Canal入門使用小結(jié)
說明:canal [k?’næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)(官方介紹)。一言以蔽之,Canal是一款實(shí)現(xiàn)數(shù)據(jù)同步的組件??梢詫?shí)現(xiàn)數(shù)據(jù)庫之間、數(shù)據(jù)庫與Redis、ES之間的數(shù)據(jù)同步。本文介紹Canal的入門使用。
Canal介紹
Canal實(shí)現(xiàn)原理是偽裝成MySQL主節(jié)點(diǎn)的從節(jié)點(diǎn),接收主節(jié)點(diǎn)的binlog日志,解析、提取數(shù)據(jù)庫操作,將對數(shù)據(jù)庫的操作通過代碼更新到其他組件中,如其他數(shù)據(jù)庫、ES、Redis等,官方解釋如下:
官方提供的結(jié)構(gòu)圖如下:
Canal安裝
首先,從官網(wǎng)上下載Canal服務(wù)器,地址:https://github.com/alibaba/canal/releases
下載下來,解壓,如下:
canal配置文件暫時(shí)不用管,先修改一下example
文件中監(jiān)測的目前節(jié)點(diǎn)配置,修改成自己需要監(jiān)測的MySQL配置,如下:
修改完,啟動(dòng)canal服務(wù),雙擊startup.bat
文件,如下:
Canal使用
只要你的MySQL服務(wù)器的IP、賬號密碼沒輸錯(cuò),且測試過能用Navicat或其他數(shù)據(jù)庫連接工具成功連接數(shù)據(jù)庫,那么就可以進(jìn)行下面的編碼工作了。
首先,創(chuàng)建一個(gè)Maven項(xiàng)目,pom.xml如下,導(dǎo)個(gè)canal依賴就行了
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hezy</groupId> <artifactId>canal_demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!--canal客戶端--> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency> </dependencies> </project>
測試代碼如下,用來連接canal服務(wù)器,打印canal監(jiān)測到的數(shù)據(jù)內(nèi)容;
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; /** * Canal處理器 * 作用:打印canal服務(wù)器監(jiān)測到的數(shù)據(jù) */ public class CanalHandler { public static void main(String[] args) throws InvalidProtocolBufferException { // 1.創(chuàng)建連接 CanalConnector canalConnector = CanalConnectors .newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", ""); // 2.抓取數(shù)據(jù) while (true) { // 3.開始連接 canalConnector.connect(); // 4.訂閱數(shù)據(jù),所有的庫和表 canalConnector.subscribe(".*\\..*"); // 5.抓取數(shù)據(jù),每次抓取100條 Message message = canalConnector.get(100); // 6.獲取entry集合 List<CanalEntry.Entry> entries = message.getEntries(); // 7.判斷是否有數(shù)據(jù) if (entries.size() == 0) { System.out.println(">>>暫無數(shù)據(jù)<<<"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // 8.解析數(shù)據(jù) for (CanalEntry.Entry entry : entries) { // 獲取表名 String tableName = entry.getHeader().getTableName(); // 獲取操作類型 CanalEntry.EntryType entryType = entry.getEntryType(); // 判斷entryType是否為ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // 序列化數(shù)據(jù) ByteString storeValue = entry.getStoreValue(); // 反序列化數(shù)據(jù) CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 獲取事件類型 CanalEntry.EventType eventType = rowChange.getEventType(); // 獲取具體的數(shù)據(jù) List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // 遍歷打印 for (CanalEntry.RowData rowData : rowDatasList) { // 獲取拉取前后的數(shù)據(jù) List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); // 用Map存儲(chǔ)每條數(shù)據(jù) HashMap<String, Object> beforeMap = new HashMap<>(); HashMap<String, Object> afterMap = new HashMap<>(); // 獲取不同操作的數(shù)據(jù) if (CanalEntry.EventType.INSERT.equals(eventType)) { System.out.println("【" + tableName + "】表插入數(shù)據(jù)"); for (CanalEntry.Column column : afterColumnsList) { afterMap.put(column.getName(), column.getValue()); } System.out.println("新增數(shù)據(jù):" + afterMap); } else if (CanalEntry.EventType.UPDATE.equals(eventType)) { System.out.println("【" + tableName + "】表更新數(shù)據(jù)"); for (CanalEntry.Column column : beforeColumnsList) { beforeMap.put(column.getName(), column.getValue()); } System.out.println("更新前:" + beforeMap); System.out.println("----"); for (CanalEntry.Column column : afterColumnsList) { afterMap.put(column.getName(), column.getValue()); } System.out.println("更新后:" + afterMap); } else if (CanalEntry.EventType.DELETE.equals(eventType)) { System.out.println("【" + tableName + "】表刪除數(shù)據(jù)"); for (CanalEntry.Column column : beforeColumnsList) { beforeMap.put(column.getName(), column.getValue()); } System.out.println("被刪除的數(shù)據(jù):" + beforeMap); } } } } } } } }
啟動(dòng)程序,查看控制臺(tái),檢測中……
使用Navicat連接數(shù)據(jù)庫,查看數(shù)據(jù)庫test庫,i_user表內(nèi)容;
此時(shí),我們新增一條數(shù)據(jù),看控制臺(tái),canal成功接收到了這次修改;
更新數(shù)據(jù);
刪除數(shù)據(jù);
頭能過身體就能過,接下來不就好辦了。將Canal接收到的數(shù)據(jù)轉(zhuǎn)為對象,根據(jù)不同的操作類型分發(fā)給自己想要同步的組件,同步給從MySQL,就調(diào)用對應(yīng)的Mapper;同步給Redis,就調(diào)用Redis對應(yīng)的方法,ES同樣。
總結(jié)
本文介紹了Canal入門使用,參考B站視頻:Canal極簡入門:一小時(shí)讓你快速上手Canal數(shù)據(jù)同步神技~
到此這篇關(guān)于Canal入門使用小結(jié)的文章就介紹到這了,更多相關(guān)Canal入門內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支
- MySQL高性能實(shí)現(xiàn)Canal數(shù)據(jù)同步神器
- Canal監(jiān)聽MySQL的實(shí)現(xiàn)步驟
- 使用Canal實(shí)現(xiàn)PHP應(yīng)用程序與MySQL數(shù)據(jù)庫的實(shí)時(shí)數(shù)據(jù)同步
- Canal搭建?idea設(shè)置及采集數(shù)據(jù)到kafka的操作方法
- 使用Canal實(shí)現(xiàn)MySQL主從同步的流程步驟
- Canal實(shí)現(xiàn)MYSQL實(shí)時(shí)數(shù)據(jù)同步的示例代碼
- Java使用Canal同步MySQL數(shù)據(jù)到Redis
相關(guān)文章
Mysql連接無效(invalid connection)問題及解決
這篇文章主要介紹了Mysql連接無效(invalid connection)問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02MySQL和Oracle的元數(shù)據(jù)抽取實(shí)例分析
MySQL和Oracle雖然在架構(gòu)上有很大的不同,但是如果從某些方面比較起來,它們有些方面也是相通的,下面這篇文章主要給大家介紹了關(guān)于MySQL和Oracle元數(shù)據(jù)抽取的相關(guān)資料,需要的朋友可以參考下2021-12-12MySQL5.7中的sql_mode默認(rèn)值帶來的坑及解決方法
這篇文章主要介紹了MySQL5.7中的sql_mode默認(rèn)值帶來的坑及解決方法,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-11-11大批量數(shù)據(jù)用mysql批量更新數(shù)據(jù)的4種方法總結(jié)
這篇文章主要給大家介紹了關(guān)于大批量數(shù)據(jù)用mysql批量更新數(shù)據(jù)的4種方法,要在MySQL中新增大批量數(shù)據(jù),可以通過以下幾種方法來實(shí)現(xiàn),文中給出了詳細(xì)的代碼示例,需要的朋友可以參考下2024-05-05批量 kill mysql 中運(yùn)行時(shí)間長的sql
這篇文章主要介紹了批量 kill mysql 中運(yùn)行時(shí)間長的sql,需要的朋友可以參考下2016-01-01