欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更詳解

 更新時間:2024年12月30日 09:37:46   作者:星辰@Sea  
在現(xiàn)代分布式系統(tǒng)中,實時獲取數(shù)據(jù)庫的變更信息是一個常見的需求,本文將介紹SpringBoot如何通過整合Canal和RabbitMQ監(jiān)聽數(shù)據(jù)變更,需要的可以參考下

需求

在現(xiàn)代分布式系統(tǒng)中,實時獲取數(shù)據(jù)庫的變更信息是一個常見的需求。例如,在電商系統(tǒng)中,當(dāng)訂單表發(fā)生更新時,可能需要同步這些變更到搜索服務(wù)、緩存服務(wù)或者通知其他微服務(wù)。傳統(tǒng)的解決方案包括定時輪詢數(shù)據(jù)庫或通過觸發(fā)器將變更寫入消息隊列等方法,但這些方案要么效率低下,要么實現(xiàn)復(fù)雜。而使用 Canal + RabbitMQ 可以提供一種高效且可靠的方式來捕獲 MySQL 數(shù)據(jù)庫的變更,并將其發(fā)送到 RabbitMQ 中供其他服務(wù)消費(fèi)。

Canal 是阿里巴巴開源的一個用于增量訂閱和消費(fèi) MySQL 數(shù)據(jù)庫 Binlog 的工具,它模擬 MySQL 主從復(fù)制機(jī)制,無需侵入業(yè)務(wù)邏輯即可捕獲數(shù)據(jù)庫變更。RabbitMQ 是一個流行的開源消息代理,支持多種協(xié)議并提供了豐富的特性來確保消息傳遞的可靠性。結(jié)合這兩者,可以構(gòu)建一個強(qiáng)大的實時數(shù)據(jù)變更監(jiān)聽和處理系統(tǒng)。

步驟

環(huán)境搭建

整合SpringBoot與Canal實現(xiàn)客戶端

Canal整合RabbitMQ

SpringBoot整合RabbitMQ

環(huán)境搭建

1. 安裝MySQL

確保你有一個正在運(yùn)行的 MySQL 實例,并啟用了 binlog 日志記錄功能。這是 Canal 捕獲數(shù)據(jù)庫變更的基礎(chǔ)。

# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

重啟 MySQL 服務(wù)使配置生效。

2. 安裝Canal Server

下載最新版本的 Canal Server 并解壓到合適的位置。根據(jù)官方文檔進(jìn)行必要的配置,特別是 instance.properties 文件中的數(shù)據(jù)庫連接信息。

3. 安裝RabbitMQ

可以通過 Docker 快速安裝 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

訪問 http://localhost:15672 登錄管理界面,默認(rèn)用戶名/密碼為 guest/guest。

整合SpringBoot與Canal實現(xiàn)客戶端

創(chuàng)建SpringBoot項目

使用 Spring Initializr 創(chuàng)建一個新的 Spring Boot 項目,添加 Web, JPA, 和 AMQP(用于后續(xù)整合 RabbitMQ)依賴。

引入Canal依賴

在 pom.xml 中添加 Canal Client 的依賴:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

編寫Canal客戶端代碼

創(chuàng)建一個 Canal 客戶端類,用來監(jiān)聽 MySQL 數(shù)據(jù)庫的變化,并將變更事件轉(zhuǎn)發(fā)給 RabbitMQ。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;

public class CanalClient {

    private final CanalConnector connector;
    private final Channel channel;

    public CanalClient(CanalConnector connector, Channel channel) {
        this.connector = connector;
        this.channel = channel;
    }

    public void start() {
        // Canal 連接配置
        connector.connect();
        connector.subscribe(".*\\..*"); // 訂閱所有數(shù)據(jù)庫和表
        connector.rollback();

        while (true) {
            int batchSize = 1000;
            EntryBatch batch = connector.getWithoutAck(batchSize); // 獲取一批次數(shù)據(jù)
            long batchId = batch.getId();
            int size = batch.getEntries().size();

            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                printEntry(batch.getEntries());
                connector.ack(batchId); // 提交確認(rèn)
            }

            if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }

        connector.disconnect();
    }

    private void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    sendToRabbitMQ(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    sendToRabbitMQ(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    sendToRabbitMQ(rowData.getBeforeColumnsList());

                    System.out.println("-------> after");
                    sendToRabbitMQ(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private void sendToRabbitMQ(List<Column> columns) {
        StringBuilder message = new StringBuilder();
        for (Column column : columns) {
            message.append(column.getName()).append("=").append(column.getValue()).append(",");
        }
        try {
            channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Canal整合RabbitMQ

配置Canal Server

確保 Canal Server 已正確配置并啟動,能夠監(jiān)聽 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 實例,并設(shè)置適當(dāng)?shù)倪^濾規(guī)則。

配置RabbitMQ Exchange

在 RabbitMQ 中創(chuàng)建一個名為 canal_exchange 的 exchange,類型可以根據(jù)需要選擇,如 fanout, direct, topic 或 headers。

rabbitmqadmin declare exchange name=canal_exchange type=fanout

SpringBoot整合RabbitMQ

添加依賴

確保在 pom.xml 中已經(jīng)包含了 RabbitMQ 的 Spring AMQP 依賴。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ連接信息

在 application.yml 或 application.properties 中配置 RabbitMQ 的連接參數(shù)。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

創(chuàng)建消費(fèi)者

編寫一個消費(fèi)者類來接收來自 RabbitMQ 的消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class CanalMessageConsumer {

    @RabbitListener(queues = "canal_queue")
    public void receive(String message) {
        System.out.println("Received message: " + message);
    }
}

配置隊列和綁定

確保在應(yīng)用程序啟動時自動創(chuàng)建所需的隊列,并將它們綁定到之前創(chuàng)建的 exchange 上。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue canalQueue() {
        return new Queue("canal_queue", false);
    }

    @Bean
    public TopicExchange canalExchange() {
        return new TopicExchange("canal_exchange");
    }

    @Bean
    public Binding binding(Queue canalQueue, TopicExchange canalExchange) {
        return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");
    }
}

總結(jié)

通過以上步驟,我們成功地將 Canal 與 RabbitMQ 整合到了 Spring Boot 應(yīng)用程序中。這使得我們可以實時監(jiān)聽 MySQL 數(shù)據(jù)庫的變更,并將這些變更作為消息發(fā)布到 RabbitMQ 中供其他微服務(wù)消費(fèi)。這種方法不僅提高了系統(tǒng)的響應(yīng)速度,也簡化了數(shù)據(jù)同步的過程,降低了開發(fā)和維護(hù)成本。

到此這篇關(guān)于SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更詳解的文章就介紹到這了,更多相關(guān)SpringBoot監(jiān)聽數(shù)據(jù)變更內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • springboot2.6.4集成swagger3.0遇到的坑及解決方法

    springboot2.6.4集成swagger3.0遇到的坑及解決方法

    這篇文章主要介紹了springboot2.6.4如何集成swagger3.0,在集成的過程中遇到很多問題,本文給大家分享四種問題及相應(yīng)的解決方案,需要的朋友可以參考下
    2022-03-03
  • Spring的定時任務(wù)@Scheduled源碼詳解

    Spring的定時任務(wù)@Scheduled源碼詳解

    這篇文章主要介紹了Spring的定時任務(wù)@Scheduled源碼詳解,@Scheduled注解是包org.springframework.scheduling.annotation中的一個注解,主要是用來開啟定時任務(wù),本文提供了部分實現(xiàn)代碼與思路,需要的朋友可以參考下
    2023-09-09
  • Spring JdbcTemplate執(zhí)行數(shù)據(jù)庫操作詳解

    Spring JdbcTemplate執(zhí)行數(shù)據(jù)庫操作詳解

    JdbcTemplate是Spring框架自帶的對JDBC操作的封裝,目的是提供統(tǒng)一的模板方法使對數(shù)據(jù)庫的操作更加方便、友好,效率也不錯,這篇文章主要介紹了Spring JdbcTemplate執(zhí)行數(shù)據(jù)庫操作,需要的朋友可以參考下
    2022-10-10
  • Hibernate一級緩存和二級緩存詳解

    Hibernate一級緩存和二級緩存詳解

    今天小編就為大家分享一篇關(guān)于Hibernate一級緩存和二級緩存詳解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • 詳解Android開發(fā)中Fragment的使用

    詳解Android開發(fā)中Fragment的使用

    這篇文章主要介紹了詳解Android開發(fā)中Fragment的使用,包括Java代碼中調(diào)用Fragment的方法,需要的朋友可以參考下
    2015-07-07
  • SpringBoot請求轉(zhuǎn)發(fā)的方式小結(jié)

    SpringBoot請求轉(zhuǎn)發(fā)的方式小結(jié)

    本文主要介紹了SpringBoot請求轉(zhuǎn)發(fā)的方式,一共有兩大類,一種是controller控制器轉(zhuǎn)發(fā)一種是使用HttpServletRequest進(jìn)行轉(zhuǎn)發(fā),本文就詳細(xì)的介紹一下,感興趣的可以了解一下
    2023-09-09
  • Java list foreach修改元素方式

    Java list foreach修改元素方式

    這篇文章主要介紹了Java list foreach修改元素方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • RateLimit-使用guava來做接口限流代碼示例

    RateLimit-使用guava來做接口限流代碼示例

    這篇文章主要介紹了RateLimit-使用guava來做接口限流代碼示例,具有一定借鑒價值,需要的朋友可以參考下
    2018-01-01
  • IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline)

    IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline)

    今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline),小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-10-10
  • IDEA全局查找關(guān)鍵字的用法解讀

    IDEA全局查找關(guān)鍵字的用法解讀

    這篇文章主要介紹了IDEA全局查找關(guān)鍵字的用法解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02

最新評論