欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實現(xiàn)MySQL數(shù)據(jù)實時同步至Elasticsearch的方法詳解

 更新時間:2025年03月10日 08:43:37   作者:小諸葛IT課堂  
MySQL擅長事務(wù)處理,而Elasticsearch(ES)則專注于搜索與分析,將MySQL數(shù)據(jù)實時同步到ES,可以充分發(fā)揮兩者的優(yōu)勢,下面我們就來看看如何使用Java實現(xiàn)這一功能吧

引言:為什么需要實時同步?

MySQL擅長事務(wù)處理,而Elasticsearch(ES)則專注于搜索與分析。將MySQL數(shù)據(jù)實時同步到ES,可以充分發(fā)揮兩者的優(yōu)勢,例如:

  • 構(gòu)建高性能搜索服務(wù)
  • 實時數(shù)據(jù)分析與大屏展示
  • 提升復(fù)雜查詢效率

傳統(tǒng)方案(如定時全量同步)存在延遲高、資源浪費等問題。本文將基于MySQL Binlog監(jiān)聽實現(xiàn)毫秒級實時同步,并提供完整Java代碼及深度源碼解析。

一、技術(shù)選型與核心原理

1.1 核心組件

MySQL Binlog:MySQL的二進(jìn)制日志,記錄所有數(shù)據(jù)變更事件(增刪改)。

Canal/OpenReplicator:解析Binlog的工具(本文使用輕量級mysql-binlog-connector-java)。

Elasticsearch High Level REST Client:ES官方Java客戶端,用于數(shù)據(jù)寫入。

1.2 架構(gòu)流程圖

MySQL Server → Binlog → Java監(jiān)聽程序 → 數(shù)據(jù)轉(zhuǎn)換 → Elasticsearch

二、環(huán)境準(zhǔn)備與配置

2.1 MySQL開啟Binlog

# 修改my.cnf(Linux)或my.ini(Windows)
[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW  # 必須為ROW模式

2.2 創(chuàng)建ES索引

PUT /user
{
  "mappings": {
    "properties": {
      "id": {"type": "integer"},
      "name": {"type": "text"},
      "email": {"type": "keyword"},
      "create_time": {"type": "date"}
    }
  }
}

三、Java代碼實現(xiàn)

3.1 Maven依賴

<dependency>
    <groupId>com.github.shyiko</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.25.4</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.3</version>
</dependency>

3.2 核心代碼(Binlog監(jiān)聽與同步)

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
 
public class MySQL2ESSyncer {
 
    private static final String ES_INDEX = "user";
 
    public static void main(String[] args) throws Exception {
        // 初始化ES客戶端
        RestHighLevelClient esClient = ESClientFactory.createClient();
 
        // 配置Binlog監(jiān)聽
        BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");
        client.setServerId(1001); // 唯一ID,避免沖突
 
        client.registerEventListener(event -> {
            EventData data = event.getData();
            if (data instanceof WriteRowsEventData) {
                // 處理插入事件
                handleWriteEvent((WriteRowsEventData) data, esClient);
            } else if (data instanceof UpdateRowsEventData) {
                // 處理更新事件
                handleUpdateEvent((UpdateRowsEventData) data, esClient);
            } else if (data instanceof DeleteRowsEventData) {
                // 處理刪除事件
                handleDeleteEvent((DeleteRowsEventData) data, esClient);
            }
        });
 
        client.connect(); // 啟動監(jiān)聽
    }
 
    private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {
        eventData.getRows().forEach(row -> {
            // 假設(shè)表結(jié)構(gòu)為:id, name, email, create_time
            String json = String.format(
                "{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",
                row[0], row[1], row[2], row[3]
            );
            IndexRequest request = new IndexRequest(ES_INDEX)
                .id(row[0].toString())
                .source(json, XContentType.JSON);
            esClient.index(request, RequestOptions.DEFAULT);
        });
    }
 
    // 更新和刪除處理類似,代碼略(完整源碼見文末鏈接)
}

四、源碼深度解析

4.1 Binlog監(jiān)聽流程

BinaryLogClient:核心類,負(fù)責(zé)連接MySQL并監(jiān)聽Binlog。

事件類型判斷:根據(jù)WriteRowsEventData、UpdateRowsEventData、DeleteRowsEventData區(qū)分增、改、刪操作。

4.2 數(shù)據(jù)轉(zhuǎn)換關(guān)鍵點

Row數(shù)據(jù)解析:從事件中提取變更的行的具體值,需與表結(jié)構(gòu)順序?qū)?yīng)。

ES文檔ID:建議使用MySQL主鍵,確保更新/刪除操作能精準(zhǔn)定位文檔。

4.3 異常處理與優(yōu)化

重試機制:ES寫入失敗時,可加入重試隊列。

批量提交:攢批寫入ES提升性能(需權(quán)衡實時性)。

事務(wù)一致性:確保Binlog位置持久化,避免數(shù)據(jù)丟失。

五、方案優(yōu)缺點對比

方案實時性復(fù)雜度資源消耗
定時全量同步低(分鐘級)
基于觸發(fā)器高(需改表)
Binlog監(jiān)聽

六、總結(jié)與擴展

本文實現(xiàn)了基于Binlog的MySQL到ES的實時同步,具備以下優(yōu)勢:

  • 實時性:毫秒級延遲,滿足大部分業(yè)務(wù)場景。
  • 無侵入:無需修改MySQL表結(jié)構(gòu)。
  • 可擴展:可輕松適配其他數(shù)據(jù)源(如PostgreSQL)。

擴展方向:

  • 使用Kafka作為中間層,解耦生產(chǎn)與消費。
  • 增加監(jiān)控報警,保障數(shù)據(jù)一致性。
  • 支持DDL變更自動同步(如表結(jié)構(gòu)修改)。

到此這篇關(guān)于Java實現(xiàn)MySQL數(shù)據(jù)實時同步至Elasticsearch的方法詳解的文章就介紹到這了,更多相關(guān)Java MySQL數(shù)據(jù)同步至Elasticsearch內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Springboot2.1.6集成activiti7出現(xiàn)登錄驗證的實現(xiàn)

    Springboot2.1.6集成activiti7出現(xiàn)登錄驗證的實現(xiàn)

    這篇文章主要介紹了Springboot2.1.6集成activiti7出現(xiàn)登錄驗證的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • SpringBoot新特性之全局懶加載機制

    SpringBoot新特性之全局懶加載機制

    這篇文章主要介紹了SpringBoot新特性之全局懶加載機制,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • Java的RTTI和反射機制代碼分析

    Java的RTTI和反射機制代碼分析

    這篇文章主要涉及了Java的RTTI和反射機制代碼分析的相關(guān)內(nèi)容,在介紹運行時類型識別的同時,又向大家展示了其實例以及什么時候會用到反射機制,內(nèi)容豐富,需要的朋友可以參考下。
    2017-09-09
  • 基于StringBuilder類中的重要方法(介紹)

    基于StringBuilder類中的重要方法(介紹)

    下面小編就為大家?guī)硪黄赟tringBuilder類中的重要方法(介紹)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07
  • Spring OAuth2.0 單元測試解決方案

    Spring OAuth2.0 單元測試解決方案

    這篇文章主要介紹了Spring OAuth2.0 單元測試解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • java獲取中文拼音首字母的實例

    java獲取中文拼音首字母的實例

    下面小編就為大家?guī)硪黄猨ava獲取中文拼音首字母的實例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • IDEA maven引入SSL證書校驗問題及處理

    IDEA maven引入SSL證書校驗問題及處理

    這篇文章主要討論了在Maven項目中遇到依賴導(dǎo)入問題,特別是關(guān)于PKIX路徑構(gòu)建失敗的錯誤,文章提供了三種解決方法:手動下載依賴、忽略SSL證書校驗以及生成并導(dǎo)入SSL證書,每種方法都有詳細(xì)的步驟和示例代碼,幫助開發(fā)者解決這個問題
    2025-02-02
  • Java 鎖的知識總結(jié)及實例代碼

    Java 鎖的知識總結(jié)及實例代碼

    這篇文章主要介紹了Java 鎖的知識總結(jié)及實例代碼,需要的朋友可以參考下
    2016-09-09
  • java LRU算法介紹與用法示例

    java LRU算法介紹與用法示例

    這篇文章主要介紹了java LRU算法,簡單介紹了LRU算法的概念并結(jié)合實例形式分析了LRU算法的具體使用方法,需要的朋友可以參考下
    2017-09-09
  • Java基于fork/koin類實現(xiàn)并發(fā)排序

    Java基于fork/koin類實現(xiàn)并發(fā)排序

    這篇文章主要介紹了Java基于fork/koin類實現(xiàn)并發(fā)排序,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-02-02

最新評論