如何使用 Spring Boot 和 Canal 實現(xiàn) MySQL 數(shù)據(jù)庫同步
前言
在分布式系統(tǒng)中,數(shù)據(jù)同步是一個常見的需求。例如,我們可能需要將主庫的數(shù)據(jù)實時同步到多個從庫,或者將數(shù)據(jù)從一個數(shù)據(jù)庫集群同步到另一個集群。本篇內(nèi)容通過一個實際案例,介紹如何使用 Spring Boot 和 Canal 實現(xiàn) MySQL 數(shù)據(jù)庫之間的數(shù)據(jù)同步。
一、背景
假設我們有以下數(shù)據(jù)庫架構(gòu):
- 兩個主庫:db_1 和 db_2。
- 每個主庫對應兩個從庫:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
- 我們的目標是:
- 將 db_1 的數(shù)據(jù)同步到 db_1_bk_1 和 db_1_bk_2。
- 將 db_2 的數(shù)據(jù)同步到 db_2_bk_1 和 db_2_bk_2。
二、Canal 簡介
Canal 是阿里巴巴開源的一款基于 MySQL Binlog 的增量數(shù)據(jù)訂閱與分發(fā)工具。它通過模擬 MySQL 的從節(jié)點,實時捕獲主庫的 Binlog 日志,并將數(shù)據(jù)變更事件推送給下游消費者。Canal 支持多種下游適配器,如 Kafka、RabbitMQ 和直接消費。
三、主庫數(shù)據(jù)庫配置
1.主庫配置
為了使 Canal 能夠正常解析 Binlog 日志,主庫需要進行以下配置:
- 開啟 Binlog 日志:確保主庫開啟了 Binlog 日志,并且設置為 ROW 模式。
- 配置 server-id:為每個主庫設置唯一的 server-id。
- 創(chuàng)建 Canal 用戶并授予權(quán)限:創(chuàng)建一個用戶供 Canal 使用,并授予必要的權(quán)限。
編輯主庫的配置文件(my.cnf 或 my.ini),添加以下內(nèi)容:
[mysqld] # 開啟 Binlog 日志 log-bin=mysql-bin # 設置 Binlog 格式為 ROW 模式 binlog-format=ROW # 設置唯一的 server-id server-id=1
注意:
- 如果你有多個主庫,每個主庫的 server-id 必須是唯一的。
- 修改配置后,需要重啟 MySQL 服務以使配置生效。
2.創(chuàng)建 Canal 用戶并授予權(quán)限
Canal 需要一個具有讀取 Binlog 權(quán)限的 MySQL 用戶。以下是創(chuàng)建用戶并授予權(quán)限的步驟:
# 登錄 MySQL mysql -u root -p # 創(chuàng)建用戶 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; # 授予權(quán)限 GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal'; # 刷新權(quán)限 FLUSH PRIVILEGES;
說明:
- canal 用戶需要足夠的權(quán)限來讀取 Binlog 數(shù)據(jù),但不需要對數(shù)據(jù)庫進行寫操作。
- 如果你的 MySQL 版本較新(8.x),可能需要使用 ALTER USER 命令來設置密碼:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';
四.配置 Canal Server
Canal Server 是 Canal 的核心組件,負責連接主庫并解析 Binlog 數(shù)據(jù)。我們需要為每個主庫配置一個 Canal 實例。
1.Canal Server 配置文件
在 Canal Server 的配置目錄下,創(chuàng)建兩個實例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。conf/db_1/instance.properties:
# 主庫的地址和端口 canal.instance.master.address=db_1_ip:3306 # Canal 連接主庫的用戶名和密碼 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 需要同步的表正則表達式,這里表示同步 db_1 數(shù)據(jù)庫的所有表 canal.instance.filter.regex=db_1\\..*
conf/db_2/instance.properties:
# 主庫的地址和端口 canal.instance.master.address=db_2_ip:3306 # Canal 連接主庫的用戶名和密碼 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 需要同步的表正則表達式,這里表示同步 db_2 數(shù)據(jù)庫的所有表 canal.instance.filter.regex=db_2\\..*
2.啟動 Canal Server
使用以下命令啟動 Canal Server:
nohup sh bin/canal.sh start &
注意:
- 確保主庫的 Binlog 位置和文件名正確。如果不確定,可以通過 SHOW MASTER STATUS; 命令查看。
- 如果主庫已經(jīng)運行了一段時間,需要指定 Binlog 的起始位置,避免重復同步舊數(shù)據(jù)。
五.開發(fā) Spring Boot 客戶端
Spring Boot 客戶端作為 Canal 的消息消費者,負責接收數(shù)據(jù)變更事件并同步到目標從庫。
1. 引入依賴
在 Spring Boot 項目的 pom.xml
文件中,引入 Canal 客戶端依賴:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.8</version> </dependency>
2. 配置 Canal 客戶端
在 application.yml
文件中,配置 Canal Server 的地址:
canal: server.ip: canal_server_ip server.port: 11111
3. 實現(xiàn)數(shù)據(jù)同步邏輯
創(chuàng)建一個 Canal 客戶端服務類,用于接收和處理數(shù)據(jù)變更事件。CanalClientService.java:
@Service public class CanalClientService { private final CanalConnector canalConnector; public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) { this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", ""); } @PostConstruct public void start() { canalConnector.connect(); canalConnector.subscribe("db_1..*, db_2..*"); // 訂閱 db_1 和 db_2 的所有表 new Thread(this::process).start(); } private void process() { while (true) { Message message = canalConnector.getWithoutAck(100); long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) { continue; } for (Entry entry : message.getEntries()) { handleData(entry); } canalConnector.ack(batchId); } } private void handleData(Entry entry) { String schemaName = entry.getHeader().getSchemaName(); // 數(shù)據(jù)庫名 String tableName = entry.getHeader().getTableName(); // 表名 EventType eventType = entry.getHeader().getEventType(); // 數(shù)據(jù)變更類型 System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType); // 根據(jù)來源數(shù)據(jù)庫同步到對應的從庫 if ("db_1".equals(schemaName)) { syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2"); } else if ("db_2".equals(schemaName)) { syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2"); } } private void syncToBackupDbs(Entry entry, String... backupDbs) { // 根據(jù)事件類型同步到從庫 if (entry.getHeader().getEventType() == EventType.INSERT) { for (String db : backupDbs) { syncInsert(entry, db); } } else if (entry.getHeader().getEventType() == EventType.UPDATE) { for (String db : backupDbs) { syncUpdate(entry, db); } } else if (entry.getHeader().getEventType() == EventType.DELETE) { for (String db : backupDbs) { syncDelete(entry, db); } } } private void syncInsert(Entry entry, String backupDb) { // 使用 MyBatis 將數(shù)據(jù)插入到對應的從庫 System.out.println("INSERT into " + backupDb); } private void syncUpdate(Entry entry, String backupDb) { // 使用 MyBatis 將數(shù)據(jù)更新到對應的從庫 System.out.println("UPDATE into " + backupDb); } private void syncDelete(Entry entry, String backupDb) { // 使用 MyBatis 將數(shù)據(jù)從對應的從庫刪除 System.out.println("DELETE from " + backupDb); } }
六.啟動并測試
- 啟動 Canal Server。
- 啟動 Spring Boot 應用。
- 在主庫 db_1 或 db_2 中插入、更新或刪除數(shù)據(jù)。
- 觀察從庫 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。
七.注意事項
- 數(shù)據(jù)一致性:確保從庫的數(shù)據(jù)與主庫保持一致??梢酝ㄟ^事務或鎖機制來避免沖突。
- 性能優(yōu)化:如果數(shù)據(jù)量較大,建議結(jié)合中間件(如 Kafka)進行緩沖和負載均衡。
- 錯誤處理:在同步過程中,需要處理網(wǎng)絡異常、數(shù)據(jù)庫連接異常等情況。
- Canal Server 高可用:在生產(chǎn)環(huán)境中,建議部署 Canal Server 的集群,以提高系統(tǒng)的可用性。
八.總結(jié)
通過 Spring Boot 和 Canal,我們可以實現(xiàn) MySQL 數(shù)據(jù)庫之間的高效數(shù)據(jù)同步。Canal 提供了強大的 Binlog 解析能力,而 Spring Boot 則提供了靈活的開發(fā)框架,兩者結(jié)合可以輕松應對復雜的分布式數(shù)據(jù)同步需求。希望本文對你有所幫助,如果有任何問題,歡迎在評論區(qū)留言。
到此這篇關于使用 Spring Boot 和 Canal 實現(xiàn) MySQL 數(shù)據(jù)庫同步的文章就介紹到這了,更多相關Spring Boot MySQL 數(shù)據(jù)庫同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot利用@Retryable注解實現(xiàn)接口重試
本文主要介紹了springboot如何利用@Retryable注解實現(xiàn)接口重試功能,文中示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-06-06Java語法基礎之選擇結(jié)構(gòu)的if語句、switch語句詳解
這篇文章主要為大詳細介紹了Java語法基礎之選擇結(jié)構(gòu)的if語句、switch語句,感興趣的小伙伴們可以參考一下2016-09-09關于SpringCloud分布式系統(tǒng)中實現(xiàn)冪等性的幾種方式
這篇文章主要介紹了關于SpringCloud分布式系統(tǒng)中實現(xiàn)冪等性的幾種方式,冪等函數(shù),或冪等方法,是指可以使用相同參數(shù)重復執(zhí)行,并能獲得相同結(jié)果的函數(shù),這些函數(shù)不會影響系統(tǒng)狀態(tài),也不用擔心重復執(zhí)行會對系統(tǒng)造成改變,需要的朋友可以參考下2023-10-10Java獲取調(diào)用當前方法的類名或方法名(棧堆信息)的四種方式舉例
在Java編程中我們經(jīng)常需要在運行時獲取當前執(zhí)行的方法名稱,這在日志記錄、性能監(jiān)控、調(diào)試等方面非常有用,這篇文章主要給大家介紹了關于Java獲取調(diào)用當前方法的類名或方法名(棧堆信息)的四種方式,需要的朋友可以參考下2024-09-09Java并發(fā)編程中的生產(chǎn)者與消費者模型簡述
這篇文章主要介紹了Java并發(fā)編程中的生產(chǎn)者與消費者模型簡述,多線程并發(fā)是Java編程中最終要的部分之一,需要的朋友可以參考下2015-07-07