SpringBoot整合Canal方法詳解
pom.xml 添加 canal.client 依賴
(1.1.5 改動(dòng)很大,這兒客戶端用 1.1.4)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>top.yueshushu</groupId>
<artifactId>learn</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Canal</name>
<description>學(xué)習(xí) Canal</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 導(dǎo)入配置文件處理器,配置文件進(jìn)行綁定就會(huì)有提示,需要重啟 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!--導(dǎo)入自動(dòng)熱步署的依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!--引入MySql的驅(qū)動(dòng)-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--引入springboot與mybatis整合的依賴-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<!-- 引入pagehelper分頁(yè)插件 -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.5</version>
</dependency>
<!--添加 druid-spring-boot-starter的依賴的依賴-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.14</version>
</dependency>
<!--SpringBoot 的aop 模塊-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!--添加canal的依賴. 重要. 使用 1.1.4-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.4</version>
</dependency>
</dependencies>
<build>
<!--將該目錄下的文件全部打包成類的路徑-->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>業(yè)務(wù)功能處理
簡(jiǎn)單連接程序
/**
* 一個(gè)簡(jiǎn)單的canal 的連接測(cè)試程序
*/
@Test
public void connectionTest() {
//1. 創(chuàng)建連接 填充對(duì)應(yīng)的地址信息 ,要監(jiān)控的實(shí)例和相應(yīng)的用戶名和密碼
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal",
"canal"
);
//2. 進(jìn)行連接
canalConnector.connect();
log.info(">>>連接成功:{}", canalConnector);
}
17:26:32.179 [main] INFO top.yueshushu.learn.CanalDemoTest - >>>連接成功:com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3
單次獲取數(shù)據(jù)
/**
* 獲取數(shù)據(jù)信息. 可以發(fā)現(xiàn),未獲取到數(shù)據(jù) . 這個(gè)應(yīng)該是實(shí)時(shí)的.
*/
@Test
public void getDataTest() {
//1. 創(chuàng)建連接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"canal",
"canal"
);
// 進(jìn)行連接
canalConnector.connect();
//3. 注冊(cè),看使用哪個(gè)數(shù)據(jù)庫(kù)表
canalConnector.subscribe("springboot.user");
//4. 獲取 1條數(shù)據(jù)
Message message = canalConnector.get(1);
log.info("獲取的數(shù)據(jù):id:{},數(shù)據(jù):{}", message.getId(), message);
if (message.getId() == -1) {
log.info(">>>未獲取到數(shù)據(jù)");
return;
}
//5. 獲取相應(yīng)的數(shù)據(jù)集合
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
log.info(">>>獲取數(shù)據(jù) {}", entry);
//獲取表名
CanalEntry.Header header = entry.getHeader();
log.info(">>>獲取表名:{}", header.getTableName());
CanalEntry.EntryType entryType = entry.getEntryType();
log.info(">>獲取類型 {}:,對(duì)應(yīng)的信息:{}", entryType.getNumber(), entryType.name());
//獲取數(shù)據(jù)
ByteString storeValue = entry.getStoreValue();
log.info(">>>輸出存儲(chǔ)的值:{}", storeValue);
}
}

在主庫(kù)里面插入一條數(shù)據(jù)
insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用戶',24,'男','學(xué)習(xí)canal');
再次執(zhí)行:

循環(huán)獲取數(shù)據(jù)
/**
* 獲取數(shù)據(jù)信息. 獲取現(xiàn)在的數(shù)據(jù). 再次執(zhí)行時(shí),就沒(méi)有這個(gè)數(shù)據(jù)了.
*/
@Test
public void getNowDataTest() {
//1. 創(chuàng)建連接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"canal",
"canal"
);
// 進(jìn)行連接
canalConnector.connect();
//3. 注冊(cè),看使用哪個(gè)數(shù)據(jù)庫(kù)表
canalConnector.subscribe("springboot.user");
for (;;) {
//4. 獲取 1條數(shù)據(jù)
Message message = canalConnector.get(1);
log.info("獲取的數(shù)據(jù):id:{},數(shù)據(jù):{}", message.getId(), message);
if (message.getId() == -1) {
log.info(">>>未獲取到數(shù)據(jù)");
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
//5. 獲取相應(yīng)的數(shù)據(jù)集合
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
log.info(">>>獲取數(shù)據(jù) {}", entry);
//獲取表名
CanalEntry.Header header = entry.getHeader();
log.info(">>>獲取表名:{}", header.getTableName());
CanalEntry.EntryType entryType = entry.getEntryType();
log.info(">>獲取類型 {}:,對(duì)應(yīng)的信息:{}", entryType.getNumber(), entryType.name());
//獲取數(shù)據(jù)
ByteString storeValue = entry.getStoreValue();
log.info(">>>輸出存儲(chǔ)的值:{}", storeValue);
}
}
}可以隨時(shí)獲取相應(yīng)的數(shù)據(jù)變更信息。
會(huì)發(fā)現(xiàn), storeValue 的值是很難解讀的。 需要將這個(gè)數(shù)據(jù)解析出來(lái)。
解析 storeValue 值
/**
* 將 storeValue 進(jìn)行解析,解析成我們能看懂的語(yǔ)句.
* 對(duì)數(shù)據(jù)庫(kù) cud 進(jìn)行處理操作觀看一下.
* 發(fā)現(xiàn),點(diǎn)是不好的,也有多余的記錄信息.
*
* @throws Exception 異常
*/
@Test
public void convertDataTest() throws Exception {
//1. 創(chuàng)建連接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"canal", "canal"
);
//2. 進(jìn)行連接
canalConnector.connect();
canalConnector.subscribe("springboot.user");
for (;;) {
//獲取信息
Message message = canalConnector.get(1);
if (message.getId() == -1L) {
// log.info("未獲取到數(shù)據(jù)");
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
List<CanalEntry.Entry> entryList = message.getEntries();
//對(duì)獲取到的數(shù)據(jù)進(jìn)行處理
log.info(">>獲取到{}條數(shù)據(jù)", entryList.size());
for (CanalEntry.Entry entry : entryList) {
CanalEntry.Header header = entry.getHeader();
log.info(">>>獲取表名:{}", header.getTableName());
//獲取類型.
CanalEntry.EntryType entryType = entry.getEntryType();
log.info(">>類型編號(hào) {},類型名稱:{}", entryType.getNumber(), entryType.name());
//獲取存入日志的值
ByteString storeValue = entry.getStoreValue();
//將這個(gè)值進(jìn)行解析
CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue);
String sql = rowChange.getSql();
log.info(">>>獲取對(duì)應(yīng)的sql:{}", sql);
// 這個(gè)sql 可能是 批量的sql語(yǔ)句
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
log.info(">>>獲取信息:{}", rowData);
//對(duì)數(shù)據(jù)進(jìn)行處理
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
beforeColumnsList.forEach(
n -> log.info("哪個(gè)列{},原先是{},是否被更新{}", n.getName(),
n.getValue(), n.getUpdated())
);
afterColumnsList.forEach(
n -> log.info("哪個(gè)列{},后來(lái)是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated())
);
}
}
}
}再次執(zhí)行sql
insert into springboot.user(id,name,age,sex,description) values(2,'canal添加用戶2',25,'男','學(xué)習(xí)canal2');

不同的類型進(jìn)行不同的處理
發(fā)現(xiàn) 其他類型的 如: TRANSACTIONBEGIN 也進(jìn)行了處理
/**
* 類型轉(zhuǎn)換數(shù)據(jù)
*
* @throws Exception 異常
*/
@Test
public void dataTypeTest() throws Exception {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal", "canal"
);
canalConnector.connect();
canalConnector.subscribe("springboot.user");
for(;;){
Message message = canalConnector.get(1);
if (message.getId() == -1) {
TimeUnit.SECONDS.sleep(1);
continue;
}
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
CanalEntry.EntryType entryType = entry.getEntryType();
//只要 RowData 數(shù)據(jù)類型的
if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
continue;
}
String tableName = entry.getHeader().getTableName();
log.info(">>>對(duì)表 {} 進(jìn)行操作", tableName);
ByteString storeValue = entry.getStoreValue();
RowChange rowChange = RowChange.parseFrom(storeValue);
//行改變
CanalEntry.EventType eventType = rowChange.getEventType();
switch (eventType) {
case INSERT: {
insertHandler(rowChange);
break;
}
case UPDATE: {
updateHandler(rowChange);
break;
}
case DELETE: {
deleteHandler(rowChange);
break;
}
default: {
break;
}
}
}
}
}
private void deleteHandler(RowChange rowChange) {
log.info(">>>>執(zhí)行刪除的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
log.info(">>>>>字段 {} 刪除數(shù)據(jù) {}", column.getName(), column.getValue());
}
}
}
private void updateHandler(RowChange rowChange) {
log.info(">>>執(zhí)行更新的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
Map<String, String> afterValueMap = afterColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
beforeValueMap.forEach((column, beforeValue) -> {
String afterValue = afterValueMap.get(column);
Boolean update = beforeValue.equals(afterValue);
log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
update);
});
}
}
/**
* 插入數(shù)據(jù). 只有后的數(shù)據(jù).
*
* @param rowChange 行改變
*/
private void insertHandler(RowChange rowChange) {
log.info(">>>執(zhí)行添加 的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
if (!StringUtils.hasText(column.getValue())) {
continue;
}
log.info("字段 {} 插入了數(shù)據(jù) {}", column.getName(), column.getValue());
}
}
}
插入,更新,刪除,分別進(jìn)行了處理.
先啟動(dòng)測(cè)試程序:

不打印任何信息。
主表執(zhí)行添加語(yǔ)句:
insert into springboot.user(id,name,age,sex,description) values(4,'canal添加用戶4',25,'男','學(xué)習(xí)canal4');
會(huì)打印信息:

這個(gè)可讀性就非常高了.
主表執(zhí)行修改的操作.
update springboot.user set name='開開心心',age=26,description='岳澤霖' where id =4;
更新時(shí),若每一個(gè)字段都跟原先一樣,不會(huì)產(chǎn)生日志消費(fèi)。

主表執(zhí)行刪除的操作:
delete from springboot.user where id =4;

上面的獲取,都是一條數(shù)據(jù)一條數(shù)據(jù)獲取的。效率比較低
一次性獲取多條數(shù)據(jù)
/**
* 一次性獲取多條數(shù)據(jù)。
* sql 執(zhí)行多條。
*/
@Test
public void dataMoreTest() throws Exception {
//1. 創(chuàng)建 canal連接對(duì)象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal",
"canal"
);
canalConnector.connect();
// 訂閱哪個(gè)對(duì)象
canalConnector.subscribe("springboot.user");
for (; ; ) {
// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
Message message = canalConnector.get(3);
if (message.getId() == -1) {
// 未獲取到數(shù)據(jù)
continue;
}
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
CanalEntry.EntryType entryType = entry.getEntryType();
if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
continue;
}
String tableName = entry.getHeader().getTableName();
log.info(">>>>對(duì)表{} 執(zhí)行操作", tableName);
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
//對(duì)類型進(jìn)行處理
CanalEntry.EventType eventType = rowChange.getEventType();
switch (eventType) {
case INSERT: {
insertHandler(rowChange);
break;
}
case UPDATE: {
updateHandler(rowChange);
break;
}
case DELETE: {
deleteHandler(rowChange);
break;
}
default: {
break;
}
}
}
}
}
private void deleteHandler(CanalEntry.RowChange rowChange) {
log.info(">>>>執(zhí)行刪除的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
log.info(">>>>>字段 {} 刪除數(shù)據(jù) {}", column.getName(), column.getValue());
}
}
}
private void updateHandler(CanalEntry.RowChange rowChange) {
log.info(">>>執(zhí)行更新的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
Map<String, String> afterValueMap = afterColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
beforeValueMap.forEach((column, beforeValue) -> {
String afterValue = afterValueMap.get(column);
Boolean update = beforeValue.equals(afterValue);
log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
update);
});
}
}
/**
* 插入數(shù)據(jù). 只有后的數(shù)據(jù).
*
* @param rowChange 行改變
*/
private void insertHandler(CanalEntry.RowChange rowChange) {
log.info(">>>執(zhí)行添加 的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
if (!StringUtils.hasText(column.getValue())) {
continue;
}
log.info("字段 {} 插入了數(shù)據(jù) {}", column.getName(), column.getValue());
}
}
}修改點(diǎn):
// Message message = canalConnector.get(3, 5L, TimeUnit.SECONDS);
Message message = canalConnector.get(3);
.get(3) 表示 一次性獲取3條記錄.
canalConnector.get(3, 5L, TimeUnit.SECONDS); 表示5秒之內(nèi)獲取3條記錄,
有兩個(gè)觸發(fā)條件,一個(gè)是獲取了3條,一個(gè)是到了5秒。
效果展示信息與之前是一致的,就不重新演示了。
ack 配置信息
/**
* 一次性獲取多條數(shù)據(jù)。
* sql 執(zhí)行多條。
*/
@Test
public void dataMoreTest() throws Exception {
//1. 創(chuàng)建 canal連接對(duì)象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
"127.0.0.1", 11111
),
"example",
"canal",
"canal"
);
canalConnector.connect();
// 訂閱哪個(gè)對(duì)象
canalConnector.subscribe("springboot.user");
for (; ; ) {
Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
if (message.getId() == -1) {
// 未獲取到數(shù)據(jù)
TimeUnit.MILLISECONDS.sleep(500);
continue;
}
log.info(">>>>獲取對(duì)應(yīng)的 id: {}",message.getId());
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
CanalEntry.EntryType entryType = entry.getEntryType();
if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
continue;
}
String tableName = entry.getHeader().getTableName();
log.info(">>>>對(duì)表{} 執(zhí)行操作", tableName);
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
//對(duì)類型進(jìn)行處理
CanalEntry.EventType eventType = rowChange.getEventType();
switch (eventType) {
case INSERT: {
insertHandler(rowChange);
break;
}
case UPDATE: {
updateHandler(rowChange);
break;
}
case DELETE: {
deleteHandler(rowChange);
break;
}
default: {
break;
}
}
}
//進(jìn)行回滾
// canalConnector.rollback();
//確認(rèn)ack 配置
canalConnector.ack(message.getId());
}
}
private void deleteHandler(CanalEntry.RowChange rowChange) {
log.info(">>>>執(zhí)行刪除的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
log.info(">>>>>字段 {} 刪除數(shù)據(jù) {}", column.getName(), column.getValue());
}
}
}
private void updateHandler(CanalEntry.RowChange rowChange) {
log.info(">>>執(zhí)行更新的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
Map<String, String> afterValueMap = afterColumnsList.stream().collect(
Collectors.toMap(
CanalEntry.Column::getName,
CanalEntry.Column::getValue
)
);
beforeValueMap.forEach((column, beforeValue) -> {
String afterValue = afterValueMap.get(column);
Boolean update = beforeValue.equals(afterValue);
log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
update);
});
}
}
/**
* 插入數(shù)據(jù). 只有后的數(shù)據(jù).
*
* @param rowChange 行改變
*/
private void insertHandler(CanalEntry.RowChange rowChange) {
log.info(">>>執(zhí)行添加 的方法");
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
if (!StringUtils.hasText(column.getValue())) {
continue;
}
log.info("字段 {} 插入了數(shù)據(jù) {}", column.getName(), column.getValue());
}
}
}主要信息:
Message message = canalConnector.getWithoutAck(3, 2L, TimeUnit.SECONDS);
//進(jìn)行回滾 // canalConnector.rollback();
//確認(rèn)ack 配置canalConnector.ack(message.getId());
手動(dòng)確認(rèn)消息消費(fèi)了.
當(dāng)消息 rollback() 回滾后,會(huì)再次消費(fèi)這條消息.
canalConnector.rollback();
執(zhí)行語(yǔ)句:
insert into springboot.user(id,name,age,sex,description) values(5,'canal添加用戶5',25,'男','學(xué)習(xí)canal5');

如果變成 手動(dòng)確認(rèn),
canalConnector.ack(message.getId());
則只消費(fèi)一次.
到此這篇關(guān)于SpringBoot整合Canal方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot整合Canal內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis和MyBatis Plus并存問(wèn)題及解決
最近需要使用MyBatis和MyBatis Plus,就會(huì)導(dǎo)致MyBatis和MyBatis Plus并存,本文主要介紹了MyBatis和MyBatis Plus并存問(wèn)題及解決,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07
詳解SpringMVC使用MultipartFile實(shí)現(xiàn)文件的上傳
本篇文章主要介紹了SpringMVC使用MultipartFile實(shí)現(xiàn)文件的上傳,本地的文件上傳到資源服務(wù)器上,比較好的辦法就是通過(guò)ftp上傳。這里是結(jié)合SpringMVC+ftp的形式上傳的,有興趣的可以了解一下。2016-12-12
深入了解Java中Synchronized關(guān)鍵字的實(shí)現(xiàn)原理
synchronized是JVM的內(nèi)置鎖,基于Monitor機(jī)制實(shí)現(xiàn),每一個(gè)對(duì)象都有一個(gè)與之關(guān)聯(lián)的監(jiān)視器?(Monitor),這個(gè)監(jiān)視器充當(dāng)了一種互斥鎖的角色,本文就詳細(xì)聊一聊Synchronized關(guān)鍵字的實(shí)現(xiàn)原理,需要的朋友可以參考下2023-06-06
Java中Object類常用的12個(gè)方法(小結(jié))
Java 中的 Object 方法在面試中是一個(gè)非常高頻的點(diǎn),本文主要介紹了Java中Object類常用的12個(gè)方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
Java Web實(shí)現(xiàn)文件上傳和下載接口功能詳解
這篇文章主要為大家詳細(xì)介紹了Java Web實(shí)現(xiàn)文件上傳和下載接口功能的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)有一定的借鑒價(jià)值,需要的可以參考一下2022-12-12

