SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更詳解
需求
在現(xiàn)代分布式系統(tǒng)中,實時獲取數(shù)據(jù)庫的變更信息是一個常見的需求。例如,在電商系統(tǒng)中,當(dāng)訂單表發(fā)生更新時,可能需要同步這些變更到搜索服務(wù)、緩存服務(wù)或者通知其他微服務(wù)。傳統(tǒng)的解決方案包括定時輪詢數(shù)據(jù)庫或通過觸發(fā)器將變更寫入消息隊列等方法,但這些方案要么效率低下,要么實現(xiàn)復(fù)雜。而使用 Canal + RabbitMQ 可以提供一種高效且可靠的方式來捕獲 MySQL 數(shù)據(jù)庫的變更,并將其發(fā)送到 RabbitMQ 中供其他服務(wù)消費(fèi)。
Canal 是阿里巴巴開源的一個用于增量訂閱和消費(fèi) MySQL 數(shù)據(jù)庫 Binlog 的工具,它模擬 MySQL 主從復(fù)制機(jī)制,無需侵入業(yè)務(wù)邏輯即可捕獲數(shù)據(jù)庫變更。RabbitMQ 是一個流行的開源消息代理,支持多種協(xié)議并提供了豐富的特性來確保消息傳遞的可靠性。結(jié)合這兩者,可以構(gòu)建一個強(qiáng)大的實時數(shù)據(jù)變更監(jiān)聽和處理系統(tǒng)。
步驟
環(huán)境搭建
整合SpringBoot與Canal實現(xiàn)客戶端
Canal整合RabbitMQ
SpringBoot整合RabbitMQ
環(huán)境搭建
1. 安裝MySQL
確保你有一個正在運(yùn)行的 MySQL 實例,并啟用了 binlog 日志記錄功能。這是 Canal 捕獲數(shù)據(jù)庫變更的基礎(chǔ)。
# 修改 MySQL 配置文件 my.cnf 或 my.ini [mysqld] server-id=1 log-bin=mysql-bin binlog-format=ROW
重啟 MySQL 服務(wù)使配置生效。
2. 安裝Canal Server
下載最新版本的 Canal Server 并解壓到合適的位置。根據(jù)官方文檔進(jìn)行必要的配置,特別是 instance.properties 文件中的數(shù)據(jù)庫連接信息。
3. 安裝RabbitMQ
可以通過 Docker 快速安裝 RabbitMQ:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
訪問 http://localhost:15672 登錄管理界面,默認(rèn)用戶名/密碼為 guest/guest。
整合SpringBoot與Canal實現(xiàn)客戶端
創(chuàng)建SpringBoot項目
使用 Spring Initializr 創(chuàng)建一個新的 Spring Boot 項目,添加 Web, JPA, 和 AMQP(用于后續(xù)整合 RabbitMQ)依賴。
引入Canal依賴
在 pom.xml 中添加 Canal Client 的依賴:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency>
編寫Canal客戶端代碼
創(chuàng)建一個 Canal 客戶端類,用來監(jiān)聽 MySQL 數(shù)據(jù)庫的變化,并將變更事件轉(zhuǎn)發(fā)給 RabbitMQ。
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.rabbitmq.client.Channel; public class CanalClient { private final CanalConnector connector; private final Channel channel; public CanalClient(CanalConnector connector, Channel channel) { this.connector = connector; this.channel = channel; } public void start() { // Canal 連接配置 connector.connect(); connector.subscribe(".*\\..*"); // 訂閱所有數(shù)據(jù)庫和表 connector.rollback(); while (true) { int batchSize = 1000; EntryBatch batch = connector.getWithoutAck(batchSize); // 獲取一批次數(shù)據(jù) long batchId = batch.getId(); int size = batch.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { printEntry(batch.getEntries()); connector.ack(batchId); // 提交確認(rèn) } if (Thread.currentThread().isInterrupted()) { break; } } connector.disconnect(); } private void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { sendToRabbitMQ(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { sendToRabbitMQ(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); sendToRabbitMQ(rowData.getBeforeColumnsList()); System.out.println("-------> after"); sendToRabbitMQ(rowData.getAfterColumnsList()); } } } } private void sendToRabbitMQ(List<Column> columns) { StringBuilder message = new StringBuilder(); for (Column column : columns) { message.append(column.getName()).append("=").append(column.getValue()).append(","); } try { channel.basicPublish("", "canal_exchange", null, message.toString().getBytes()); } catch (IOException e) { e.printStackTrace(); } } }
Canal整合RabbitMQ
配置Canal Server
確保 Canal Server 已正確配置并啟動,能夠監(jiān)聽 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 實例,并設(shè)置適當(dāng)?shù)倪^濾規(guī)則。
配置RabbitMQ Exchange
在 RabbitMQ 中創(chuàng)建一個名為 canal_exchange 的 exchange,類型可以根據(jù)需要選擇,如 fanout, direct, topic 或 headers。
rabbitmqadmin declare exchange name=canal_exchange type=fanout
SpringBoot整合RabbitMQ
添加依賴
確保在 pom.xml 中已經(jīng)包含了 RabbitMQ 的 Spring AMQP 依賴。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置RabbitMQ連接信息
在 application.yml 或 application.properties 中配置 RabbitMQ 的連接參數(shù)。
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
創(chuàng)建消費(fèi)者
編寫一個消費(fèi)者類來接收來自 RabbitMQ 的消息。
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class CanalMessageConsumer { @RabbitListener(queues = "canal_queue") public void receive(String message) { System.out.println("Received message: " + message); } }
配置隊列和綁定
確保在應(yīng)用程序啟動時自動創(chuàng)建所需的隊列,并將它們綁定到之前創(chuàng)建的 exchange 上。
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue canalQueue() { return new Queue("canal_queue", false); } @Bean public TopicExchange canalExchange() { return new TopicExchange("canal_exchange"); } @Bean public Binding binding(Queue canalQueue, TopicExchange canalExchange) { return BindingBuilder.bind(canalQueue).to(canalExchange).with("#"); } }
總結(jié)
通過以上步驟,我們成功地將 Canal 與 RabbitMQ 整合到了 Spring Boot 應(yīng)用程序中。這使得我們可以實時監(jiān)聽 MySQL 數(shù)據(jù)庫的變更,并將這些變更作為消息發(fā)布到 RabbitMQ 中供其他微服務(wù)消費(fèi)。這種方法不僅提高了系統(tǒng)的響應(yīng)速度,也簡化了數(shù)據(jù)同步的過程,降低了開發(fā)和維護(hù)成本。
到此這篇關(guān)于SpringBoot整合Canal+RabbitMQ監(jiān)聽數(shù)據(jù)變更詳解的文章就介紹到這了,更多相關(guān)SpringBoot監(jiān)聽數(shù)據(jù)變更內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot2.6.4集成swagger3.0遇到的坑及解決方法
這篇文章主要介紹了springboot2.6.4如何集成swagger3.0,在集成的過程中遇到很多問題,本文給大家分享四種問題及相應(yīng)的解決方案,需要的朋友可以參考下2022-03-03Spring JdbcTemplate執(zhí)行數(shù)據(jù)庫操作詳解
JdbcTemplate是Spring框架自帶的對JDBC操作的封裝,目的是提供統(tǒng)一的模板方法使對數(shù)據(jù)庫的操作更加方便、友好,效率也不錯,這篇文章主要介紹了Spring JdbcTemplate執(zhí)行數(shù)據(jù)庫操作,需要的朋友可以參考下2022-10-10SpringBoot請求轉(zhuǎn)發(fā)的方式小結(jié)
本文主要介紹了SpringBoot請求轉(zhuǎn)發(fā)的方式,一共有兩大類,一種是controller控制器轉(zhuǎn)發(fā)一種是使用HttpServletRequest進(jìn)行轉(zhuǎn)發(fā),本文就詳細(xì)的介紹一下,感興趣的可以了解一下2023-09-09IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline)
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline),小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10