MySQL和Elasticsearch數(shù)據(jù)同步方案詳解
在現(xiàn)代電商系統(tǒng)中,MySQL 作為關(guān)系型數(shù)據(jù)庫負(fù)責(zé)數(shù)據(jù)的持久化存儲,而 Elasticsearch 則作為搜索引擎提供高效的全文檢索能力。保證兩者之間的數(shù)據(jù)一致性是系統(tǒng)設(shè)計的關(guān)鍵挑戰(zhàn)。本文將詳細(xì)介紹主流的同步方案、實現(xiàn)方式及其優(yōu)缺點。
一、同步方案對比
| 同步方案 | 實時性 | 實現(xiàn)復(fù)雜度 | 一致性保證 | 性能影響 | 適用場景 |
|---|---|---|---|---|---|
| 同步雙寫 | 極高 | 簡單 | 強一致性 | 高 | 金融交易、核心訂單 |
| 異步雙寫 | 較高 | 中等 | 最終一致性 | 中 | 電商商品、用戶信息 |
| Canal + MQ | 高 | 較高 | 最終一致性 | 低 | 大規(guī)模數(shù)據(jù)同步 |
| Logstash 定時 | 低 | 簡單 | 弱一致性 | 低 | 報表、分析數(shù)據(jù) |
| Debezium | 高 | 高 | 最終一致性 | 低 | 復(fù)雜數(shù)據(jù)同步 |
二、詳細(xì)實現(xiàn)方案
1. 同步雙寫
原理:在業(yè)務(wù)代碼中同時寫入 MySQL 和 Elasticsearch,確保兩者數(shù)據(jù)同步更新。
實現(xiàn)代碼:
@Service
@Transactional
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Autowired
private ElasticsearchClient esClient;
@Override
public void createProduct(Product product) {
// 1. 寫入 MySQL
productMapper.insert(product);
// 2. 同步寫入 ES
try {
ProductIndex productIndex = convertToIndex(product);
IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
.index("product_index")
.id(product.getId().toString())
.document(productIndex)
);
esClient.index(request);
} catch (Exception e) {
// 處理 ES 寫入失敗的情況
log.error("ES同步失敗: {}", e.getMessage());
// 可以選擇拋出異?;貪L事務(wù),或者記錄失敗日志后續(xù)補償
throw new RuntimeException("數(shù)據(jù)同步失敗", e);
}
}
@Override
public void updateProduct(Product product) {
// 類似 createProduct,同時更新 MySQL 和 ES
productMapper.updateById(product);
// ES 更新邏輯...
}
private ProductIndex convertToIndex(Product product) {
// 實體轉(zhuǎn)換邏輯
ProductIndex index = new ProductIndex();
index.setId(product.getId());
index.setTitle(product.getTitle());
index.setPrice(product.getPrice());
// 其他字段轉(zhuǎn)換...
return index;
}
}
優(yōu)缺點:
- 優(yōu)點:實現(xiàn)簡單,數(shù)據(jù)一致性強,實時性最高
- 缺點:
- 代碼耦合度高,業(yè)務(wù)邏輯與數(shù)據(jù)同步混合
- ES 寫入延遲影響主業(yè)務(wù)性能
- 故障處理復(fù)雜,需考慮回滾機制
2. 異步雙寫
原理:通過消息隊列解耦,業(yè)務(wù)代碼只負(fù)責(zé)寫 MySQL,然后發(fā)送消息到 MQ,由消費者異步更新 ES。
實現(xiàn)代碼:
// 生產(chǎn)者端
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
@Transactional
public void createProduct(Product product) {
// 1. 寫入 MySQL
productMapper.insert(product);
// 2. 發(fā)送消息到隊列
ProductEvent event = new ProductEvent();
event.setType("CREATE");
event.setProductId(product.getId());
rabbitTemplate.convertAndSend("product-event-exchange", "product.create", event);
}
}
// 消費者端
@Component
public class ProductSyncConsumer {
@Autowired
private ProductMapper productMapper;
@Autowired
private ElasticsearchClient esClient;
@RabbitListener(queues = "product-sync-queue")
public void handleProductEvent(ProductEvent event) {
try {
Product product = productMapper.selectById(event.getProductId());
if (product == null) {
// 刪除 ES 中的數(shù)據(jù)
deleteFromEs(event.getProductId());
return;
}
// 同步到 ES
ProductIndex index = convertToIndex(product);
IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
.index("product_index")
.id(product.getId().toString())
.document(index)
);
esClient.index(request);
log.info("產(chǎn)品 {} 同步到 ES 成功", event.getProductId());
} catch (Exception e) {
log.error("同步 ES 失敗: {}", e.getMessage());
// 可以根據(jù)需要進(jìn)行重試或記錄到死信隊列
}
}
}
優(yōu)缺點:
- 優(yōu)點:
- 解耦業(yè)務(wù)與同步邏輯
- 消息隊列提供削峰填谷能力
- ES 故障不影響主業(yè)務(wù)流程
- 缺點:
- 存在短暫的數(shù)據(jù)不一致
- 增加了系統(tǒng)復(fù)雜度
- 需要處理消息丟失、重復(fù)消費等問題
3. Canal + 消息隊列 方案
原理:利用 Canal 監(jiān)聽 MySQL 的 binlog,解析數(shù)據(jù)變更并發(fā)送到消息隊列,再由消費者同步到 ES。
3.1 環(huán)境準(zhǔn)備
MySQL 配置:
# 開啟 binlog log-bin=mysql-bin # 選擇 ROW 模式 binlog-format=ROW # 服務(wù)器唯一ID server-id=1 # 開啟 binlog 實時更新 sync_binlog=1
Canal Server 配置:
# canal-server/conf/example/instance.properties canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8 canal.instance.filter.regex=.*\..*
Canal Adapter 配置:
# canal-adapter/conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: kafka
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/shop?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: es
hosts: 127.0.0.1:9200
properties:
mode: rest
cluster.name: elasticsearch
表映射配置:
# canal-adapter/conf/es/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId:
topic: example
database: shop
table: tb_product
esMapping:
_index: product_index
_type: _doc
_id: _id
upsert: true
sql: |
select
p.id as _id,
p.title,
p.sub_title as subTitle,
p.price,
p.sales,
c.name as categoryName
from tb_product p
left join tb_category c on p.cid1 = c.id
commitBatch: 3000
3.2 自定義消費者實現(xiàn)
如果需要更靈活的處理,可以自定義 Kafka 消費者:
@Component
public class ProductSyncConsumer {
@Autowired
private ElasticsearchClient esClient;
@KafkaListener(topics = "example")
public void processMessage(String message) {
try {
// 解析 Canal 消息
CanalMessage canalMsg = JSON.parseObject(message, CanalMessage.class);
for (CanalData data : canalMsg.getData()) {
// 根據(jù)操作類型處理
switch (canalMsg.getEventType()) {
case INSERT:
case UPDATE:
syncToEs(data);
break;
case DELETE:
deleteFromEs(data);
break;
}
}
} catch (Exception e) {
log.error("處理 Canal 消息失敗: {}", e.getMessage());
}
}
private void syncToEs(CanalData data) throws IOException {
// 構(gòu)建 ES 文檔
ProductIndex index = new ProductIndex();
index.setId(Long.valueOf(data.get("id").toString()));
index.setTitle(data.get("title").toString());
// 其他字段映射...
// 寫入 ES
esClient.index(i -> i
.index("product_index")
.id(index.getId().toString())
.document(index)
);
}
}
優(yōu)缺點:
- 優(yōu)點:
- 完全解耦,對業(yè)務(wù)代碼零侵入
- 高性能,只同步變更數(shù)據(jù)
- 支持全量和增量同步
- 可靠性高,基于 binlog 保證不丟失
- 缺點:
- 部署復(fù)雜度高
- 配置相對復(fù)雜
- 對 MySQL binlog 有依賴
三、數(shù)據(jù)一致性保障策略
1. 冪等性設(shè)計
確保重復(fù)同步不會導(dǎo)致數(shù)據(jù)異常:
// ES 操作冪等性實現(xiàn)
public void syncProduct(Long productId) {
// 使用文檔ID作為唯一標(biāo)識
IndexRequest<ProductIndex> request = IndexRequest.of(i -> i
.index("product_index")
.id(productId.toString())
.document(buildProductIndex(productId))
// 設(shè)置樂觀鎖版本控制
.versionType(VersionType.EXTERNAL)
.version(getCurrentVersion(productId))
);
try {
esClient.index(request);
} catch (VersionConflictException e) {
// 版本沖突,需要重新獲取最新數(shù)據(jù)
log.warn("版本沖突,重新同步: {}", productId);
// 重試邏輯...
}
}
2. 重試機制
@Service
public class EsSyncService {
@Autowired
private ElasticsearchClient esClient;
@Autowired
private RedisTemplate redisTemplate;
// 最大重試次數(shù)
private static final int MAX_RETRY_COUNT = 3;
public void syncWithRetry(ProductIndex index) {
String key = "es:retry:" + index.getId();
for (int i = 0; i < MAX_RETRY_COUNT; i++) {
try {
esClient.index(req -> req
.index("product_index")
.id(index.getId().toString())
.document(index)
);
// 成功后刪除重試標(biāo)記
redisTemplate.delete(key);
return;
} catch (Exception e) {
log.error("第{}次同步失敗: {}", i+1, e.getMessage());
if (i == MAX_RETRY_COUNT - 1) {
// 達(dá)到最大重試次數(shù),記錄失敗任務(wù)
redisTemplate.opsForValue().set(key, JSON.toJSONString(index), 7, TimeUnit.DAYS);
log.error("同步失敗,已記錄到失敗隊列: {}", index.getId());
} else {
// 指數(shù)退避重試
try {
Thread.sleep((long) (Math.pow(2, i) * 1000));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
}
// 定時任務(wù)處理失敗的同步任務(wù)
@Scheduled(cron = "0 0/5 * * * ?")
public void processFailedTasks() {
Set<String> keys = redisTemplate.keys("es:retry:*");
if (keys != null) {
for (String key : keys) {
String json = (String) redisTemplate.opsForValue().get(key);
ProductIndex index = JSON.parseObject(json, ProductIndex.class);
// 重新嘗試同步
syncWithRetry(index);
}
}
}
}
3. 全量校驗與修復(fù)
定期全量對比 MySQL 和 ES 數(shù)據(jù),修復(fù)不一致:
@Service
public class DataConsistencyService {
@Autowired
private ProductMapper productMapper;
@Autowired
private ElasticsearchClient esClient;
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2點執(zhí)行
public void checkAndRepair() {
log.info("開始數(shù)據(jù)一致性校驗");
// 分頁查詢 MySQL 數(shù)據(jù)
int page = 1;
int pageSize = 1000;
while (true) {
Page<Product> productPage = productMapper.selectPage(
new Page<>(page, pageSize), null);
for (Product product : productPage.getRecords()) {
try {
// 查詢 ES 數(shù)據(jù)
GetResponse<ProductIndex> response = esClient.get(req -> req
.index("product_index")
.id(product.getId().toString()),
ProductIndex.class
);
if (!response.found()) {
// ES 中不存在,需要插入
syncToEs(product);
log.warn("修復(fù)缺失數(shù)據(jù): {}", product.getId());
} else {
// 對比數(shù)據(jù)是否一致
ProductIndex esData = response.source();
if (!isConsistent(product, esData)) {
// 數(shù)據(jù)不一致,更新 ES
syncToEs(product);
log.warn("修復(fù)不一致數(shù)據(jù): {}", product.getId());
}
}
} catch (Exception e) {
log.error("校驗商品 {} 失敗: {}", product.getId(), e.getMessage());
}
}
if (productPage.hasNext()) {
page++;
} else {
break;
}
}
log.info("數(shù)據(jù)一致性校驗完成");
}
private boolean isConsistent(Product mysql, ProductIndex es) {
// 比較關(guān)鍵字段
return Objects.equals(mysql.getTitle(), es.getTitle()) &&
Objects.equals(mysql.getPrice(), es.getPrice()) &&
Objects.equals(mysql.getSales(), es.getSales());
}
}
四、性能優(yōu)化策略
1. ES 批量寫入
public void batchSyncToEs(List<Product> products) {
if (CollectionUtils.isEmpty(products)) {
return;
}
try {
List<BulkOperation> operations = new ArrayList<>();
for (Product product : products) {
ProductIndex index = convertToIndex(product);
operations.add(BulkOperation.of(op -> op
.index(idx -> idx
.index("product_index")
.id(product.getId().toString())
.document(index)
)
));
}
BulkRequest request = BulkRequest.of(req -> req.operations(operations));
BulkResponse response = esClient.bulk(request);
if (response.errors()) {
// 處理錯誤
for (BulkResponseItem item : response.items()) {
if (item.error() != null) {
log.error("批量同步失敗: {} - {}",
item.id(), item.error().reason());
}
}
}
} catch (Exception e) {
log.error("批量同步異常: {}", e.getMessage());
}
}
2. 優(yōu)化 Canal 配置
# 增加批處理大小 syncBatchSize = 2000 # 優(yōu)化網(wǎng)絡(luò)參數(shù) tcp.so.sndbuf = 1048576 tcp.so.rcvbuf = 1048576 # 調(diào)整消費線程數(shù) canal.instance.parser.parallel = true canal.instance.parser.parallelThreadSize = 8
3. MySQL binlog 優(yōu)化
# 增加 binlog 大小限制 binlog-file-size = 1G # 優(yōu)化 binlog 刷新策略 sync_binlog = 1 innodb_flush_log_at_trx_commit = 1 # 調(diào)整 binlog 保留時間 expire_logs_days = 7
五、最佳實踐建議
1. 方案選型建議
- 小型系統(tǒng)/快速迭代:異步雙寫(MQ)
- 大型系統(tǒng)/高可靠:Canal + MQ
- 實時性要求極高:同步雙寫(權(quán)衡性能影響)
- 歷史數(shù)據(jù)遷移:Logstash 或 Canal 全量同步
2. 監(jiān)控與告警
@Service
public class SyncMonitorService {
@Autowired
private RedisTemplate redisTemplate;
// 記錄同步時間戳
public void recordSyncTimestamp(String tableName, Long id) {
String key = "sync:timestamp:" + tableName + ":" + id;
redisTemplate.opsForValue().set(key, System.currentTimeMillis(), 24, TimeUnit.HOURS);
}
// 檢查同步延遲
@Scheduled(fixedRate = 60000)
public void checkSyncDelay() {
// 查詢最近5分鐘內(nèi)更新的數(shù)據(jù)
List<Product> recentProducts = productMapper.selectRecentUpdated(5);
for (Product product : recentProducts) {
String key = "sync:timestamp:tb_product:" + product.getId();
Long syncTime = (Long) redisTemplate.opsForValue().get(key);
if (syncTime == null) {
// 未同步
sendAlarm("數(shù)據(jù)未同步", product.getId());
} else {
long delay = System.currentTimeMillis() - syncTime;
if (delay > 300000) { // 5分鐘
// 同步延遲過大
sendAlarm("數(shù)據(jù)同步延遲:" + (delay/1000) + "秒", product.getId());
}
}
}
}
private void sendAlarm(String message, Long productId) {
// 發(fā)送告警(郵件、短信、釘釘?shù)龋?
log.error("告警: {} - 商品ID: {}", message, productId);
// 實際告警邏輯...
}
}
3. 數(shù)據(jù)同步異常處理流程
- 重試機制:指數(shù)退避策略,避免立即重試造成雪崩
- 死信隊列:記錄無法通過重試解決的異常
- 手動干預(yù):提供管理界面手動觸發(fā)同步
- 數(shù)據(jù)校驗:定期全量比對,發(fā)現(xiàn)并修復(fù)不一致
六、總結(jié)
MySQL 和 Elasticsearch 數(shù)據(jù)同步是電商系統(tǒng)中的關(guān)鍵技術(shù)挑戰(zhàn)。選擇合適的同步方案需要綜合考慮實時性要求、系統(tǒng)復(fù)雜度、團(tuán)隊技術(shù)棧等因素。在實際項目中,推薦采用 Canal + 消息隊列 的方案,它提供了良好的實時性、可靠性和擴展性,同時對業(yè)務(wù)代碼零侵入。
無論選擇哪種方案,都需要特別關(guān)注數(shù)據(jù)一致性保障、異常處理、性能優(yōu)化和監(jiān)控告警等方面,確保系統(tǒng)在生產(chǎn)環(huán)境中的穩(wěn)定運行。
到此這篇關(guān)于MySQL和Elasticsearch數(shù)據(jù)同步方案詳解的文章就介紹到這了,更多相關(guān)MySQL和Elasticsearch數(shù)據(jù)同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳細(xì)深入聊一聊Mysql中的int(1)和int(11)
mysql數(shù)據(jù)庫作為當(dāng)前常用的關(guān)系型數(shù)據(jù)庫,肯定會遇到設(shè)計表的需求,下面對設(shè)計表時int類型的設(shè)置進(jìn)行分析,下面這篇文章主要給大家介紹了關(guān)于Mysql中int(1)和int(11)的相關(guān)資料,需要的朋友可以參考下2022-08-08
MySQL四種日志binlog/redolog/relaylog/undolog詳解
undo?log主要存儲的也是邏輯日志,比如我們要insert一條數(shù)據(jù)了,那undo?log會記錄的一條對應(yīng)的delete日志,我們要update一條記錄時,它會記錄一條對應(yīng)相反的update記錄,這篇文章主要介紹了MySQL四種日志binlog/redolog/relaylog/undolog,需要的朋友可以參考下2024-08-08
MySQL插入數(shù)據(jù)與查詢數(shù)據(jù)
這篇文章主要介紹了 MySQL插入數(shù)據(jù)與查詢數(shù)據(jù),缺省插入、缺省插入、缺省插入等各種數(shù)據(jù)插入分享,需要的小伙伴可以參考一下,希望對你有所幫助2022-03-03
MySQL?原理優(yōu)化之Group?By的優(yōu)化技巧
這篇文章主要介紹了MySQL?原理優(yōu)化之Group?By的優(yōu)化技巧,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-08-08
mysql2/promise 中 execute 和 query 的使
mysql2/promise中,execute使用預(yù)處理語句,嚴(yán)格類型檢查且防SQL注入,適合頻繁執(zhí)行,query直接拼接SQL,參數(shù)綁定更寬松,適用于簡單查詢,LIMIT/OFFSET在query中不會報錯,execute需拼接數(shù)字常量,本文給大家介紹mysql2/promise中execute和query的使用,感興趣的朋友一起看看吧2025-08-08

