使用Canal監(jiān)聽MySQL Binlog日志的實(shí)現(xiàn)方案
引入
原始實(shí)現(xiàn)
首先看下面的代碼,這段代碼的作用是關(guān)閉超時(shí)未支付的訂單,包括兩個(gè)步驟
1、將訂單狀態(tài)修改為取消狀態(tài)
2、調(diào)用遠(yuǎn)程服務(wù),恢復(fù)數(shù)據(jù)庫和緩存中的庫存
@Override
@Transactional(rollbackFor = Throwable.class)
public void closeOrder(String orderSn) {
OrderDO orderDO = baseMapper.selectByOrderSn(orderSn);
if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) {
// --if-- 到時(shí)間了,訂單還沒有支付,取消該訂單
// 修改訂單狀態(tài)為取消狀態(tài)
orderDO.setOrderStatus(OrderStatusConstant.CANCEL);
// 分片鍵不能更新
orderDO.setVenueId(null);
baseMapper.updateByOrderSn(orderDO);
// 還原數(shù)據(jù)庫和緩存中的庫存
Result<OrderDO> result = null;
try {
result = timePeriodFeignService.release(TimePeriodStockRestoreReqDTO.builder()
.timePeriodId(orderDO.getTimePeriodId())
.partitionId(orderDO.getPartitionId())
.courtIndex(orderDO.getCourtIndex())
.userId(orderDO.getUserId())
.build());
} catch (Exception e) {
// --if-- 庫存恢復(fù)遠(yuǎn)程接口調(diào)用失敗
throw new ServiceException(BaseErrorCode.REMOTE_ERROR);
}
if (result != null && !result.isSuccess()) {
// 因?yàn)槭褂昧薚ransactional,如果這里出現(xiàn)了異常,訂單的關(guān)閉修改會(huì)回退
throw new ServiceException("調(diào)用遠(yuǎn)程服務(wù)釋放時(shí)間段數(shù)據(jù)庫庫存失敗", BaseErrorCode.SERVICE_ERROR);
}
}
}
存在問題
為了確保這兩個(gè)步驟要么全部成功,要么全部失敗,在這段代碼中,使用了@Transactional注解來管理本地?cái)?shù)據(jù)庫事務(wù)。如果說調(diào)用遠(yuǎn)程服務(wù)恢復(fù)庫存時(shí),調(diào)用失敗,事務(wù)會(huì)進(jìn)行回滾,即訂單狀態(tài)還是保持原樣,不會(huì)被取消。然而,在分布式環(huán)境中,當(dāng)涉及到調(diào)用遠(yuǎn)程服務(wù)時(shí),@Transactional只能保證本地事務(wù)的一致性,而不能保證跨服務(wù)的一致性。例如在極端情況下會(huì)出現(xiàn)如下問題:
- 遠(yuǎn)程服務(wù)實(shí)際上已經(jīng)成功處理了請求,完成了庫存的恢復(fù)。
- 但由于網(wǎng)絡(luò)延遲或中斷,本地服務(wù)未能接收到遠(yuǎn)程服務(wù)的成功響應(yīng)。
- 結(jié)果是本地服務(wù)認(rèn)為庫存恢復(fù)失敗,觸發(fā)了本地事務(wù)的回滾,使訂單狀態(tài)回到未取消的狀態(tài)。
這種情況下,就會(huì)產(chǎn)生事務(wù)不一致的問題:庫存已經(jīng)被正確地恢復(fù),但訂單仍然處于可支付狀態(tài)。這可能導(dǎo)致客戶繼續(xù)嘗試支付一個(gè)實(shí)際上應(yīng)該被取消的訂單,或者導(dǎo)致庫存數(shù)據(jù)與訂單狀態(tài)之間的不匹配。
替代方案
開啟 MySQL 的 Binlog 日志,通過 Canal 監(jiān)聽訂單狀態(tài)的變化并異步發(fā)送消息至消息隊(duì)列。消費(fèi)者從隊(duì)列中接收消息后,如果檢測到訂單的狀態(tài)是從未支付修改為已取消,就負(fù)責(zé)調(diào)用庫存服務(wù)恢復(fù)商品庫存。
這種方式解耦了訂單服務(wù)與庫存服務(wù),提高了系統(tǒng)的容錯(cuò)性和處理效率,支持異步操作和流量削峰,確保了最終一致性,并通過冪等性設(shè)計(jì)保障了數(shù)據(jù)的準(zhǔn)確性和系統(tǒng)的穩(wěn)定性。
**為什么說確保了最終一致性?**當(dāng)訂單關(guān)閉之后,消息隊(duì)列會(huì)保證消息至少被成功消費(fèi)一次,即庫存如果還原失敗,消息隊(duì)列會(huì)多次重發(fā)消息,如果達(dá)到重發(fā)上限可以接入人工來處理死信隊(duì)列的消息
操作
MySQL 開啟 Binlog
log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)

通過 CMD 命令行窗口重啟 MySQL 數(shù)據(jù)庫,讓配置生效
C:\Windows\System32>net stop mysql8 mysql8 服務(wù)正在停止.. mysql8 服務(wù)已成功停止。 C:\Windows\System32>net start mysql8 mysql8 服務(wù)正在啟動(dòng) . mysql8 服務(wù)已經(jīng)啟動(dòng)成功。
連接進(jìn)入MySQL之后,使用show variables like 'log_%';查看BinLog啟動(dòng)是否成功,如果查詢出來log_bin對應(yīng)的值為ON,說明啟動(dòng)成功
C:\Windows\System32>mysql -u root -p12345678 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 9 Server version: 8.0.27 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> show variables like 'log_%'; +----------------------------------------+----------------------------------------------------------------------------------+ | Variable_name | Value | +----------------------------------------+----------------------------------------------------------------------------------+ | log_bin | ON | | log_bin_basename | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin | | log_bin_index | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | log_error | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\DESKTOP-TQSE9JO.err | | log_error_services | log_filter_internal; log_sink_internal | | log_error_suppression_list | | | log_error_verbosity | 2 | | log_output | FILE | | log_queries_not_using_indexes | OFF | | log_raw | OFF | | log_replica_updates | ON | | log_slave_updates | ON | | log_slow_admin_statements | OFF | | log_slow_extra | OFF | | log_slow_replica_statements | OFF | | log_slow_slave_statements | OFF | | log_statements_unsafe_for_binlog | ON | | log_throttle_queries_not_using_indexes | 0 | | log_timestamps | UTC | +----------------------------------------+----------------------------------------------------------------------------------+ 21 rows in set, 1 warning (0.01 sec)

給canal創(chuàng)建一個(gè)單獨(dú)使用的賬號來進(jìn)行 Binlog 的同步和監(jiān)聽
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
執(zhí)行成功,MySQL的user表就多了一天canal的記錄
mysql> CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES; Query OK, 0 rows affected (0.01 sec) Query OK, 0 rows affected (0.00 sec) Query OK, 0 rows affected (0.00 sec)

Canal 中間件
下載方式
源碼地址:https://github.com/alibaba/canal
下載地址:https://github.com/alibaba/canal/releases

解壓之后,目錄如下:

修改配置文件
instance.properties
首先修改配置文件instance.properties
- 修改
canal.instance.master.address,指向真正的 MySQL 的 IP 和端口 - 修改
canal.mq.topic,聲明發(fā)送到消息隊(duì)列的消息的 Topic



如果你給canal提供的賬號密碼不是canal,需要修改

最后,并不是所有數(shù)據(jù)庫或數(shù)據(jù)表改動(dòng)我們都需要做出反應(yīng)的,這里我只針對我需要監(jiān)聽的數(shù)據(jù)庫和數(shù)據(jù)表即可,通過設(shè)置canal.instance.filter.regex進(jìn)行過濾
我的設(shè)置為canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))$,解釋如下:
^表示字符串的開始。(venue-reservation)匹配名為venue-reservation的數(shù)據(jù)庫。這里使用了括號()來創(chuàng)建一個(gè)捕獲組。如果說項(xiàng)目使用分庫,需要匹配多個(gè)數(shù)據(jù)庫的話,可以這樣寫(venue-reservation_0|venue-reservation_1|venue-reservation_2)\\.匹配實(shí)際的點(diǎn)號,這是數(shù)據(jù)庫名和表名之間的分隔符。(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))匹配特定模式的表名。這里也是用括號創(chuàng)建了一個(gè)捕獲組。因?yàn)轫?xiàng)目對time_period_order進(jìn)行了分表,所以需要這樣設(shè)置。具體地:time_period_order_匹配固定前綴time_period_order_。([0-9]|1[0-9]|2[0-9]|3[0-1])這一部分是用來匹配數(shù)字部分,看起來像是為了匹配類似于時(shí)間間隔或者編號的表。更具體地說:[0-9]匹配從0到9的任何數(shù)字。1[0-9]匹配從10到19的兩位數(shù)。2[0-9]匹配從20到29的兩位數(shù)。3[0-1]匹配30或31。
$表示字符串的結(jié)束。
canal.properties
修改配置文件canal.properties
- 因?yàn)槲宜褂玫南㈥?duì)列是RocketMQ,首先將模式
canal.serverMode設(shè)置為RocketMQ - 將
rocketmq.namesrv.addr指向的RocketMQ服務(wù)器指向正確的IP和端口



啟動(dòng)
啟動(dòng)Canal,如果是win,直接雙擊startup.bat啟動(dòng)即可

下圖啟動(dòng)之后彈出的窗口,如果要關(guān)閉 canal ,就點(diǎn)右上角的 x 即可

想要查看 canal 是否啟動(dòng)成功,可以通過日志文件查看,如果出現(xiàn)如下紅色部分的輸出,說明啟動(dòng)成功

測試
將訂單狀態(tài)從0改成2

去RocketMQ中查看收到的消息,topic選擇剛剛配置文件中設(shè)置的vrs_canal_common_topic

剛剛接收到的消息詳情如下

使用json格式化工具查看
{
"data": [
{
"order_sn": "1866821518450221056850432",
"is_deleted": "0",
"order_time": "2024-12-11 20:25:09",
"venue_id": "1865271207637635072",
"partition_id": "1865276571322015744",
"partition_index": "0",
"time_period_id": "1866776397058904064",
"user_id": "1864637732760850432",
"order_status": "2",
"payment_method": null,
"transaction_id": null,
"pay_time": null,
"pay_amount": null,
"refund_status": null,
"refund_amount": null,
"refund_time": null
}
],
"database": "venue-reservation",
"es": 1734228903000,
"gtid": "",
"id": 16,
"isDdl": false,
"mysqlType": {
"order_sn": "varchar(30)",
"is_deleted": "tinyint",
"order_time": "datetime",
"venue_id": "bigint",
"partition_id": "bigint",
"partition_index": "int",
"time_period_id": "bigint",
"user_id": "bigint",
"order_status": "tinyint",
"payment_method": "tinyint",
"transaction_id": "varchar(255)",
"pay_time": "datetime",
"pay_amount": "decimal(10,2)",
"refund_status": "tinyint",
"refund_amount": "decimal(10,2)",
"refund_time": "datetime"
},
"old": [
{
"order_status": "0"
}
],
"pkNames": [
"order_sn"
],
"sql": "",
"sqlType": {
"order_sn": 12,
"is_deleted": -6,
"order_time": 93,
"venue_id": -5,
"partition_id": -5,
"partition_index": 4,
"time_period_id": -5,
"user_id": -5,
"order_status": -6,
"payment_method": -6,
"transaction_id": 12,
"pay_time": 93,
"pay_amount": 3,
"refund_status": -6,
"refund_amount": 3,
"refund_time": 93
},
"table": "time_period_order_0",
"ts": 1734228903999,
"type": "UPDATE"
}

消息監(jiān)聽處理
實(shí)體類
首先定義一個(gè)實(shí)體類,用來接收Canal推送過來的消息
import lombok.Data;
import java.util.List;
import java.util.Map;
/**
* 用來接收canal發(fā)送過來的消息的數(shù)據(jù)
* @Author dam
* @create 2024/12/10 14:11
*/
@Data
public class CanalBinlogDTO {
/**
* 變更之后的數(shù)據(jù)
*/
private List<Map<String, Object>> data;
/**
* 數(shù)據(jù)庫名稱
*/
private String database;
/**
* es 是指 Mysql Binlog 里原始的時(shí)間戳,也就是數(shù)據(jù)原始變更的時(shí)間
* Canal 的消費(fèi)延遲 = ts - es
*/
private Long es;
/**
* 遞增 ID,從 1 開始
*/
private Long id;
/**
* 當(dāng)前變更是否是 DDL 語句
*/
private Boolean isDdl;
/**
* 表結(jié)構(gòu)字段類型
*/
private Map<String, Object> mysqlType;
/**
* 修改之前的舊數(shù)據(jù)
*/
private List<Map<String, Object>> old;
/**
* 主鍵名稱
*/
private List<String> pkNames;
/**
* SQL 語句
*/
private String sql;
/**
* SQL 類型
*/
private Map<String, Object> sqlType;
/**
* 表名
*/
private String table;
/**
* ts 是指 Canal 收到這個(gè) Binlog,構(gòu)造為自己協(xié)議對象的時(shí)間
* 應(yīng)用消費(fèi)的延遲 = now - ts
*/
private Long ts;
/**
* INSERT(新增)、UPDATE(更新)、DELETE(刪除)等等
*/
private String type;
}
監(jiān)聽
獲取到消息之后,如果判斷到所做的修改是UPDATE類型,而且修改的是訂單號,即oldDataMap.containsKey("order_status"),則進(jìn)一步判斷是否為將訂單號從0修改為2,如果是則調(diào)用恢復(fù)庫存方法
import cn.hutool.core.util.ObjectUtil;
import com.vrs.annotation.Idempotent;
import com.vrs.constant.OrderStatusConstant;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author dam
* @create 2024/12/10 14:12
*/
@Slf4j(topic = RocketMqConstant.CANAL_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC,
consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP,
messageModel = MessageModel.CLUSTERING
)
@RequiredArgsConstructor
public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> {
private final TimePeriodService timePeriodService;
/**
* 消費(fèi)消息的方法
* 方法報(bào)錯(cuò)就會(huì)拒收消息
*
* @param messageWrapper 消息內(nèi)容,類型和上面的泛型一致。如果泛型指定了固定的類型,消息體就是我們的參數(shù)
*/
@Idempotent(
uniqueKeyPrefix = "canal_binlog_common:",
key = "#canalBinlogDTO.getId()+''",
scene = IdempotentSceneEnum.MQ,
keyTimeout = 3600L
)
@SneakyThrows
@Override
public void onMessage(CanalBinlogDTO canalBinlogDTO) {
if (canalBinlogDTO.getOld() == null) {
return;
}
Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);
Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);
if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) {
log.info("[消費(fèi)者] 消費(fèi)canal的消息,恢復(fù)時(shí)間段的庫存和空閑場號,時(shí)間段ID:{}", alterDataMap.get("time_period_id"));
Long userId = Long.parseLong(alterDataMap.get("user_id").toString());
Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString());
Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());
Long courtIndex;
if (alterDataMap.containsKey("partition_index")) {
courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString());
} else {
courtIndex = Long.parseLong(alterDataMap.get("court_index").toString());
}
Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString());
Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString());
if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) {
// 恢復(fù)庫存
timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder()
.userId(userId)
.courtIndex(courtIndex)
.timePeriodId(timePeriodId)
.partitionId(partitionId)
.build());
}
}
}
}
以上就是使用Canal監(jiān)聽MySQL Binlog日志的實(shí)現(xiàn)方案的詳細(xì)內(nèi)容,更多關(guān)于Canal監(jiān)聽MySQL Binlog的資料請關(guān)注腳本之家其它相關(guān)文章!
- MySQL使用binlog日志恢復(fù)數(shù)據(jù)的方法步驟
- MySQL刪除binlog日志文件的三種實(shí)現(xiàn)方式
- 開啟mysql的binlog日志步驟詳解
- Python解析MySQL Binlog日志分析情況
- mysql查看binlog日志的實(shí)現(xiàn)方法
- MySQL使用binlog日志進(jìn)行數(shù)據(jù)庫遷移和數(shù)據(jù)恢復(fù)
- Docker內(nèi)部MySQL如何開啟binlog日志
- mysql binlog日志查詢不出語句問題及解決
- MySQL中根據(jù)binlog日志進(jìn)行恢復(fù)的實(shí)現(xiàn)
相關(guān)文章
MySQL5.7的sql腳本導(dǎo)入到MySQL5.5出錯(cuò)3種解決方案
筆者需要將使用MySQL5.7數(shù)據(jù)庫的網(wǎng)站挪入winows服務(wù)器,目標(biāo)服務(wù)器使用的是MySQL5.5,因?yàn)榧骖櫟揭郧暗木W(wǎng)站,MySQL不能升級。遇到MySQL5.7的sql腳本導(dǎo)入到MySQL5.5出錯(cuò),總結(jié)了3種解決方案,總有一個(gè)方案適合你。2023-06-06
關(guān)于Mysql搭建主從復(fù)制功能的步驟實(shí)現(xiàn)
這篇文章主要介紹了關(guān)于Mysql搭建主從復(fù)制功能的步驟實(shí)現(xiàn),在實(shí)際的生產(chǎn)中,為了解決Mysql的單點(diǎn)故障已經(jīng)提高M(jìn)ySQL的整體服務(wù)性能,一般都會(huì)采用主從復(fù)制,需要的朋友可以參考下2023-05-05
MySQL多實(shí)例的配置應(yīng)用實(shí)例場景
在一臺服務(wù)器上,運(yùn)行多個(gè)數(shù)據(jù)庫服務(wù),這些服務(wù)進(jìn)程通過不同的socket監(jiān)聽不同的服務(wù)端口來提供各自的服務(wù),這篇文章主要介紹了MySQL多實(shí)例的配置場景分析,需要的朋友可以參考下2021-12-12
MySQL 5.6主從報(bào)錯(cuò)的實(shí)戰(zhàn)記錄
這篇文章主要給大家介紹了關(guān)于MySQL 5.6主從報(bào)錯(cuò)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
MySQL數(shù)據(jù)庫中外鍵(foreign?key)用法詳解
這篇文章主要給大家介紹了關(guān)于MySQL數(shù)據(jù)庫中外鍵(foreign?key)的相關(guān)資料,MySQL 外鍵約束可以用來保證表與表之間的關(guān)系完整性,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-10-10

