欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合Canal方法詳解

 更新時間:2022年12月21日 09:43:07   作者:兩個蝴蝶飛  
這篇文章主要介紹了SpringBoot整合Canal,canal可以用來監(jiān)控數據庫數據的變化,從而獲得新增數據,或者修改的數據,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧

pom.xml 添加 canal.client 依賴

(1.1.5 改動很大,這兒客戶端用 1.1.4)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>top.yueshushu</groupId>
    <artifactId>learn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Canal</name>
    <description>學習 Canal</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 導入配置文件處理器,配置文件進行綁定就會有提示,需要重啟 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--導入自動熱步署的依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <!--引入MySql的驅動-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--引入springboot與mybatis整合的依賴-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.4</version>
        </dependency>
        <!-- 引入pagehelper分頁插件 -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!--添加 druid-spring-boot-starter的依賴的依賴-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.14</version>
        </dependency>
        <!--SpringBoot 的aop 模塊-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <!--添加canal的依賴. 重要.  使用  1.1.4-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.4</version>
        </dependency>
    </dependencies>
    <build>
        <!--將該目錄下的文件全部打包成類的路徑-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

業(yè)務功能處理

簡單連接程序

/**
     * 一個簡單的canal 的連接測試程序
     */
    @Test
    public void connectionTest() {
        //1. 創(chuàng)建連接  填充對應的地址信息 ,要監(jiān)控的實例和相應的用戶名和密碼
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal",
                "canal"
        );
        //2. 進行連接
        canalConnector.connect();
        log.info(">>>連接成功:{}", canalConnector);
    }

17:26:32.179 [main] INFO top.yueshushu.learn.CanalDemoTest - >>>連接成功:com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3

單次獲取數據

/**
     * 獲取數據信息. 可以發(fā)現(xiàn),未獲取到數據 .  這個應該是實時的.
     */
    @Test
    public void getDataTest() {
        //1. 創(chuàng)建連接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal",
                "canal"
        );
        // 進行連接
        canalConnector.connect();
        //3. 注冊,看使用哪個數據庫表
        canalConnector.subscribe("springboot.user");
        //4. 獲取 1條數據
        Message message = canalConnector.get(1);
        log.info("獲取的數據:id:{},數據:{}", message.getId(), message);
        if (message.getId() == -1) {
            log.info(">>>未獲取到數據");
            return;
        }
        //5. 獲取相應的數據集合
        List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            log.info(">>>獲取數據 {}", entry);
            //獲取表名
            CanalEntry.Header header = entry.getHeader();
            log.info(">>>獲取表名:{}", header.getTableName());
            CanalEntry.EntryType entryType = entry.getEntryType();
            log.info(">>獲取類型 {}:,對應的信息:{}", entryType.getNumber(), entryType.name());
            //獲取數據
            ByteString storeValue = entry.getStoreValue();
            log.info(">>>輸出存儲的值:{}", storeValue);
        }
    }

在主庫里面插入一條數據

insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用戶',24,'男','學習canal');

再次執(zhí)行:

循環(huán)獲取數據

/**
     * 獲取數據信息. 獲取現(xiàn)在的數據.  再次執(zhí)行時,就沒有這個數據了.
     */
    @Test
    public void getNowDataTest() {
        //1. 創(chuàng)建連接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal",
                "canal"
        );
        // 進行連接
        canalConnector.connect();
        //3. 注冊,看使用哪個數據庫表
        canalConnector.subscribe("springboot.user");
        for (;;) {
            //4. 獲取 1條數據
            Message message = canalConnector.get(1);
            log.info("獲取的數據:id:{},數據:{}", message.getId(), message);
            if (message.getId() == -1) {
                log.info(">>>未獲取到數據");
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            //5. 獲取相應的數據集合
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                log.info(">>>獲取數據 {}", entry);
                //獲取表名
                CanalEntry.Header header = entry.getHeader();
                log.info(">>>獲取表名:{}", header.getTableName());
                CanalEntry.EntryType entryType = entry.getEntryType();
                log.info(">>獲取類型 {}:,對應的信息:{}", entryType.getNumber(), entryType.name());
                //獲取數據
                ByteString storeValue = entry.getStoreValue();
                log.info(">>>輸出存儲的值:{}", storeValue);
            }
        }
    }

可以隨時獲取相應的數據變更信息。

會發(fā)現(xiàn), storeValue 的值是很難解讀的。 需要將這個數據解析出來。

解析 storeValue 值

/**
     * 將 storeValue 進行解析,解析成我們能看懂的語句.
     * 對數據庫 cud 進行處理操作觀看一下.
     * 發(fā)現(xiàn),點是不好的,也有多余的記錄信息.
     *
     * @throws Exception 異常
     */
    @Test
    public void convertDataTest() throws Exception {
        //1. 創(chuàng)建連接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal", "canal"
        );
        //2. 進行連接
        canalConnector.connect();
        canalConnector.subscribe("springboot.user");
        for (;;) {
            //獲取信息
            Message message = canalConnector.get(1);
            if (message.getId() == -1L) {
                // log.info("未獲取到數據");
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            List<CanalEntry.Entry> entryList = message.getEntries();
            //對獲取到的數據進行處理
            log.info(">>獲取到{}條數據", entryList.size());
            for (CanalEntry.Entry entry : entryList) {
                CanalEntry.Header header = entry.getHeader();
                log.info(">>>獲取表名:{}", header.getTableName());
                //獲取類型.
                CanalEntry.EntryType entryType = entry.getEntryType();
                log.info(">>類型編號 {},類型名稱:{}", entryType.getNumber(), entryType.name());
                //獲取存入日志的值
                ByteString storeValue = entry.getStoreValue();
                //將這個值進行解析
                CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue);
                String sql = rowChange.getSql();
                log.info(">>>獲取對應的sql:{}", sql);
                // 這個sql 可能是 批量的sql語句
                List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                for (CanalEntry.RowData rowData : rowDatasList) {
                    log.info(">>>獲取信息:{}", rowData);
                    //對數據進行處理
                    List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                    List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                    beforeColumnsList.forEach(
                            n -> log.info("哪個列{},原先是{},是否被更新{}", n.getName(),
                                    n.getValue(), n.getUpdated())
                    );
                    afterColumnsList.forEach(
                            n -> log.info("哪個列{},后來是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated())
                    );
                }
            }
        }
    }

再次執(zhí)行sql

insert into springboot.user(id,name,age,sex,description) 
values(2,'canal添加用戶2',25,'男','學習canal2');

不同的類型進行不同的處理

發(fā)現(xiàn) 其他類型的 如: TRANSACTIONBEGIN 也進行了處理

/**
     * 類型轉換數據
     *
     * @throws Exception 異常
     */
    @Test
    public void dataTypeTest() throws Exception {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal", "canal"
        );
        canalConnector.connect();
        canalConnector.subscribe("springboot.user");
        for(;;){
            Message message = canalConnector.get(1);
            if (message.getId() == -1) {
                TimeUnit.SECONDS.sleep(1);
                continue;
            }
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                CanalEntry.EntryType entryType = entry.getEntryType();
                //只要 RowData 數據類型的
                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    continue;
                }
                String tableName = entry.getHeader().getTableName();
                log.info(">>>對表 {} 進行操作", tableName);
                ByteString storeValue = entry.getStoreValue();
                RowChange rowChange = RowChange.parseFrom(storeValue);
                //行改變
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT: {
                        insertHandler(rowChange);
                        break;
                    }
                    case UPDATE: {
                        updateHandler(rowChange);
                        break;
                    }
                    case DELETE: {
                        deleteHandler(rowChange);
                        break;
                    }
                    default: {
                        break;
                    }
                }
            }
        }
    }
    private void deleteHandler(RowChange rowChange) {
        log.info(">>>>執(zhí)行刪除的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                log.info(">>>>>字段 {} 刪除數據 {}", column.getName(), column.getValue());
            }
        }
    }
    private void updateHandler(RowChange rowChange) {
        log.info(">>>執(zhí)行更新的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            beforeValueMap.forEach((column, beforeValue) -> {
                String afterValue = afterValueMap.get(column);
                Boolean update = beforeValue.equals(afterValue);
                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
                        update);
            });
        }
    }
    /**
     * 插入數據. 只有后的數據.
     *
     * @param rowChange 行改變
     */
    private void insertHandler(RowChange rowChange) {
        log.info(">>>執(zhí)行添加 的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (!StringUtils.hasText(column.getValue())) {
                    continue;
                }
                log.info("字段 {} 插入了數據 {}", column.getName(), column.getValue());
            }

        }
    }

插入,更新,刪除,分別進行了處理.

先啟動測試程序:

不打印任何信息。

主表執(zhí)行添加語句:

insert into springboot.user(id,name,age,sex,description) 
values(4,'canal添加用戶4',25,'男','學習canal4');

會打印信息:

這個可讀性就非常高了.

主表執(zhí)行修改的操作.

update springboot.user set name='開開心心',age=26,description='岳澤霖' where id =4;

更新時,若每一個字段都跟原先一樣,不會產生日志消費。

主表執(zhí)行刪除的操作:

delete from springboot.user where id =4;

上面的獲取,都是一條數據一條數據獲取的。效率比較低

一次性獲取多條數據

/**
     * 一次性獲取多條數據。
     * sql 執(zhí)行多條。
     */
    @Test
    public void dataMoreTest() throws Exception {
        //1. 創(chuàng)建 canal連接對象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal",
                "canal"
        );
        canalConnector.connect();
        // 訂閱哪個對象
        canalConnector.subscribe("springboot.user");
        for (; ; ) {
           // Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
            Message message = canalConnector.get(3);
            if (message.getId() == -1) {
                // 未獲取到數據
                continue;
            }
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                CanalEntry.EntryType entryType = entry.getEntryType();
                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    continue;
                }
                String tableName = entry.getHeader().getTableName();
                log.info(">>>>對表{} 執(zhí)行操作", tableName);
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                //對類型進行處理
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT: {
                        insertHandler(rowChange);
                        break;
                    }
                    case UPDATE: {
                        updateHandler(rowChange);
                        break;
                    }
                    case DELETE: {
                        deleteHandler(rowChange);
                        break;
                    }
                    default: {
                        break;
                    }
                }
            }
        }
    }
    private void deleteHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>>執(zhí)行刪除的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                log.info(">>>>>字段 {} 刪除數據 {}", column.getName(), column.getValue());
            }
        }
    }
    private void updateHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>執(zhí)行更新的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            beforeValueMap.forEach((column, beforeValue) -> {
                String afterValue = afterValueMap.get(column);
                Boolean update = beforeValue.equals(afterValue);
                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
                        update);
            });
        }
    }
    /**
     * 插入數據. 只有后的數據.
     *
     * @param rowChange 行改變
     */
    private void insertHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>執(zhí)行添加 的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (!StringUtils.hasText(column.getValue())) {
                    continue;
                }
                log.info("字段 {} 插入了數據 {}", column.getName(), column.getValue());
            }
        }
    }

修改點:

// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
    Message message = canalConnector.get(3);

.get(3) 表示 一次性獲取3條記錄.

canalConnector.get(3, 5L, TimeUnit.SECONDS); 表示5秒之內獲取3條記錄,

有兩個觸發(fā)條件,一個是獲取了3條,一個是到了5秒。

效果展示信息與之前是一致的,就不重新演示了。

ack 配置信息

/**
     * 一次性獲取多條數據。
     * sql 執(zhí)行多條。 
     */
    @Test
    public void dataMoreTest() throws Exception {
        //1. 創(chuàng)建 canal連接對象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal",
                "canal"
        );
        canalConnector.connect();
        // 訂閱哪個對象
        canalConnector.subscribe("springboot.user");
        for (; ; ) {
             Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
            if (message.getId() == -1) {
                // 未獲取到數據
                TimeUnit.MILLISECONDS.sleep(500);
                continue;
            }
            log.info(">>>>獲取對應的 id: {}",message.getId());
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                CanalEntry.EntryType entryType = entry.getEntryType();
                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    continue;
                }
                String tableName = entry.getHeader().getTableName();
                log.info(">>>>對表{} 執(zhí)行操作", tableName);
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                //對類型進行處理
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT: {
                        insertHandler(rowChange);
                        break;
                    }
                    case UPDATE: {
                        updateHandler(rowChange);
                        break;
                    }
                    case DELETE: {
                        deleteHandler(rowChange);
                        break;
                    }
                    default: {
                        break;
                    }
                }
            }
            //進行回滾
           // canalConnector.rollback();
            //確認ack 配置
           canalConnector.ack(message.getId());
        }
    }
    private void deleteHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>>執(zhí)行刪除的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                log.info(">>>>>字段 {} 刪除數據 {}", column.getName(), column.getValue());
            }
        }
    }
    private void updateHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>執(zhí)行更新的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            beforeValueMap.forEach((column, beforeValue) -> {
                String afterValue = afterValueMap.get(column);
                Boolean update = beforeValue.equals(afterValue);
                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
                        update);
            });
        }
    }
    /**
     * 插入數據. 只有后的數據.
     *
     * @param rowChange 行改變
     */
    private void insertHandler(CanalEntry.RowChange rowChange) {
        log.info(">>>執(zhí)行添加 的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (!StringUtils.hasText(column.getValue())) {
                    continue;
                }
                log.info("字段 {} 插入了數據 {}", column.getName(), column.getValue());
            }
        }
    }

主要信息:

Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);

//進行回滾 // canalConnector.rollback();

//確認ack 配置canalConnector.ack(message.getId());

手動確認消息消費了.

當消息 rollback() 回滾后,會再次消費這條消息.

canalConnector.rollback();

執(zhí)行語句:

insert into springboot.user(id,name,age,sex,description) 
values(5,'canal添加用戶5',25,'男','學習canal5');

如果變成 手動確認,

canalConnector.ack(message.getId());

則只消費一次.

到此這篇關于SpringBoot整合Canal方法詳解的文章就介紹到這了,更多相關SpringBoot整合Canal內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • MyBatis和MyBatis Plus并存問題及解決

    MyBatis和MyBatis Plus并存問題及解決

    最近需要使用MyBatis和MyBatis Plus,就會導致MyBatis和MyBatis Plus并存,本文主要介紹了MyBatis和MyBatis Plus并存問題及解決,具有一定的參考價值,感興趣的可以了解一下
    2024-07-07
  • ReentrantLock可重入鎖原理解析

    ReentrantLock可重入鎖原理解析

    這篇文章主要為大家介紹了ReentrantLock可重入鎖原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-10-10
  • 詳解SpringMVC使用MultipartFile實現(xiàn)文件的上傳

    詳解SpringMVC使用MultipartFile實現(xiàn)文件的上傳

    本篇文章主要介紹了SpringMVC使用MultipartFile實現(xiàn)文件的上傳,本地的文件上傳到資源服務器上,比較好的辦法就是通過ftp上傳。這里是結合SpringMVC+ftp的形式上傳的,有興趣的可以了解一下。
    2016-12-12
  • Java國密加密SM2代碼詳細使用步驟

    Java國密加密SM2代碼詳細使用步驟

    SM2算法可以用較少的計算能力提供比RSA算法更高的安全強度,而所需的密鑰長度卻遠比RSA算法低,下面這篇文章主要給大家介紹了關于Java國密加密SM2代碼的相關資料,需要的朋友可以參考下
    2024-07-07
  • idea 配置checkstyle詳細步驟

    idea 配置checkstyle詳細步驟

    checkstyle是提高代碼質量,檢查代碼規(guī)范的很好用的一款工具,本文簡單介紹一下集成的步驟,并提供一份完整的checkstyle的代碼規(guī)范格式文件,以及常見的格式問題的解決方法,需要的朋友可以參考下
    2023-11-11
  • JMeter配置元件詳解

    JMeter配置元件詳解

    JMeter提供了豐富的配置元件,常用的包括參數化配置元件、HTTP請求默認值、HTTP信息頭管理器、計數器等,本文就詳細的介紹一下這些元件的使用,感興趣的可以了解一下
    2021-12-12
  • 深入了解Java中Synchronized關鍵字的實現(xiàn)原理

    深入了解Java中Synchronized關鍵字的實現(xiàn)原理

    synchronized是JVM的內置鎖,基于Monitor機制實現(xiàn),每一個對象都有一個與之關聯(lián)的監(jiān)視器?(Monitor),這個監(jiān)視器充當了一種互斥鎖的角色,本文就詳細聊一聊Synchronized關鍵字的實現(xiàn)原理,需要的朋友可以參考下
    2023-06-06
  • MyBatis的逆向工程詳解

    MyBatis的逆向工程詳解

    這篇文章主要介紹了MyBatis的逆向工程詳解,詳細的介紹了逆行工程的概念和實現(xiàn),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-08-08
  • Java中Object類常用的12個方法(小結)

    Java中Object類常用的12個方法(小結)

    Java 中的 Object 方法在面試中是一個非常高頻的點,本文主要介紹了Java中Object類常用的12個方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • Java Web實現(xiàn)文件上傳和下載接口功能詳解

    Java Web實現(xiàn)文件上傳和下載接口功能詳解

    這篇文章主要為大家詳細介紹了Java Web實現(xiàn)文件上傳和下載接口功能的相關知識,文中的示例代碼講解詳細,對我們學習有一定的借鑒價值,需要的可以參考一下
    2022-12-12

最新評論