欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Canal入門使用小結(jié)

 更新時(shí)間:2025年02月07日 10:37:15   作者:何中應(yīng)  
Canal是一款MySQL數(shù)據(jù)庫增量日志解析工具,用于實(shí)現(xiàn)數(shù)據(jù)庫之間的數(shù)據(jù)同步,本文主要介紹了Canal入門使用小結(jié),感興趣的可以了解一下

說明:canal [k?’næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)(官方介紹)。一言以蔽之,Canal是一款實(shí)現(xiàn)數(shù)據(jù)同步的組件??梢詫?shí)現(xiàn)數(shù)據(jù)庫之間、數(shù)據(jù)庫與Redis、ES之間的數(shù)據(jù)同步。本文介紹Canal的入門使用。

Canal介紹

Canal實(shí)現(xiàn)原理是偽裝成MySQL主節(jié)點(diǎn)的從節(jié)點(diǎn),接收主節(jié)點(diǎn)的binlog日志,解析、提取數(shù)據(jù)庫操作,將對數(shù)據(jù)庫的操作通過代碼更新到其他組件中,如其他數(shù)據(jù)庫、ES、Redis等,官方解釋如下:

在這里插入圖片描述

官方提供的結(jié)構(gòu)圖如下:

在這里插入圖片描述

Canal安裝

首先,從官網(wǎng)上下載Canal服務(wù)器,地址:https://github.com/alibaba/canal/releases

在這里插入圖片描述

下載下來,解壓,如下:

在這里插入圖片描述

canal配置文件暫時(shí)不用管,先修改一下example文件中監(jiān)測的目前節(jié)點(diǎn)配置,修改成自己需要監(jiān)測的MySQL配置,如下:

在這里插入圖片描述

修改完,啟動(dòng)canal服務(wù),雙擊startup.bat文件,如下:

在這里插入圖片描述

Canal使用

只要你的MySQL服務(wù)器的IP、賬號密碼沒輸錯(cuò),且測試過能用Navicat或其他數(shù)據(jù)庫連接工具成功連接數(shù)據(jù)庫,那么就可以進(jìn)行下面的編碼工作了。

首先,創(chuàng)建一個(gè)Maven項(xiàng)目,pom.xml如下,導(dǎo)個(gè)canal依賴就行了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hezy</groupId>
    <artifactId>canal_demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!--canal客戶端-->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
    </dependencies>

</project>

測試代碼如下,用來連接canal服務(wù)器,打印canal監(jiān)測到的數(shù)據(jù)內(nèi)容;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;

/**
 * Canal處理器
 * 作用:打印canal服務(wù)器監(jiān)測到的數(shù)據(jù)
 */
public class CanalHandler {
    public static void main(String[] args) throws InvalidProtocolBufferException {

        // 1.創(chuàng)建連接
        CanalConnector canalConnector = CanalConnectors
                .newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", "");

        // 2.抓取數(shù)據(jù)
        while (true) {

            // 3.開始連接
            canalConnector.connect();

            // 4.訂閱數(shù)據(jù),所有的庫和表
            canalConnector.subscribe(".*\\..*");

            // 5.抓取數(shù)據(jù),每次抓取100條
            Message message = canalConnector.get(100);

            // 6.獲取entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            // 7.判斷是否有數(shù)據(jù)
            if (entries.size() == 0) {
                System.out.println(">>>暫無數(shù)據(jù)<<<");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                // 8.解析數(shù)據(jù)
                for (CanalEntry.Entry entry : entries) {
                    // 獲取表名
                    String tableName = entry.getHeader().getTableName();
                    // 獲取操作類型
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    // 判斷entryType是否為ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 序列化數(shù)據(jù)
                        ByteString storeValue = entry.getStoreValue();
                        // 反序列化數(shù)據(jù)
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        // 獲取事件類型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 獲取具體的數(shù)據(jù)
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        // 遍歷打印
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            // 獲取拉取前后的數(shù)據(jù)
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

                            // 用Map存儲(chǔ)每條數(shù)據(jù)
                            HashMap<String, Object> beforeMap = new HashMap<>();
                            HashMap<String, Object> afterMap = new HashMap<>();

                            // 獲取不同操作的數(shù)據(jù)
                            if (CanalEntry.EventType.INSERT.equals(eventType)) {
                                System.out.println("【" + tableName + "】表插入數(shù)據(jù)");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("新增數(shù)據(jù):" + afterMap);
                            } else if (CanalEntry.EventType.UPDATE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表更新數(shù)據(jù)");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新前:" + beforeMap);
                                System.out.println("----");
                                for (CanalEntry.Column column : afterColumnsList) {
                                    afterMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("更新后:" + afterMap);
                            } else if (CanalEntry.EventType.DELETE.equals(eventType)) {
                                System.out.println("【" + tableName + "】表刪除數(shù)據(jù)");
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    beforeMap.put(column.getName(), column.getValue());
                                }
                                System.out.println("被刪除的數(shù)據(jù):" + beforeMap);
                            }
                        }
                    }
                }
            }
        }
    }
}

啟動(dòng)程序,查看控制臺(tái),檢測中……

在這里插入圖片描述

使用Navicat連接數(shù)據(jù)庫,查看數(shù)據(jù)庫test庫,i_user表內(nèi)容;

在這里插入圖片描述

此時(shí),我們新增一條數(shù)據(jù),看控制臺(tái),canal成功接收到了這次修改;

在這里插入圖片描述

更新數(shù)據(jù);

在這里插入圖片描述

刪除數(shù)據(jù);

在這里插入圖片描述

頭能過身體就能過,接下來不就好辦了。將Canal接收到的數(shù)據(jù)轉(zhuǎn)為對象,根據(jù)不同的操作類型分發(fā)給自己想要同步的組件,同步給從MySQL,就調(diào)用對應(yīng)的Mapper;同步給Redis,就調(diào)用Redis對應(yīng)的方法,ES同樣。

總結(jié)

本文介紹了Canal入門使用,參考B站視頻:Canal極簡入門:一小時(shí)讓你快速上手Canal數(shù)據(jù)同步神技~

到此這篇關(guān)于Canal入門使用小結(jié)的文章就介紹到這了,更多相關(guān)Canal入門內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支

相關(guān)文章

最新評論