欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

實(shí)現(xiàn)MySQL與elasticsearch的數(shù)據(jù)同步的代碼示例

 更新時(shí)間:2023年07月06日 11:13:08   作者:又拍云  
MySQL 自身簡單、高效、可靠,是又拍云內(nèi)部使用最廣泛的數(shù)據(jù)庫,但是當(dāng)數(shù)據(jù)量達(dá)到一定程度的時(shí)候,對(duì)整個(gè) MySQL 的操作會(huì)變得非常遲緩,這個(gè)時(shí)候我們就需要MySQL與elasticsearch數(shù)據(jù)同步,接下來就給大家介紹如何實(shí)現(xiàn)數(shù)據(jù)同步

原數(shù)據(jù)庫的同步問題

由于傳統(tǒng)的 mysql 數(shù)據(jù)庫并不擅長海量數(shù)據(jù)的檢索,當(dāng)數(shù)據(jù)量到達(dá)一定規(guī)模時(shí)(估算單表兩千萬左右),查詢和插入的耗時(shí)會(huì)明顯增加。同樣,當(dāng)需要對(duì)這些數(shù)據(jù)進(jìn)行模糊查詢或是數(shù)據(jù)分析時(shí),MySQL作為事務(wù)型關(guān)系數(shù)據(jù)庫很難提供良好的性能支持。使用適合的數(shù)據(jù)庫來實(shí)現(xiàn)模糊查詢是解決這個(gè)問題的關(guān)鍵。

但是,切換數(shù)據(jù)庫會(huì)迎來兩個(gè)問題,一是已有的服務(wù)對(duì)現(xiàn)在的 MySQL 重度依賴,二是 MySQL 的事務(wù)能力和軟件生態(tài)仍然不可替代,直接遷移數(shù)據(jù)庫的成本過大。我們綜合考慮了下,決定同時(shí)使用多個(gè)數(shù)據(jù)庫的方案,不同的數(shù)據(jù)庫應(yīng)用于不同的使用場景。而在支持模糊查詢功能的數(shù)據(jù)庫中,elasticsearch 自然是首選的查詢數(shù)據(jù)庫。這樣后續(xù)對(duì)業(yè)務(wù)需求的切換也會(huì)非常靈活。

那具體該如何實(shí)現(xiàn)呢?在又拍云以往的項(xiàng)目中,也有遇到相似的問題。之前采用的方法是在業(yè)務(wù)中編寫代碼,然后同步到 elasticsearch 中。具體是這樣實(shí)施的:每個(gè)系統(tǒng)編寫特定的代碼,修改 MySQL 數(shù)據(jù)庫后,再將更新的數(shù)據(jù)直接推送到需要同步的數(shù)據(jù)庫中,或推送到隊(duì)列由消費(fèi)程序來寫入到數(shù)據(jù)庫中。

但這個(gè)方案有一些明顯的缺點(diǎn):

  • 系統(tǒng)高耦合,侵入式代碼,使得業(yè)務(wù)邏輯復(fù)雜度增加

  • 方案不通用,每一套同步都需要額外定制,不僅增加業(yè)務(wù)處理時(shí)間,還會(huì)提升軟件復(fù)復(fù)雜度

  • 工作量和復(fù)雜度增加

在業(yè)務(wù)中編寫同步方案,雖然在項(xiàng)目早期比較方便,但隨著數(shù)據(jù)量和系統(tǒng)的發(fā)展壯大,往往最后會(huì)成為業(yè)務(wù)的大痛點(diǎn)。

解決思路及方案

調(diào)整架構(gòu)

既然以往的方案有明顯的缺點(diǎn),那我們?nèi)绾蝸斫鉀Q它呢?優(yōu)秀的解決方案往往是 “通過架構(gòu)來解決問題“,那么能不能通過架構(gòu)的思想來解決問題呢?

答案是可以的。我們可以將程序偽裝成 “從數(shù)據(jù)庫”,主庫的增量變化會(huì)傳遞到從庫,那這個(gè)偽裝成 “從數(shù)據(jù)庫” 的程序就能實(shí)時(shí)獲取到數(shù)據(jù)變化,然后將增量的變化推送到消息隊(duì)列 MQ,后續(xù)消費(fèi)者消耗 MQ 的數(shù)據(jù),然后經(jīng)過處理之后再推送到各自需要的數(shù)據(jù)庫。

這個(gè)架構(gòu)的核心是通過監(jiān)聽 MySQL 的 binlog 來同步增量數(shù)據(jù),通過基于 query 的查詢舊表來同步舊數(shù)據(jù),這就是本文要講的一種異構(gòu)數(shù)據(jù)庫同步的實(shí)踐。

改進(jìn)數(shù)據(jù)庫

經(jīng)過深度的調(diào)研,成功得到了一套異構(gòu)數(shù)據(jù)庫同步方案,并且成功將公司生產(chǎn)環(huán)境下的 robin/logs 的表同步到了 elasticsearch 上。

首先對(duì) MySQL 開啟 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生產(chǎn)環(huán)境的數(shù)據(jù)庫不宜修改。這里請(qǐng)教了海楊前輩,他提供了”從庫聯(lián)級(jí)“的思路,在從庫中監(jiān)聽 binlog 繞過了操作生產(chǎn)環(huán)境重啟主庫的操作,大大降低了系統(tǒng)風(fēng)險(xiǎn)。

后續(xù)操作比較順利,啟動(dòng) maxwell 監(jiān)聽從庫變化,然后將增量變化推送到 kafka ,最后配置 logstash 消費(fèi) kafka中的數(shù)據(jù)變化事件信息,將結(jié)果推送到 elasticsearch。配置 logstash需要結(jié)合表結(jié)構(gòu),這是整套方案實(shí)施的重點(diǎn)。

這套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 與 kafka已經(jīng)在生產(chǎn)環(huán)境中有部署,所以無需單獨(dú)部署維護(hù)。而 logstash 與 maxwell 只需要修改配置文件和啟動(dòng)命令即可快速上線。整套方案的意義不僅在于成本低,而且可以大規(guī)模使用,公司內(nèi)有 MySQL 同步到其它數(shù)據(jù)庫的需求時(shí),都可以上任。

成果展示前后對(duì)比

使用該方案同步和業(yè)務(wù)實(shí)現(xiàn)同步的對(duì)比

寫入到 elasticsearch 性能對(duì)比 (8核4G內(nèi)存)

經(jīng)過對(duì)比測試,800w 數(shù)據(jù)量全量同步,使用 logstash 寫到 elasticsearch,實(shí)際需要大概 3 小時(shí),而舊方案的寫入時(shí)間需要 2.5 天。

方案實(shí)施細(xì)節(jié)

接下來,我們來看看具體是如何實(shí)現(xiàn)的。

本方案無需編寫額外代碼,非侵入式的,實(shí)現(xiàn) MySQL 數(shù)據(jù)與 elasticsearch 數(shù)據(jù)庫的同步。

下列是本次方案需要使用所有的組件:

  • MySQL

  • Kafka

  • Maxwell(監(jiān)聽 binlog)

  • Logstash(將數(shù)據(jù)同步給 elasticsearch)

  • Elasticsearch

1. MySQL配置

本次使用 MySQL 5.5 作示范,其他版本的配置可能稍許不同需要

首先我們需要增加一個(gè)數(shù)據(jù)庫只讀的用戶,如果已有的可以跳過。

-- 創(chuàng)建一個(gè) 用戶名為 maxwell 密碼為 xxxxxx 的用戶
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

開啟數(shù)據(jù)庫的 binlog,修改 mysql 配置文件,注意 maxwell 需要的 binlog 格式必須是row

# /etc/mysql/my.cnf
[mysqld]
# maxwell 需要的 binlog 格式必須是 row
binlog_format=row
# 指定 server_id 此配置關(guān)系到主從同步需要按情況設(shè)置,
# 由于此mysql沒有開啟主從同步,這邊默認(rèn)設(shè)置為 1
server_id=1
# logbin 輸出的文件名, 按需配置
log-bin=master

重啟 MySQL 并查看配置是否生效:

sudo systemctl restart mysqld
select @@log_bin;
-- 正確結(jié)果是 1
select @@binlog_format;
-- 正確結(jié)果是 ROW

如果要監(jiān)聽的數(shù)據(jù)庫開啟了主從同步,并且不是主數(shù)據(jù)庫,需要再從數(shù)據(jù)庫開啟 binlog 聯(lián)級(jí)同步。

# /etc/my.cnf
log_slave_updates = 1

需要被同步到 elasticsearch 的表結(jié)構(gòu)。

-- robin.logs
show create table robin.logs;
-- 表結(jié)構(gòu)
CREATE TABLE `logs` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `content` text NOT NULL,
  `user_id` int(11) NOT NULL,
  `status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,
  `type` varchar(20) DEFAULT '',
  `meta` text,
  `created_at` bigint(15) NOT NULL,
  `idx_host` varchar(255) DEFAULT '',
  `idx_domain_id` int(11) unsigned DEFAULT NULL,
  `idx_record_value` varchar(255) DEFAULT '',
  `idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,
  `idx_orig_record_value` varchar(255) DEFAULT '',
  PRIMARY KEY (`id`),
  KEY `created_at` (`created_at`)
) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8

2. Maxwell 配置

本次使用 maxwell-1.39.2 作示范, 確保機(jī)器中包含 java 環(huán)境, 推薦 openjdk11

下載 maxwell 程序

wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz
tar zxvf maxwell-1.39.2.tar.gz **&&**  cd maxwell-1.39.2

maxwell 使用了兩個(gè)數(shù)據(jù)庫:

  • 一個(gè)是需要被監(jiān)聽binlog的數(shù)據(jù)庫(只需要讀權(quán)限)

  • 另一個(gè)是記錄maxwell服務(wù)狀態(tài)的數(shù)據(jù)庫,當(dāng)前這兩個(gè)數(shù)據(jù)庫可以是同一個(gè)

重要參數(shù)說明:

  • host 需要監(jiān)聽binlog的數(shù)據(jù)庫地址

  • port 需要監(jiān)聽binlog的數(shù)據(jù)庫端口

  • user 需要監(jiān)聽binlog的數(shù)據(jù)庫用戶名

  • password 需要監(jiān)聽binlog的密碼

  • replication_host 記錄maxwell服務(wù)的數(shù)據(jù)庫地址

  • replication_port 記錄maxwell服務(wù)的數(shù)據(jù)庫端口

  • replication_user 記錄maxwell服務(wù)的數(shù)據(jù)庫用戶名

  • filter 用于監(jiān)聽binlog數(shù)據(jù)時(shí)過濾不需要的數(shù)據(jù)庫數(shù)據(jù)或指定需要的數(shù)據(jù)庫

  • producer 將監(jiān)聽到的增量變化數(shù)據(jù)提交給的消費(fèi)者 (如 stdout、kafka)

  • kafka.bootstrap.servers kafka 服務(wù)地址

  • kafka_version kafka 版本

  • kafka_topic 推送到kafka的主題

啟動(dòng) maxwell

注意,如果 kafka 配置了禁止自動(dòng)創(chuàng)建主題,需要先自行在 kafka 上創(chuàng)建主題,kafka_version 需要根據(jù)情況指定, 此次使用了兩張不同的庫

./bin/maxwell 
        --host=mysql-maxwell.mysql.svc.cluster.fud3 
        --port=3306 
        --user=root 
        --password=password 
        --replication_host=192.168.5.38 
        --replication_port=3306 
        --replication_user=cloner 
        --replication_password=password
        --filter='exclude: *.*, include: robin.logs' 
        --producer=kafka 
        --kafka.bootstrap.servers=192.168.30.10:9092 
        --kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1

3. 安裝 Logstash

Logstash 包中已經(jīng)包含了 openjdk,無需額外安裝。

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz
tar zxvf logstash-8.5.0-linux-x86_64.tar.gz

刪除不需要的配置文件。

rm config/logstash.yml

修改 logstash 配置文件

# config/logstash-sample.conf
input {
 kafka {
    bootstrap_servers => "192.168.30.10:9092"
    group_id => "main"
    topics => ["maxwell-robinlogs"]
 }
}
filter {
  json {
    source => "message"
  }
  # 將maxwell的事件類型轉(zhuǎn)化為es的事件類型
  # 如增加 -> index 修改-> update
  translate {
    source => "[type]"
    target => "[action]"
    dictionary => {
      "insert" => "index"
      "bootstrap-insert" => "index"
      "update" => "update"
      "delete" => "delete"
    }
    fallback => "unknown"
  }
  # 過濾無效的數(shù)據(jù)
  if ([action] == "unknown") {
    drop {}
  }
  # 處理數(shù)據(jù)格式
  if [data][idx_host] {
    mutate {
      add_field => { "idx_host" => "%{[data][idx_host]}" }
    }
  } else {
    mutate {
      add_field => { "idx_host" => "" }
    }
  }
  if [data][idx_domain_id] {
    mutate {
      add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }
    }
  } else {
    mutate {
      add_field => { "idx_domain_id" => "" }
    }
  }
  if [data][idx_record_value] {
    mutate {
      add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }
    }
  } else {
    mutate {
      add_field => { "idx_record_value" => "" }
    }
  }
   if [data][idx_record_opt] {
    mutate {
      add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }
    }
  } else {
    mutate {
      add_field => { "idx_record_opt" => "" }
    }
  }
  if [data][idx_orig_record_value] {
    mutate {
      add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }
    }
  } else {
    mutate {
      add_field => { "idx_orig_record_value" => "" }
    }
  }
  if [data][type] {
    mutate {
      replace => { "type" => "%{[data][type]}" }
    }
  } else {
    mutate {
      replace => { "type" => "" }
    }
  }
  mutate {
    add_field => {
      "id" => "%{[data][id]}"
      "content" => "%{[data][content]}"
      "user_id" => "%{[data][user_id]}"
      "status" => "%{[data][status]}"
      "meta" => "%{[data][meta]}"
      "created_at" => "%{[data][created_at]}"
    }
    remove_field => ["data"]
  }
  mutate {
    convert => {
      "id" => "integer"
      "user_id" => "integer"
      "idx_domain_id" => "integer"
      "created_at" => "integer"
    }
  }
  # 只提煉需要的字段
  mutate {
    remove_field => [
      "message",
      "original",
      "@version",
      "@timestamp",
      "event",
      "database",
      "table",
      "ts",
      "xid",
      "commit",
      "tags"
    ]
   }
}
output {
  # 結(jié)果寫到es
  elasticsearch {
    hosts => ["http://es-zico2.service.upyun:9500"]
    index => "robin_logs"
    action => "%{action}"
    document_id => "%{id}"
    document_type => "robin_logs"
  }
  # 結(jié)果打印到標(biāo)準(zhǔn)輸出
  stdout {
    codec => rubydebug
  }
}

執(zhí)行程序:

# 測試配置文件*
bin/logstash -f config/logstash-sample.conf --config.test_and_exit
# 啟動(dòng)*
bin/logstash -f config/logstash-sample.conf --config.reload.automatic

4. 全量同步

完成啟動(dòng)后,后續(xù)的增量數(shù)據(jù) maxwell 會(huì)自動(dòng)推送給 logstash 最終推送到 elasticsearch ,而之前的舊數(shù)據(jù)可以通過 maxwell 的 bootstrap 來同步,往下面表中插入一條任務(wù),那么 maxwell 會(huì)自動(dòng)將所有符合條件的 where_clause 的數(shù)據(jù)推送更新。

INSERT INTO maxwell.bootstrap 
        ( database_name, table_name, where_clause, client_id ) 
values 
        ( 'robin', 'logs', 'id > 1', 'maxwell' );

后續(xù)可以在 elasticsearch 檢測數(shù)據(jù)是否同步完成,可以先查看數(shù)量是否一致,然后抽樣對(duì)比詳細(xì)數(shù)據(jù)。

# 檢測 elasticsearch  中的數(shù)據(jù)量
GET robin_logs/robin_logs/_count

以上就是實(shí)現(xiàn)MySQL與elasticsearch的數(shù)據(jù)同步的代碼示例的詳細(xì)內(nèi)容,更多關(guān)于MySQ與elasticsearch數(shù)據(jù)同步的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Mysql指定某個(gè)字符串字段前面幾位排序查詢方式

    Mysql指定某個(gè)字符串字段前面幾位排序查詢方式

    這篇文章主要介紹了Mysql指定某個(gè)字符串字段前面幾位排序查詢方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • MySQL按天分組統(tǒng)計(jì)一定時(shí)間內(nèi)的數(shù)據(jù)實(shí)例(沒有數(shù)據(jù)補(bǔ)0)

    MySQL按天分組統(tǒng)計(jì)一定時(shí)間內(nèi)的數(shù)據(jù)實(shí)例(沒有數(shù)據(jù)補(bǔ)0)

    我們?cè)谟肕ysql制作數(shù)據(jù)可視化圖表時(shí)候,經(jīng)常需要按照天對(duì)數(shù)據(jù)進(jìn)行分組統(tǒng)計(jì),下面這篇文章主要給大家介紹了關(guān)于MySQL按天分組統(tǒng)計(jì)一定時(shí)間內(nèi)的數(shù)據(jù),沒有數(shù)據(jù)補(bǔ)0的相關(guān)資料,需要的朋友可以參考下
    2023-03-03
  • OEL7.6源碼安裝MYSQL5.7的教程

    OEL7.6源碼安裝MYSQL5.7的教程

    這篇文章主要介紹了OEL7.6源碼安裝MYSQL5.7 的教程,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-07-07
  • MySQL索引失效的問題解決

    MySQL索引失效的問題解決

    索引可以大大提高查詢速度和效率,但如果索引失效了,查詢的效率會(huì)變得非常低,本文主要介紹了MySQL索引失效的問題解決,感興趣的可以了解一下
    2024-05-05
  • MySQL之臨時(shí)表的實(shí)現(xiàn)示例

    MySQL之臨時(shí)表的實(shí)現(xiàn)示例

    MySQL臨時(shí)表是存儲(chǔ)在內(nèi)存或者磁盤上的臨時(shí)數(shù)據(jù)表,它們的生命周期只限于當(dāng)前數(shù)據(jù)庫會(huì)話,臨時(shí)表的創(chuàng)建和使用方式與普通表類似,本文就詳細(xì)的介紹了MySQL之臨時(shí)表,感興趣的可以了解一下
    2023-08-08
  • mysql數(shù)據(jù)庫插入速度和讀取速度的調(diào)整記錄

    mysql數(shù)據(jù)庫插入速度和讀取速度的調(diào)整記錄

    由于項(xiàng)目變態(tài)需求;需要在一個(gè)比較短時(shí)間段急劇增加數(shù)據(jù)庫記錄(兩三天內(nèi),由于0增加至4億)。在整個(gè)過程調(diào)優(yōu)過程非常艱辛
    2012-07-07
  • mysql?blocked?because?of?many?connection?errors解決記錄

    mysql?blocked?because?of?many?connection?errors解決記錄

    這篇文章主要為大家介紹了mysql?blocked?because?of?many?connection?errors解決方法記錄,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • 十億級(jí)MySQL大表的查詢優(yōu)化10種方法

    十億級(jí)MySQL大表的查詢優(yōu)化10種方法

    針對(duì)十億級(jí)MySQL大表的查詢優(yōu)化,本文主要介紹了10種經(jīng)過驗(yàn)證的核心優(yōu)化策略及對(duì)應(yīng)SQL實(shí)現(xiàn)方案,具有一定的參考價(jià)值,感興趣的可以了解一下
    2025-03-03
  • MySQL服務(wù)器 IO 100%的分析與優(yōu)化方案

    MySQL服務(wù)器 IO 100%的分析與優(yōu)化方案

    這篇文章主要給大家介紹了關(guān)于MySQL服務(wù)器 IO 100%的相關(guān)資料,文中通過示例代碼介紹的介紹非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用mysql具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-10-10
  • 華為歐拉openEuler在線安裝MySQL8的實(shí)現(xiàn)步驟

    華為歐拉openEuler在線安裝MySQL8的實(shí)現(xiàn)步驟

    本文主要介紹了華為歐拉openEuler在線安裝MySQL8的實(shí)現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01

最新評(píng)論