欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成canal實現(xiàn)示例解析

 更新時間:2024年10月10日 11:00:12   作者:Ascend1797  
這篇文章主要為大家介紹了springboot整合canal的示例實現(xiàn)解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多多進步,早日升職加薪

什么是 Canal

        阿里巴巴 B2B 公司,因為業(yè)務的特性,賣家主要集中在國內(nèi),買家主要集中在國外,所以衍生出了同步杭州和美國異地機房的需求,從 2010 年開始,阿里系公司開始逐步的嘗試基于數(shù)據(jù)庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業(yè)務。

        Canal 是用 Java 開發(fā)的基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費的中間件。目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 來處理獲得的相關(guān)數(shù)據(jù)。(數(shù)據(jù)庫同步需要阿里的 Otter 中間件,基于 Canal)。

MySQL 的 Binlog

        MySQL 的二進制日志可以說 MySQL 最重要的日志了,它記錄了所有的 DDL 和 DML(除了數(shù)據(jù)查詢語句)語句,以事件形式記錄,還包含語句所執(zhí)行的消耗的時間,MySQL 的二進制日志是事務安全型的。

        一般來說開啟二進制日志大概會有 1%的性能損耗。二進制有兩個最重要的使用場景:

        其一:MySQL Replication 在 Master 端開啟 Binlog,Master 把它的二進制日志傳遞給Slaves來達到 Master-Slave 數(shù)據(jù)一致的目的。

        其二:自然就是數(shù)據(jù)恢復了,通過使用 MySQL Binlog 工具來使恢復數(shù)據(jù)。

        二進制日志包括兩類文件:二進制日志索引文件(文件名后綴為.index)用于記錄所有的二進制文件,二進制日志文件(文件名后綴為.00000*)記錄數(shù)據(jù)庫所有的 DDL 和 DML(除了數(shù)據(jù)查詢語句)語句事件。

Binlog 的分類

MySQL Binlog 的格式有三種,分別是 STATEMENT,MIXED,ROW。在配置文件中可以選擇配置 binlog_format= statement|mixed|row。三種格式的區(qū)別:

        1)statement:語句級,binlog 會記錄每次一執(zhí)行寫操作的語句。相對 row 模式節(jié)省空間  但是可能產(chǎn)生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志進行恢復,由于執(zhí)行時間不同可能產(chǎn)生的數(shù)據(jù)就不同。

        優(yōu)點:節(jié)省空間。

        缺點:有可能造成數(shù)據(jù)不一致。

        2)row:行級, binlog 會記錄每次操作后每行記錄的變化。

        優(yōu)點:保持數(shù)據(jù)的絕對一致性。因為不管 sql 是什么,引用了什么函數(shù),他只記錄執(zhí)行后的效果。

        缺點:占用較大空間。

        3)mixed:statement 的升級版,一定程度上解決了,因為一些情況而造成的 statement模式不一致問題,默認還是 statement。

             statement在某些情況下 會按照ROW 的方式進行處理譬如:

              1:當函數(shù)中包含 UUID() 時;

              2: 包含AUTO_INCREMENT 字段的表被更新時;

              3: 執(zhí)行 INSERT DELAYED 語句時;

              4: 用 UDF 時;

        優(yōu)點:節(jié)省空間,同時兼顧了一定的一致性。

        缺點:還有些極個別情況依舊會造成不一致,另外 statement 和 mixed 對于需要對binlog 的監(jiān)控的情況都不方便。
綜合上面對比,Canal 想做監(jiān)控分析,選擇 row 格式比較合適。 

Canal 的工作原理 

MySQL 主從復制過程

        1)Master 主庫將改變記錄,寫到二進制日志(Binary Log)中;

        2)Slave 從庫向 MySQL Master 發(fā)送 dump 協(xié)議,將 Master 主庫的 binary log events 拷到它的中繼日志(relay log);

        3)Slave 從庫讀取并重做中繼日志中的事件,將改變的數(shù)據(jù)同步到自己的數(shù)據(jù)庫。 

Canal 的工作原理

原理相對比較簡單:

  • canal 模擬 mysql slave 的交互協(xié)議,偽裝自己為 mysql slave,向 mysql master 發(fā)送 dump 協(xié)議

  • mysql master 收到 dump 請求,開始推送 binary log 給 slave (也就是 canal)

  • canal 解析 binary log 對象 (原始為 byte 流)。

Canal使用場景

1)原始場景: 阿里 Otter 中間件的一部分 Otter 是阿里用于進行異地數(shù)據(jù)庫之間的同步框架,Canal 是其中一部分。

 2)常見場景 1:更新緩存

3)常見場景 2:抓取業(yè)務表的新增變化數(shù)據(jù),用于制作實時統(tǒng)計。 

Canal使用實戰(zhàn) 

檢查Mysql binlog功能是否有開啟

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.10 sec)

 如果顯示狀態(tài)為OFF表示該功能未開啟,開啟binlog功能,修改 mysql 的配置文件my.ini,追加內(nèi)容:

log-bin=mysql-bin #binlog文件名
binlog_format=ROW #選擇row模式
server_id=1 #mysql實例id,不能和canal的slaveId重復

service mysql restart 重啟 mysql。

創(chuàng)建具有作為 MySQL slave的MySQL 賬號

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

下載安裝Canal服務端

 https://github.com/alibaba/canal/releases

  • canal-adapter(canal-client)

        相當于canal的客戶端,會從canal-server中獲取數(shù)據(jù)(需要配置為tcp方式),然后對數(shù)據(jù)進行同步,可以同步到MySQL、Elasticsearch和HBase等存儲中去。相較于canal-server自帶的canal.serverMode,canal-adapter提供的下游數(shù)據(jù)接受更為廣泛。

  • canal-admin

        為canal提供整體配置管理、節(jié)點運維等面向運維的功能,提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作。

  • canal-deployer(canal-server)

        可以直接監(jiān)聽MySQL的binlog,把自己偽裝成MySQL的從庫,只負責接收數(shù)據(jù),并不做處理。接收到MySQL的binlog數(shù)據(jù)后可以通過配置canal.serverMode:tcp, kafka, RocketMQ, RabbitMQ連接方式發(fā)送到對應的下游。其中tcp方式可以自定義canal客戶端進行接受數(shù)據(jù),較為靈活。

修改 instance.properties配置文件,

#需要改成數(shù)據(jù)源mysql數(shù)據(jù)庫的信息
canal.instance.master.address=127.0.0.1:3306
#需要改成自己的數(shù)據(jù)庫創(chuàng)建的從庫用戶名與密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#需要改成同步的數(shù)據(jù)庫表規(guī)則
canal.instance.filter.regex=.*\\..*

常見的匹配規(guī)則:

        所有表:.* or .\…

        canal 數(shù)據(jù)庫下所有表: canal\…*

        canal數(shù)據(jù)庫下的以canal打頭的表:canal.canal.*

        canal 數(shù)據(jù)庫下的一張表:canal.test1

        多個規(guī)則組合使用:canal\…*,mysql.test1,mysql.test2 (逗號分隔)

監(jiān)聽多個Mysql實例配置

        如果需要監(jiān)聽多個Mysql實例,通過前面 canal 架構(gòu),我們可以知道,一個 canal 服務 中可以有多個 instance,conf/下的每一個 example 即是一個實例,每個實例下面都有獨立的 配置文件。默認只有一個實例 example,如果需要多個實例處理不同的 MySQL 數(shù)據(jù)的話,直 接拷貝出多個 example,并對其重新命名,命名和配置文件中指定的名稱一致,然后修改 canal.properties 中的 canal.destinations=實例 1,實例 2,實例 3。

運行Canal服務端 sh bin/startup.sh(win下是運行 startup.bat)

Springboot集成Canal客戶端

創(chuàng)建canal-clint SpringBoot工程

 在canal-clint 模塊中配置 pom.xml

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>

 創(chuàng)建單機版Canal客戶端SimpleCanalClientExampleTest 

package com.canal.clint.clint;

/**
 * <p>
 * </p>
 * @since 2023-03-30 17:13
 */

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

public class SimpleCanalClientExampleTest {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        // 1.獲取 canal 連接對象
        CanalConnector canalConnector =
            CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        while (true) {
            // 2.獲取連接
            canalConnector.connect();
            //  3.指定要監(jiān)控的數(shù)據(jù)庫,此處指定了要監(jiān)聽的庫,會覆蓋instance.properties配置的數(shù)據(jù)庫表規(guī)則
            canalConnector.subscribe("intl.*");
            // 4.獲取 Message
            Message message = canalConnector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size() <= 0) {
                System.out.println("沒有數(shù)據(jù),休息一會");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                for (CanalEntry.Entry entry : entries) {
                    // TODO 獲取表名
                    String tableName = entry.getHeader().getTableName();
                    // TODO Entry 類型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    // TODO 判斷 entryType 是否為 ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // TODO 序列化數(shù)據(jù)
                        ByteString storeValue = entry.getStoreValue();
                        // TODO 反序列化
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // TODO 獲取事件類型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // TODO 獲取具體的數(shù)據(jù)
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        // TODO 遍歷并打印數(shù)據(jù)
                        for (CanalEntry.RowData rowData : rowDataList) {
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            JSONObject beforeData = new JSONObject();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }
                            System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:"
                                + beforeData + ",After:" + afterData);
                        }
                    }
                }
            }
        }
    }
}

 創(chuàng)建數(shù)據(jù)庫user表

CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  `remark` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

插入數(shù)據(jù)

INSERT INTO `intl`.`user`(`id`, `name`, `remark`) VALUES (1, '哈嘍', 'Canal測試');

輸出結(jié)果

注意坑:

  • 如果是基于阿里云服務器安裝的Canal,記得開放11111端口(Canal的默認端口號);
  • 如果客戶端調(diào)用了connector.subscribe("intl.*")方法,指定要監(jiān)聽的庫,會覆蓋instance.properties配置的數(shù)據(jù)庫表規(guī)則;
  • 如果Mysq binlog日志類型設置為mixed可能會導致connector.subscribe("intl.*")方法失效,進而監(jiān)聽整個Mysql實例。

Canal的部署也是支持集群的,需要配合ZooKeeper進行集群管理。Canal還有一個Web管理界面。

總結(jié)

到此這篇關(guān)于SpringBoot集成canal的文章就介紹到這了,更多相關(guān)SpringBoot集成canal內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 手寫redis@Cacheable注解?支持過期時間設置方式

    手寫redis@Cacheable注解?支持過期時間設置方式

    這篇文章主要介紹了手寫redis@Cacheable注解?支持過期時間設置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • 詳解Java如何實現(xiàn)一個像String一樣不可變的類

    詳解Java如何實現(xiàn)一個像String一樣不可變的類

    說到?String?大家都知道?String?是一個不可變的類;雖然用的很多,那不知道小伙伴們有沒有想過怎么樣創(chuàng)建一個自己的不可變的類呢?這篇文章就帶大家來實踐一下,創(chuàng)建一個自己的不可變的類
    2022-11-11
  • Java基礎之簡單介紹一下Maven

    Java基礎之簡單介紹一下Maven

    今天給大家復習一下Java基礎知識,簡單介紹Maven,文中有非常詳細的解釋,對Java初學者很有幫助喲,需要的朋友可以參考下
    2021-05-05
  • Spring中的@ExceptionHandler注解統(tǒng)一異常處理詳解

    Spring中的@ExceptionHandler注解統(tǒng)一異常處理詳解

    這篇文章主要介紹了Spring中的@ExceptionHandler注解統(tǒng)一異常處理詳解,當我們使用這個@ExceptionHandler注解時,定義一個異常的處理方法,加上@ExceptionHandler注解,這個方法就會處理類中其他方法拋出的異常,需要的朋友可以參考下
    2024-01-01
  • Springcould多模塊搭建Eureka服務器端口過程詳解

    Springcould多模塊搭建Eureka服務器端口過程詳解

    這篇文章主要介紹了Springcould多模塊搭建Eureka服務器端口過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-11-11
  • 一文帶你看懂SpringBoot中的全局配置文件

    一文帶你看懂SpringBoot中的全局配置文件

    這篇文章主要介紹了一文帶你看懂SpringBoot中的全局配置文件,全局配置文件能夠?qū)σ恍┠J配置值進行修改,Spring Boot使用一個application.properties或者application.yaml的文件作為全局配置文件,需要的朋友可以參考下
    2023-08-08
  • 詳解Java如何優(yōu)雅的使用裝飾器模式

    詳解Java如何優(yōu)雅的使用裝飾器模式

    裝飾器設計模式大家肯定都聽說過,但是有沒有使用過呢,今天本君就跟大家分享一下裝飾器模式應該如何使用,感興趣的小伙伴可以學習一下
    2022-09-09
  • idea中創(chuàng)建多module的maven工程的方法

    idea中創(chuàng)建多module的maven工程的方法

    這篇文章主要介紹了idea中創(chuàng)建多module的maven工程的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-10-10
  • 你知道Tomcat安裝之前為什么要安裝JDK

    你知道Tomcat安裝之前為什么要安裝JDK

    這篇文章主要介紹了你知道Tomcat安裝之前為什么要安裝JDK嗎?具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-03-03
  • SpringBoot項目中獲取IP地址的實現(xiàn)示例

    SpringBoot項目中獲取IP地址的實現(xiàn)示例

    OkHttp是一個由Square開發(fā)的高效、現(xiàn)代的HTTP客戶端庫,本文主要介紹了SpringBoot項目中獲取IP地址的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下
    2024-08-08

最新評論