SpringBoot整合canal實(shí)現(xiàn)數(shù)據(jù)同步的示例代碼
一、前言
canal:阿里巴巴 MySQL binlog 增量訂閱&消費(fèi)組件
https://github.com/alibaba/canal
tips: 環(huán)境要求和配置參考 https://github.com/alibaba/canal/wiki/AdminGuide
這里額外提下Redis緩存和MySQL數(shù)據(jù)一致性解決方案
- 延時(shí)雙刪策略
- 異步更新緩存(基于訂閱binlog的同步機(jī)制)
- …
我們的canal即可作為MySQL binlog增量訂閱消費(fèi)組件+MQ消息隊(duì)列將增量數(shù)據(jù)更新到redis
二、docker-compose部署canal
tips: 詳情可查看 https://gitee.com/zhengqingya/docker-compose
# 準(zhǔn)備 git clone https://gitee.com/zhengqingya/docker-compose.git cd docker-compose/Liunx # 導(dǎo)入初始化SQL Liunx/canal/canal_admin/canal_manager.sql # 運(yùn)行 (tips:先修改配置文件信息) docker-compose -f docker-compose-canal.yml -p canal up -d
相關(guān)配置文件

docker-compose-canal.yml
# 可參考 https://github.com/alibaba/canal/wiki/QuickStart
version: '3'
# 網(wǎng)橋canal -> 方便相互通訊
networks:
canal:
services:
canal_admin:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-admin:v1.1.5 # 原鏡像`canal/canal-admin:v1.1.5`
container_name: canal_admin # 容器名為'canal_admin'
restart: unless-stopped # 指定容器退出后的重啟策略為始終重啟,但是不考慮在Docker守護(hù)進(jìn)程啟動時(shí)就已經(jīng)停止了的容器
volumes: # 數(shù)據(jù)卷掛載路徑設(shè)置,將本機(jī)目錄映射到容器目錄
- "./canal/canal-admin/bin/startup.sh:/home/admin/canal-admin/bin/startup.sh"
- "./canal/canal-admin/logs:/home/admin/canal-admin/logs"
environment: # 設(shè)置環(huán)境變量,相當(dāng)于docker run命令中的-e
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.adminUser: admin
canal.adminPasswd: 123456
spring.datasource.address: www.zhengqingya.com:3306
spring.datasource.database: canal_manager
spring.datasource.username: root
spring.datasource.password: root
ports:
- "8089:8089"
networks:
- canal
canal_server:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-server:v1.1.5 # 原鏡像`canal/canal-server:v1.1.5`
container_name: canal_server # 容器名為'canal_server'
restart: unless-stopped # 指定容器退出后的重啟策略為始終重啟,但是不考慮在Docker守護(hù)進(jìn)程啟動時(shí)就已經(jīng)停止了的容器
volumes: # 數(shù)據(jù)卷掛載路徑設(shè)置,將本機(jī)目錄映射到容器目錄
- "./canal/canal-server/logs:/home/admin/canal-server/logs"
environment: # 設(shè)置環(huán)境變量,相當(dāng)于docker run命令中的-e
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.register.ip: www.zhengqingya.com
canal.admin.manager: canal_admin:8089
canal.admin.port: 11110
canal.admin.user: admin
canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
ports:
- "11110:11110"
- "11111:11111"
- "11112:11112"
depends_on:
- canal_admin
links:
- canal_admin
networks:
- canal
訪問地址:http://ip地址:8089
默認(rèn)登錄賬號密碼:admin/123456


三、canal-admin可視化管理
tips: 頁面使用自行多點(diǎn)點(diǎn)就會了
^_^
將數(shù)據(jù)投遞到RabbitMQ配置為例,附帶些其中重要的配置信息
mq的交換機(jī)和隊(duì)列可通過后面java代碼自動生成,無需自己手動建立
1、canal.properties
# canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = rabbitMQ rabbitmq.host = www.zhengqingya.com:5672 rabbitmq.virtual.host = my_vhost rabbitmq.exchange = canal.exchange rabbitmq.username = admin rabbitmq.password = admin
2、example/instance.propertios
canal.instance.master.address=www.zhengqingya.com:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # table regex 只同步test數(shù)據(jù)庫下的t_user表 canal.instance.filter.regex=test\\.t_user # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # mq config canal.mq.topic=canal_routing_key
四、springboot整合canal實(shí)現(xiàn)數(shù)據(jù)同步
1、pom.xml引入mq依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml配置
# RabbitMQ配置
spring:
rabbitmq:
addresses: www.zhengqingya.com:5672,www.zhengqingya.com:5673 # 指定client連接到的server的地址,多個(gè)以逗號分隔
# 填寫自己安裝rabbitmq時(shí)設(shè)置的賬號密碼,默認(rèn)賬號密碼為`guest`
username: admin
password: admin
virtual-host: my_vhost # 填寫自己的虛擬機(jī)名,對應(yīng)可查看 `127.0.0.1:15672/#/users` 下Admin中的`Can access virtual hosts`信息
3、mq監(jiān)聽canal消息數(shù)據(jù)
@Slf4j
@Component
public class CanalRabbitMqListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),
exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),
key = MqConstant.CANAL_ROUTING_KEY
)
})
public void handleCanalDataChange(String message) {
log.info("[canal] 接收消息: {}", JSON.toJSONString(message));
}
}
public interface MqConstant {
String CANAL_EXCHANGE = "canal.exchange";
String CANAL_QUEUE = "canal_queue";
String CANAL_ROUTING_KEY = "canal_routing_key";
}
4、測試
先啟動項(xiàng)目讓程序自動建立所需mq中的交換機(jī)和隊(duì)列

再修改canal監(jiān)聽的表數(shù)據(jù)

查看程序監(jiān)聽的mq消息數(shù)據(jù)如下,拿到數(shù)據(jù)就可以進(jìn)行數(shù)據(jù)解析處理了…

五、canal-spring-boot-starter
tips: 可參考 https://github.com/NormanGyllenhaal/canal-client
此方式需將canal.properties配置文件中的canal.serverMode屬性值修改為tcp

1、pom.xml中引入依賴
<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2、application.yml配置
canal: server: www.zhengqingya.com:11111 destination: example # tips:canal-admin中Instance管理下需存在example實(shí)例配置
3、監(jiān)聽canal數(shù)據(jù) – 訂閱數(shù)據(jù)庫增刪改操作
@Slf4j
@Component
@CanalTable(value = "t_user")
public class UserHandler implements EntryHandler<User> {
@Override
public void insert(User user) {
log.info("insert message {}", user);
}
@Override
public void update(User before, User after) {
log.info("update before {} ", before);
log.info("update after {}", after);
}
@Override
public void delete(User user) {
log.info("delete {}", user);
}
}
@Data
@Table(name = "t_user")
public class User implements Serializable {
/**
* 主鍵
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "user_id")
private Integer userId;
/**
* 用戶名
*/
@Column(name = "username")
private String username;
/**
* 密碼
*/
@Column(name = "password")
private String password;
/**
* 性別
*/
@Column(name = "sex")
private Integer sex;
/**
* 備注
*/
private String remark;
/**
* 時(shí)間
*/
private Date date;
}
經(jīng)測試發(fā)現(xiàn)這個(gè)jar存在一些bug,ex:針對表字段,數(shù)據(jù)原本為空,修改為有值的時(shí)候,如果java這邊用非String字段類型去接收會報(bào)錯!

本文案例demo源碼
https://gitee.com/zhengqingya/java-workspace
到此這篇關(guān)于SpringBoot整合canal實(shí)現(xiàn)數(shù)據(jù)同步的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot canal數(shù)據(jù)同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲的實(shí)例代碼
這篇文章主要介紹了springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲的實(shí)例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
解決@Scheduled定時(shí)器使用@Thransactional事物問題
這篇文章主要介紹了解決@Scheduled定時(shí)器使用@Thransactional事物問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08
Spring中的事件監(jiān)聽器使用學(xué)習(xí)記錄
Spring框架中的事件監(jiān)聽機(jī)制是一種設(shè)計(jì)模式,它允許你定義和觸發(fā)事件,同時(shí)允許其他組件監(jiān)聽這些事件并在事件發(fā)生時(shí)作出響應(yīng),這篇文章主要介紹了Spring中的事件監(jiān)聽器使用學(xué)習(xí),需要的朋友可以參考下2024-07-07
java為什么會出現(xiàn)精度丟失這種現(xiàn)象你知道嗎
這篇文章主要介紹了Java精度丟失的問題,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下,希望能夠給你帶來幫助2021-08-08
springboot使用swagger-ui 2.10.5 有關(guān)版本更新帶來的問題小結(jié)
這篇文章主要介紹了springboot使用swagger-ui 2.10.5 有關(guān)版本更新帶來的問題小結(jié),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12

