欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

canal實現(xiàn)mysql數(shù)據(jù)同步的詳細(xì)過程

 更新時間:2025年06月12日 15:14:30   作者:那些樂趣  
這篇文章主要介紹了canal實現(xiàn)mysql數(shù)據(jù)同步的詳細(xì)過程,本文通過實例圖文相結(jié)合給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧

1、canal下載

canal實現(xiàn)mysql數(shù)據(jù)同步可以直接安裝canal server就可以了,但是為了方便管理(instance配置,canal server狀態(tài)管理,集群等),需要安裝canal admin,應(yīng)用下載地址:Releases · alibaba/canal · GitHub

進(jìn)入頁面可以選擇需要安裝的版本

下載canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz

2、mysql同步用戶創(chuàng)建和授權(quán)

登錄mysql
mysql -h 127.0.0.1 -P 3306 -u root -p
創(chuàng)建同步用戶 repl 密碼設(shè)為123456
CREATE USER 'repl'@'%' IDENTIFIED BY '123456';
給予同步權(quán)限
GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456';
給予repl只讀test庫的權(quán)限,test庫是用來同步數(shù)據(jù)的
GRANT SELECT ON test.* to 'repl'@'%' identified by '123456';
canal_manager是canal admin需要的,給予repl對該庫的讀寫權(quán)限
GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456';
mysql my.cnf配置文件增加主從配置master數(shù)據(jù)庫的配置信息
#主數(shù)據(jù)主從配置 唯一id
server_id=1
#開啟logbin
log-bin=mysql-bin
#寫入模式 row
binlog-format=ROW
#需要同步的庫
binlog-do-db=test
#忽略的數(shù)據(jù)庫
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema

在canal-admin解壓文件的conf中有一個canal_manager.sql,導(dǎo)入到master數(shù)據(jù)庫

3、canal admin安裝和啟動

把canal.admin-1.1.8.tar.gz上傳到linux

解壓 tar -zvxf canal.admin-1.1.8.tar.gz 

進(jìn)入conf目錄下,編輯application.yml配置文件。

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: repl
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
canal:
  adminUser: admin
  adminPasswd: 123456

重點介紹以下幾個參數(shù):

address:我們需要訂閱(也就是mysql master服務(wù)器)mysql所在的服務(wù)器IP和數(shù)據(jù)庫端口。

database:canal.admin web系統(tǒng)必須的幾張表,需要在mysql master服務(wù)器上初始化conf/canal_manager.sql文件。

sername和password就是mysql master服務(wù)器創(chuàng)建的用于復(fù)制的用戶和密碼,也就是我們在canal server中配置的repl 和 123456。

driver-class-name:mysql的驅(qū)動,默認(rèn)是MYSQL5的驅(qū)動,如果你的MYSQL是8的(我的就是),要將驅(qū)動改為com.mysql.cj.jdbc.Driver。

另外,還需要在mysql連接后面加上allowPublicKeyRetrieval=true,不然啟動時,有可能報錯。

啟動canal.admin

進(jìn)入bin目錄,執(zhí)行如下命令,啟動canal.admin:

./startup.sh

查看 admin 日志

2022-12-10 03:13:58.995 [main] INFO  o.s.jmx.export.annotation.AnnotationMBeanExporter - 
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)

如果出現(xiàn)上述日志,說明啟動成功!

登錄admin

通過http://127.0.0.1:8089/訪問,默認(rèn)密碼:admin/123456。

注意,IP和密碼需要改成你自己配置的。如果是在服務(wù)器上配置的,別忘記放開8089端口。

輸入用戶名和密碼之后,出現(xiàn)上述頁面說明配置成功!

如果需要修改密碼,直接通過執(zhí)行 select upper(sha1(unhex(sha1('1234567')))) 這個sql得到結(jié)果,然后復(fù)制到canal_manager庫的canal_user表的password字段中就可以了,其中1234567是明文密碼,執(zhí)行上述sql會得到一個密碼。

4、canal server安裝和啟動

把canal.deployer-1.1.8.tar.gz上傳到linux

解壓 tar -zvxf ccanal.deployer-1.1.8.tar.gz

進(jìn)入conf目錄下,編輯canal.properties配置文件。

注意,如果直接編輯canal.properties,可能無法啟動,報如下錯誤:

可以通過如下方式修改

mv canal.properties canal.properties_bak
cp canal_local.properties canal.properties
vim canal.properties

canal.properties文件全部內(nèi)容如下:

# register ip
canal.register.ip =
# canal admin config  canalAdmin 的鏈接、端口、用戶名和MD5密碼
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
# admin auto register canal server啟動后自動注入到canal admin管理模塊
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =

一般只需要修改下面這3個

canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB

啟動canal.server

進(jìn)入bin目錄,執(zhí)行如下命令,啟動canal.server:

./startup.sh

查看canal日志

啟動后,canalAdmin的server管理模塊,對應(yīng)創(chuàng)建的canal server會動態(tài)識別到,狀態(tài)變?yōu)閱?/p>

5、canal數(shù)據(jù)同步

5.1、java 端集成監(jiān)聽canal 同步的mysql數(shù)據(jù)

1、引入依賴

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2、編寫測試代碼

package com.hy.das.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements InitializingBean{
    private final static int BATCH_SIZE = 1000;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 創(chuàng)建鏈接 此處的11111為tcp端口 在canal admin Server管理模塊可以查看
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "test", "", "");
        try {
            //打開連接
            connector.connect();
            //訂閱數(shù)據(jù)庫表,全部表
            connector.subscribe(".*\\..*");
            //回滾到未進(jìn)行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始拿
            connector.rollback();
            while (true) {
                // 獲取指定數(shù)量的數(shù)據(jù)
                Message message = connector.getWithoutAck(BATCH_SIZE);
                System.out.println(message.getEntries().size());
                //獲取批量ID
                long batchId = message.getId();
                //獲取批量的數(shù)量
                int size = message.getEntries().size();
                //如果沒有數(shù)據(jù)
                if (batchId == -1 || size == 0) {
                    try {
                        //線程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("----------------");
                    //如果有數(shù)據(jù),處理數(shù)據(jù)
                    //遍歷entries,單條解析
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        //獲取表名
                        String tableName = entry.getHeader().getTableName();
                        //獲取類型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //獲取序列化后的數(shù)據(jù)
                        ByteString storeValue = entry.getStoreValue();
                        //判斷entry類型是否為ROWDATA類型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //獲取當(dāng)前事件操作類型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //獲取數(shù)據(jù)集
                            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                            //遍歷
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改變前數(shù)據(jù)
                                JSONObject jsonObjectBefore = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改變后數(shù)據(jù)
                                JSONObject jsonObjectAfter = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("當(dāng)前操作類型為:"+entryType);
                        }
                    }
                }
                //進(jìn)行 batch id 的確認(rèn)。確認(rèn)之后,小于等于此 batchId 的 Message 都會被確認(rèn)。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

newSingleConnector方法里面的test是一個instance實列,定義了需要同步的master庫的信息(ip、端口、用戶名、密碼、binlog文件名稱、同步位置、需要同步的庫、不需要同步的庫等)

在canal admin web管理界面的Instance 管理模塊,點擊新建Instance進(jìn)行創(chuàng)建,新建頁面的Instance名稱就是test,這個可以隨便填寫,代碼對應(yīng)修改就行,所屬集群/主機(jī),因為我這里是單機(jī)部署,直接選擇自動注入的canal server就行,點擊載入模板,獲取配置初始信息,下圖中標(biāo)出的信息按照實際的修改填入就行,點擊保存后,啟動這個Instance。

3、啟動服務(wù),對test庫的sys_user表進(jìn)行數(shù)據(jù)更新,可以看到后臺已經(jīng)收到變更數(shù)據(jù)

5.2、kafka同步數(shù)據(jù)

1:canal.properties配置文件增加如下配置

#數(shù)據(jù)變更發(fā)送到kafka
# 設(shè)置輸出目標(biāo)為 kafka
canal.serverMode = kafka
# Kafka 地址
canal.mq.servers  = xx.xx.xx.xx:9092
# 投遞失敗的重試次數(shù),默認(rèn)0,改為2
canal.mq.retries = 2
# Kafka batch.size,即producer一個微批次的大小,默認(rèn)16K,這里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一個請求的最大大小,默認(rèn)1M,這里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender線程在檢查微批次是否就緒時的超時,默認(rèn)0ms,這里改為200ms
# 滿足batch.size和linger.ms其中之一,就會發(fā)送消息
canal.mq.lingerMs = 200
# Kafka buffer.memory,緩存大小,默認(rèn)32M
canal.mq.bufferMemory = 33554432
# 獲取binlog數(shù)據(jù)的批次大小,默認(rèn)50
canal.mq.canalBatchSize = 50
# 獲取binlog數(shù)據(jù)的超時時間,默認(rèn)200ms
canal.mq.canalGetTimeout = 200
# 是否將binlog轉(zhuǎn)為JSON格式。如果為false,就是原生Protobuf格式
canal.mq.flatMessage = true
# 壓縮類型,官方文檔沒有描述
canal.mq.compressionType = none
# Kafka acks,默認(rèn)all,表示分區(qū)leader會等所有follower同步完才給producer發(fā)送ack
# 0表示不等待ack,1表示leader寫入完畢之后直接ack
canal.mq.acks = all
# Kafka消息投遞是否使用事務(wù)
# 主要針對flatMessage的異步發(fā)送和動態(tài)多topic消息投遞進(jìn)行事務(wù)控制來保持和Canal binlog位置的一致性
# flatMessage模式下建議開啟
canal.mq.transaction = true

2:在canal admin web界面修改instance mq配置,增加數(shù)據(jù)同步到kakfa的topic

3:如上兩步配置完成重啟后,在kafka監(jiān)聽配置的topic就可以接收到數(shù)據(jù)了

6、java tcp同步只是其中一種方式,還可以通過kafka、rabbitmq等方式進(jìn)行數(shù)據(jù)同步

注意上面需要提供對外訪問的端口需要開通安全組,比如8089、11111等端口。

參考文章:

【CanalAdmin部署文檔】

https://zhuanlan.zhihu.com/p/590705531

到此這篇關(guān)于canal實現(xiàn)mysql數(shù)據(jù)同步的文章就介紹到這了,更多相關(guān)canal mysql數(shù)據(jù)同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • CentOS 7下MySQL服務(wù)啟動失敗的快速解決方法

    CentOS 7下MySQL服務(wù)啟動失敗的快速解決方法

    CentOS 7下MySQL服務(wù)啟動失敗怎么辦?下面小編就為大家?guī)硪黄狢entOS 7下MySQL服務(wù)啟動失敗的快速解決方法?,F(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-03-03
  • 關(guān)于Mysql中文亂碼問題該如何解決(亂碼問題完美解決方案)

    關(guān)于Mysql中文亂碼問題該如何解決(亂碼問題完美解決方案)

    這篇文章給大家介紹關(guān)于Mysql中文亂碼問題該如何解決(亂碼問題完美解決方案)的相關(guān)資料,還給大家收集些關(guān)于MySQL會出現(xiàn)中文亂碼原因常見的幾點,小伙伴快來看看吧
    2015-11-11
  • MySQL中的三大日志用法及分析

    MySQL中的三大日志用法及分析

    這篇文章主要介紹了MySQL中的三大日志用法,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2025-05-05
  • mysql序號rownum行號實現(xiàn)方式

    mysql序號rownum行號實現(xiàn)方式

    這篇文章主要介紹了mysql序號rownum行號實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • MySQL數(shù)據(jù)庫鎖機(jī)制原理解析

    MySQL數(shù)據(jù)庫鎖機(jī)制原理解析

    這篇文章主要介紹了MySQL數(shù)據(jù)庫鎖機(jī)制原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-12-12
  • mysql 服務(wù)意外停止1067錯誤解決辦法小結(jié)

    mysql 服務(wù)意外停止1067錯誤解決辦法小結(jié)

    今天在配置服務(wù)器時安裝mysql5.5總是無法安裝,查看日志錯誤提示為1067錯誤,下面來看我的解決辦法
    2012-11-11
  • MySQL數(shù)據(jù)庫wait_timeout參數(shù)詳細(xì)介紹

    MySQL數(shù)據(jù)庫wait_timeout參數(shù)詳細(xì)介紹

    這篇文章主要介紹了MySQL數(shù)據(jù)庫wait_timeout參數(shù)詳細(xì)介紹的相關(guān)資料,wait_timeout是MySQL中用于控制非交互式連接等待時間的系統(tǒng)變量,影響服務(wù)器資源管理和安全性,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-12-12
  • Windows下mysql community server 8.0.16安裝配置方法圖文教程

    Windows下mysql community server 8.0.16安裝配置方法圖文教程

    這篇文章主要為大家詳細(xì)介紹了Windows下mysql community server 8.0.16安裝配置方法圖文教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-06-06
  • mysql單字段多值分割和合并的處理方法

    mysql單字段多值分割和合并的處理方法

    這篇文章主要給大家介紹了關(guān)于mysql單字段多值分割和合并的處理方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • MySQL索引的基本語法

    MySQL索引的基本語法

    這篇文章主要介紹了MySQL索引的基本語法,幫助大家更好的理解和學(xué)習(xí)MySQL,感興趣的朋友可以了解下
    2020-08-08

最新評論