基于Spring監(jiān)聽Binlog日志的方法詳解
binlog的三種模式
MySQL 的二進(jìn)制日志(binlog)有三種不同的格式,通常被稱為 binlog 模式。這三種模式分別是 Statement 模式、Row 模式和Mixed 模式。
Statement 模式:
- 在 Statement 模式下,MySQL 記錄每個(gè)會(huì)更改數(shù)據(jù)的 SQL 語句。
- binlog 記錄的是執(zhí)行的 SQL 語句本身,而不是具體的數(shù)據(jù)變化。
- 例如,如果執(zhí)行了
UPDATE語句,binlog 記錄的是這個(gè)UPDATE語句的文本。
Row 模式:
- 在 Row 模式下,MySQL 記錄每一行數(shù)據(jù)的變化。
- binlog 記錄的是行數(shù)據(jù)的變化,而不是 SQL 語句。
- 例如,如果執(zhí)行了
UPDATE語句,binlog 記錄的是被修改的行的實(shí)際數(shù)據(jù)。
Mixed 模式:
- Mixed 模式是 Statement 模式和 Row 模式的結(jié)合。
- 在 Mixed 模式下,MySQL 根據(jù)執(zhí)行的 SQL 語句的類型來決定是記錄語句還是記錄行。
- 通常,對(duì)于簡(jiǎn)單的語句,使用 Statement 模式,對(duì)于涉及到行變化的復(fù)雜語句,使用 Row 模式。
這些模式可以通過 MySQL 配置文件中的 binlog_format 參數(shù)進(jìn)行配置。例如:
[mysqld] binlog_format=mixed
其中,statement、row 和 mixed 分別代表 Statement 模式、Row 模式和 Mixed 模式。選擇適當(dāng)?shù)?binlog 模式取決于應(yīng)用的特定需求和性能要求。不同的模式具有不同的優(yōu)劣勢(shì),例如,Statement 模式可能會(huì)更輕量,而 Row 模式可能提供更詳細(xì)的數(shù)據(jù)變化信息。
以Mixed 為例
查看binlog是否開啟
show variables like '%log_bin%'

啟動(dòng)springboot程序

新建數(shù)據(jù)庫

這個(gè)事件是一個(gè) binlog 事件,其內(nèi)容表示一個(gè) SQL 查詢事件。讓我解釋一下這個(gè)事件的各個(gè)部分:
- 事件類型 (
eventType): 該事件的類型是QUERY,表示這是一個(gè) SQL 查詢事件。 - 時(shí)間戳 (
timestamp): 事件的時(shí)間戳為1700045267000,表示事件發(fā)生的時(shí)間。 - 線程ID (
threadId): 線程ID 是189,表示執(zhí)行這個(gè)查詢的線程的標(biāo)識(shí)符。 - 執(zhí)行時(shí)間 (
executionTime): 執(zhí)行時(shí)間為0,表示執(zhí)行這個(gè)查詢所花費(fèi)的時(shí)間。 - 錯(cuò)誤代碼 (
errorCode): 錯(cuò)誤代碼為0,表示查詢執(zhí)行沒有錯(cuò)誤。 - 數(shù)據(jù)庫 (
database): 數(shù)據(jù)庫為test2023,表示這個(gè)查詢發(fā)生在test2023數(shù)據(jù)庫中。 - SQL 查詢 (
sql): 實(shí)際的 SQL 查詢?yōu)?CREATE DATABASEtest2023CHARACTER SET utf8 COLLATE utf8_general_ci,表示執(zhí)行了創(chuàng)建數(shù)據(jù)庫的操作。
這個(gè)事件的作用是在 test2023 數(shù)據(jù)庫中執(zhí)行了一個(gè)創(chuàng)建數(shù)據(jù)庫的 SQL 查詢。這是 binlog 中的一部分,用于記錄數(shù)據(jù)庫中的變化,以便進(jìn)行數(shù)據(jù)備份、主從同步等操作。

新建表數(shù)據(jù)
CREATE TABLE `t_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `userName` varchar(100) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4;

這個(gè)事件也是一個(gè) binlog 事件,表示一個(gè) SQL 查詢事件。讓我解釋一下這個(gè)事件的各個(gè)部分:
- 事件類型 (
eventType): 該事件的類型是QUERY,表示這是一個(gè) SQL 查詢事件。 - 時(shí)間戳 (
timestamp): 事件的時(shí)間戳為1700045422000,表示事件發(fā)生的時(shí)間。 - 線程ID (
threadId): 線程ID 是204,表示執(zhí)行這個(gè)查詢的線程的標(biāo)識(shí)符。 - 執(zhí)行時(shí)間 (
executionTime): 執(zhí)行時(shí)間為0,表示執(zhí)行這個(gè)查詢所花費(fèi)的時(shí)間。 - 錯(cuò)誤代碼 (
errorCode): 錯(cuò)誤代碼為0,表示查詢執(zhí)行沒有錯(cuò)誤。 - 數(shù)據(jù)庫 (
database): 數(shù)據(jù)庫為test2023,表示這個(gè)查詢發(fā)生在test2023數(shù)據(jù)庫中。 - SQL 查詢 (
sql): 實(shí)際的 SQL 查詢?yōu)?CREATE TABLEt_user(idbigint(20) NOT NULL AUTO_INCREMENT,userNamevarchar(100) NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4,表示執(zhí)行了在test2023數(shù)據(jù)庫中創(chuàng)建名為t_user的表的操作。
這個(gè)事件的作用是在 test2023 數(shù)據(jù)庫中創(chuàng)建了一個(gè)名為 t_user 的表,該表包含 id 和 userName 兩個(gè)字段,其中 id 是自增的主鍵。這種類型的事件常常用于記錄數(shù)據(jù)庫結(jié)構(gòu)的變化,以便進(jìn)行數(shù)據(jù)備份、遷移和版本控制等操作。

插入表數(shù)據(jù)
INSERT INTO `test2023`.`t_user` (`id`, `userName`)
VALUES
(
"10086",
"用心記錄技術(shù),走心分享,始于后端,不止于后端,勵(lì)志成為一名優(yōu)秀的全棧架構(gòu)師,真正的實(shí)現(xiàn)碼中致富。"
);

這個(gè)事件也是一個(gè) binlog 事件,表示一個(gè) SQL 查詢事件,具體如下:
- 事件類型 (
eventType): 該事件的類型是QUERY,表示這是一個(gè) SQL 查詢事件。 - 時(shí)間戳 (
timestamp): 事件的時(shí)間戳為1700045547000,表示事件發(fā)生的時(shí)間。 - 線程ID (
threadId): 線程ID 是204,表示執(zhí)行這個(gè)查詢的線程的標(biāo)識(shí)符。 - 執(zhí)行時(shí)間 (
executionTime): 執(zhí)行時(shí)間為0,表示執(zhí)行這個(gè)查詢所花費(fèi)的時(shí)間。 - 錯(cuò)誤代碼 (
errorCode): 錯(cuò)誤代碼為0,表示查詢執(zhí)行沒有錯(cuò)誤。 - 數(shù)據(jù)庫 (
database): 數(shù)據(jù)庫為test2023,表示這個(gè)查詢發(fā)生在test2023數(shù)據(jù)庫中。 - SQL 查詢 (
sql): 實(shí)際的 SQL 查詢?yōu)?INSERT INTOtest2023.t_user(id,userName) VALUES ( "10086", "用心記錄技術(shù),走心分享,始于后端,不止于后端,勵(lì)志成為一名優(yōu)秀的全棧架構(gòu)師,真正的實(shí)現(xiàn)碼中致富。",表示執(zhí)行了向test2023數(shù)據(jù)庫的t_user表中插入一行數(shù)據(jù)的操作。
這個(gè)事件的作用是向 t_user 表中插入了一行數(shù)據(jù),包含了 id 和 userName 兩個(gè)字段的值。這種類型的事件通常用于記錄數(shù)據(jù)的變化,以便進(jìn)行數(shù)據(jù)備份、同步和遷移等操作。

修改表數(shù)據(jù)修改表數(shù)據(jù)
修改表數(shù)據(jù)
UPDATE `test2023`.`t_user` SET `id` = '10086', `userName` = '我的修改數(shù)據(jù)!??!' WHERE (`id` = '10086');

這個(gè)事件同樣是一個(gè) binlog 事件,表示一個(gè) SQL 查詢事件,具體如下:
- 事件類型 (
eventType): 該事件的類型是QUERY,表示這是一個(gè) SQL 查詢事件。 - 時(shí)間戳 (
timestamp): 事件的時(shí)間戳為1700045675000,表示事件發(fā)生的時(shí)間。 - 線程ID (
threadId): 線程ID 是204,表示執(zhí)行這個(gè)查詢的線程的標(biāo)識(shí)符。 - 執(zhí)行時(shí)間 (
executionTime): 執(zhí)行時(shí)間為0,表示執(zhí)行這個(gè)查詢所花費(fèi)的時(shí)間。 - 錯(cuò)誤代碼 (
errorCode): 錯(cuò)誤代碼為0,表示查詢執(zhí)行沒有錯(cuò)誤。 - 數(shù)據(jù)庫 (
database): 數(shù)據(jù)庫為test2023,表示這個(gè)查詢發(fā)生在test2023數(shù)據(jù)庫中。 - SQL 查詢 (
sql): 實(shí)際的 SQL 查詢?yōu)?UPDATEtest2023.t_userSETid= '10086',userName= '我的修改數(shù)據(jù)?。?!' WHERE (id= '10086'),表示執(zhí)行了更新test2023數(shù)據(jù)庫中的t_user表中一行數(shù)據(jù)的操作。
這個(gè)事件的作用是將 t_user 表中 id 為 10086 的行的數(shù)據(jù)進(jìn)行更新,將 id 修改為 10086,userName 修改為 '我的修改數(shù)據(jù)!?。?#39;。這種類型的事件通常用于記錄數(shù)據(jù)的變化,以便進(jìn)行數(shù)據(jù)備份、同步和遷移等操作。

刪除表數(shù)據(jù)
DELETE FROM t_user WHERE id = '10086';

這個(gè)事件同樣是一個(gè) binlog 事件,表示一個(gè) SQL 查詢事件,具體如下:
- 事件類型 (
eventType): 該事件的類型是QUERY,表示這是一個(gè) SQL 查詢事件。 - 時(shí)間戳 (
timestamp): 事件的時(shí)間戳為1700045755000,表示事件發(fā)生的時(shí)間。 - 線程ID (
threadId): 線程ID 是204,表示執(zhí)行這個(gè)查詢的線程的標(biāo)識(shí)符。 - 執(zhí)行時(shí)間 (
executionTime): 執(zhí)行時(shí)間為0,表示執(zhí)行這個(gè)查詢所花費(fèi)的時(shí)間。 - 錯(cuò)誤代碼 (
errorCode): 錯(cuò)誤代碼為0,表示查詢執(zhí)行沒有錯(cuò)誤。 - 數(shù)據(jù)庫 (
database): 數(shù)據(jù)庫為test2023,表示這個(gè)查詢發(fā)生在test2023數(shù)據(jù)庫中。 - SQL 查詢 (
sql): 實(shí)際的 SQL 查詢?yōu)?DELETE FROM t_user WHERE id = '10086',表示執(zhí)行了刪除test2023數(shù)據(jù)庫中的t_user表中一行數(shù)據(jù)的操作。
這個(gè)事件的作用是刪除 t_user 表中 id 為 10086 的行。這種類型的事件通常用于記錄數(shù)據(jù)的刪除操作,以便進(jìn)行數(shù)據(jù)備份、同步和遷移等操作。

總結(jié): binlog_format 設(shè)置為 mixed 時(shí),對(duì)于 INSERT、UPDATE 和 DELETE 操作,它們?cè)?binlog 中的事件類型都會(huì)被表示為 QUERY 事件。這是因?yàn)樵?mixed 模式下,MySQL 使用了不同的方式來記錄不同類型的操作,但在 binlog 中,它們都被包裝成了 QUERY 事件。
在 mixed 模式下:
- 對(duì)于某些語句級(jí)別的操作(例如非確定性的語句或不支持事務(wù)的存儲(chǔ)引擎),會(huì)使用 STATEMENT 事件。
- 對(duì)于其他一些情況,會(huì)使用 ROW 事件,將變更的行作為事件的一部分進(jìn)行記錄。
這就是為什么看到的 INSERT、UPDATE 和 DELETE 操作的事件類型都是 QUERY。在處理這些事件時(shí),需要根據(jù)具體的 SQL 查詢語句或其他信息來確定操作的類型。
源碼示例
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> <!-- 查看最新版本 --> </dependency> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.21.0</version> </dependency>
Java示例
package com.example.demo.listener;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.AuthenticationException;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class BinlogListenerMixed {
private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class);
private static final String MYSQL_HOST = "8.130.74.105";
private static final int MYSQL_PORT = 3306;
private static final String MYSQL_USERNAME = "root";
private static final String MYSQL_PASSWORD = "zhang.ting.123";
public static void main(String[] args) {
try {
BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
// client.setBinlogFilename(null);
// client.setBinlogPosition(-1); // 或者設(shè)置為其他適當(dāng)?shù)某跏嘉恢?
// client.setServerId(1);
// client.setBinlogFilename("mysql-bin.000005");
// client.setBinlogPosition(154);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
logger.info("使用主機(jī)={}, 端口={}, 用戶名={}, 密碼={} 連接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(BinlogListenerMixed::handleEvent);
client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() {
@Override
public void onConnect(BinaryLogClient client) {
logger.info("Connected to MySQL server");
}
@Override
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
logger.error("Communication failure with MySQL server", ex);
}
@Override
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
logger.error("Event deserialization failure", ex);
}
@Override
public void onDisconnect(BinaryLogClient client) {
logger.warn("Disconnected from MySQL server");
// 在這里添加重新連接或其他處理邏輯
}
});
client.connect();
} catch (IOException e) {
logger.error("@@ 連接到 MySQL 時(shí)發(fā)生錯(cuò)誤", e);
logger.error("@@ Error connecting to MySQL", e);
}
}
private static void handleEvent(Event event) {
logger.info("@@ 打印 event: {}", event);
logger.info("@@ Received event type: {}", event.getHeader().getEventType());
switch (event.getHeader().getEventType()) {
case WRITE_ROWS:
case EXT_WRITE_ROWS:
handleWriteRowsEvent((WriteRowsEventData) event.getData());
break;
case QUERY:
handleQueryEvent((QueryEventData) event.getData());
break;
case TABLE_MAP:
handleTableMapEvent((TableMapEventData) event.getData());
break;
// 其他事件處理...
}
}
private static void handleWriteRowsEvent(WriteRowsEventData eventData) {
List<Serializable[]> rows = eventData.getRows();
// 獲取表名
String tableName = getTableName(eventData);
// 處理每一行數(shù)據(jù)
for (Serializable[] row : rows) {
// 根據(jù)需要調(diào)整以下代碼以獲取具體的列值
String column1Value = row[0].toString();
String column2Value = row[1].toString();
// 將數(shù)據(jù)備份到另一個(gè)數(shù)據(jù)庫
backupToAnotherDatabase(tableName, column1Value, column2Value);
}
}
private static void handleQueryEvent(QueryEventData eventData) {
String sql = eventData.getSql();
logger.info("@@ handleQueryEvent函數(shù)執(zhí)行Query event SQL: {}", sql);
// 解析SQL語句,根據(jù)需要處理
// 例如,檢查是否包含寫入操作,然后執(zhí)行相應(yīng)的邏輯
}
private static void handleTableMapEvent(TableMapEventData eventData) {
// 獲取表映射信息,根據(jù)需要處理
logger.info("@@ handleTableMapEvent函數(shù)執(zhí)行TableMap event: {}", eventData);
}
private static String getTableName(EventData eventData) {
// 獲取表名的邏輯,可以使用TableMapEventData等信息
// 根據(jù)實(shí)際情況實(shí)現(xiàn)
return "example_table";
}
private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) {
// 將數(shù)據(jù)備份到另一個(gè)數(shù)據(jù)庫的邏輯
logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value);
}
}
以上就是基于Spring監(jiān)聽Binlog日志的方法詳解的詳細(xì)內(nèi)容,更多關(guān)于Spring監(jiān)聽Binlog日志的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
30w+數(shù)據(jù)使用RedisTemplate?pipeline空指針NullPointerException異常分析
這篇文章主要為大家介紹了30w+數(shù)據(jù)使用RedisTemplate?pipeline空指針NullPointerException異常分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
spring中的@Value讀取配置文件的細(xì)節(jié)處理過程
這篇文章主要介紹了spring中的@Value讀取配置文件的細(xì)節(jié)處理過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
JDK自帶的序列化方式優(yōu)缺點(diǎn)及實(shí)現(xiàn)原理面試精講
這篇文章主要為大家介紹了JDK自帶的序列化方式優(yōu)缺點(diǎn)及實(shí)現(xiàn)原理面試精講,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
使用google.kaptcha來生成圖片驗(yàn)證碼的實(shí)現(xiàn)方法
這篇文章主要介紹了使用google.kaptcha來生成圖片驗(yàn)證碼的實(shí)現(xiàn)方法,非常不錯(cuò)具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-09-09

