Mysql到Elasticsearch高效實時同步Debezium實現(xiàn)
題記
來自Elasticsearch中文社區(qū)的問題——
MySQL中表無唯一遞增字段,也無唯一遞增時間字段,該怎么使用logstash實現(xiàn)MySQL實時增量導(dǎo)數(shù)據(jù)到es中?
logstash和kafka_connector都僅支持基于自增id或者時間戳更新的方式增量同步數(shù)據(jù)。
回到問題本身:如果庫表里沒有相關(guān)字段,該如何處理呢?
本文給出相關(guān)探討和解決方案。
1、 binlog認(rèn)知
1.1 啥是 binlog?
binlog是Mysql sever層維護(hù)的一種二進(jìn)制日志,與innodb引擎中的redo/undo log是完全不同的日志;
其主要是用來記錄對mysql數(shù)據(jù)更新或潛在發(fā)生更新的SQL語句,并以"事務(wù)"的形式保存在磁盤中;
作用主要有:
1)復(fù)制:達(dá)到master-slave數(shù)據(jù)一致的目的。2)數(shù)據(jù)恢復(fù):通過mysqlbinlog工具恢復(fù)數(shù)據(jù)。3)增量備份。
1.2 阿里的Canal實現(xiàn)了增量Mysql同步
一圖勝千言,canal是用java開發(fā)的基于數(shù)據(jù)庫增量日志解析、提供增量數(shù)據(jù)訂閱&消費的中間件。
目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用來處理獲得的相關(guān)數(shù)據(jù)。目的:增量數(shù)據(jù)訂閱&消費。
綜上,使用binlog可以突破logstash或者kafka-connector沒有自增id或者沒有時間戳字段的限制,實現(xiàn)增量同步。
2、基于binlog的同步方式
1)基于kafka Connect的Debezium 開源工程,地址:. https://debezium.io/
2)不依賴第三方的獨立應(yīng)用: Maxwell開源項目,地址:http://maxwells-daemon.io/
由于已經(jīng)部署過conluent(kafka的企業(yè)版本,自帶zookeeper、kafka、ksql、kafka-connector等),本文僅針對Debezium展開。
3、Debezium介紹
Debezium是捕獲數(shù)據(jù)實時動態(tài)變化的開源的分布式同步平臺。能實時捕獲到數(shù)據(jù)源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、刪除(deletes)操作,實時同步到Kafka,穩(wěn)定性強且速度非???。
特點:
1)簡單。無需修改應(yīng)用程序??蓪ν馓峁┓?wù)。
2)穩(wěn)定。持續(xù)跟蹤每一行的每一處變動。
3)快速。構(gòu)建于kafka之上,可擴(kuò)展,經(jīng)官方驗證可處理大容量的數(shù)據(jù)。
4、同步架構(gòu)
如圖,Mysql到ES的同步策略,采取“曲線救國”機制。
步驟1: 基Debezium的binlog機制,將Mysql數(shù)據(jù)同步到Kafka。
步驟2: 基于Kafka_connector機制,將kafka數(shù)據(jù)同步到Elasticsearch。
5、Debezium實現(xiàn)Mysql到ES增刪改實時同步
軟件版本:
confluent:5.1.2;
Debezium:0.9.2_Final;
Mysql:5.7.x.
Elasticsearch:6.6.1
5.1 Debezium安裝
confluent的安裝部署參見:http://t.cn/Ef5poZk,不再贅述。
Debezium的安裝只需要把debezium-connector-mysql的壓縮包解壓放到Confluent的解壓后的插件目錄(share/java)中。
MySQL Connector plugin 壓縮包的下載地址:https://debezium.io/docs/install/。
注意重啟一下confluent,以使得Debezium生效。
5.2 Mysql binlog等相關(guān)配置。
Debezium使用MySQL的binlog機制實現(xiàn)數(shù)據(jù)動態(tài)變化監(jiān)測,所以需要Mysql提前配置binlog。
核心配置如下,在Mysql機器的/etc/my.cnf的mysqld下添加如下配置。
[mysqld] server-id = 223344 log_bin = mysql-bin binlog_format = row binlog_row_image = full expire_logs_days = 10
然后,重啟一下Mysql以使得binlog生效。
systemctl start mysqld.service
5.3 配置connector連接器。
配置confluent路徑目錄 : /etc
創(chuàng)建文件夾命令 :
mkdir kafka-connect-debezium
在mysql2kafka_debezium.json存放connector的配置信息 :
[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json { "name" : "debezium-mysql-source-0223", "config": { "connector.class" : "io.debezium.connector.mysql.MySqlConnector", "database.hostname" : "192.168.1.22", "database.port" : "3306", "database.user" : "root", "database.password" : "XXXXXX", "database.whitelist" : "kafka_base_db", "table.whitlelist" : "accounts", "database.server.id" : "223344", "database.server.name" : "full", "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092", "database.history.kafka.topic" : "account_topic", "include.schema.changes" : "true" , "incrementing.column.name" : "id", "database.history.skip.unparseable.ddl" : "true", "transforms": "unwrap,changetopic", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.changetopic.regex":"(.*)", "transforms.changetopic.replacement":"$1-smt" } }
注意如下配置:
“database.server.id”:對應(yīng)Mysql中的server-id的配置。
“database.whitelist”: 待同步的Mysql數(shù)據(jù)庫名。
“table.whitlelist”:待同步的Mysq表名。
“database.history.kafka.topic”:存儲數(shù)據(jù)庫的Shcema的記錄信息,而非寫入數(shù)據(jù)的topic
“database.server.name”:邏輯名稱,每個connector確保唯一,作為寫入數(shù)據(jù)的kafka topic的前綴名稱。
坑1:transforms相關(guān)5行配置作用是寫入數(shù)據(jù)格式轉(zhuǎn)換。
如果沒有,輸入數(shù)據(jù)會包含:before、after記錄修改前對比信息以及元數(shù)據(jù)信息(source,op,ts_ms等)。
這些信息在后續(xù)數(shù)據(jù)寫入Elasticsearch是不需要的。(注意結(jié)合自己業(yè)務(wù)場景)。
格式轉(zhuǎn)換相關(guān)原理:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
5.4 啟動connector
curl -X POST -H "Content-Type:application/json" --data @mysql2kafka_debezium.json.json http://192.168.1.22:18083/connectors | jq
5.5 驗證寫入是否成功。
查看kafka-topic
kafka-topics --list --zookeeper localhost:2181
此處會看到寫入數(shù)據(jù)topic的信息。
注意新寫入數(shù)據(jù)topic的格式:database.schema.table-smt 三部分組成。
本示例topic名稱:full.kafka_base_db.account-smt。
5.6 驗證消費數(shù)據(jù)驗證寫入是否正常
./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning
至此,Debezium實現(xiàn)mysql同步kafka完成。
6、kafka-connector實現(xiàn)kafka同步Elasticsearch
6.1、Kafka-connector介紹
見官網(wǎng):https://docs.confluent.io/current/connect.html
Kafka Connect是一個用于連接Kafka與外部系統(tǒng)(如數(shù)據(jù)庫,鍵值存儲,檢索系統(tǒng)索引和文件系統(tǒng))的框架。
連接器實現(xiàn)公共數(shù)據(jù)源數(shù)據(jù)(如Mysql、Mongo、Pgsql等)寫入Kafka,或者Kafka數(shù)據(jù)寫入目標(biāo)數(shù)據(jù)庫,也可以自己開發(fā)連接器。
6.2、kafka到ES connector同步配置
配置路徑:
/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
配置內(nèi)容:
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "full.kafka_base_db.account-smt", "key.ignore": "true", "connection.url": "http://192.168.1.22:9200", "type.name": "_doc", "name": "elasticsearch-sink-test"
6.3 kafka到ES啟動connector
啟動命令
confluent load elasticsearch-sink-test -d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
6.4 Kafka-connctor RESTFul API查看
Mysql2kafka,kafka2ES的connector詳情信息可以借助postman或者瀏覽器或者命令行查看。
curl -X GET http://localhost:8083/connectors
7、坑復(fù)盤。
坑2: 同步的過程中可能出現(xiàn)錯誤,比如:kafka topic沒法消費到數(shù)據(jù)。
排解思路如下:
1)確認(rèn)消費的topic是否是寫入數(shù)據(jù)的topic;2)確認(rèn)同步的過程中沒有出錯。可以借助connector如下命令查看。
curl -X GET http://localhost:8083/connectors-xxx/status
坑3: Mysql2ES出現(xiàn)日期格式不能識別。
是Mysql jar包的問題,解決方案:在my.cnf中配置時區(qū)信息即可。
坑4: kafka2ES,ES沒有寫入數(shù)據(jù)。
排解思路:
1)建議:先創(chuàng)建同topic名稱一致的索引,注意:Mapping靜態(tài)自定義,不要動態(tài)識別生成。
2)通過connetor/status排查出錯原因,一步步分析。
8、小結(jié)
binlog的實現(xiàn)突破了字段的限制,實際上業(yè)界的go-mysql-elasticsearch已經(jīng)實現(xiàn)。
對比:logstash、kafka-connector,雖然Debezium“曲線救國”兩步實現(xiàn)了實時同步,但穩(wěn)定性+實時性能相對不錯。
參考:
[1] https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/
[2] https://www.smwenku.com/a/5c0a7b61bd9eee6fb21356a1/zh-cn
[3] https://juejin.im/post/5b7c036bf265da43506e8cfd
[4] https://debezium.io/docs/connectors/mysql/#configuration
[5] https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc
以上就是Mysql到Elasticsearch高效實時同步Debezium實現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于Mysql到Elasticsearch同步的資料請關(guān)注腳本之家其它相關(guān)文章!
- MySQL數(shù)據(jù)同步Elasticsearch的4種方案
- logstash將mysql數(shù)據(jù)同步到elasticsearch方法詳解
- 使用logstash同步mysql數(shù)據(jù)到elasticsearch實現(xiàn)
- 使用canal監(jiān)控mysql數(shù)據(jù)庫實現(xiàn)elasticsearch索引實時更新問題
- 詳解Mysql如何實現(xiàn)數(shù)據(jù)同步到Elasticsearch
- 用python簡單實現(xiàn)mysql數(shù)據(jù)同步到ElasticSearch的教程
- MySQL 與 Elasticsearch 數(shù)據(jù)不對稱問題解決辦法
- 如何在Elasticsearch中啟用和使用SQL功能
相關(guān)文章
具有負(fù)載均衡功能的MySQL服務(wù)器集群部署及實現(xiàn)
MySQL是一個高速度、高性能、多線程的關(guān)系型數(shù)據(jù)庫管理系統(tǒng),適用平臺多,可擴(kuò)展性強。2011-05-05MySql總彈出mySqlInstallerConsole窗口的解決方法
這篇文章主要介紹了MySql總彈出mySqlInstallerConsole窗口的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-09-09mysql 8.0.16 winx64及Linux修改root用戶密碼 的方法
這篇文章主要介紹了mysql 8.0.16 winx64及Linux修改root用戶密碼 的方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2019-07-07詳解如何用SQL取出字段內(nèi)是json的數(shù)據(jù)
數(shù)據(jù)庫中會遇到字段里面存的JSON結(jié)果的數(shù)據(jù),那么如果我們想直接取到JSON里的值該怎么辦呢?其實SQL自帶的函數(shù)就可解決本文就詳細(xì)的給大家介紹了如何用SQL取出字段內(nèi)是json的數(shù)據(jù),需要的朋友可以參考下2023-10-10跳槽必備之你設(shè)計索引的原則是什么?怎么避免索引失效?
索引的設(shè)計可以遵循一些已有的原則,創(chuàng)建索引的時候請盡量符合這些原則,便于提升索引地使用效率,更高效地使用索引。今天給大家介紹跳槽必備之你設(shè)計索引的原則是什么?怎么避免索引失效?感興趣的朋友一起看看吧2021-05-05