使用Canal監(jiān)聽MySQL Binlog日志的實現(xiàn)方案
引入
原始實現(xiàn)
首先看下面的代碼,這段代碼的作用是關(guān)閉超時未支付的訂單,包括兩個步驟
1、將訂單狀態(tài)修改為取消狀態(tài)
2、調(diào)用遠(yuǎn)程服務(wù),恢復(fù)數(shù)據(jù)庫和緩存中的庫存
@Override @Transactional(rollbackFor = Throwable.class) public void closeOrder(String orderSn) { OrderDO orderDO = baseMapper.selectByOrderSn(orderSn); if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) { // --if-- 到時間了,訂單還沒有支付,取消該訂單 // 修改訂單狀態(tài)為取消狀態(tài) orderDO.setOrderStatus(OrderStatusConstant.CANCEL); // 分片鍵不能更新 orderDO.setVenueId(null); baseMapper.updateByOrderSn(orderDO); // 還原數(shù)據(jù)庫和緩存中的庫存 Result<OrderDO> result = null; try { result = timePeriodFeignService.release(TimePeriodStockRestoreReqDTO.builder() .timePeriodId(orderDO.getTimePeriodId()) .partitionId(orderDO.getPartitionId()) .courtIndex(orderDO.getCourtIndex()) .userId(orderDO.getUserId()) .build()); } catch (Exception e) { // --if-- 庫存恢復(fù)遠(yuǎn)程接口調(diào)用失敗 throw new ServiceException(BaseErrorCode.REMOTE_ERROR); } if (result != null && !result.isSuccess()) { // 因為使用了Transactional,如果這里出現(xiàn)了異常,訂單的關(guān)閉修改會回退 throw new ServiceException("調(diào)用遠(yuǎn)程服務(wù)釋放時間段數(shù)據(jù)庫庫存失敗", BaseErrorCode.SERVICE_ERROR); } } }
存在問題
為了確保這兩個步驟要么全部成功,要么全部失敗,在這段代碼中,使用了@Transactional
注解來管理本地數(shù)據(jù)庫事務(wù)。如果說調(diào)用遠(yuǎn)程服務(wù)恢復(fù)庫存時,調(diào)用失敗,事務(wù)會進(jìn)行回滾,即訂單狀態(tài)還是保持原樣,不會被取消。然而,在分布式環(huán)境中,當(dāng)涉及到調(diào)用遠(yuǎn)程服務(wù)時,@Transactional
只能保證本地事務(wù)的一致性,而不能保證跨服務(wù)的一致性。例如在極端情況下會出現(xiàn)如下問題:
- 遠(yuǎn)程服務(wù)實際上已經(jīng)成功處理了請求,完成了庫存的恢復(fù)。
- 但由于網(wǎng)絡(luò)延遲或中斷,本地服務(wù)未能接收到遠(yuǎn)程服務(wù)的成功響應(yīng)。
- 結(jié)果是本地服務(wù)認(rèn)為庫存恢復(fù)失敗,觸發(fā)了本地事務(wù)的回滾,使訂單狀態(tài)回到未取消的狀態(tài)。
這種情況下,就會產(chǎn)生事務(wù)不一致的問題:庫存已經(jīng)被正確地恢復(fù),但訂單仍然處于可支付狀態(tài)。這可能導(dǎo)致客戶繼續(xù)嘗試支付一個實際上應(yīng)該被取消的訂單,或者導(dǎo)致庫存數(shù)據(jù)與訂單狀態(tài)之間的不匹配。
替代方案
開啟 MySQL 的 Binlog 日志,通過 Canal 監(jiān)聽訂單狀態(tài)的變化并異步發(fā)送消息至消息隊列。消費者從隊列中接收消息后,如果檢測到訂單的狀態(tài)是從未支付
修改為已取消
,就負(fù)責(zé)調(diào)用庫存服務(wù)恢復(fù)商品庫存。
這種方式解耦了訂單服務(wù)與庫存服務(wù),提高了系統(tǒng)的容錯性和處理效率,支持異步操作和流量削峰,確保了最終一致性,并通過冪等性設(shè)計保障了數(shù)據(jù)的準(zhǔn)確性和系統(tǒng)的穩(wěn)定性。
**為什么說確保了最終一致性?**當(dāng)訂單關(guān)閉之后,消息隊列會保證消息至少被成功消費一次,即庫存如果還原失敗,消息隊列會多次重發(fā)消息,如果達(dá)到重發(fā)上限可以接入人工來處理死信隊列的消息
操作
MySQL 開啟 Binlog
log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)
通過 CMD 命令行窗口重啟 MySQL 數(shù)據(jù)庫,讓配置生效
C:\Windows\System32>net stop mysql8 mysql8 服務(wù)正在停止.. mysql8 服務(wù)已成功停止。 C:\Windows\System32>net start mysql8 mysql8 服務(wù)正在啟動 . mysql8 服務(wù)已經(jīng)啟動成功。
連接進(jìn)入MySQL之后,使用show variables like 'log_%';
查看BinLog啟動是否成功,如果查詢出來log_bin
對應(yīng)的值為ON,說明啟動成功
C:\Windows\System32>mysql -u root -p12345678 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 9 Server version: 8.0.27 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> show variables like 'log_%'; +----------------------------------------+----------------------------------------------------------------------------------+ | Variable_name | Value | +----------------------------------------+----------------------------------------------------------------------------------+ | log_bin | ON | | log_bin_basename | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin | | log_bin_index | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | log_error | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\DESKTOP-TQSE9JO.err | | log_error_services | log_filter_internal; log_sink_internal | | log_error_suppression_list | | | log_error_verbosity | 2 | | log_output | FILE | | log_queries_not_using_indexes | OFF | | log_raw | OFF | | log_replica_updates | ON | | log_slave_updates | ON | | log_slow_admin_statements | OFF | | log_slow_extra | OFF | | log_slow_replica_statements | OFF | | log_slow_slave_statements | OFF | | log_statements_unsafe_for_binlog | ON | | log_throttle_queries_not_using_indexes | 0 | | log_timestamps | UTC | +----------------------------------------+----------------------------------------------------------------------------------+ 21 rows in set, 1 warning (0.01 sec)
給canal創(chuàng)建一個單獨使用的賬號來進(jìn)行 Binlog 的同步和監(jiān)聽
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
執(zhí)行成功,MySQL的user表就多了一天canal的記錄
mysql> CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES; Query OK, 0 rows affected (0.01 sec) Query OK, 0 rows affected (0.00 sec) Query OK, 0 rows affected (0.00 sec)
Canal 中間件
下載方式
源碼地址:https://github.com/alibaba/canal
下載地址:https://github.com/alibaba/canal/releases
解壓之后,目錄如下:
修改配置文件
instance.properties
首先修改配置文件instance.properties
- 修改
canal.instance.master.address
,指向真正的 MySQL 的 IP 和端口 - 修改
canal.mq.topic
,聲明發(fā)送到消息隊列的消息的 Topic
如果你給canal提供的賬號密碼不是canal,需要修改
最后,并不是所有數(shù)據(jù)庫或數(shù)據(jù)表改動我們都需要做出反應(yīng)的,這里我只針對我需要監(jiān)聽的數(shù)據(jù)庫和數(shù)據(jù)表即可,通過設(shè)置canal.instance.filter.regex
進(jìn)行過濾
我的設(shè)置為canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))$
,解釋如下:
^
表示字符串的開始。(venue-reservation)
匹配名為venue-reservation
的數(shù)據(jù)庫。這里使用了括號()
來創(chuàng)建一個捕獲組。如果說項目使用分庫,需要匹配多個數(shù)據(jù)庫的話,可以這樣寫(venue-reservation_0|venue-reservation_1|venue-reservation_2)
\\.
匹配實際的點號,這是數(shù)據(jù)庫名和表名之間的分隔符。(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))
匹配特定模式的表名。這里也是用括號創(chuàng)建了一個捕獲組。因為項目對time_period_order
進(jìn)行了分表,所以需要這樣設(shè)置。具體地:time_period_order_
匹配固定前綴time_period_order_
。([0-9]|1[0-9]|2[0-9]|3[0-1])
這一部分是用來匹配數(shù)字部分,看起來像是為了匹配類似于時間間隔或者編號的表。更具體地說:[0-9]
匹配從0到9的任何數(shù)字。1[0-9]
匹配從10到19的兩位數(shù)。2[0-9]
匹配從20到29的兩位數(shù)。3[0-1]
匹配30或31。
$
表示字符串的結(jié)束。
canal.properties
修改配置文件canal.properties
- 因為我所使用的消息隊列是RocketMQ,首先將模式
canal.serverMode
設(shè)置為RocketMQ - 將
rocketmq.namesrv.addr
指向的RocketMQ服務(wù)器指向正確的IP和端口
啟動
啟動Canal,如果是win,直接雙擊startup.bat
啟動即可
下圖啟動之后彈出的窗口,如果要關(guān)閉 canal ,就點右上角的 x 即可
想要查看 canal 是否啟動成功,可以通過日志文件查看,如果出現(xiàn)如下紅色部分的輸出,說明啟動成功
測試
將訂單狀態(tài)從0改成2
去RocketMQ中查看收到的消息,topic選擇剛剛配置文件中設(shè)置的vrs_canal_common_topic
剛剛接收到的消息詳情如下
使用json格式化工具查看
{ "data": [ { "order_sn": "1866821518450221056850432", "is_deleted": "0", "order_time": "2024-12-11 20:25:09", "venue_id": "1865271207637635072", "partition_id": "1865276571322015744", "partition_index": "0", "time_period_id": "1866776397058904064", "user_id": "1864637732760850432", "order_status": "2", "payment_method": null, "transaction_id": null, "pay_time": null, "pay_amount": null, "refund_status": null, "refund_amount": null, "refund_time": null } ], "database": "venue-reservation", "es": 1734228903000, "gtid": "", "id": 16, "isDdl": false, "mysqlType": { "order_sn": "varchar(30)", "is_deleted": "tinyint", "order_time": "datetime", "venue_id": "bigint", "partition_id": "bigint", "partition_index": "int", "time_period_id": "bigint", "user_id": "bigint", "order_status": "tinyint", "payment_method": "tinyint", "transaction_id": "varchar(255)", "pay_time": "datetime", "pay_amount": "decimal(10,2)", "refund_status": "tinyint", "refund_amount": "decimal(10,2)", "refund_time": "datetime" }, "old": [ { "order_status": "0" } ], "pkNames": [ "order_sn" ], "sql": "", "sqlType": { "order_sn": 12, "is_deleted": -6, "order_time": 93, "venue_id": -5, "partition_id": -5, "partition_index": 4, "time_period_id": -5, "user_id": -5, "order_status": -6, "payment_method": -6, "transaction_id": 12, "pay_time": 93, "pay_amount": 3, "refund_status": -6, "refund_amount": 3, "refund_time": 93 }, "table": "time_period_order_0", "ts": 1734228903999, "type": "UPDATE" }
消息監(jiān)聽處理
實體類
首先定義一個實體類,用來接收Canal推送過來的消息
import lombok.Data; import java.util.List; import java.util.Map; /** * 用來接收canal發(fā)送過來的消息的數(shù)據(jù) * @Author dam * @create 2024/12/10 14:11 */ @Data public class CanalBinlogDTO { /** * 變更之后的數(shù)據(jù) */ private List<Map<String, Object>> data; /** * 數(shù)據(jù)庫名稱 */ private String database; /** * es 是指 Mysql Binlog 里原始的時間戳,也就是數(shù)據(jù)原始變更的時間 * Canal 的消費延遲 = ts - es */ private Long es; /** * 遞增 ID,從 1 開始 */ private Long id; /** * 當(dāng)前變更是否是 DDL 語句 */ private Boolean isDdl; /** * 表結(jié)構(gòu)字段類型 */ private Map<String, Object> mysqlType; /** * 修改之前的舊數(shù)據(jù) */ private List<Map<String, Object>> old; /** * 主鍵名稱 */ private List<String> pkNames; /** * SQL 語句 */ private String sql; /** * SQL 類型 */ private Map<String, Object> sqlType; /** * 表名 */ private String table; /** * ts 是指 Canal 收到這個 Binlog,構(gòu)造為自己協(xié)議對象的時間 * 應(yīng)用消費的延遲 = now - ts */ private Long ts; /** * INSERT(新增)、UPDATE(更新)、DELETE(刪除)等等 */ private String type; }
監(jiān)聽
獲取到消息之后,如果判斷到所做的修改是UPDATE
類型,而且修改的是訂單號,即oldDataMap.containsKey("order_status")
,則進(jìn)一步判斷是否為將訂單號從0
修改為2
,如果是則調(diào)用恢復(fù)庫存方法
import cn.hutool.core.util.ObjectUtil; import com.vrs.annotation.Idempotent; import com.vrs.constant.OrderStatusConstant; import com.vrs.constant.RocketMqConstant; import com.vrs.domain.dto.mq.CanalBinlogDTO; import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO; import com.vrs.enums.IdempotentSceneEnum; import com.vrs.service.TimePeriodService; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author dam * @create 2024/12/10 14:12 */ @Slf4j(topic = RocketMqConstant.CANAL_TOPIC) @Component @RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP, messageModel = MessageModel.CLUSTERING ) @RequiredArgsConstructor public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> { private final TimePeriodService timePeriodService; /** * 消費消息的方法 * 方法報錯就會拒收消息 * * @param messageWrapper 消息內(nèi)容,類型和上面的泛型一致。如果泛型指定了固定的類型,消息體就是我們的參數(shù) */ @Idempotent( uniqueKeyPrefix = "canal_binlog_common:", key = "#canalBinlogDTO.getId()+''", scene = IdempotentSceneEnum.MQ, keyTimeout = 3600L ) @SneakyThrows @Override public void onMessage(CanalBinlogDTO canalBinlogDTO) { if (canalBinlogDTO.getOld() == null) { return; } Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0); Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0); if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) { log.info("[消費者] 消費canal的消息,恢復(fù)時間段的庫存和空閑場號,時間段ID:{}", alterDataMap.get("time_period_id")); Long userId = Long.parseLong(alterDataMap.get("user_id").toString()); Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString()); Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString()); Long courtIndex; if (alterDataMap.containsKey("partition_index")) { courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString()); } else { courtIndex = Long.parseLong(alterDataMap.get("court_index").toString()); } Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString()); Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString()); if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) { // 恢復(fù)庫存 timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder() .userId(userId) .courtIndex(courtIndex) .timePeriodId(timePeriodId) .partitionId(partitionId) .build()); } } } }
以上就是使用Canal監(jiān)聽MySQL Binlog日志的實現(xiàn)方案的詳細(xì)內(nèi)容,更多關(guān)于Canal監(jiān)聽MySQL Binlog的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MySQL5.7的sql腳本導(dǎo)入到MySQL5.5出錯3種解決方案
筆者需要將使用MySQL5.7數(shù)據(jù)庫的網(wǎng)站挪入winows服務(wù)器,目標(biāo)服務(wù)器使用的是MySQL5.5,因為兼顧到以前的網(wǎng)站,MySQL不能升級。遇到MySQL5.7的sql腳本導(dǎo)入到MySQL5.5出錯,總結(jié)了3種解決方案,總有一個方案適合你。2023-06-06關(guān)于Mysql搭建主從復(fù)制功能的步驟實現(xiàn)
這篇文章主要介紹了關(guān)于Mysql搭建主從復(fù)制功能的步驟實現(xiàn),在實際的生產(chǎn)中,為了解決Mysql的單點故障已經(jīng)提高M(jìn)ySQL的整體服務(wù)性能,一般都會采用主從復(fù)制,需要的朋友可以參考下2023-05-05MySQL數(shù)據(jù)庫中外鍵(foreign?key)用法詳解
這篇文章主要給大家介紹了關(guān)于MySQL數(shù)據(jù)庫中外鍵(foreign?key)的相關(guān)資料,MySQL 外鍵約束可以用來保證表與表之間的關(guān)系完整性,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-10-10