使用Canal監(jiān)聽MySQL Binlog日志的實(shí)現(xiàn)方案
引入
原始實(shí)現(xiàn)
首先看下面的代碼,這段代碼的作用是關(guān)閉超時(shí)未支付的訂單,包括兩個(gè)步驟
1、將訂單狀態(tài)修改為取消狀態(tài)
2、調(diào)用遠(yuǎn)程服務(wù),恢復(fù)數(shù)據(jù)庫(kù)和緩存中的庫(kù)存
@Override @Transactional(rollbackFor = Throwable.class) public void closeOrder(String orderSn) { OrderDO orderDO = baseMapper.selectByOrderSn(orderSn); if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) { // --if-- 到時(shí)間了,訂單還沒(méi)有支付,取消該訂單 // 修改訂單狀態(tài)為取消狀態(tài) orderDO.setOrderStatus(OrderStatusConstant.CANCEL); // 分片鍵不能更新 orderDO.setVenueId(null); baseMapper.updateByOrderSn(orderDO); // 還原數(shù)據(jù)庫(kù)和緩存中的庫(kù)存 Result<OrderDO> result = null; try { result = timePeriodFeignService.release(TimePeriodStockRestoreReqDTO.builder() .timePeriodId(orderDO.getTimePeriodId()) .partitionId(orderDO.getPartitionId()) .courtIndex(orderDO.getCourtIndex()) .userId(orderDO.getUserId()) .build()); } catch (Exception e) { // --if-- 庫(kù)存恢復(fù)遠(yuǎn)程接口調(diào)用失敗 throw new ServiceException(BaseErrorCode.REMOTE_ERROR); } if (result != null && !result.isSuccess()) { // 因?yàn)槭褂昧薚ransactional,如果這里出現(xiàn)了異常,訂單的關(guān)閉修改會(huì)回退 throw new ServiceException("調(diào)用遠(yuǎn)程服務(wù)釋放時(shí)間段數(shù)據(jù)庫(kù)庫(kù)存失敗", BaseErrorCode.SERVICE_ERROR); } } }
存在問(wèn)題
為了確保這兩個(gè)步驟要么全部成功,要么全部失敗,在這段代碼中,使用了@Transactional
注解來(lái)管理本地?cái)?shù)據(jù)庫(kù)事務(wù)。如果說(shuō)調(diào)用遠(yuǎn)程服務(wù)恢復(fù)庫(kù)存時(shí),調(diào)用失敗,事務(wù)會(huì)進(jìn)行回滾,即訂單狀態(tài)還是保持原樣,不會(huì)被取消。然而,在分布式環(huán)境中,當(dāng)涉及到調(diào)用遠(yuǎn)程服務(wù)時(shí),@Transactional
只能保證本地事務(wù)的一致性,而不能保證跨服務(wù)的一致性。例如在極端情況下會(huì)出現(xiàn)如下問(wèn)題:
- 遠(yuǎn)程服務(wù)實(shí)際上已經(jīng)成功處理了請(qǐng)求,完成了庫(kù)存的恢復(fù)。
- 但由于網(wǎng)絡(luò)延遲或中斷,本地服務(wù)未能接收到遠(yuǎn)程服務(wù)的成功響應(yīng)。
- 結(jié)果是本地服務(wù)認(rèn)為庫(kù)存恢復(fù)失敗,觸發(fā)了本地事務(wù)的回滾,使訂單狀態(tài)回到未取消的狀態(tài)。
這種情況下,就會(huì)產(chǎn)生事務(wù)不一致的問(wèn)題:庫(kù)存已經(jīng)被正確地恢復(fù),但訂單仍然處于可支付狀態(tài)。這可能導(dǎo)致客戶繼續(xù)嘗試支付一個(gè)實(shí)際上應(yīng)該被取消的訂單,或者導(dǎo)致庫(kù)存數(shù)據(jù)與訂單狀態(tài)之間的不匹配。
替代方案
開啟 MySQL 的 Binlog 日志,通過(guò) Canal 監(jiān)聽訂單狀態(tài)的變化并異步發(fā)送消息至消息隊(duì)列。消費(fèi)者從隊(duì)列中接收消息后,如果檢測(cè)到訂單的狀態(tài)是從未支付
修改為已取消
,就負(fù)責(zé)調(diào)用庫(kù)存服務(wù)恢復(fù)商品庫(kù)存。
這種方式解耦了訂單服務(wù)與庫(kù)存服務(wù),提高了系統(tǒng)的容錯(cuò)性和處理效率,支持異步操作和流量削峰,確保了最終一致性,并通過(guò)冪等性設(shè)計(jì)保障了數(shù)據(jù)的準(zhǔn)確性和系統(tǒng)的穩(wěn)定性。
**為什么說(shuō)確保了最終一致性?**當(dāng)訂單關(guān)閉之后,消息隊(duì)列會(huì)保證消息至少被成功消費(fèi)一次,即庫(kù)存如果還原失敗,消息隊(duì)列會(huì)多次重發(fā)消息,如果達(dá)到重發(fā)上限可以接入人工來(lái)處理死信隊(duì)列的消息
操作
MySQL 開啟 Binlog
log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)
通過(guò) CMD 命令行窗口重啟 MySQL 數(shù)據(jù)庫(kù),讓配置生效
C:\Windows\System32>net stop mysql8 mysql8 服務(wù)正在停止.. mysql8 服務(wù)已成功停止。 C:\Windows\System32>net start mysql8 mysql8 服務(wù)正在啟動(dòng) . mysql8 服務(wù)已經(jīng)啟動(dòng)成功。
連接進(jìn)入MySQL之后,使用show variables like 'log_%';
查看BinLog啟動(dòng)是否成功,如果查詢出來(lái)log_bin
對(duì)應(yīng)的值為ON,說(shuō)明啟動(dòng)成功
C:\Windows\System32>mysql -u root -p12345678 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 9 Server version: 8.0.27 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> show variables like 'log_%'; +----------------------------------------+----------------------------------------------------------------------------------+ | Variable_name | Value | +----------------------------------------+----------------------------------------------------------------------------------+ | log_bin | ON | | log_bin_basename | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin | | log_bin_index | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | log_error | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\DESKTOP-TQSE9JO.err | | log_error_services | log_filter_internal; log_sink_internal | | log_error_suppression_list | | | log_error_verbosity | 2 | | log_output | FILE | | log_queries_not_using_indexes | OFF | | log_raw | OFF | | log_replica_updates | ON | | log_slave_updates | ON | | log_slow_admin_statements | OFF | | log_slow_extra | OFF | | log_slow_replica_statements | OFF | | log_slow_slave_statements | OFF | | log_statements_unsafe_for_binlog | ON | | log_throttle_queries_not_using_indexes | 0 | | log_timestamps | UTC | +----------------------------------------+----------------------------------------------------------------------------------+ 21 rows in set, 1 warning (0.01 sec)
給canal創(chuàng)建一個(gè)單獨(dú)使用的賬號(hào)來(lái)進(jìn)行 Binlog 的同步和監(jiān)聽
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
執(zhí)行成功,MySQL的user表就多了一天canal的記錄
mysql> CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES; Query OK, 0 rows affected (0.01 sec) Query OK, 0 rows affected (0.00 sec) Query OK, 0 rows affected (0.00 sec)
Canal 中間件
下載方式
源碼地址:https://github.com/alibaba/canal
下載地址:https://github.com/alibaba/canal/releases
解壓之后,目錄如下:
修改配置文件
instance.properties
首先修改配置文件instance.properties
- 修改
canal.instance.master.address
,指向真正的 MySQL 的 IP 和端口 - 修改
canal.mq.topic
,聲明發(fā)送到消息隊(duì)列的消息的 Topic
如果你給canal提供的賬號(hào)密碼不是canal,需要修改
最后,并不是所有數(shù)據(jù)庫(kù)或數(shù)據(jù)表改動(dòng)我們都需要做出反應(yīng)的,這里我只針對(duì)我需要監(jiān)聽的數(shù)據(jù)庫(kù)和數(shù)據(jù)表即可,通過(guò)設(shè)置canal.instance.filter.regex
進(jìn)行過(guò)濾
我的設(shè)置為canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))$
,解釋如下:
^
表示字符串的開始。(venue-reservation)
匹配名為venue-reservation
的數(shù)據(jù)庫(kù)。這里使用了括號(hào)()
來(lái)創(chuàng)建一個(gè)捕獲組。如果說(shuō)項(xiàng)目使用分庫(kù),需要匹配多個(gè)數(shù)據(jù)庫(kù)的話,可以這樣寫(venue-reservation_0|venue-reservation_1|venue-reservation_2)
\\.
匹配實(shí)際的點(diǎn)號(hào),這是數(shù)據(jù)庫(kù)名和表名之間的分隔符。(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))
匹配特定模式的表名。這里也是用括號(hào)創(chuàng)建了一個(gè)捕獲組。因?yàn)轫?xiàng)目對(duì)time_period_order
進(jìn)行了分表,所以需要這樣設(shè)置。具體地:time_period_order_
匹配固定前綴time_period_order_
。([0-9]|1[0-9]|2[0-9]|3[0-1])
這一部分是用來(lái)匹配數(shù)字部分,看起來(lái)像是為了匹配類似于時(shí)間間隔或者編號(hào)的表。更具體地說(shuō):[0-9]
匹配從0到9的任何數(shù)字。1[0-9]
匹配從10到19的兩位數(shù)。2[0-9]
匹配從20到29的兩位數(shù)。3[0-1]
匹配30或31。
$
表示字符串的結(jié)束。
canal.properties
修改配置文件canal.properties
- 因?yàn)槲宜褂玫南㈥?duì)列是RocketMQ,首先將模式
canal.serverMode
設(shè)置為RocketMQ - 將
rocketmq.namesrv.addr
指向的RocketMQ服務(wù)器指向正確的IP和端口
啟動(dòng)
啟動(dòng)Canal,如果是win,直接雙擊startup.bat
啟動(dòng)即可
下圖啟動(dòng)之后彈出的窗口,如果要關(guān)閉 canal ,就點(diǎn)右上角的 x 即可
想要查看 canal 是否啟動(dòng)成功,可以通過(guò)日志文件查看,如果出現(xiàn)如下紅色部分的輸出,說(shuō)明啟動(dòng)成功
測(cè)試
將訂單狀態(tài)從0改成2
去RocketMQ中查看收到的消息,topic選擇剛剛配置文件中設(shè)置的vrs_canal_common_topic
剛剛接收到的消息詳情如下
使用json格式化工具查看
{ "data": [ { "order_sn": "1866821518450221056850432", "is_deleted": "0", "order_time": "2024-12-11 20:25:09", "venue_id": "1865271207637635072", "partition_id": "1865276571322015744", "partition_index": "0", "time_period_id": "1866776397058904064", "user_id": "1864637732760850432", "order_status": "2", "payment_method": null, "transaction_id": null, "pay_time": null, "pay_amount": null, "refund_status": null, "refund_amount": null, "refund_time": null } ], "database": "venue-reservation", "es": 1734228903000, "gtid": "", "id": 16, "isDdl": false, "mysqlType": { "order_sn": "varchar(30)", "is_deleted": "tinyint", "order_time": "datetime", "venue_id": "bigint", "partition_id": "bigint", "partition_index": "int", "time_period_id": "bigint", "user_id": "bigint", "order_status": "tinyint", "payment_method": "tinyint", "transaction_id": "varchar(255)", "pay_time": "datetime", "pay_amount": "decimal(10,2)", "refund_status": "tinyint", "refund_amount": "decimal(10,2)", "refund_time": "datetime" }, "old": [ { "order_status": "0" } ], "pkNames": [ "order_sn" ], "sql": "", "sqlType": { "order_sn": 12, "is_deleted": -6, "order_time": 93, "venue_id": -5, "partition_id": -5, "partition_index": 4, "time_period_id": -5, "user_id": -5, "order_status": -6, "payment_method": -6, "transaction_id": 12, "pay_time": 93, "pay_amount": 3, "refund_status": -6, "refund_amount": 3, "refund_time": 93 }, "table": "time_period_order_0", "ts": 1734228903999, "type": "UPDATE" }
消息監(jiān)聽處理
實(shí)體類
首先定義一個(gè)實(shí)體類,用來(lái)接收Canal推送過(guò)來(lái)的消息
import lombok.Data; import java.util.List; import java.util.Map; /** * 用來(lái)接收canal發(fā)送過(guò)來(lái)的消息的數(shù)據(jù) * @Author dam * @create 2024/12/10 14:11 */ @Data public class CanalBinlogDTO { /** * 變更之后的數(shù)據(jù) */ private List<Map<String, Object>> data; /** * 數(shù)據(jù)庫(kù)名稱 */ private String database; /** * es 是指 Mysql Binlog 里原始的時(shí)間戳,也就是數(shù)據(jù)原始變更的時(shí)間 * Canal 的消費(fèi)延遲 = ts - es */ private Long es; /** * 遞增 ID,從 1 開始 */ private Long id; /** * 當(dāng)前變更是否是 DDL 語(yǔ)句 */ private Boolean isDdl; /** * 表結(jié)構(gòu)字段類型 */ private Map<String, Object> mysqlType; /** * 修改之前的舊數(shù)據(jù) */ private List<Map<String, Object>> old; /** * 主鍵名稱 */ private List<String> pkNames; /** * SQL 語(yǔ)句 */ private String sql; /** * SQL 類型 */ private Map<String, Object> sqlType; /** * 表名 */ private String table; /** * ts 是指 Canal 收到這個(gè) Binlog,構(gòu)造為自己協(xié)議對(duì)象的時(shí)間 * 應(yīng)用消費(fèi)的延遲 = now - ts */ private Long ts; /** * INSERT(新增)、UPDATE(更新)、DELETE(刪除)等等 */ private String type; }
監(jiān)聽
獲取到消息之后,如果判斷到所做的修改是UPDATE
類型,而且修改的是訂單號(hào),即oldDataMap.containsKey("order_status")
,則進(jìn)一步判斷是否為將訂單號(hào)從0
修改為2
,如果是則調(diào)用恢復(fù)庫(kù)存方法
import cn.hutool.core.util.ObjectUtil; import com.vrs.annotation.Idempotent; import com.vrs.constant.OrderStatusConstant; import com.vrs.constant.RocketMqConstant; import com.vrs.domain.dto.mq.CanalBinlogDTO; import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO; import com.vrs.enums.IdempotentSceneEnum; import com.vrs.service.TimePeriodService; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author dam * @create 2024/12/10 14:12 */ @Slf4j(topic = RocketMqConstant.CANAL_TOPIC) @Component @RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP, messageModel = MessageModel.CLUSTERING ) @RequiredArgsConstructor public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> { private final TimePeriodService timePeriodService; /** * 消費(fèi)消息的方法 * 方法報(bào)錯(cuò)就會(huì)拒收消息 * * @param messageWrapper 消息內(nèi)容,類型和上面的泛型一致。如果泛型指定了固定的類型,消息體就是我們的參數(shù) */ @Idempotent( uniqueKeyPrefix = "canal_binlog_common:", key = "#canalBinlogDTO.getId()+''", scene = IdempotentSceneEnum.MQ, keyTimeout = 3600L ) @SneakyThrows @Override public void onMessage(CanalBinlogDTO canalBinlogDTO) { if (canalBinlogDTO.getOld() == null) { return; } Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0); Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0); if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) { log.info("[消費(fèi)者] 消費(fèi)canal的消息,恢復(fù)時(shí)間段的庫(kù)存和空閑場(chǎng)號(hào),時(shí)間段ID:{}", alterDataMap.get("time_period_id")); Long userId = Long.parseLong(alterDataMap.get("user_id").toString()); Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString()); Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString()); Long courtIndex; if (alterDataMap.containsKey("partition_index")) { courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString()); } else { courtIndex = Long.parseLong(alterDataMap.get("court_index").toString()); } Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString()); Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString()); if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) { // 恢復(fù)庫(kù)存 timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder() .userId(userId) .courtIndex(courtIndex) .timePeriodId(timePeriodId) .partitionId(partitionId) .build()); } } } }
以上就是使用Canal監(jiān)聽MySQL Binlog日志的實(shí)現(xiàn)方案的詳細(xì)內(nèi)容,更多關(guān)于Canal監(jiān)聽MySQL Binlog的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- MySQL使用binlog日志恢復(fù)數(shù)據(jù)的方法步驟
- MySQL刪除binlog日志文件的三種實(shí)現(xiàn)方式
- 開啟mysql的binlog日志步驟詳解
- Python解析MySQL Binlog日志分析情況
- mysql查看binlog日志的實(shí)現(xiàn)方法
- MySQL使用binlog日志進(jìn)行數(shù)據(jù)庫(kù)遷移和數(shù)據(jù)恢復(fù)
- Docker內(nèi)部MySQL如何開啟binlog日志
- mysql binlog日志查詢不出語(yǔ)句問(wèn)題及解決
- MySQL中根據(jù)binlog日志進(jìn)行恢復(fù)的實(shí)現(xiàn)
相關(guān)文章
MySQL5.7的sql腳本導(dǎo)入到MySQL5.5出錯(cuò)3種解決方案
筆者需要將使用MySQL5.7數(shù)據(jù)庫(kù)的網(wǎng)站挪入winows服務(wù)器,目標(biāo)服務(wù)器使用的是MySQL5.5,因?yàn)榧骖櫟揭郧暗木W(wǎng)站,MySQL不能升級(jí)。遇到MySQL5.7的sql腳本導(dǎo)入到MySQL5.5出錯(cuò),總結(jié)了3種解決方案,總有一個(gè)方案適合你。2023-06-06關(guān)于Mysql搭建主從復(fù)制功能的步驟實(shí)現(xiàn)
這篇文章主要介紹了關(guān)于Mysql搭建主從復(fù)制功能的步驟實(shí)現(xiàn),在實(shí)際的生產(chǎn)中,為了解決Mysql的單點(diǎn)故障已經(jīng)提高M(jìn)ySQL的整體服務(wù)性能,一般都會(huì)采用主從復(fù)制,需要的朋友可以參考下2023-05-05MySQL多實(shí)例的配置應(yīng)用實(shí)例場(chǎng)景
在一臺(tái)服務(wù)器上,運(yùn)行多個(gè)數(shù)據(jù)庫(kù)服務(wù),這些服務(wù)進(jìn)程通過(guò)不同的socket監(jiān)聽不同的服務(wù)端口來(lái)提供各自的服務(wù),這篇文章主要介紹了MySQL多實(shí)例的配置場(chǎng)景分析,需要的朋友可以參考下2021-12-12MySQL 5.6主從報(bào)錯(cuò)的實(shí)戰(zhàn)記錄
這篇文章主要給大家介紹了關(guān)于MySQL 5.6主從報(bào)錯(cuò)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03mysql經(jīng)典4張表問(wèn)題詳細(xì)講解
MySQL是一種關(guān)系型數(shù)據(jù)庫(kù)管理系統(tǒng),可以通過(guò)連接不同的表將數(shù)據(jù)進(jìn)行關(guān)聯(lián)查詢,下面這篇文章主要給大家介紹了關(guān)于mysql經(jīng)典4張表問(wèn)題的相關(guān)資料,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-03-03MySQL數(shù)據(jù)庫(kù)中外鍵(foreign?key)用法詳解
這篇文章主要給大家介紹了關(guān)于MySQL數(shù)據(jù)庫(kù)中外鍵(foreign?key)的相關(guān)資料,MySQL 外鍵約束可以用來(lái)保證表與表之間的關(guān)系完整性,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-10-10