欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

MySQL和Elasticsearch數(shù)據(jù)同步方案詳解

 更新時間:2025年10月24日 11:00:08   作者:匆匆忙忙游刃有余  
在現(xiàn)代電商系統(tǒng)中,MySQL 作為關(guān)系型數(shù)據(jù)庫負(fù)責(zé)數(shù)據(jù)的持久化存儲,而 Elasticsearch 則作為搜索引擎提供高效的全文檢索能力,本文就來詳細(xì)的介紹一下MySQL和Elasticsearch數(shù)據(jù)同步方案,感興趣的可以了解一下

在現(xiàn)代電商系統(tǒng)中,MySQL 作為關(guān)系型數(shù)據(jù)庫負(fù)責(zé)數(shù)據(jù)的持久化存儲,而 Elasticsearch 則作為搜索引擎提供高效的全文檢索能力。保證兩者之間的數(shù)據(jù)一致性是系統(tǒng)設(shè)計的關(guān)鍵挑戰(zhàn)。本文將詳細(xì)介紹主流的同步方案、實現(xiàn)方式及其優(yōu)缺點。

一、同步方案對比

同步方案實時性實現(xiàn)復(fù)雜度一致性保證性能影響適用場景
同步雙寫極高簡單強一致性金融交易、核心訂單
異步雙寫較高中等最終一致性電商商品、用戶信息
Canal + MQ較高最終一致性大規(guī)模數(shù)據(jù)同步
Logstash 定時簡單弱一致性報表、分析數(shù)據(jù)
Debezium最終一致性復(fù)雜數(shù)據(jù)同步

二、詳細(xì)實現(xiàn)方案

1. 同步雙寫

原理:在業(yè)務(wù)代碼中同時寫入 MySQL 和 Elasticsearch,確保兩者數(shù)據(jù)同步更新。

實現(xiàn)代碼

@Service
@Transactional
public class ProductServiceImpl implements ProductService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @Override
    public void createProduct(Product product) {
        // 1. 寫入 MySQL
        productMapper.insert(product);
        
        // 2. 同步寫入 ES
        try {
            ProductIndex productIndex = convertToIndex(product);
            IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
                .index("product_index")
                .id(product.getId().toString())
                .document(productIndex)
            );
            esClient.index(request);
        } catch (Exception e) {
            // 處理 ES 寫入失敗的情況
            log.error("ES同步失敗: {}", e.getMessage());
            // 可以選擇拋出異?;貪L事務(wù),或者記錄失敗日志后續(xù)補償
            throw new RuntimeException("數(shù)據(jù)同步失敗", e);
        }
    }
    
    @Override
    public void updateProduct(Product product) {
        // 類似 createProduct,同時更新 MySQL 和 ES
        productMapper.updateById(product);
        // ES 更新邏輯...
    }
    
    private ProductIndex convertToIndex(Product product) {
        // 實體轉(zhuǎn)換邏輯
        ProductIndex index = new ProductIndex();
        index.setId(product.getId());
        index.setTitle(product.getTitle());
        index.setPrice(product.getPrice());
        // 其他字段轉(zhuǎn)換...
        return index;
    }
}

優(yōu)缺點

  • 優(yōu)點:實現(xiàn)簡單,數(shù)據(jù)一致性強,實時性最高
  • 缺點
    • 代碼耦合度高,業(yè)務(wù)邏輯與數(shù)據(jù)同步混合
    • ES 寫入延遲影響主業(yè)務(wù)性能
    • 故障處理復(fù)雜,需考慮回滾機制

2. 異步雙寫

原理:通過消息隊列解耦,業(yè)務(wù)代碼只負(fù)責(zé)寫 MySQL,然后發(fā)送消息到 MQ,由消費者異步更新 ES。

實現(xiàn)代碼

// 生產(chǎn)者端
@Service
public class ProductServiceImpl implements ProductService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Override
    @Transactional
    public void createProduct(Product product) {
        // 1. 寫入 MySQL
        productMapper.insert(product);
        
        // 2. 發(fā)送消息到隊列
        ProductEvent event = new ProductEvent();
        event.setType("CREATE");
        event.setProductId(product.getId());
        rabbitTemplate.convertAndSend("product-event-exchange", "product.create", event);
    }
}

// 消費者端
@Component
public class ProductSyncConsumer {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @RabbitListener(queues = "product-sync-queue")
    public void handleProductEvent(ProductEvent event) {
        try {
            Product product = productMapper.selectById(event.getProductId());
            if (product == null) {
                // 刪除 ES 中的數(shù)據(jù)
                deleteFromEs(event.getProductId());
                return;
            }
            
            // 同步到 ES
            ProductIndex index = convertToIndex(product);
            IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
                .index("product_index")
                .id(product.getId().toString())
                .document(index)
            );
            esClient.index(request);
            
            log.info("產(chǎn)品 {} 同步到 ES 成功", event.getProductId());
        } catch (Exception e) {
            log.error("同步 ES 失敗: {}", e.getMessage());
            // 可以根據(jù)需要進(jìn)行重試或記錄到死信隊列
        }
    }
}

優(yōu)缺點

  • 優(yōu)點
    • 解耦業(yè)務(wù)與同步邏輯
    • 消息隊列提供削峰填谷能力
    • ES 故障不影響主業(yè)務(wù)流程
  • 缺點
    • 存在短暫的數(shù)據(jù)不一致
    • 增加了系統(tǒng)復(fù)雜度
    • 需要處理消息丟失、重復(fù)消費等問題

3. Canal + 消息隊列 方案

原理:利用 Canal 監(jiān)聽 MySQL 的 binlog,解析數(shù)據(jù)變更并發(fā)送到消息隊列,再由消費者同步到 ES。

3.1 環(huán)境準(zhǔn)備

MySQL 配置

# 開啟 binlog
log-bin=mysql-bin
# 選擇 ROW 模式
binlog-format=ROW
# 服務(wù)器唯一ID
server-id=1
# 開啟 binlog 實時更新
sync_binlog=1

Canal Server 配置

# canal-server/conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\..*

Canal Adapter 配置

# canal-adapter/conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/shop?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: example  # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 127.0.0.1:9200
        properties:
          mode: rest
          cluster.name: elasticsearch

表映射配置

# canal-adapter/conf/es/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId:
topic: example
database: shop
table: tb_product
esMapping:
  _index: product_index
  _type: _doc
  _id: _id
  upsert: true
  sql: |
    select 
      p.id as _id, 
      p.title, 
      p.sub_title as subTitle, 
      p.price, 
      p.sales, 
      c.name as categoryName 
    from tb_product p 
    left join tb_category c on p.cid1 = c.id
  commitBatch: 3000

3.2 自定義消費者實現(xiàn)

如果需要更靈活的處理,可以自定義 Kafka 消費者:

@Component
public class ProductSyncConsumer {
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @KafkaListener(topics = "example")
    public void processMessage(String message) {
        try {
            // 解析 Canal 消息
            CanalMessage canalMsg = JSON.parseObject(message, CanalMessage.class);
            
            for (CanalData data : canalMsg.getData()) {
                // 根據(jù)操作類型處理
                switch (canalMsg.getEventType()) {
                    case INSERT:
                    case UPDATE:
                        syncToEs(data);
                        break;
                    case DELETE:
                        deleteFromEs(data);
                        break;
                }
            }
        } catch (Exception e) {
            log.error("處理 Canal 消息失敗: {}", e.getMessage());
        }
    }
    
    private void syncToEs(CanalData data) throws IOException {
        // 構(gòu)建 ES 文檔
        ProductIndex index = new ProductIndex();
        index.setId(Long.valueOf(data.get("id").toString()));
        index.setTitle(data.get("title").toString());
        // 其他字段映射...
        
        // 寫入 ES
        esClient.index(i -> i
            .index("product_index")
            .id(index.getId().toString())
            .document(index)
        );
    }
}

優(yōu)缺點

  • 優(yōu)點
    • 完全解耦,對業(yè)務(wù)代碼零侵入
    • 高性能,只同步變更數(shù)據(jù)
    • 支持全量和增量同步
    • 可靠性高,基于 binlog 保證不丟失
  • 缺點
    • 部署復(fù)雜度高
    • 配置相對復(fù)雜
    • 對 MySQL binlog 有依賴

三、數(shù)據(jù)一致性保障策略

1. 冪等性設(shè)計

確保重復(fù)同步不會導(dǎo)致數(shù)據(jù)異常:

// ES 操作冪等性實現(xiàn)
public void syncProduct(Long productId) {
    // 使用文檔ID作為唯一標(biāo)識
    IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
        .index("product_index")
        .id(productId.toString())
        .document(buildProductIndex(productId))
        // 設(shè)置樂觀鎖版本控制
        .versionType(VersionType.EXTERNAL)
        .version(getCurrentVersion(productId))
    );
    
    try {
        esClient.index(request);
    } catch (VersionConflictException e) {
        // 版本沖突,需要重新獲取最新數(shù)據(jù)
        log.warn("版本沖突,重新同步: {}", productId);
        // 重試邏輯...
    }
}

2. 重試機制

@Service
public class EsSyncService {
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    // 最大重試次數(shù)
    private static final int MAX_RETRY_COUNT = 3;
    
    public void syncWithRetry(ProductIndex index) {
        String key = "es:retry:" + index.getId();
        
        for (int i = 0; i < MAX_RETRY_COUNT; i++) {
            try {
                esClient.index(req -> req
                    .index("product_index")
                    .id(index.getId().toString())
                    .document(index)
                );
                // 成功后刪除重試標(biāo)記
                redisTemplate.delete(key);
                return;
            } catch (Exception e) {
                log.error("第{}次同步失敗: {}", i+1, e.getMessage());
                
                if (i == MAX_RETRY_COUNT - 1) {
                    // 達(dá)到最大重試次數(shù),記錄失敗任務(wù)
                    redisTemplate.opsForValue().set(key, JSON.toJSONString(index), 7, TimeUnit.DAYS);
                    log.error("同步失敗,已記錄到失敗隊列: {}", index.getId());
                } else {
                    // 指數(shù)退避重試
                    try {
                        Thread.sleep((long) (Math.pow(2, i) * 1000));
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    // 定時任務(wù)處理失敗的同步任務(wù)
    @Scheduled(cron = "0 0/5 * * * ?")
    public void processFailedTasks() {
        Set<String> keys = redisTemplate.keys("es:retry:*");
        if (keys != null) {
            for (String key : keys) {
                String json = (String) redisTemplate.opsForValue().get(key);
                ProductIndex index = JSON.parseObject(json, ProductIndex.class);
                // 重新嘗試同步
                syncWithRetry(index);
            }
        }
    }
}

3. 全量校驗與修復(fù)

定期全量對比 MySQL 和 ES 數(shù)據(jù),修復(fù)不一致:

@Service
public class DataConsistencyService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ElasticsearchClient esClient;
    
    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2點執(zhí)行
    public void checkAndRepair() {
        log.info("開始數(shù)據(jù)一致性校驗");
        
        // 分頁查詢 MySQL 數(shù)據(jù)
        int page = 1;
        int pageSize = 1000;
        
        while (true) {
            Page<Product> productPage = productMapper.selectPage(
                new Page<>(page, pageSize), null);
            
            for (Product product : productPage.getRecords()) {
                try {
                    // 查詢 ES 數(shù)據(jù)
                    GetResponse<ProductIndex> response = esClient.get(req -> req
                        .index("product_index")
                        .id(product.getId().toString()),
                        ProductIndex.class
                    );
                    
                    if (!response.found()) {
                        // ES 中不存在,需要插入
                        syncToEs(product);
                        log.warn("修復(fù)缺失數(shù)據(jù): {}", product.getId());
                    } else {
                        // 對比數(shù)據(jù)是否一致
                        ProductIndex esData = response.source();
                        if (!isConsistent(product, esData)) {
                            // 數(shù)據(jù)不一致,更新 ES
                            syncToEs(product);
                            log.warn("修復(fù)不一致數(shù)據(jù): {}", product.getId());
                        }
                    }
                } catch (Exception e) {
                    log.error("校驗商品 {} 失敗: {}", product.getId(), e.getMessage());
                }
            }
            
            if (productPage.hasNext()) {
                page++;
            } else {
                break;
            }
        }
        
        log.info("數(shù)據(jù)一致性校驗完成");
    }
    
    private boolean isConsistent(Product mysql, ProductIndex es) {
        // 比較關(guān)鍵字段
        return Objects.equals(mysql.getTitle(), es.getTitle()) &&
               Objects.equals(mysql.getPrice(), es.getPrice()) &&
               Objects.equals(mysql.getSales(), es.getSales());
    }
}

四、性能優(yōu)化策略

1. ES 批量寫入

public void batchSyncToEs(List<Product> products) {
    if (CollectionUtils.isEmpty(products)) {
        return;
    }
    
    try {
        List<BulkOperation> operations = new ArrayList<>();
        
        for (Product product : products) {
            ProductIndex index = convertToIndex(product);
            operations.add(BulkOperation.of(op -> op
                .index(idx -> idx
                    .index("product_index")
                    .id(product.getId().toString())
                    .document(index)
                )
            ));
        }
        
        BulkRequest request = BulkRequest.of(req -> req.operations(operations));
        BulkResponse response = esClient.bulk(request);
        
        if (response.errors()) {
            // 處理錯誤
            for (BulkResponseItem item : response.items()) {
                if (item.error() != null) {
                    log.error("批量同步失敗: {} - {}", 
                              item.id(), item.error().reason());
                }
            }
        }
    } catch (Exception e) {
        log.error("批量同步異常: {}", e.getMessage());
    }
}

2. 優(yōu)化 Canal 配置

# 增加批處理大小
syncBatchSize = 2000

# 優(yōu)化網(wǎng)絡(luò)參數(shù)
tcp.so.sndbuf = 1048576
tcp.so.rcvbuf = 1048576

# 調(diào)整消費線程數(shù)
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 8

3. MySQL binlog 優(yōu)化

# 增加 binlog 大小限制
binlog-file-size = 1G

# 優(yōu)化 binlog 刷新策略
sync_binlog = 1
innodb_flush_log_at_trx_commit = 1

# 調(diào)整 binlog 保留時間
expire_logs_days = 7

五、最佳實踐建議

1. 方案選型建議

  • 小型系統(tǒng)/快速迭代:異步雙寫(MQ)
  • 大型系統(tǒng)/高可靠:Canal + MQ
  • 實時性要求極高:同步雙寫(權(quán)衡性能影響)
  • 歷史數(shù)據(jù)遷移:Logstash 或 Canal 全量同步

2. 監(jiān)控與告警

@Service
public class SyncMonitorService {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    // 記錄同步時間戳
    public void recordSyncTimestamp(String tableName, Long id) {
        String key = "sync:timestamp:" + tableName + ":" + id;
        redisTemplate.opsForValue().set(key, System.currentTimeMillis(), 24, TimeUnit.HOURS);
    }
    
    // 檢查同步延遲
    @Scheduled(fixedRate = 60000)
    public void checkSyncDelay() {
        // 查詢最近5分鐘內(nèi)更新的數(shù)據(jù)
        List<Product> recentProducts = productMapper.selectRecentUpdated(5);
        
        for (Product product : recentProducts) {
            String key = "sync:timestamp:tb_product:" + product.getId();
            Long syncTime = (Long) redisTemplate.opsForValue().get(key);
            
            if (syncTime == null) {
                // 未同步
                sendAlarm("數(shù)據(jù)未同步", product.getId());
            } else {
                long delay = System.currentTimeMillis() - syncTime;
                if (delay > 300000) { // 5分鐘
                    // 同步延遲過大
                    sendAlarm("數(shù)據(jù)同步延遲:" + (delay/1000) + "秒", product.getId());
                }
            }
        }
    }
    
    private void sendAlarm(String message, Long productId) {
        // 發(fā)送告警(郵件、短信、釘釘?shù)龋?
        log.error("告警: {} - 商品ID: {}", message, productId);
        // 實際告警邏輯...
    }
}

3. 數(shù)據(jù)同步異常處理流程

  1. 重試機制:指數(shù)退避策略,避免立即重試造成雪崩
  2. 死信隊列:記錄無法通過重試解決的異常
  3. 手動干預(yù):提供管理界面手動觸發(fā)同步
  4. 數(shù)據(jù)校驗:定期全量比對,發(fā)現(xiàn)并修復(fù)不一致

六、總結(jié)

MySQL 和 Elasticsearch 數(shù)據(jù)同步是電商系統(tǒng)中的關(guān)鍵技術(shù)挑戰(zhàn)。選擇合適的同步方案需要綜合考慮實時性要求、系統(tǒng)復(fù)雜度、團(tuán)隊技術(shù)棧等因素。在實際項目中,推薦采用 Canal + 消息隊列 的方案,它提供了良好的實時性、可靠性和擴展性,同時對業(yè)務(wù)代碼零侵入。

無論選擇哪種方案,都需要特別關(guān)注數(shù)據(jù)一致性保障、異常處理、性能優(yōu)化和監(jiān)控告警等方面,確保系統(tǒng)在生產(chǎn)環(huán)境中的穩(wěn)定運行。

到此這篇關(guān)于MySQL和Elasticsearch數(shù)據(jù)同步方案詳解的文章就介紹到這了,更多相關(guān)MySQL和Elasticsearch數(shù)據(jù)同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳細(xì)深入聊一聊Mysql中的int(1)和int(11)

    詳細(xì)深入聊一聊Mysql中的int(1)和int(11)

    mysql數(shù)據(jù)庫作為當(dāng)前常用的關(guān)系型數(shù)據(jù)庫,肯定會遇到設(shè)計表的需求,下面對設(shè)計表時int類型的設(shè)置進(jìn)行分析,下面這篇文章主要給大家介紹了關(guān)于Mysql中int(1)和int(11)的相關(guān)資料,需要的朋友可以參考下
    2022-08-08
  • Mysql分組排序取每組第一條的2種實現(xiàn)方式

    Mysql分組排序取每組第一條的2種實現(xiàn)方式

    開發(fā)中經(jīng)常會遇到,分組查詢最新數(shù)據(jù)的問題,下面這篇文章主要給大家介紹了關(guān)于Mysql分組排序取每組第一條的2種實現(xiàn)方式,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • mysql無法成功啟動服務(wù)的解決方法(通俗易懂!)

    mysql無法成功啟動服務(wù)的解決方法(通俗易懂!)

    Mysql是我們使用數(shù)據(jù)庫時需要用到的服務(wù),但是在使用過程中常常遇到服務(wù)無法啟動的問題,下面這篇文章主要給大家介紹了關(guān)于mysql無法成功啟動服務(wù)的解決方法,需要的朋友可以參考下
    2023-02-02
  • MySQL四種日志binlog/redolog/relaylog/undolog詳解

    MySQL四種日志binlog/redolog/relaylog/undolog詳解

    undo?log主要存儲的也是邏輯日志,比如我們要insert一條數(shù)據(jù)了,那undo?log會記錄的一條對應(yīng)的delete日志,我們要update一條記錄時,它會記錄一條對應(yīng)相反的update記錄,這篇文章主要介紹了MySQL四種日志binlog/redolog/relaylog/undolog,需要的朋友可以參考下
    2024-08-08
  • MySQL插入數(shù)據(jù)與查詢數(shù)據(jù)

    MySQL插入數(shù)據(jù)與查詢數(shù)據(jù)

    這篇文章主要介紹了 MySQL插入數(shù)據(jù)與查詢數(shù)據(jù),缺省插入、缺省插入、缺省插入等各種數(shù)據(jù)插入分享,需要的小伙伴可以參考一下,希望對你有所幫助
    2022-03-03
  • MySQL多實例管理如何在一臺主機上運行多個mysql

    MySQL多實例管理如何在一臺主機上運行多個mysql

    文章詳解了在Linux主機上通過二進(jìn)制方式安裝MySQL多實例的步驟,涵蓋端口配置、數(shù)據(jù)目錄準(zhǔn)備、初始化與啟動流程,以及排錯方法,適用于構(gòu)建讀寫分離架構(gòu),感興趣的朋友一起看看吧
    2025-07-07
  • MySql中的常用參數(shù)查詢

    MySql中的常用參數(shù)查詢

    這篇文章主要介紹了MySql中的常用參數(shù)查詢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • MySQL?系統(tǒng)變量(查看,修改)

    MySQL?系統(tǒng)變量(查看,修改)

    MySQL的系統(tǒng)變量是由MySQL服務(wù)器管理的,用于控制服務(wù)器的各種行為和特性,本文主要介紹了MySQL?系統(tǒng)變量(查看,修改),感興趣的可以了解一下
    2024-08-08
  • MySQL?原理優(yōu)化之Group?By的優(yōu)化技巧

    MySQL?原理優(yōu)化之Group?By的優(yōu)化技巧

    這篇文章主要介紹了MySQL?原理優(yōu)化之Group?By的優(yōu)化技巧,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下
    2022-08-08
  • mysql2/promise 中 execute 和 query 的使用最佳實踐

    mysql2/promise 中 execute 和 query 的使

    mysql2/promise中,execute使用預(yù)處理語句,嚴(yán)格類型檢查且防SQL注入,適合頻繁執(zhí)行,query直接拼接SQL,參數(shù)綁定更寬松,適用于簡單查詢,LIMIT/OFFSET在query中不會報錯,execute需拼接數(shù)字常量,本文給大家介紹mysql2/promise中execute和query的使用,感興趣的朋友一起看看吧
    2025-08-08

最新評論