欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何利用Java實(shí)現(xiàn)MySQL的數(shù)據(jù)變化監(jiān)聽(tīng)

 更新時(shí)間:2025年02月11日 08:35:39   作者:Huooya  
在高并發(fā)和大數(shù)據(jù)環(huán)境下,實(shí)時(shí)獲取?MySQL?數(shù)據(jù)庫(kù)的增量變化對(duì)數(shù)據(jù)同步、數(shù)據(jù)分析、緩存更新等場(chǎng)景至關(guān)重要,下面我們就來(lái)看看如何通過(guò)Java實(shí)現(xiàn)MySQL的數(shù)據(jù)變化監(jiān)聽(tīng)吧

在高并發(fā)和大數(shù)據(jù)環(huán)境下,實(shí)時(shí)獲取 MySQL 數(shù)據(jù)庫(kù)的增量變化對(duì)數(shù)據(jù)同步、數(shù)據(jù)分析、緩存更新等場(chǎng)景至關(guān)重要。MySQL 的 binlog(Binary Log) 記錄了數(shù)據(jù)庫(kù)的所有變更,可以用來(lái)實(shí)現(xiàn) 增量數(shù)據(jù)監(jiān)聽(tīng)。本文將介紹如何利用 binlog 監(jiān)聽(tīng) MySQL 數(shù)據(jù)增量,并提供基于 Java 的 Canal 實(shí)現(xiàn)示例。

1.binlog 簡(jiǎn)介

1.1 什么是 binlog

binlog(Binary Log) 是 MySQL 記錄 DDL(數(shù)據(jù)定義語(yǔ)言,如 CREATE、ALTER)和 DML(數(shù)據(jù)操作語(yǔ)言,如 INSERT、UPDATEDELETE)的日志文件,它用于:

  • 主從復(fù)制:MySQL 主庫(kù)將 binlog 傳輸?shù)綇膸?kù),實(shí)現(xiàn)數(shù)據(jù)同步。
  • 數(shù)據(jù)恢復(fù):通過(guò) mysqlbinlog 工具解析 binlog 恢復(fù)數(shù)據(jù)。
  • 數(shù)據(jù)同步:第三方工具(如 Canal)解析 binlog,進(jìn)行數(shù)據(jù)同步。

1.2 binlog 的三種格式

binlog 格式說(shuō)明
STATEMENT記錄 SQL 語(yǔ)句本身
ROW記錄行數(shù)據(jù)變更(推薦)
MIXED結(jié)合前兩者,MySQL 自動(dòng)判斷

由于 ROW 格式能提供精確的行級(jí)別變更信息,因此推薦使用它。

2. 開(kāi)啟 binlog 并配置 MySQL

2.1 檢查 binlog 是否開(kāi)啟

SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 值為 OFF,說(shuō)明 binlog 未開(kāi)啟。

2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)

[mysqld] 部分添加以下內(nèi)容:

server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
expire_logs_days=7

重啟 MySQL:

systemctl restart mysql  # Linux
net stop mysql && net start mysql  # Windows

2.3 驗(yàn)證 binlog 配置

執(zhí)行:

SHOW BINARY LOGS;

如果有 binlog 文件,如 mysql-bin.000001,說(shuō)明已開(kāi)啟。

3. 使用 Java 監(jiān)聽(tīng) binlog

3.1 選擇工具:Canal

阿里巴巴開(kāi)源的 Canal 可以模擬 MySQL 從庫(kù)協(xié)議,解析 binlog 并實(shí)時(shí)推送增量數(shù)據(jù)。

3.2 Java 代碼監(jiān)聽(tīng) binlog

引入 Maven 依賴

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.6</version>
    </dependency>
</dependencies>

編寫 Java 代碼

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class BinlogListener {
    public static void main(String[] args) {
        // 連接 Canal
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111), 
                "example", "canal", "canal");
        

        try {
            connector.connect();
            connector.subscribe(".*\\..*"); // 監(jiān)聽(tīng)所有庫(kù)表
            connector.rollback();
    
            while (true) {
                Message message = connector.getWithoutAck(100); // 獲取數(shù)據(jù)
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();
    
                if (batchId != -1 && !entries.isEmpty()) {
                    for (CanalEntry.Entry entry : entries) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            processEntry(entry);
                        }
                    }
                }
                connector.ack(batchId); // 確認(rèn)消息
            }
        } finally {
            connector.disconnect();
        }
    }
    
    private static void processEntry(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
    
            System.out.println("變更表:" + entry.getHeader().getTableName());
            System.out.println("變更類型:" + eventType);
    
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList());
                } else {
                    System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList());
                    System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

4. 代碼解析

1.創(chuàng)建 Canal 連接

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111), 
    "example", "canal", "canal");
  • 127.0.0.1:Canal 服務(wù)器地址
  • 11111:Canal 端口
  • example:Canal 實(shí)例
  • canal/canal:默認(rèn)賬號(hào)密碼

2.獲取 binlog 變更數(shù)據(jù)

Message message = connector.getWithoutAck(100);

getWithoutAck(100):拉取 100 條 binlog 事件。

3.解析 binlog

for (CanalEntry.Entry entry : entries) {
    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
        processEntry(entry);
    }
}

僅處理 ROWDATA 類型的變更,忽略事務(wù)等其他信息。

4.分類處理 INSERT、UPDATE、DELETE

if (eventType == CanalEntry.EventType.DELETE) {
    System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
    System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList());
} else {
    System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList());
    System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList());
}

總結(jié)

  • MySQL binlog 記錄數(shù)據(jù)庫(kù)變更,可用于監(jiān)聽(tīng)增量數(shù)據(jù)。
  • Canal 作為 MySQL 從庫(kù)解析 binlog,實(shí)現(xiàn)數(shù)據(jù)同步。
  • Java 代碼示例 展示如何用 Canal 監(jiān)聽(tīng) INSERT、UPDATE、DELETE 操作,并解析變更數(shù)據(jù)。

這種方案適用于 分布式數(shù)據(jù)同步、緩存一致性數(shù)據(jù)變更通知,是實(shí)時(shí)數(shù)據(jù)處理的重要手段。

到此這篇關(guān)于如何利用Java實(shí)現(xiàn)MySQL的數(shù)據(jù)變化監(jiān)聽(tīng)的文章就介紹到這了,更多相關(guān)Java監(jiān)聽(tīng)MySQL數(shù)據(jù)變化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論