欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java使用Canal同步MySQL數(shù)據(jù)到Redis

 更新時間:2024年11月08日 09:59:10   作者:binbinxyz  
在現(xiàn)代微服務(wù)架構(gòu)中,數(shù)據(jù)同步是一個常見的需求,特別是將?MySQL?數(shù)據(jù)實時同步到?Redis,下面我們就來看看Java如何使用Canal同步MySQL數(shù)據(jù)到Redis吧

一、引言

在現(xiàn)代微服務(wù)架構(gòu)中,數(shù)據(jù)同步是一個常見的需求。特別是將 MySQL 數(shù)據(jù)實時同步到 Redis,可以顯著提升應(yīng)用的性能和響應(yīng)速度。本文將詳細(xì)介紹如何使用 Canal 實現(xiàn)這一目標(biāo)。Canal 是阿里巴巴開源的一個數(shù)據(jù)庫 Binlog 同步工具,可以實時捕獲 MySQL 的 Binlog 日志并將其同步到其他存儲系統(tǒng)。

項目地址:alibaba/canal

二、工作原理

1. MySQL主備復(fù)制原理

MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events,可以通過 show binlog events 進(jìn)行查看)

MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)

MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

2. canal 工作原理

canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議

MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )

canal 解析 binary log 對象(原始為 byte 流)

三、環(huán)境準(zhǔn)備

1. 安裝和配置 MySQL

Canal的原理是基于mysql binlog技術(shù),所以這里一定需要開啟mysql的binlog寫入功能,并且配置binlog模式為row。編輯 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下內(nèi)容:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

授權(quán) canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

重啟 MySQL 服務(wù)以使配置生效:

sudo service mysql restart

2. 安裝和配置 Canal

下載并解壓 Canal 服務(wù)端:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
cd /opt/canal

編輯 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服務(wù)器的相關(guān)信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

啟動 Canal 服務(wù):

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

如果啟動失敗,注意檢查配置文件conf/example/instance.properties的內(nèi)容,還要注意JDK版本及配置。建議使用1.6.25。我用openjdk 21啟動報錯,改回JDK8u421啟動成功。

3. 安裝和配置 Redis

確保 Redis 服務(wù)已經(jīng)安裝并啟動。可以在 Redis 客戶端中執(zhí)行以下命令檢查:

redis-cli
ping

四、開發(fā) Java 應(yīng)用

1. 添加依賴

在你的 pom.xml 文件中添加 Canal 客戶端和 Redis 客戶端的依賴。以下是一個示例:

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.1.5</version>
    </dependency>
</dependencies>

2. 編寫 Canal 客戶端代碼

創(chuàng)建一個 Java 類來連接 Canal 服務(wù)并處理 Binlog 事件,將數(shù)據(jù)同步到 Redis:

package org.hbin.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalToRedisSync {

    public static void main(String[] args) {
        // 創(chuàng)建 Canal 連接
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", "");

        // 連接到 Canal 服務(wù)
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();

        // 創(chuàng)建 Redis 客戶端
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        while (true) {
            Message message = connector.getWithoutAck(100); // 獲取最多 100 條記錄
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                handleEntry(message.getEntries(), jedis);
            }

            connector.ack(batchId); // 提交確認(rèn)
            // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
        }
    }

    private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else {
                    System.out.println("-------> before");
                    syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                    System.out.println("-------> after");
                    syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                }
            }
        }
    }

    private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Insert: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Update: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            }
        }
        System.out.println("Delete: " + key.toString());
        jedis.hdel(schema + ":" + table, key.toString());
    }
}

3. 運行和測試

3.1 啟動 Canal 服務(wù):

sh /opt/canal/bin/startup.sh

3.2 啟動 Redis 服務(wù):

確保 Redis 服務(wù)已經(jīng)啟動,可以在 Redis 客戶端中執(zhí)行以下命令檢查:

redis-cli
ping

3.3 啟動 Java 應(yīng)用:

編譯并運行上述 Java 應(yīng)用,確保 Canal 服務(wù)和 MySQL 服務(wù)器正常運行。

3.4 測試數(shù)據(jù)同步:

在 MySQL 中插入、更新或刪除數(shù)據(jù),觀察 Java 應(yīng)用是否能夠?qū)崟r捕獲這些變化并將數(shù)據(jù)同步到 Redis。

相關(guān)SQL如下:

drop database if exists canal;
create database canal;
use canal;

drop table if exists user;
create table user(
  `id` bigint AUTO_INCREMENT primary key,
  `name` varchar(20) NOT NULL,
  `age` tinyint DEFAULT 0,
  `detail` varchar(100) DEFAULT '',
  `create_time` date,
  `update_time` date
);

insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07');
update user set age=26 where id=2;
delete from user where id=3;

輸出信息:

================> binlog[binlog.000008:6390] , name[canal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERT
Insert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERT
Insert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERT
Insert: 3 -> name:Tom3,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE
-------> before
Update: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
-------> after
Update: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETE
Delete: 3

五、注意事項

性能優(yōu)化:根據(jù)實際需求調(diào)整 Canal 和 Redis 的配置,以優(yōu)化性能。

錯誤處理:在生產(chǎn)環(huán)境中,需要增加錯誤處理和重試機制,確保數(shù)據(jù)同步的可靠性。

安全性:確保 Canal 和 Redis 的連接是安全的,使用適當(dāng)?shù)恼J(rèn)證和授權(quán)機制。

六、結(jié)論

通過使用 Canal,我們可以輕松地將 MySQL 數(shù)據(jù)實時同步到 Redis、Kafka 或其他系統(tǒng)。這不僅提高了數(shù)據(jù)的一致性和實時性,還為應(yīng)用提供了更高的性能和響應(yīng)速度。

以上就是Java使用Canal同步MySQL數(shù)據(jù)到Redis的詳細(xì)內(nèi)容,更多關(guān)于Java Canal同步MySQL數(shù)據(jù)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 深入理解Java8新特性之Optional容器類的應(yīng)用

    深入理解Java8新特性之Optional容器類的應(yīng)用

    Optional<T> 類(java.util.Optional) 是一個容器類,代表一個值存在或不存在,原來用 null 表示一個值不存在,現(xiàn)在 Optional 可以更好的表達(dá)這個概念。并且可以避免空指針異常,需要的朋友可以參考下本文
    2021-11-11
  • Mybatis攔截器實現(xiàn)公共字段填充的示例代碼

    Mybatis攔截器實現(xiàn)公共字段填充的示例代碼

    本文介紹了使用Spring Boot和MyBatis實現(xiàn)公共字段的自動填充功能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-12-12
  • SpringBoot集成JWT生成token及校驗方法過程解析

    SpringBoot集成JWT生成token及校驗方法過程解析

    這篇文章主要介紹了SpringBoot集成JWT生成token及校驗方法過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-04-04
  • Java集成presto查詢方式

    Java集成presto查詢方式

    這篇文章主要介紹了Java集成presto查詢方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • 實例代碼講解JAVA 觀察者模式

    實例代碼講解JAVA 觀察者模式

    這篇文章主要介紹了JAVA 觀察者模式的的相關(guān)資料,文中代碼非常詳細(xì),幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-06-06
  • Java?Socket實現(xiàn)文件發(fā)送和接收功能以及遇到的Bug問題

    Java?Socket實現(xiàn)文件發(fā)送和接收功能以及遇到的Bug問題

    這篇文章主要介紹了Java?Socket實現(xiàn)文件發(fā)送和接收功能以及遇到的Bug問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • 講解Java編程中finally語句的基本使用方法

    講解Java編程中finally語句的基本使用方法

    這篇文章主要介紹了講解Java編程中finally語句的基本使用方法,finally在異常處理中的使用時Java入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-11-11
  • 詳解SpringBoot是如何整合JPA的

    詳解SpringBoot是如何整合JPA的

    借助于開發(fā)框架,我們已經(jīng)不用編寫原始的訪問數(shù)據(jù)庫的代碼,也不用調(diào)用JDBC或者連接池等諸如此類的被稱作底層的代碼,我們將從更高的層次上訪問數(shù)據(jù)庫,本章我們將詳細(xì)介紹在Springboot中使用 Spring Data JPA 來實現(xiàn)對數(shù)據(jù)庫的操作,需要的朋友可以參考下
    2021-06-06
  • SpringBoot字段注入和構(gòu)造函數(shù)注入的區(qū)別及說明

    SpringBoot字段注入和構(gòu)造函數(shù)注入的區(qū)別及說明

    這篇文章主要介紹了SpringBoot字段注入和構(gòu)造函數(shù)注入的區(qū)別及說明,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2025-04-04
  • Java實現(xiàn)經(jīng)典游戲2048的示例代碼

    Java實現(xiàn)經(jīng)典游戲2048的示例代碼

    2014年Gabriele Cirulli利用周末的時間寫2048這個游戲的程序。本文將用java語言實現(xiàn)這一經(jīng)典游戲,并采用了swing技術(shù)進(jìn)行了界面化處理,需要的可以參考一下
    2022-02-02

最新評論