欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

MySQL數(shù)據(jù)變化監(jiān)聽的實現(xiàn)方案

 更新時間:2025年02月10日 10:33:31   作者:Huooya  
在高并發(fā)和大數(shù)據(jù)環(huán)境下,實時獲取MySQL數(shù)據(jù)庫的增量變化對數(shù)據(jù)同步、數(shù)據(jù)分析、緩存更新等場景至關(guān)重要,MySQL的binlog(Binary Log) 記錄了數(shù)據(jù)庫的所有變更,可以用來實現(xiàn) 增量數(shù)據(jù)監(jiān)聽,本文將介紹如何利用binlog監(jiān)聽MySQL數(shù)據(jù)增量,并提供基 Java的Canal實現(xiàn)示例

1. binlog 簡介

1.1 什么是 binlog?

binlog(Binary Log) 是 MySQL 記錄 DDL(數(shù)據(jù)定義語言,如 CREATE、ALTER)和 DML(數(shù)據(jù)操作語言,如 INSERTUPDATE、DELETE)的日志文件,它用于:

  • 主從復制:MySQL 主庫將 binlog 傳輸?shù)綇膸?,實現(xiàn)數(shù)據(jù)同步。
  • 數(shù)據(jù)恢復:通過 mysqlbinlog 工具解析 binlog 恢復數(shù)據(jù)。
  • 數(shù)據(jù)同步:第三方工具(如 Canal)解析 binlog,進行數(shù)據(jù)同步。

1.2 binlog 的三種格式

binlog 格式說明
STATEMENT記錄 SQL 語句本身
ROW記錄行數(shù)據(jù)變更(推薦)
MIXED結(jié)合前兩者,MySQL 自動判斷

由于 ROW 格式能提供精確的行級別變更信息,因此推薦使用它。

2. 開啟 binlog 并配置 MySQL

2.1 檢查 binlog 是否開啟

SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 值為 OFF,說明 binlog 未開啟。

2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)

在 [mysqld] 部分添加以下內(nèi)容:

server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
expire_logs_days=7

重啟 MySQL:

systemctl restart mysql  # Linux
net stop mysql && net start mysql  # Windows

2.3 驗證 binlog 配置

執(zhí)行:

SHOW BINARY LOGS;

如果有 binlog 文件,如 mysql-bin.000001,說明已開啟。

3. 使用 Java 監(jiān)聽 binlog

3.1 選擇工具:Canal

阿里巴巴開源的 Canal 可以模擬 MySQL 從庫協(xié)議,解析 binlog 并實時推送增量數(shù)據(jù)。

3.2 Java 代碼監(jiān)聽 binlog

引入 Maven 依賴

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.6</version>
    </dependency>
</dependencies>

編寫 Java 代碼

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class BinlogListener {
    public static void main(String[] args) {
        // 連接 Canal
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111), 
                "example", "canal", "canal");
        

        try {
            connector.connect();
            connector.subscribe(".*\\..*"); // 監(jiān)聽所有庫表
            connector.rollback();
    
            while (true) {
                Message message = connector.getWithoutAck(100); // 獲取數(shù)據(jù)
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();
    
                if (batchId != -1 && !entries.isEmpty()) {
                    for (CanalEntry.Entry entry : entries) {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            processEntry(entry);
                        }
                    }
                }
                connector.ack(batchId); // 確認消息
            }
        } finally {
            connector.disconnect();
        }
    }
    
    private static void processEntry(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
    
            System.out.println("變更表:" + entry.getHeader().getTableName());
            System.out.println("變更類型:" + eventType);
    
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList());
                } else {
                    System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList());
                    System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

4. 代碼解析

  • 創(chuàng)建 Canal 連接

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("127.0.0.1", 11111), 
    "example", "canal", "canal");
    • 127.0.0.1:Canal 服務(wù)器地址
    • 11111:Canal 端口
    • example:Canal 實例
    • canal/canal:默認賬號密碼
  • 獲取 binlog 變更數(shù)據(jù)

Message message = connector.getWithoutAck(100);
    • getWithoutAck(100):拉取 100 條 binlog 事件。
  • 解析 binlog

for (CanalEntry.Entry entry : entries) {
    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
        processEntry(entry);
    }
}
  • 僅處理 ROWDATA 類型的變更,忽略事務(wù)等其他信息。

  • 分類處理 INSERTUPDATE、DELETE

if (eventType == CanalEntry.EventType.DELETE) {
    System.out.println("刪除數(shù)據(jù):" + rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
    System.out.println("新增數(shù)據(jù):" + rowData.getAfterColumnsList());
} else {
    System.out.println("更新前數(shù)據(jù):" + rowData.getBeforeColumnsList());
    System.out.println("更新后數(shù)據(jù):" + rowData.getAfterColumnsList());
}

總結(jié)

  • MySQL binlog 記錄數(shù)據(jù)庫變更,可用于監(jiān)聽增量數(shù)據(jù)。
  • Canal 作為 MySQL 從庫解析 binlog,實現(xiàn)數(shù)據(jù)同步。
  • Java 代碼示例 展示如何用 Canal 監(jiān)聽 INSERT、UPDATE、DELETE 操作,并解析變更數(shù)據(jù)。

這種方案適用于 分布式數(shù)據(jù)同步、緩存一致性數(shù)據(jù)變更通知,是實時數(shù)據(jù)處理的重要手段。

以上就是MySQL數(shù)據(jù)變化監(jiān)聽的實現(xiàn)方案的詳細內(nèi)容,更多關(guān)于MySQL數(shù)據(jù)變化監(jiān)聽的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • MySQL中int(10)和int(11)的區(qū)別詳解

    MySQL中int(10)和int(11)的區(qū)別詳解

    本文主要介紹了MySQL中int(10)和int(11)的區(qū)別詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-03-03
  • MySQL中UPDATE與DELETE語句的使用教程

    MySQL中UPDATE與DELETE語句的使用教程

    這篇文章主要介紹了MySQL中UPDATE與DELETE語句的使用教程,是MySQL入門學習中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-12-12
  • 提升MYSQL查詢效率的10個SQL語句優(yōu)化技巧

    提升MYSQL查詢效率的10個SQL語句優(yōu)化技巧

    MySQL數(shù)據(jù)庫執(zhí)行效率對程序的執(zhí)行速度有很大的影響,有效的處理優(yōu)化數(shù)據(jù)庫是非常有用的。尤其是大量數(shù)據(jù)需要處理的時候
    2018-03-03
  • Linux下rpm方式安裝mysql教程

    Linux下rpm方式安裝mysql教程

    這篇文章主要為大家詳細介紹了Linux下rpm方式安裝mysql教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-10-10
  • MySQL表的重命名字段添加及字段屬性修改操作語法

    MySQL表的重命名字段添加及字段屬性修改操作語法

    這篇文章主要為大家介紹了MySQL表的重命名字段添加及字段屬性修改語法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-05-05
  • 淺談MySQL的性能優(yōu)化

    淺談MySQL的性能優(yōu)化

    這篇文章主要介紹了淺談MySQL的性能優(yōu)化,MySQL性能優(yōu)化是通過對數(shù)據(jù)庫的配置、查詢優(yōu)化以及索引優(yōu)化等手段提高數(shù)據(jù)庫的響應(yīng)速度和處理能力,本文從多個層面對mysql性能優(yōu)化進行了小結(jié),需要的朋友可以參考下
    2023-08-08
  • MySQL 使用索引掃描進行排序

    MySQL 使用索引掃描進行排序

    mysql可以使用同一個索引既滿足排序,又用于查找行,因此,如果可能,設(shè)計索引時應(yīng)該盡可能地同時滿足這兩種任務(wù),這樣是最好的。本文將介紹如何利用索引來進行排序
    2021-06-06
  • MySQL下200GB大表備份的操作(利用傳輸表空間解決停服發(fā)版表備份問題)

    MySQL下200GB大表備份的操作(利用傳輸表空間解決停服發(fā)版表備份問題)

    這篇文章主要介紹了MySQL下200GB大表備份的操作(利用傳輸表空間解決停服發(fā)版表備份問題),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2025-04-04
  • MySQL Order by 語句用法與優(yōu)化詳解

    MySQL Order by 語句用法與優(yōu)化詳解

    Order by語句是用來排序的,經(jīng)常我們會使用到Order by來進行排序,下面我給大家來講講Order by用法與優(yōu)化排序,有需要的同學可參考
    2013-06-06
  • MySQL Json類型字段IN查詢分組優(yōu)化

    MySQL Json類型字段IN查詢分組優(yōu)化

    這篇文章主要為大家介紹了MySQL Json類型字段IN查詢分組優(yōu)化,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08

最新評論