欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java使用Canal同步MySQL數(shù)據(jù)到Redis

 更新時(shí)間:2024年11月08日 09:59:10   作者:binbinxyz  
在現(xiàn)代微服務(wù)架構(gòu)中,數(shù)據(jù)同步是一個(gè)常見的需求,特別是將?MySQL?數(shù)據(jù)實(shí)時(shí)同步到?Redis,下面我們就來看看Java如何使用Canal同步MySQL數(shù)據(jù)到Redis吧

一、引言

在現(xiàn)代微服務(wù)架構(gòu)中,數(shù)據(jù)同步是一個(gè)常見的需求。特別是將 MySQL 數(shù)據(jù)實(shí)時(shí)同步到 Redis,可以顯著提升應(yīng)用的性能和響應(yīng)速度。本文將詳細(xì)介紹如何使用 Canal 實(shí)現(xiàn)這一目標(biāo)。Canal 是阿里巴巴開源的一個(gè)數(shù)據(jù)庫 Binlog 同步工具,可以實(shí)時(shí)捕獲 MySQL 的 Binlog 日志并將其同步到其他存儲(chǔ)系統(tǒng)。

項(xiàng)目地址:alibaba/canal

二、工作原理

1. MySQL主備復(fù)制原理

MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events,可以通過 show binlog events 進(jìn)行查看)

MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)

MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

2. canal 工作原理

canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議

MySQL master 收到 dump 請(qǐng)求,開始推送 binary log 給 slave (即 canal )

canal 解析 binary log 對(duì)象(原始為 byte 流)

三、環(huán)境準(zhǔn)備

1. 安裝和配置 MySQL

Canal的原理是基于mysql binlog技術(shù),所以這里一定需要開啟mysql的binlog寫入功能,并且配置binlog模式為row。編輯 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下內(nèi)容:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

重啟 MySQL 服務(wù)以使配置生效:

sudo service mysql restart

2. 安裝和配置 Canal

下載并解壓 Canal 服務(wù)端:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
cd /opt/canal

編輯 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服務(wù)器的相關(guān)信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

啟動(dòng) Canal 服務(wù):

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

如果啟動(dòng)失敗,注意檢查配置文件conf/example/instance.properties的內(nèi)容,還要注意JDK版本及配置。建議使用1.6.25。我用openjdk 21啟動(dòng)報(bào)錯(cuò),改回JDK8u421啟動(dòng)成功。

3. 安裝和配置 Redis

確保 Redis 服務(wù)已經(jīng)安裝并啟動(dòng)??梢栽?Redis 客戶端中執(zhí)行以下命令檢查:

redis-cli
ping

四、開發(fā) Java 應(yīng)用

1. 添加依賴

在你的 pom.xml 文件中添加 Canal 客戶端和 Redis 客戶端的依賴。以下是一個(gè)示例:

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.1.5</version>
    </dependency>
</dependencies>

2. 編寫 Canal 客戶端代碼

創(chuàng)建一個(gè) Java 類來連接 Canal 服務(wù)并處理 Binlog 事件,將數(shù)據(jù)同步到 Redis:

package org.hbin.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalToRedisSync {

    public static void main(String[] args) {
        // 創(chuàng)建 Canal 連接
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", "");

        // 連接到 Canal 服務(wù)
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();

        // 創(chuàng)建 Redis 客戶端
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        while (true) {
            Message message = connector.getWithoutAck(100); // 獲取最多 100 條記錄
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                handleEntry(message.getEntries(), jedis);
            }

            connector.ack(batchId); // 提交確認(rèn)
            // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
        }
    }

    private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else {
                    System.out.println("-------> before");
                    syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                    System.out.println("-------> after");
                    syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                }
            }
        }
    }

    private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Insert: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Update: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            }
        }
        System.out.println("Delete: " + key.toString());
        jedis.hdel(schema + ":" + table, key.toString());
    }
}

3. 運(yùn)行和測(cè)試

3.1 啟動(dòng) Canal 服務(wù):

sh /opt/canal/bin/startup.sh

3.2 啟動(dòng) Redis 服務(wù):

確保 Redis 服務(wù)已經(jīng)啟動(dòng),可以在 Redis 客戶端中執(zhí)行以下命令檢查:

redis-cli
ping

3.3 啟動(dòng) Java 應(yīng)用:

編譯并運(yùn)行上述 Java 應(yīng)用,確保 Canal 服務(wù)和 MySQL 服務(wù)器正常運(yùn)行。

3.4 測(cè)試數(shù)據(jù)同步:

在 MySQL 中插入、更新或刪除數(shù)據(jù),觀察 Java 應(yīng)用是否能夠?qū)崟r(shí)捕獲這些變化并將數(shù)據(jù)同步到 Redis。

相關(guān)SQL如下:

drop database if exists canal;
create database canal;
use canal;

drop table if exists user;
create table user(
  `id` bigint AUTO_INCREMENT primary key,
  `name` varchar(20) NOT NULL,
  `age` tinyint DEFAULT 0,
  `detail` varchar(100) DEFAULT '',
  `create_time` date,
  `update_time` date
);

insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07');
update user set age=26 where id=2;
delete from user where id=3;

輸出信息:

================> binlog[binlog.000008:6390] , name[canal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERT
Insert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERT
Insert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERT
Insert: 3 -> name:Tom3,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE
-------> before
Update: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
-------> after
Update: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETE
Delete: 3

五、注意事項(xiàng)

性能優(yōu)化:根據(jù)實(shí)際需求調(diào)整 Canal 和 Redis 的配置,以優(yōu)化性能。

錯(cuò)誤處理:在生產(chǎn)環(huán)境中,需要增加錯(cuò)誤處理和重試機(jī)制,確保數(shù)據(jù)同步的可靠性。

安全性:確保 Canal 和 Redis 的連接是安全的,使用適當(dāng)?shù)恼J(rèn)證和授權(quán)機(jī)制。

六、結(jié)論

通過使用 Canal,我們可以輕松地將 MySQL 數(shù)據(jù)實(shí)時(shí)同步到 Redis、Kafka 或其他系統(tǒng)。這不僅提高了數(shù)據(jù)的一致性和實(shí)時(shí)性,還為應(yīng)用提供了更高的性能和響應(yīng)速度。

以上就是Java使用Canal同步MySQL數(shù)據(jù)到Redis的詳細(xì)內(nèi)容,更多關(guān)于Java Canal同步MySQL數(shù)據(jù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Mybatis查詢時(shí),區(qū)分大小寫操作

    Mybatis查詢時(shí),區(qū)分大小寫操作

    這篇文章主要介紹了Mybatis查詢時(shí),區(qū)分大小寫操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Windows下將JAVA?jar注冊(cè)成windows服務(wù)的方法

    Windows下將JAVA?jar注冊(cè)成windows服務(wù)的方法

    這篇文章主要介紹了Windows下將JAVA?jar注冊(cè)成windows服務(wù)的方法,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-07-07
  • 常用輸入字節(jié)流InputStream介紹

    常用輸入字節(jié)流InputStream介紹

    下面小編就為大家?guī)硪黄S幂斎胱止?jié)流InputStream介紹。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-08-08
  • java IP地址網(wǎng)段計(jì)算的示例代碼

    java IP地址網(wǎng)段計(jì)算的示例代碼

    這篇文章主要介紹了java IP地址網(wǎng)段計(jì)算的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-03-03
  • java 多線程死鎖詳解及簡(jiǎn)單實(shí)例

    java 多線程死鎖詳解及簡(jiǎn)單實(shí)例

    這篇文章主要介紹了java 多線程死鎖詳解及簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-01-01
  • SpringBoot Starter自定義之創(chuàng)建可復(fù)用的自動(dòng)配置模塊方式

    SpringBoot Starter自定義之創(chuàng)建可復(fù)用的自動(dòng)配置模塊方式

    本文將詳細(xì)介紹如何設(shè)計(jì)和實(shí)現(xiàn)一個(gè)自定義的Spring Boot Starter,幫助讀者掌握這一強(qiáng)大技術(shù),提升代碼復(fù)用性和開發(fā)效率,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2025-04-04
  • Java 指定微信好友自動(dòng)發(fā)送消息的實(shí)現(xiàn)示例

    Java 指定微信好友自動(dòng)發(fā)送消息的實(shí)現(xiàn)示例

    這篇文章主要介紹了Java 指定微信好友自動(dòng)發(fā)送消息的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • 一小時(shí)迅速入門Mybatis之Prepared Statement與符號(hào)的使用

    一小時(shí)迅速入門Mybatis之Prepared Statement與符號(hào)的使用

    這篇文章主要介紹了一小時(shí)迅速入門Mybatis之Prepared Statement與符號(hào)的使用,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09
  • SpringBoot中自定義首頁(默認(rèn)頁)及favicon的方法

    SpringBoot中自定義首頁(默認(rèn)頁)及favicon的方法

    這篇文章主要介紹了SpringBoot中如何自定義首頁(默認(rèn)頁)及favicon,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-08-08
  • 一文帶你了解RabbitMQ消息轉(zhuǎn)換器

    一文帶你了解RabbitMQ消息轉(zhuǎn)換器

    這篇文章主要為大家詳細(xì)介紹了RabbitMQ中消息轉(zhuǎn)換器的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的小伙伴可以了解一下
    2023-04-04

最新評(píng)論