欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot2.3.x整合Canal的示例代碼

 更新時(shí)間:2022年02月19日 14:46:37   作者:保護(hù)我方胖虎  
canal是阿里開源mysql?binlog?數(shù)據(jù)組件,canal-server?才是canal的核心我們前邊所講的canal的功能,實(shí)際上講述的就是canal-server的功能,本文給大家介紹Springboot2.3.x整合Canal的示例代碼,需要的朋友可以參考下

一、故事背景

前言…

最近工作中遇到了一個(gè)數(shù)據(jù)同步的問題

我們這邊系統(tǒng)的一個(gè)子業(yè)務(wù)需要依賴另一個(gè)系統(tǒng)的數(shù)據(jù),當(dāng)另一個(gè)系統(tǒng)數(shù)據(jù)變更時(shí),我們這邊的數(shù)據(jù)庫(kù)要對(duì)數(shù)據(jù)進(jìn)行同步…

那么我自己想到的同步方式呢就兩種:

1、MQ訂閱,另一個(gè)系統(tǒng)數(shù)據(jù)變更后將變更數(shù)據(jù)方式到MQ 我們這邊訂閱接受

2、數(shù)據(jù)庫(kù)的觸發(fā)器

但是呢,兩者都被組長(zhǎng)paas了!

1、MQ呢,會(huì)造成代碼侵入,但是另一個(gè)系統(tǒng)暫時(shí)不會(huì)做任何代碼更改…

2、數(shù)據(jù)庫(kù)的觸發(fā)器會(huì)直接跟生產(chǎn)數(shù)據(jù)庫(kù)強(qiáng)關(guān)聯(lián),會(huì)搶占資源,甚至有可能造成生產(chǎn)數(shù)據(jù)庫(kù)的不穩(wěn)定…

對(duì)此很是苦惱…

于是啊,只能借由強(qiáng)大的google、百度,看看能不能解決我這個(gè)問題!一番搜索,有學(xué)習(xí)了一個(gè)很有趣的東西…

Canal

二、什么是Canal

canal:阿里開源mysql binlog 數(shù)據(jù)組件

官網(wǎng)解釋的相當(dāng)詳細(xì)了(國(guó)產(chǎn)牛逼)…下邊我也是照搬過來的…

官網(wǎng)地址如下:https://github.com/alibaba/canal/wiki

早期,阿里巴巴B2B公司因?yàn)榇嬖诤贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求。不過早期的數(shù)據(jù)庫(kù)同步業(yè)務(wù),主要是基于trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基于數(shù)據(jù)庫(kù)的日志解析,獲取增量變更進(jìn)行同步,由此衍生出了增量訂閱&消費(fèi)的業(yè)務(wù),從此開啟了一段新紀(jì)元。ps. 目前內(nèi)部使用的同步,已經(jīng)支持mysql5.x和oracle部分版本的日志解析

image-20200926161534433

canal [k?’næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)

工作原理

  • canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送 dump 協(xié)議
  • MySQL master 收到 dump 請(qǐng)求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對(duì)象(原始為 byte 流)

canal呢,實(shí)際是就是運(yùn)用了Mysql的主從復(fù)制原理…

MySQL主從復(fù)制實(shí)現(xiàn)

image-20200926161859141

復(fù)制遵循三步過程:

  • 主服務(wù)器將更改記錄到binlog中(這些記錄稱為binlog事件,可以通過來查看show binary events
  • 從服務(wù)器將主服務(wù)器的二進(jìn)制日志事件復(fù)制到其中繼日志。
  • 中繼日志中的從服務(wù)器重做事件隨后將更新其舊數(shù)據(jù)。

如何運(yùn)作

image-20200926161913848

原理很簡(jiǎn)單:

  • Canal模擬MySQL從站的交互協(xié)議,偽裝成MySQL從站,然后將轉(zhuǎn)儲(chǔ)協(xié)議發(fā)送到MySQL主服務(wù)器。
  • MySQL Master接收到轉(zhuǎn)儲(chǔ)請(qǐng)求,并開始將二進(jìn)制日志推送到slave(即運(yùn)河)。
  • 運(yùn)河將二進(jìn)制日志對(duì)象解析為其自己的數(shù)據(jù)類型(最初為字節(jié)流)

通過官網(wǎng)的介紹,讓我們了解到,canal實(shí)際上就是偽裝為了一個(gè)從庫(kù),我們只需要訂閱到數(shù)據(jù)變更的主庫(kù),那么canal就會(huì)以從庫(kù)的身份讀取到其主庫(kù)的binlog日志!我們拿到canal解析好的binlog日志信息,就等于拿到了變更的數(shù)據(jù)啦!…

這樣的話呢,我們即保證了不影響其系統(tǒng)數(shù)據(jù)庫(kù)正常使用,又不會(huì)侵入他的項(xiàng)目代碼,一舉兩得

ok,接下來開始實(shí)戰(zhàn)篇…

三、Canal安裝

(1)事前準(zhǔn)備

(1)數(shù)據(jù)庫(kù)開啟binlog

使用canal呢,有一個(gè)前提條件,即被訂閱的數(shù)據(jù)庫(kù)需要開啟binlog

如何查看是否開啟binlog呢?

登錄服務(wù)器上數(shù)據(jù)庫(kù)或在可視化工具中 執(zhí)行查詢語(yǔ)句: 如果出現(xiàn) log_bin ON 表示已開啟Binlog

show variables like 'log_bin'; 

image-20200927230024401

如果服務(wù)器上的數(shù)據(jù)庫(kù)為自己安裝的,則找到配置文件my.conf 添加以下內(nèi)容,如果買的云實(shí)例,則詢問廠商開啟即可

image-20200926164518333

在my.conf文件中的 [mysqld] 下添加以下三行內(nèi)容

log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式 讀行
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)

(2)數(shù)據(jù)庫(kù)新建賬號(hào),開啟MySQL slav權(quán)限

canaltest:作為slave 角色的賬戶 Canal123…:為密碼

CREATE USER canaltest IDENTIFIED BY 'Canal123..';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canaltest'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'canaltest'@'%' ;
FLUSH PRIVILEGES;

image-20200926165912070

連接測(cè)試

image-20200926170124796

那么到這里,準(zhǔn)備工作就好了!

可能呢,有的小伙伴有點(diǎn)懵,你這是在干啥?那么咱們就來理那么一理! 敲黑板了哈!

image-20200926170334603

1、事前準(zhǔn)備,是針對(duì)于訂閱數(shù)據(jù)庫(kù)的(即主庫(kù))

2、實(shí)際步驟也就兩步 1:更改配置,開啟binlog 2:設(shè)置新賬號(hào),賦予slave權(quán)限,供canal讀取Binlog橋梁使用

3、以上操作與canal本身沒啥關(guān)系,僅僅是使用canal的前提條件罷遼…

(2)Canal Admin 安裝

canal admin 是 一個(gè)可視化的 canal web管理運(yùn)維工程,脫離以往服務(wù)器運(yùn)維,面向web…

canal-admin設(shè)計(jì)上是為canal提供整體配置管理、節(jié)點(diǎn)運(yùn)維等面向運(yùn)維的功能,提供相對(duì)友好的WebUI操作界面,方便更多用戶快速和安全的操作

canal-admin的限定依賴:

  • MySQL,用于存儲(chǔ)配置和節(jié)點(diǎn)等相關(guān)數(shù)據(jù)
  • canal版本,要求>=1.1.4 (需要依賴canal-server提供面向admin的動(dòng)態(tài)運(yùn)維管理接口)
  • 需要JRE 環(huán)境 (安裝JDK)

下載

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解壓

mkdir /usr/local/canal-admin
tar zxvf canal.admin-1.1.4.tar.gz  -C /usr/local/canal-admin

進(jìn)入canal-admin目錄下查看

cd /usr/local/canal-admin

修改配置

vim conf/application.yml

里邊的配置 按照自己的實(shí)際情況更改…

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
#這里是配置canal-admin 所依賴的數(shù)據(jù)庫(kù),,,存放web管理中設(shè)置的配置等,,,
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root	
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
# 連接所用的賬戶密碼 
canal:
  adminUser: admin
  adminPasswd: leitest

導(dǎo)入canaladmin 所需要的數(shù)據(jù)庫(kù)文件

這里需要注意了,要和 application.yml中的數(shù)據(jù)庫(kù)名對(duì)應(yīng),你可以選擇命令導(dǎo)入,也可以Navicat 可視化拖sql文件導(dǎo)入…一切…看你喜歡.

我這個(gè)玩canal的服務(wù)器呢,是新安裝的,mysql直接用docker安裝即可,具體可查看我的博客:

Docker在CentOS7下不能下載鏡像timeout的解決辦法(圖解)

CentOS 7安裝Docker

需要注意的是,使用docker 安裝的mysql 是無法直接使用 mysql -uroot -p 命令的哦,需要先將腳本復(fù)制到容器中,docker不熟練或覺得麻煩的同鞋,請(qǐng)直接使用Navicat可視化工具…

導(dǎo)入canal-admin服務(wù)所必需的sql文件

如果是服務(wù)器軟件軟件安裝的mysql 則直接執(zhí)行以下命令即可

mysql -uroot -p
#.........
# 導(dǎo)入初始化SQL
> source conf/canal_manager.sql

image-20200926215220008

啟動(dòng)

直接執(zhí)行啟動(dòng)腳本即可

cd bin
./startup.sh

image-20200926220024950

默認(rèn)賬戶密碼:

admin:123456

image-20200926220129110

(3)Canal Server 安裝

canal-server 才是canal的核心我們前邊所講的canal的功能,實(shí)際上講述的就是canal-server的功能…admin 僅僅只是一個(gè)web管理而已,不要搞混主次關(guān)系…

下載

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解壓

mkdir /usr/local/canal-server
tar zxvf canal.deployer-1.1.4.tar.gz  -C /usr/local/canal-server

啟動(dòng),并連接到canal-admin web端

首先,我們需要修改配置文件

cd /usr/local/canal-server

vim /conf/canal_local.properties

image-20200926221410324

image-20200926221344585

注意了,密碼如何加密!??!

要記得,前邊 canal-admin 的 aplication.yml 中設(shè)置了賬戶密碼為 admin:leitest

# 連接所用的賬戶密碼 
canal:
  adminUser: admin
  adminPasswd: leitest

所以,我們這里需要對(duì)明文 leitest 加密并替換即可

使用數(shù)據(jù)庫(kù)函數(shù) PASSWORD 加密即可

SELECT PASSWORD(‘要加密的明文’),然后去掉前邊的* 號(hào)就行

image-20200926221859106

啟動(dòng)并連接到admin

sh bin/startup.sh local

查看端口看是否有 11110 、11111、11112

netstat -untlp 看了一下,發(fā)現(xiàn)沒有,說明server 沒有啟動(dòng)成功

image-20200926222317087

看下日志

vim logs/canal/canal.log

image-20200926222442734

解決辦法:

1、canal-admin 先停止后從起

2、canal server 先以之前的形式運(yùn)行,不輸入后邊 local 命令

3、關(guān)閉canal server

4、再以canal server 連接 admin 形式啟動(dòng)

image-20200926230725865

admin頁(yè)面上新建server

image-20200926230834367

修改配置,注釋 (instance連接信息,我們還是以前邊設(shè)置的 admin:leitest 為準(zhǔn),所有這里需要注釋掉,如果不注釋,那么我們代碼中連接則需要使用此賬號(hào)以及密碼)

image-20200926234156162

接下來咱們創(chuàng)建instance

如何理解server 和instance 呢,我認(rèn)為,可以把它當(dāng)做 java 中的 class 和 bean 即 類和對(duì)象

server 為類 instance 為其具體的實(shí)例對(duì)象 ,可創(chuàng)建多個(gè)不同的實(shí)例…

而我們這邊監(jiān)聽到主庫(kù)變化的呢,則是根據(jù)業(yè)務(wù),對(duì)不同的實(shí)例即(instance )做不同配置即可…

image-20200926231123562

image-20200926231135593

image-20200926231429803

image-20200926231516786

根據(jù)自己情況進(jìn)行過濾數(shù)據(jù)

canal.instance.filter.regexmysql 數(shù)據(jù)解析關(guān)注的表,Perl正則表達(dá)式.多個(gè)正則之間以逗號(hào)(,)分隔,轉(zhuǎn)義符需要雙斜杠(\) 常見例子:1. 所有表:.* or .\… 2. canal schema下所有表: canal\…* 3. canal下的以canal打頭的表:canal\.canal.* 4. canal schema下的一張表:canal\.test15. 多個(gè)規(guī)則組合使用:canal\…*,mysql.test1,mysql.test2 (逗號(hào)分隔)  
canal.instance.filter.druid.ddl是否使用druid處理所有的ddl解析來獲取庫(kù)和表名true 
canal.instance.filter.query.dcl是否忽略dcl語(yǔ)句false 
canal.instance.filter.query.dml是否忽略dml語(yǔ)句 (mysql5.6之后,在row模式下每條DML語(yǔ)句也會(huì)記錄SQL到binlog中,可參考MySQL文檔)false 
canal.instance.filter.query.ddl是否忽略ddl語(yǔ)句false 

更多設(shè)置請(qǐng)見官網(wǎng):https://github.com/alibaba/canal/wiki/AdminGuide

如此一來,一個(gè)簡(jiǎn)單的canal環(huán)境就搭建好了,接下來,咱們開始測(cè)試吧!

(4)springboot demo示例

引入canal所需依賴

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

配置

canal:
  # instance 實(shí)例所在ip
  host: 192.168.96.129
  # tcp通信端口
  port: 11111
  # 賬號(hào)  canal-admin application.yml 設(shè)置的
  username: admin
  # 密碼
  password: leitest
  #實(shí)例名稱
  instance: test

代碼

package com.leilei;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
/**
 * @author lei
 * @version 1.0
 * @date 2020/9/27 22:23
 * @desc 讀取binlog日志
 */
@Component
public class ReadBinLogService implements ApplicationRunner {
    @Value("${canal.host}")
    private String host;
    @Value("${canal.port}")
    private int port;
    @Value("${canal.username}")
    private String username;
    @Value("${canal.password}")
    private String password;
    @Value("${canal.instance}")
    private String instance;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        CanalConnector conn = getConn();
        while (true) {
            conn.connect();
            //訂閱實(shí)例中所有的數(shù)據(jù)庫(kù)和表
            conn.subscribe(".*\\..*");
            // 回滾到未進(jìn)行ack的地方
            conn.rollback();
            // 獲取數(shù)據(jù) 每次獲取一百條改變數(shù)據(jù)
            Message message = conn.getWithoutAck(100);
            long id = message.getId();
            int size = message.getEntries().size();
            if (id != -1 && size > 0) {
                // 數(shù)據(jù)解析
                analysis(message.getEntries());
            }else {
                Thread.sleep(1000);
            }
            // 確認(rèn)消息
            conn.ack(message.getId());
            // 關(guān)閉連接
            conn.disconnect();
        }
    }
    /**
     * 數(shù)據(jù)解析
     */
    private void analysis(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            // 只解析mysql事務(wù)的操作,其他的不解析
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {
                continue;
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            // 解析binlog
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("解析出現(xiàn)異常 data:" + entry.toString(), e);
            if (rowChange != null) {
                // 獲取操作類型
                CanalEntry.EventType eventType = rowChange.getEventType();
                // 獲取當(dāng)前操作所屬的數(shù)據(jù)庫(kù)
                String dbName = entry.getHeader().getSchemaName();
                // 獲取當(dāng)前操作所屬的表
                String tableName = entry.getHeader().getTableName();
                // 事務(wù)提交時(shí)間
                long timestamp = entry.getHeader().getExecuteTime();
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    dataDetails(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), dbName, tableName, eventType, timestamp);
                    System.out.println("-------------------------------------------------------------");
                }
     * 解析具體一條Binlog消息的數(shù)據(jù)
     *
     * @param dbName    當(dāng)前操作所屬數(shù)據(jù)庫(kù)名稱
     * @param tableName 當(dāng)前操作所屬表名稱
     * @param eventType 當(dāng)前操作類型(新增、修改、刪除)
    private static void dataDetails(List<CanalEntry.Column> beforeColumns,
                                    List<CanalEntry.Column> afterColumns,
                                    String dbName,
                                    String tableName,
                                    CanalEntry.EventType eventType,
                                    long timestamp) {
        System.out.println("數(shù)據(jù)庫(kù):" + dbName);
        System.out.println("表名:" + tableName);
        System.out.println("操作類型:" + eventType);
        if (CanalEntry.EventType.INSERT.equals(eventType)) {
            System.out.println("新增數(shù)據(jù):");
            printColumn(afterColumns);
        } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
            System.out.println("刪除數(shù)據(jù):");
            printColumn(beforeColumns);
        } else {
            System.out.println("更新數(shù)據(jù):更新前數(shù)據(jù)--");
            System.out.println("更新數(shù)據(jù):更新后數(shù)據(jù)--");
        System.out.println("操作時(shí)間:" + timestamp);
    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
     * 獲取連接
    public CanalConnector getConn() {
        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
}

測(cè)試查看

數(shù)據(jù)庫(kù)修改數(shù)據(jù)庫(kù)時(shí)

image-20200927222635688

數(shù)據(jù)新增數(shù)據(jù)時(shí)

image-20200927222734833

刪除數(shù)據(jù)(把我們才添加的小明刪掉)

image-20200927222834002

當(dāng)我們操作監(jiān)控的數(shù)據(jù)庫(kù)DM L操作的時(shí)候呢,會(huì)被canal監(jiān)聽到…我們呢,通過canal監(jiān)聽,拿到修改的庫(kù),修改的表,修改的字段,便可以根據(jù)自己業(yè)務(wù)進(jìn)行數(shù)據(jù)處理了!

哎,這個(gè)時(shí)候啊,可能有小伙伴就要問了,那么,我能不能直接獲取其操作的sql語(yǔ)句呢?

目前,我是自己解析其列來手動(dòng)拼接的sql語(yǔ)句實(shí)現(xiàn)了

話不多說,先上效果:

canal 監(jiān)聽到主庫(kù)sql變化----> update students set  id = '2', age = '999', name = '小三', city = '11', date = '2020-09-27 17:41:44', birth = '2020-09-27 18:00:48' where id=2
canal 監(jiān)聽到主庫(kù)sql變化----> delete from students where id=6
canal 監(jiān)聽到主庫(kù)sql變化----> insert into students (id,age,name,city,date,birth) VALUES ('89','98','測(cè)試新增','深圳','2020-09-27 22:46:53','')
canal 監(jiān)聽到主庫(kù)sql變化----> update students set  id = '89', age = '98', name = '測(cè)試新增', city = '深圳', date = '2020-09-27 22:46:53', birth = '2020-09-27 22:46:56' where id=89

image-20200927224716304

實(shí)際上呢,我們也就是拿到其執(zhí)行前列數(shù)據(jù)變化 執(zhí)行后列數(shù)據(jù)變化,自己拼接了一個(gè)sql罷了…附上代碼

package com.leilei;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * @author lei
 * @version 1.0
 * @date 2020/9/27 22:33
 * @desc 讀取binlog日志
 */
@Component
public class ReadBinLogToSql implements ApplicationRunner {
    //讀取的binlog sql 隊(duì)列緩存 一邊Push 一邊poll
    private Queue<String> canalQueue = new ConcurrentLinkedQueue<>();
    @Value("${canal.host}")
    private String host;
    @Value("${canal.port}")
    private int port;
    @Value("${canal.username}")
    private String username;
    @Value("${canal.password}")
    private String password;
    @Value("${canal.instance}")
    private String instance;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        CanalConnector conn = getConn();
        while (true) {
            try {
                conn.connect();
                //訂閱實(shí)例中所有的數(shù)據(jù)庫(kù)和表
                conn.subscribe(".*\\..*");
                // 回滾到未進(jìn)行ack的地方
                conn.rollback();
                // 獲取數(shù)據(jù) 每次獲取一百條改變數(shù)據(jù)
                Message message = conn.getWithoutAck(100);
                long id = message.getId();
                int size = message.getEntries().size();
                if (id != -1 && size > 0) {
                    // 數(shù)據(jù)解析
                    analysis(message.getEntries());
                } else {
                    Thread.sleep(1000);
                }
                // 確認(rèn)消息
                conn.ack(message.getId());
            } catch (CanalClientException | InvalidProtocolBufferException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 關(guān)閉連接
                conn.disconnect();
            }
        }
    }
    private void analysis(List<Entry> entries) throws InvalidProtocolBufferException {
        for (Entry entry : entries) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }
    /**
     * 保存更新語(yǔ)句
     *
     * @param entry
     */
    private void saveUpdateSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> dataList = rowChange.getRowDatasList();
            for (RowData rowData : dataList) {
                List<Column> afterColumnsList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " +
                        entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < afterColumnsList.size(); i++) {
                    sql.append(" ")
                            .append(afterColumnsList.get(i).getName())
                            .append(" = '").append(afterColumnsList.get(i).getValue())
                            .append("'");
                    if (i != afterColumnsList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        sql.append(column.getName()).append("=").append(column.getValue());
                        break;
                    }
                }
                canalQueue.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }
    /**
     * 保存刪除語(yǔ)句
     *
     * @param entry
     */
    private void saveDeleteSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " +
                        entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        sql.append(column.getName()).append("=").append(column.getValue());
                        break;
                    }
                }
                canalQueue.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }
    /**
     * 保存插入語(yǔ)句
     *
     * @param entry
     */
    private void saveInsertSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> datasList = rowChange.getRowDatasList();
            for (RowData rowData : datasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " +
                        entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                canalQueue.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }
    /**
     * 獲取連接
     */
    public CanalConnector getConn() {
        return CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
    }
    /**
     * 模擬消費(fèi)canal轉(zhuǎn)換的sql語(yǔ)句
     */
    public void executeQueueSql() {
        int size = canalQueue.size();
        for (int i = 0; i < size; i++) {
            String sql = canalQueue.poll();
            System.out.println("canal 監(jiān)聽到主庫(kù)sql變化----> " + sql);
        }
    }
}

當(dāng)然了,這只是簡(jiǎn)單的demo 演示,您可根據(jù)自己的業(yè)務(wù)進(jìn)行修改完善即可…

上邊的安裝步驟呢,我也是不斷的測(cè)試過,沒有問題,當(dāng)然可能或多或少有些坑沒有踩到,但是如果您按照我的步驟來,大概率是一馬平川的…

附上項(xiàng)目源碼:springboot-canal

到此這篇關(guān)于Springboot2.3.x整合Canal的文章就介紹到這了,更多相關(guān)Springboot2.3.x整合Canal內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java Spring詳解如何配置數(shù)據(jù)源注解開發(fā)以及整合Junit

    Java Spring詳解如何配置數(shù)據(jù)源注解開發(fā)以及整合Junit

    Spring 是目前主流的 Java Web 開發(fā)框架,是 Java 世界最為成功的框架。該框架是一個(gè)輕量級(jí)的開源框架,具有很高的凝聚力和吸引力,本篇文章帶你了解如何配置數(shù)據(jù)源、注解開發(fā)以及整合Junit
    2021-10-10
  • MyBatis如何配置多sql腳本執(zhí)行

    MyBatis如何配置多sql腳本執(zhí)行

    這篇文章主要介紹了MyBatis如何配置多sql腳本執(zhí)行問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Java中如何靈活獲取excel中的數(shù)據(jù)

    Java中如何靈活獲取excel中的數(shù)據(jù)

    這篇文章主要給大家介紹了關(guān)于Java中如何靈活獲取excel中的數(shù)據(jù),在日常工作中我們常常會(huì)進(jìn)行文件讀寫操作,除去我們最常用的純文本文件讀寫,更多時(shí)候我們需要對(duì)Excel中的數(shù)據(jù)進(jìn)行讀取操作,需要的朋友可以參考下
    2023-07-07
  • SSH框架網(wǎng)上商城項(xiàng)目第30戰(zhàn)之項(xiàng)目總結(jié)(附源碼下載地址)

    SSH框架網(wǎng)上商城項(xiàng)目第30戰(zhàn)之項(xiàng)目總結(jié)(附源碼下載地址)

    這篇文章主要介紹了SSH框架網(wǎng)上商城項(xiàng)目第30戰(zhàn)之項(xiàng)目總結(jié),并附源碼下載地址,感興趣的小伙伴們可以參考一下
    2016-06-06
  • Java基礎(chǔ)知識(shí)精通各種運(yùn)算符

    Java基礎(chǔ)知識(shí)精通各種運(yùn)算符

    計(jì)算機(jī)的最基本用途之一就是執(zhí)行數(shù)學(xué)運(yùn)算,作為一門計(jì)算機(jī)語(yǔ)言,Java也提供了一套豐富的運(yùn)算符來操縱變量,本篇對(duì)大家的學(xué)習(xí)或工作具有一定的價(jià)值,需要的朋友可以參考下
    2022-04-04
  • 優(yōu)化Java內(nèi)存管理來防止“GC”錯(cuò)誤的方法詳解

    優(yōu)化Java內(nèi)存管理來防止“GC”錯(cuò)誤的方法詳解

    垃圾回收(GC)是 Java 中的一個(gè)重要機(jī)制,它可以管理內(nèi)存并回收不再使用的對(duì)象所占用的資源,在本文中,我們將探討一些技巧,幫助您避免這一錯(cuò)誤,確保您的 Java 應(yīng)用程序順利運(yùn)行,需要的朋友可以參考下
    2023-11-11
  • Java虛擬機(jī)調(diào)用Java主類的main()方法

    Java虛擬機(jī)調(diào)用Java主類的main()方法

    這篇文章主要介紹了Java虛擬機(jī)調(diào)用Java主類的main()方法,前一篇文章我們介紹了關(guān)于Java虛擬機(jī)HotSpot
    2021-11-11
  • java讀取csv文件示例分享(java解析csv文件)

    java讀取csv文件示例分享(java解析csv文件)

    這篇文章主要介紹了java讀取csv文件示例,這個(gè)java解析csv文件的例子很簡(jiǎn)單,下面直接上代碼,大家參考使用吧
    2014-03-03
  • Spring Cloud Gateway替代zuul作為API網(wǎng)關(guān)的方法

    Spring Cloud Gateway替代zuul作為API網(wǎng)關(guān)的方法

    本文簡(jiǎn)要介紹如何使用Spring Cloud Gateway 作為API 網(wǎng)關(guān)(不是使用zuul作為網(wǎng)關(guān)),結(jié)合實(shí)例代碼給大家詳細(xì)講解,感興趣的朋友跟隨小編一起看看吧
    2023-02-02
  • Java中減少if-else的幾種方式

    Java中減少if-else的幾種方式

    if判斷語(yǔ)句是很多編程語(yǔ)言的重要組成部分,但是,若我們最終編寫了大量嵌套的if語(yǔ)句,這將使得我們的代碼更加復(fù)雜和難以維護(hù),本文主要介紹了Java中減少if-else的幾種方式,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-01-01

最新評(píng)論