MySQL 與 Elasticsearch 數(shù)據(jù)不對稱問題解決辦法
MySQL 與 Elasticsearch 數(shù)據(jù)不對稱問題解決辦法
jdbc-input-plugin 只能實現(xiàn)數(shù)據(jù)庫的追加,對于 elasticsearch 增量寫入,但經(jīng)常jdbc源一端的數(shù)據(jù)庫可能會做數(shù)據(jù)庫刪除或者更新操作。這樣一來數(shù)據(jù)庫與搜索引擎的數(shù)據(jù)庫就出現(xiàn)了不對稱的情況。
當(dāng)然你如果有開發(fā)團隊可以寫程序在刪除或者更新的時候同步對搜索引擎操作。如果你沒有這個能力,可以嘗試下面的方法。
這里有一個數(shù)據(jù)表 article , mtime 字段定義了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的時間都會變化
mysql> desc article; +-------------+--------------+------+-----+--------------------------------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+--------------------------------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | content | longtext | YES | | NULL | | | status | enum('Y','N')| NO | | 'N' | | | ctime | timestamp | NO | | CURRENT_TIMESTAMP | | | mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-------+ 7 rows in set (0.00 sec)
logstash 增加 mtime 的查詢規(guī)則
jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" #定時cron的表達(dá)式,這里是每分鐘執(zhí)行一次 statement => "select * from article where mtime > :sql_last_value" use_column_value => true tracking_column => "mtime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-mtime.last" }
創(chuàng)建回收站表,這個事用于解決數(shù)據(jù)庫刪除,或者禁用 status = 'N' 這種情況的。
CREATE TABLE `elasticsearch_trash` ( `id` int(11) NOT NULL, `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
為 article 表創(chuàng)建觸發(fā)器
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW BEGIN -- 此處的邏輯是解決文章狀態(tài)變?yōu)?N 的時候,需要將搜索引擎中對應(yīng)的數(shù)據(jù)刪除。 IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; -- 此處邏輯是修改狀態(tài)到 Y 的時候,方式elasticsearch_trash仍然存在該文章ID,導(dǎo)致誤刪除。所以需要刪除回收站中得回收記錄。 IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- 此處邏輯是文章被刪除同事將改文章放入搜索引擎回收站。 insert into elasticsearch_trash(id) values(OLD.id); END
接下來我們需要寫一個簡單地 Shell 每分鐘運行一次,從 elasticsearch_trash 數(shù)據(jù)表中取出數(shù)據(jù),然后使用 curl 命令調(diào)用 elasticsearch restful 接口,刪除被收回的數(shù)據(jù)。
你還可以開發(fā)相關(guān)的程序,這里提供一個 Spring boot 定時任務(wù)例子。
實體
package cn.netkiller.api.domain.elasticsearch; import java.util.Date; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class ElasticsearchTrash { @Id private int id; @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") private Date ctime; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getCtime() { return ctime; } public void setCtime(Date ctime) { this.ctime = ctime; } }
倉庫
package cn.netkiller.api.repository.elasticsearch; import org.springframework.data.repository.CrudRepository; import com.example.api.domain.elasticsearch.ElasticsearchTrash; public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{ }
定時任務(wù)
package cn.netkiller.api.schedule; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Component public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); @Autowired private TransportClient client; @Autowired private ElasticsearchTrashRepository alasticsearchTrashRepository; public ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // 60秒運行一次調(diào)度任務(wù) public void cleanTrash() { for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get(); RestStatus status = response.status(); logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString()); if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) { alasticsearchTrashRepository.delete(elasticsearchTrash); } } } }
Spring boot 啟動主程序。
package cn.netkiller.api; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
以上就是MySQL 與 Elasticsearch 數(shù)據(jù)不對稱問題解決辦法的講解,如有疑問請留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
- MySQL數(shù)據(jù)同步Elasticsearch的4種方案
- logstash將mysql數(shù)據(jù)同步到elasticsearch方法詳解
- 使用logstash同步mysql數(shù)據(jù)到elasticsearch實現(xiàn)
- 使用canal監(jiān)控mysql數(shù)據(jù)庫實現(xiàn)elasticsearch索引實時更新問題
- Mysql到Elasticsearch高效實時同步Debezium實現(xiàn)
- 詳解Mysql如何實現(xiàn)數(shù)據(jù)同步到Elasticsearch
- 用python簡單實現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程
- 如何在Elasticsearch中啟用和使用SQL功能
相關(guān)文章
MySQL實現(xiàn)快速刪除所有表而不刪除數(shù)據(jù)庫的方法
這篇文章主要介紹了MySQL實現(xiàn)快速刪除所有表而不刪除數(shù)據(jù)庫的方法,涉及mysql批量執(zhí)行語句的相關(guān)操作技巧,需要的朋友可以參考下2017-09-09Mysql經(jīng)典高逼格/命令行操作(速成)(推薦)
這篇文章主要介紹了Mysql命令行操作,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04教你如何讓spark?sql寫mysql的時候支持update操作
spark提供了一個枚舉類,用來支撐對接數(shù)據(jù)源的操作模式,本文重點給大家介紹如何讓spark?sql寫mysql的時候支持update操作,本文通過實例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2022-02-02mysql 批量更新與批量更新多條記錄的不同值實現(xiàn)方法
在mysql中批量更新我們可能使用update,replace into來操作,下面小編來給各位同學(xué)詳細(xì)介紹mysql 批量更新與性能吧2013-10-10k8s搭建mysql集群實現(xiàn)主從復(fù)制的方法步驟
本文是基于已有k8s環(huán)境下,介紹在k8s環(huán)境中部署mysql主從集群的實現(xiàn)步驟,對mysql學(xué)習(xí)有一定的幫助,感興趣的可以學(xué)習(xí)一下2023-01-01