canal實現(xiàn)mysql數(shù)據(jù)同步的詳細(xì)過程
1、canal下載
canal實現(xiàn)mysql數(shù)據(jù)同步可以直接安裝canal server就可以了,但是為了方便管理(instance配置,canal server狀態(tài)管理,集群等),需要安裝canal admin,應(yīng)用下載地址:Releases · alibaba/canal · GitHub
進(jìn)入頁面可以選擇需要安裝的版本
下載canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz
2、mysql同步用戶創(chuàng)建和授權(quán)
登錄mysql mysql -h 127.0.0.1 -P 3306 -u root -p 創(chuàng)建同步用戶 repl 密碼設(shè)為123456 CREATE USER 'repl'@'%' IDENTIFIED BY '123456'; 給予同步權(quán)限 GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456'; 給予repl只讀test庫的權(quán)限,test庫是用來同步數(shù)據(jù)的 GRANT SELECT ON test.* to 'repl'@'%' identified by '123456'; canal_manager是canal admin需要的,給予repl對該庫的讀寫權(quán)限 GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456'; mysql my.cnf配置文件增加主從配置master數(shù)據(jù)庫的配置信息 #主數(shù)據(jù)主從配置 唯一id server_id=1 #開啟logbin log-bin=mysql-bin #寫入模式 row binlog-format=ROW #需要同步的庫 binlog-do-db=test #忽略的數(shù)據(jù)庫 replicate-ignore-db=mysql replicate-ignore-db=sys replicate-ignore-db=information_schema replicate-ignore-db=performance_schema
在canal-admin解壓文件的conf中有一個canal_manager.sql,導(dǎo)入到master數(shù)據(jù)庫
3、canal admin安裝和啟動
把canal.admin-1.1.8.tar.gz上傳到linux
解壓 tar -zvxf canal.admin-1.1.8.tar.gz
進(jìn)入conf目錄下,編輯application.yml配置文件。
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: repl password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: 123456
重點介紹以下幾個參數(shù):
address:我們需要訂閱(也就是mysql master服務(wù)器)mysql所在的服務(wù)器IP和數(shù)據(jù)庫端口。
database:canal.admin web系統(tǒng)必須的幾張表,需要在mysql master服務(wù)器上初始化conf/canal_manager.sql文件。
sername和password就是mysql master服務(wù)器創(chuàng)建的用于復(fù)制的用戶和密碼,也就是我們在canal server中配置的repl 和 123456。
driver-class-name:mysql的驅(qū)動,默認(rèn)是MYSQL5的驅(qū)動,如果你的MYSQL是8的(我的就是),要將驅(qū)動改為com.mysql.cj.jdbc.Driver。
另外,還需要在mysql連接后面加上allowPublicKeyRetrieval=true,不然啟動時,有可能報錯。
啟動canal.admin
進(jìn)入bin目錄,執(zhí)行如下命令,啟動canal.admin:
./startup.sh
查看 admin 日志
2022-12-10 03:13:58.995 [main] INFO o.s.jmx.export.annotation.AnnotationMBeanExporter -
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)
如果出現(xiàn)上述日志,說明啟動成功!
登錄admin
通過http://127.0.0.1:8089/訪問,默認(rèn)密碼:admin/123456。
注意,IP和密碼需要改成你自己配置的。如果是在服務(wù)器上配置的,別忘記放開8089端口。
輸入用戶名和密碼之后,出現(xiàn)上述頁面說明配置成功!
如果需要修改密碼,直接通過執(zhí)行 select upper(sha1(unhex(sha1('1234567')))) 這個sql得到結(jié)果,然后復(fù)制到canal_manager庫的canal_user表的password字段中就可以了,其中1234567是明文密碼,執(zhí)行上述sql會得到一個密碼。
4、canal server安裝和啟動
把canal.deployer-1.1.8.tar.gz上傳到linux
解壓 tar -zvxf ccanal.deployer-1.1.8.tar.gz
進(jìn)入conf目錄下,編輯canal.properties配置文件。
注意,如果直接編輯canal.properties,可能無法啟動,報如下錯誤:
可以通過如下方式修改
mv canal.properties canal.properties_bak cp canal_local.properties canal.properties vim canal.properties
canal.properties文件全部內(nèi)容如下:
# register ip canal.register.ip = # canal admin config canalAdmin 的鏈接、端口、用戶名和MD5密碼 canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB # admin auto register canal server啟動后自動注入到canal admin管理模塊 canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
一般只需要修改下面這3個
canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
啟動canal.server
進(jìn)入bin目錄,執(zhí)行如下命令,啟動canal.server:
./startup.sh
查看canal日志
啟動后,canalAdmin的server管理模塊,對應(yīng)創(chuàng)建的canal server會動態(tài)識別到,狀態(tài)變?yōu)閱?/p>
5、canal數(shù)據(jù)同步
5.1、java 端集成監(jiān)聽canal 同步的mysql數(shù)據(jù)
1、引入依賴
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
2、編寫測試代碼
package com.hy.das.config; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.List; @Component public class CanalClient implements InitializingBean{ private final static int BATCH_SIZE = 1000; @Override public void afterPropertiesSet() throws Exception { // 創(chuàng)建鏈接 此處的11111為tcp端口 在canal admin Server管理模塊可以查看 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "test", "", ""); try { //打開連接 connector.connect(); //訂閱數(shù)據(jù)庫表,全部表 connector.subscribe(".*\\..*"); //回滾到未進(jìn)行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始拿 connector.rollback(); while (true) { // 獲取指定數(shù)量的數(shù)據(jù) Message message = connector.getWithoutAck(BATCH_SIZE); System.out.println(message.getEntries().size()); //獲取批量ID long batchId = message.getId(); //獲取批量的數(shù)量 int size = message.getEntries().size(); //如果沒有數(shù)據(jù) if (batchId == -1 || size == 0) { try { //線程休眠2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println("----------------"); //如果有數(shù)據(jù),處理數(shù)據(jù) //遍歷entries,單條解析 for (CanalEntry.Entry entry : message.getEntries()) { //獲取表名 String tableName = entry.getHeader().getTableName(); //獲取類型 CanalEntry.EntryType entryType = entry.getEntryType(); //獲取序列化后的數(shù)據(jù) ByteString storeValue = entry.getStoreValue(); //判斷entry類型是否為ROWDATA類型 if (CanalEntry.EntryType.ROWDATA.equals(entryType)){ //反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //獲取當(dāng)前事件操作類型 CanalEntry.EventType eventType = rowChange.getEventType(); //獲取數(shù)據(jù)集 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); //遍歷 for (CanalEntry.RowData rowData : rowDatasList) { //改變前數(shù)據(jù) JSONObject jsonObjectBefore = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { jsonObjectBefore.put(column.getName(),column.getValue()); } //改變后數(shù)據(jù) JSONObject jsonObjectAfter = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { jsonObjectAfter.put(column.getName(),column.getValue()); } System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter); } }else { System.out.println("當(dāng)前操作類型為:"+entryType); } } } //進(jìn)行 batch id 的確認(rèn)。確認(rèn)之后,小于等于此 batchId 的 Message 都會被確認(rèn)。 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } } }
newSingleConnector方法里面的test是一個instance實列,定義了需要同步的master庫的信息(ip、端口、用戶名、密碼、binlog文件名稱、同步位置、需要同步的庫、不需要同步的庫等)
在canal admin web管理界面的Instance 管理模塊,點擊新建Instance進(jìn)行創(chuàng)建,新建頁面的Instance名稱就是test,這個可以隨便填寫,代碼對應(yīng)修改就行,所屬集群/主機(jī),因為我這里是單機(jī)部署,直接選擇自動注入的canal server就行,點擊載入模板,獲取配置初始信息,下圖中標(biāo)出的信息按照實際的修改填入就行,點擊保存后,啟動這個Instance。
3、啟動服務(wù),對test庫的sys_user表進(jìn)行數(shù)據(jù)更新,可以看到后臺已經(jīng)收到變更數(shù)據(jù)
5.2、kafka同步數(shù)據(jù)
1:canal.properties配置文件增加如下配置
#數(shù)據(jù)變更發(fā)送到kafka # 設(shè)置輸出目標(biāo)為 kafka canal.serverMode = kafka # Kafka 地址 canal.mq.servers = xx.xx.xx.xx:9092 # 投遞失敗的重試次數(shù),默認(rèn)0,改為2 canal.mq.retries = 2 # Kafka batch.size,即producer一個微批次的大小,默認(rèn)16K,這里加倍 canal.mq.batchSize = 32768 # Kafka max.request.size,即一個請求的最大大小,默認(rèn)1M,這里也加倍 canal.mq.maxRequestSize = 2097152 # Kafka linger.ms,即sender線程在檢查微批次是否就緒時的超時,默認(rèn)0ms,這里改為200ms # 滿足batch.size和linger.ms其中之一,就會發(fā)送消息 canal.mq.lingerMs = 200 # Kafka buffer.memory,緩存大小,默認(rèn)32M canal.mq.bufferMemory = 33554432 # 獲取binlog數(shù)據(jù)的批次大小,默認(rèn)50 canal.mq.canalBatchSize = 50 # 獲取binlog數(shù)據(jù)的超時時間,默認(rèn)200ms canal.mq.canalGetTimeout = 200 # 是否將binlog轉(zhuǎn)為JSON格式。如果為false,就是原生Protobuf格式 canal.mq.flatMessage = true # 壓縮類型,官方文檔沒有描述 canal.mq.compressionType = none # Kafka acks,默認(rèn)all,表示分區(qū)leader會等所有follower同步完才給producer發(fā)送ack # 0表示不等待ack,1表示leader寫入完畢之后直接ack canal.mq.acks = all # Kafka消息投遞是否使用事務(wù) # 主要針對flatMessage的異步發(fā)送和動態(tài)多topic消息投遞進(jìn)行事務(wù)控制來保持和Canal binlog位置的一致性 # flatMessage模式下建議開啟 canal.mq.transaction = true
2:在canal admin web界面修改instance mq配置,增加數(shù)據(jù)同步到kakfa的topic
3:如上兩步配置完成重啟后,在kafka監(jiān)聽配置的topic就可以接收到數(shù)據(jù)了
6、java tcp同步只是其中一種方式,還可以通過kafka、rabbitmq等方式進(jìn)行數(shù)據(jù)同步
注意上面需要提供對外訪問的端口需要開通安全組,比如8089、11111等端口。
參考文章:
https://zhuanlan.zhihu.com/p/590705531
到此這篇關(guān)于canal實現(xiàn)mysql數(shù)據(jù)同步的文章就介紹到這了,更多相關(guān)canal mysql數(shù)據(jù)同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
CentOS 7下MySQL服務(wù)啟動失敗的快速解決方法
CentOS 7下MySQL服務(wù)啟動失敗怎么辦?下面小編就為大家?guī)硪黄狢entOS 7下MySQL服務(wù)啟動失敗的快速解決方法?,F(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-03-03關(guān)于Mysql中文亂碼問題該如何解決(亂碼問題完美解決方案)
這篇文章給大家介紹關(guān)于Mysql中文亂碼問題該如何解決(亂碼問題完美解決方案)的相關(guān)資料,還給大家收集些關(guān)于MySQL會出現(xiàn)中文亂碼原因常見的幾點,小伙伴快來看看吧2015-11-11mysql 服務(wù)意外停止1067錯誤解決辦法小結(jié)
今天在配置服務(wù)器時安裝mysql5.5總是無法安裝,查看日志錯誤提示為1067錯誤,下面來看我的解決辦法2012-11-11MySQL數(shù)據(jù)庫wait_timeout參數(shù)詳細(xì)介紹
這篇文章主要介紹了MySQL數(shù)據(jù)庫wait_timeout參數(shù)詳細(xì)介紹的相關(guān)資料,wait_timeout是MySQL中用于控制非交互式連接等待時間的系統(tǒng)變量,影響服務(wù)器資源管理和安全性,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-12-12Windows下mysql community server 8.0.16安裝配置方法圖文教程
這篇文章主要為大家詳細(xì)介紹了Windows下mysql community server 8.0.16安裝配置方法圖文教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-06-06