欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何使用 Spring Boot 和 Canal 實現(xiàn) MySQL 數(shù)據(jù)庫同步

 更新時間:2025年02月18日 16:56:25   作者:歸宿樂瑤  
本文介紹了如何使用SpringBoot和Canal實現(xiàn)MySQL數(shù)據(jù)庫之間的數(shù)據(jù)同步,通過配置主庫、創(chuàng)建Canal用戶、配置CanalServer以及開發(fā)SpringBoot客戶端,實現(xiàn)了將主庫的數(shù)據(jù)實時同步到多個從庫,感興趣的朋友跟隨小編一起看看吧

前言

在分布式系統(tǒng)中,數(shù)據(jù)同步是一個常見的需求。例如,我們可能需要將主庫的數(shù)據(jù)實時同步到多個從庫,或者將數(shù)據(jù)從一個數(shù)據(jù)庫集群同步到另一個集群。本篇內(nèi)容通過一個實際案例,介紹如何使用 Spring Boot 和 Canal 實現(xiàn) MySQL 數(shù)據(jù)庫之間的數(shù)據(jù)同步。

一、背景

假設我們有以下數(shù)據(jù)庫架構(gòu):

  • 兩個主庫:db_1 和 db_2。
  • 每個主庫對應兩個從庫:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
  • 我們的目標是:
  • 將 db_1 的數(shù)據(jù)同步到 db_1_bk_1 和 db_1_bk_2。
  • 將 db_2 的數(shù)據(jù)同步到 db_2_bk_1 和 db_2_bk_2。

二、Canal 簡介

Canal 是阿里巴巴開源的一款基于 MySQL Binlog 的增量數(shù)據(jù)訂閱與分發(fā)工具。它通過模擬 MySQL 的從節(jié)點,實時捕獲主庫的 Binlog 日志,并將數(shù)據(jù)變更事件推送給下游消費者。Canal 支持多種下游適配器,如 Kafka、RabbitMQ 和直接消費。

三、主庫數(shù)據(jù)庫配置

1.主庫配置

為了使 Canal 能夠正常解析 Binlog 日志,主庫需要進行以下配置:

  • 開啟 Binlog 日志:確保主庫開啟了 Binlog 日志,并且設置為 ROW 模式。
  • 配置 server-id:為每個主庫設置唯一的 server-id。
  • 創(chuàng)建 Canal 用戶并授予權(quán)限:創(chuàng)建一個用戶供 Canal 使用,并授予必要的權(quán)限。

編輯主庫的配置文件(my.cnf 或 my.ini),添加以下內(nèi)容:

[mysqld]
# 開啟 Binlog 日志
log-bin=mysql-bin
# 設置 Binlog 格式為 ROW 模式
binlog-format=ROW
# 設置唯一的 server-id
server-id=1

注意:

  • 如果你有多個主庫,每個主庫的 server-id 必須是唯一的。
  • 修改配置后,需要重啟 MySQL 服務以使配置生效。

2.創(chuàng)建 Canal 用戶并授予權(quán)限

Canal 需要一個具有讀取 Binlog 權(quán)限的 MySQL 用戶。以下是創(chuàng)建用戶并授予權(quán)限的步驟:

# 登錄 MySQL
mysql -u root -p
# 創(chuàng)建用戶
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予權(quán)限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新權(quán)限
FLUSH PRIVILEGES;

說明:

  • canal 用戶需要足夠的權(quán)限來讀取 Binlog 數(shù)據(jù),但不需要對數(shù)據(jù)庫進行寫操作。
  • 如果你的 MySQL 版本較新(8.x),可能需要使用 ALTER USER 命令來設置密碼:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';

四.配置 Canal Server

Canal Server 是 Canal 的核心組件,負責連接主庫并解析 Binlog 數(shù)據(jù)。我們需要為每個主庫配置一個 Canal 實例。

1.Canal Server 配置文件

在 Canal Server 的配置目錄下,創(chuàng)建兩個實例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:

# 主庫的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 連接主庫的用戶名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正則表達式,這里表示同步 db_1 數(shù)據(jù)庫的所有表
canal.instance.filter.regex=db_1\\..*

conf/db_2/instance.properties:

# 主庫的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 連接主庫的用戶名和密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正則表達式,這里表示同步 db_2 數(shù)據(jù)庫的所有表
canal.instance.filter.regex=db_2\\..*

2.啟動 Canal Server

使用以下命令啟動 Canal Server:

nohup sh bin/canal.sh start &

注意:

  • 確保主庫的 Binlog 位置和文件名正確。如果不確定,可以通過 SHOW MASTER STATUS; 命令查看。
  • 如果主庫已經(jīng)運行了一段時間,需要指定 Binlog 的起始位置,避免重復同步舊數(shù)據(jù)。

五.開發(fā) Spring Boot 客戶端

Spring Boot 客戶端作為 Canal 的消息消費者,負責接收數(shù)據(jù)變更事件并同步到目標從庫。

1. 引入依賴

在 Spring Boot 項目的 pom.xml文件中,引入 Canal 客戶端依賴:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.8</version>
</dependency>

2. 配置 Canal 客戶端

application.yml 文件中,配置 Canal Server 的地址:

canal:
  server.ip: canal_server_ip
  server.port: 11111

3. 實現(xiàn)數(shù)據(jù)同步邏輯

創(chuàng)建一個 Canal 客戶端服務類,用于接收和處理數(shù)據(jù)變更事件。
CanalClientService.java:

@Service
public class CanalClientService {
    private final CanalConnector canalConnector;
    public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {
        this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");
    }
    @PostConstruct
    public void start() {
        canalConnector.connect();
        canalConnector.subscribe("db_1..*, db_2..*"); // 訂閱 db_1 和 db_2 的所有表
        new Thread(this::process).start();
    }
    private void process() {
        while (true) {
            Message message = canalConnector.getWithoutAck(100);
            long batchId = message.getId();
            if (batchId == -1 || message.getEntries().isEmpty()) {
                continue;
            }
            for (Entry entry : message.getEntries()) {
                handleData(entry);
            }
            canalConnector.ack(batchId);
        }
    }
    private void handleData(Entry entry) {
        String schemaName = entry.getHeader().getSchemaName(); // 數(shù)據(jù)庫名
        String tableName = entry.getHeader().getTableName();  // 表名
        EventType eventType = entry.getHeader().getEventType(); // 數(shù)據(jù)變更類型
        System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);
        // 根據(jù)來源數(shù)據(jù)庫同步到對應的從庫
        if ("db_1".equals(schemaName)) {
            syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");
        } else if ("db_2".equals(schemaName)) {
            syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");
        }
    }
    private void syncToBackupDbs(Entry entry, String... backupDbs) {
        // 根據(jù)事件類型同步到從庫
        if (entry.getHeader().getEventType() == EventType.INSERT) {
            for (String db : backupDbs) {
                syncInsert(entry, db);
            }
        } else if (entry.getHeader().getEventType() == EventType.UPDATE) {
            for (String db : backupDbs) {
                syncUpdate(entry, db);
            }
        } else if (entry.getHeader().getEventType() == EventType.DELETE) {
            for (String db : backupDbs) {
                syncDelete(entry, db);
            }
        }
    }
    private void syncInsert(Entry entry, String backupDb) {
        // 使用 MyBatis 將數(shù)據(jù)插入到對應的從庫
        System.out.println("INSERT into " + backupDb);
    }
    private void syncUpdate(Entry entry, String backupDb) {
        // 使用 MyBatis 將數(shù)據(jù)更新到對應的從庫
        System.out.println("UPDATE into " + backupDb);
    }
    private void syncDelete(Entry entry, String backupDb) {
        // 使用 MyBatis 將數(shù)據(jù)從對應的從庫刪除
        System.out.println("DELETE from " + backupDb);
    }
}

六.啟動并測試

  • 啟動 Canal Server。
  • 啟動 Spring Boot 應用。
  • 在主庫 db_1 或 db_2 中插入、更新或刪除數(shù)據(jù)。
  • 觀察從庫 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。

 七.注意事項

  • 數(shù)據(jù)一致性:確保從庫的數(shù)據(jù)與主庫保持一致??梢酝ㄟ^事務或鎖機制來避免沖突。
  • 性能優(yōu)化:如果數(shù)據(jù)量較大,建議結(jié)合中間件(如 Kafka)進行緩沖和負載均衡。
  • 錯誤處理:在同步過程中,需要處理網(wǎng)絡異常、數(shù)據(jù)庫連接異常等情況。
  • Canal Server 高可用:在生產(chǎn)環(huán)境中,建議部署 Canal Server 的集群,以提高系統(tǒng)的可用性。

八.總結(jié)

通過 Spring Boot 和 Canal,我們可以實現(xiàn) MySQL 數(shù)據(jù)庫之間的高效數(shù)據(jù)同步。Canal 提供了強大的 Binlog 解析能力,而 Spring Boot 則提供了靈活的開發(fā)框架,兩者結(jié)合可以輕松應對復雜的分布式數(shù)據(jù)同步需求。希望本文對你有所幫助,如果有任何問題,歡迎在評論區(qū)留言。

到此這篇關于使用 Spring Boot 和 Canal 實現(xiàn) MySQL 數(shù)據(jù)庫同步的文章就介紹到這了,更多相關Spring Boot MySQL 數(shù)據(jù)庫同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringBoot利用@Retryable注解實現(xiàn)接口重試

    SpringBoot利用@Retryable注解實現(xiàn)接口重試

    本文主要介紹了springboot如何利用@Retryable注解實現(xiàn)接口重試功能,文中示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-06-06
  • 詳解RestTemplate的三種使用方式

    詳解RestTemplate的三種使用方式

    這篇文章主要介紹了詳解RestTemplate的三種使用方式,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-10-10
  • Java語法基礎之選擇結(jié)構(gòu)的if語句、switch語句詳解

    Java語法基礎之選擇結(jié)構(gòu)的if語句、switch語句詳解

    這篇文章主要為大詳細介紹了Java語法基礎之選擇結(jié)構(gòu)的if語句、switch語句,感興趣的小伙伴們可以參考一下
    2016-09-09
  • Spring Boot maven框架搭建教程圖解

    Spring Boot maven框架搭建教程圖解

    這篇文章主要介紹了Spring Boot maven框架搭建教程圖解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-07-07
  • Springmvc文件上傳實現(xiàn)流程解析

    Springmvc文件上傳實現(xiàn)流程解析

    這篇文章主要介紹了Springmvc文件上傳實現(xiàn)流程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • 關于SpringCloud分布式系統(tǒng)中實現(xiàn)冪等性的幾種方式

    關于SpringCloud分布式系統(tǒng)中實現(xiàn)冪等性的幾種方式

    這篇文章主要介紹了關于SpringCloud分布式系統(tǒng)中實現(xiàn)冪等性的幾種方式,冪等函數(shù),或冪等方法,是指可以使用相同參數(shù)重復執(zhí)行,并能獲得相同結(jié)果的函數(shù),這些函數(shù)不會影響系統(tǒng)狀態(tài),也不用擔心重復執(zhí)行會對系統(tǒng)造成改變,需要的朋友可以參考下
    2023-10-10
  • Maven解決jar包版本沖突的4種方法詳解

    Maven解決jar包版本沖突的4種方法詳解

    這篇文章主要給大家介紹了關于Maven解決jar包版本沖突的4種方法代碼,maven工程要導入jar包的坐標,就必須要考慮解決jar包沖突,文中介紹的非常詳細,需要的朋友可以參考下
    2023-07-07
  • Java獲取調(diào)用當前方法的類名或方法名(棧堆信息)的四種方式舉例

    Java獲取調(diào)用當前方法的類名或方法名(棧堆信息)的四種方式舉例

    在Java編程中我們經(jīng)常需要在運行時獲取當前執(zhí)行的方法名稱,這在日志記錄、性能監(jiān)控、調(diào)試等方面非常有用,這篇文章主要給大家介紹了關于Java獲取調(diào)用當前方法的類名或方法名(棧堆信息)的四種方式,需要的朋友可以參考下
    2024-09-09
  • Java并發(fā)編程中的生產(chǎn)者與消費者模型簡述

    Java并發(fā)編程中的生產(chǎn)者與消費者模型簡述

    這篇文章主要介紹了Java并發(fā)編程中的生產(chǎn)者與消費者模型簡述,多線程并發(fā)是Java編程中最終要的部分之一,需要的朋友可以參考下
    2015-07-07
  • Java實現(xiàn)棧和隊列面試題

    Java實現(xiàn)棧和隊列面試題

    這篇文章主要介紹了Java實現(xiàn)棧和隊列的面試題,每個例題代碼實現(xiàn)非常詳細,每一個方法講解也很到位,特別適合參加Java面試的朋友閱讀。
    2015-09-09

最新評論