SpringBoot整合canal實(shí)現(xiàn)數(shù)據(jù)同步的示例代碼
一、前言
canal:阿里巴巴 MySQL binlog 增量訂閱&消費(fèi)組件
https://github.com/alibaba/canal
tips: 環(huán)境要求和配置參考 https://github.com/alibaba/canal/wiki/AdminGuide
這里額外提下Redis緩存和MySQL數(shù)據(jù)一致性解決方案
- 延時(shí)雙刪策略
- 異步更新緩存(基于訂閱binlog的同步機(jī)制)
- …
我們的canal即可作為MySQL binlog增量訂閱消費(fèi)組件+MQ消息隊(duì)列將增量數(shù)據(jù)更新到redis
二、docker-compose部署canal
tips: 詳情可查看 https://gitee.com/zhengqingya/docker-compose
# 準(zhǔn)備 git clone https://gitee.com/zhengqingya/docker-compose.git cd docker-compose/Liunx # 導(dǎo)入初始化SQL Liunx/canal/canal_admin/canal_manager.sql # 運(yùn)行 (tips:先修改配置文件信息) docker-compose -f docker-compose-canal.yml -p canal up -d
相關(guān)配置文件
docker-compose-canal.yml
# 可參考 https://github.com/alibaba/canal/wiki/QuickStart version: '3' # 網(wǎng)橋canal -> 方便相互通訊 networks: canal: services: canal_admin: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-admin:v1.1.5 # 原鏡像`canal/canal-admin:v1.1.5` container_name: canal_admin # 容器名為'canal_admin' restart: unless-stopped # 指定容器退出后的重啟策略為始終重啟,但是不考慮在Docker守護(hù)進(jìn)程啟動(dòng)時(shí)就已經(jīng)停止了的容器 volumes: # 數(shù)據(jù)卷掛載路徑設(shè)置,將本機(jī)目錄映射到容器目錄 - "./canal/canal-admin/bin/startup.sh:/home/admin/canal-admin/bin/startup.sh" - "./canal/canal-admin/logs:/home/admin/canal-admin/logs" environment: # 設(shè)置環(huán)境變量,相當(dāng)于docker run命令中的-e TZ: Asia/Shanghai LANG: en_US.UTF-8 canal.adminUser: admin canal.adminPasswd: 123456 spring.datasource.address: www.zhengqingya.com:3306 spring.datasource.database: canal_manager spring.datasource.username: root spring.datasource.password: root ports: - "8089:8089" networks: - canal canal_server: image: registry.cn-hangzhou.aliyuncs.com/zhengqing/canal-server:v1.1.5 # 原鏡像`canal/canal-server:v1.1.5` container_name: canal_server # 容器名為'canal_server' restart: unless-stopped # 指定容器退出后的重啟策略為始終重啟,但是不考慮在Docker守護(hù)進(jìn)程啟動(dòng)時(shí)就已經(jīng)停止了的容器 volumes: # 數(shù)據(jù)卷掛載路徑設(shè)置,將本機(jī)目錄映射到容器目錄 - "./canal/canal-server/logs:/home/admin/canal-server/logs" environment: # 設(shè)置環(huán)境變量,相當(dāng)于docker run命令中的-e TZ: Asia/Shanghai LANG: en_US.UTF-8 canal.register.ip: www.zhengqingya.com canal.admin.manager: canal_admin:8089 canal.admin.port: 11110 canal.admin.user: admin canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 ports: - "11110:11110" - "11111:11111" - "11112:11112" depends_on: - canal_admin links: - canal_admin networks: - canal
訪問(wèn)地址:http://ip地址:8089
默認(rèn)登錄賬號(hào)密碼:admin/123456
三、canal-admin可視化管理
tips: 頁(yè)面使用自行多點(diǎn)點(diǎn)就會(huì)了
^_^
將數(shù)據(jù)投遞到RabbitMQ配置為例,附帶些其中重要的配置信息
mq的交換機(jī)和隊(duì)列可通過(guò)后面java代碼自動(dòng)生成,無(wú)需自己手動(dòng)建立
1、canal.properties
# canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = rabbitMQ rabbitmq.host = www.zhengqingya.com:5672 rabbitmq.virtual.host = my_vhost rabbitmq.exchange = canal.exchange rabbitmq.username = admin rabbitmq.password = admin
2、example/instance.propertios
canal.instance.master.address=www.zhengqingya.com:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # table regex 只同步test數(shù)據(jù)庫(kù)下的t_user表 canal.instance.filter.regex=test\\.t_user # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # mq config canal.mq.topic=canal_routing_key
四、springboot整合canal實(shí)現(xiàn)數(shù)據(jù)同步
1、pom.xml
引入mq依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、application.yml
配置
# RabbitMQ配置 spring: rabbitmq: addresses: www.zhengqingya.com:5672,www.zhengqingya.com:5673 # 指定client連接到的server的地址,多個(gè)以逗號(hào)分隔 # 填寫自己安裝rabbitmq時(shí)設(shè)置的賬號(hào)密碼,默認(rèn)賬號(hào)密碼為`guest` username: admin password: admin virtual-host: my_vhost # 填寫自己的虛擬機(jī)名,對(duì)應(yīng)可查看 `127.0.0.1:15672/#/users` 下Admin中的`Can access virtual hosts`信息
3、mq監(jiān)聽(tīng)canal消息數(shù)據(jù)
@Slf4j @Component public class CanalRabbitMqListener { @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"), exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE), key = MqConstant.CANAL_ROUTING_KEY ) }) public void handleCanalDataChange(String message) { log.info("[canal] 接收消息: {}", JSON.toJSONString(message)); } }
public interface MqConstant { String CANAL_EXCHANGE = "canal.exchange"; String CANAL_QUEUE = "canal_queue"; String CANAL_ROUTING_KEY = "canal_routing_key"; }
4、測(cè)試
先啟動(dòng)項(xiàng)目讓程序自動(dòng)建立所需mq中的交換機(jī)和隊(duì)列
再修改canal監(jiān)聽(tīng)的表數(shù)據(jù)
查看程序監(jiān)聽(tīng)的mq消息數(shù)據(jù)如下,拿到數(shù)據(jù)就可以進(jìn)行數(shù)據(jù)解析處理了…
五、canal-spring-boot-starter
tips: 可參考 https://github.com/NormanGyllenhaal/canal-client
此方式需將canal.properties
配置文件中的canal.serverMode
屬性值修改為tcp
1、pom.xml
中引入依賴
<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter --> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency>
2、application.yml
配置
canal: server: www.zhengqingya.com:11111 destination: example # tips:canal-admin中Instance管理下需存在example實(shí)例配置
3、監(jiān)聽(tīng)canal數(shù)據(jù) – 訂閱數(shù)據(jù)庫(kù)增刪改操作
@Slf4j @Component @CanalTable(value = "t_user") public class UserHandler implements EntryHandler<User> { @Override public void insert(User user) { log.info("insert message {}", user); } @Override public void update(User before, User after) { log.info("update before {} ", before); log.info("update after {}", after); } @Override public void delete(User user) { log.info("delete {}", user); } }
@Data @Table(name = "t_user") public class User implements Serializable { /** * 主鍵 */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) @Column(name = "user_id") private Integer userId; /** * 用戶名 */ @Column(name = "username") private String username; /** * 密碼 */ @Column(name = "password") private String password; /** * 性別 */ @Column(name = "sex") private Integer sex; /** * 備注 */ private String remark; /** * 時(shí)間 */ private Date date; }
經(jīng)測(cè)試發(fā)現(xiàn)這個(gè)jar存在一些bug,ex:針對(duì)表字段,數(shù)據(jù)原本為空,修改為有值的時(shí)候,如果java這邊用非String字段類型去接收會(huì)報(bào)錯(cuò)!
本文案例demo源碼
https://gitee.com/zhengqingya/java-workspace
到此這篇關(guān)于SpringBoot整合canal實(shí)現(xiàn)數(shù)據(jù)同步的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot canal數(shù)據(jù)同步內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲(chóng)的實(shí)例代碼
這篇文章主要介紹了springBoot+webMagic實(shí)現(xiàn)網(wǎng)站爬蟲(chóng)的實(shí)例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05解決@Scheduled定時(shí)器使用@Thransactional事物問(wèn)題
這篇文章主要介紹了解決@Scheduled定時(shí)器使用@Thransactional事物問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08Spring中的事件監(jiān)聽(tīng)器使用學(xué)習(xí)記錄
Spring框架中的事件監(jiān)聽(tīng)機(jī)制是一種設(shè)計(jì)模式,它允許你定義和觸發(fā)事件,同時(shí)允許其他組件監(jiān)聽(tīng)這些事件并在事件發(fā)生時(shí)作出響應(yīng),這篇文章主要介紹了Spring中的事件監(jiān)聽(tīng)器使用學(xué)習(xí),需要的朋友可以參考下2024-07-07java為什么會(huì)出現(xiàn)精度丟失這種現(xiàn)象你知道嗎
這篇文章主要介紹了Java精度丟失的問(wèn)題,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下,希望能夠給你帶來(lái)幫助2021-08-08springboot使用swagger-ui 2.10.5 有關(guān)版本更新帶來(lái)的問(wèn)題小結(jié)
這篇文章主要介紹了springboot使用swagger-ui 2.10.5 有關(guān)版本更新帶來(lái)的問(wèn)題小結(jié),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12