Redis和數(shù)據(jù)庫的一致性(Canal+MQ) 的實現(xiàn)
想要保證緩存與數(shù)據(jù)庫的雙寫一致,一共有4種方式,即4種同步策略:
- 先更新緩存,再更新數(shù)據(jù)庫;
- 先更新數(shù)據(jù)庫,再更新緩存;
- 先刪除緩存,再更新數(shù)據(jù)庫;
- 先更新數(shù)據(jù)庫,再刪除緩存
首先說好結(jié)論,這4種同步策略無論是哪一種,都無法保證數(shù)據(jù)庫和redis的強一致性,只能保證最終一致性,如要保證強一致,那么只能通過加鎖來實現(xiàn),那么就會造成性能問題,即CAP理論中的AP(強一致)和CP(高可用性)進行取舍,絕大多數(shù)場景是確保高可用(CP)。
更新緩存還是刪除緩存
下面,我們來分析一下,應(yīng)該采用更新緩存還是刪除緩存的方式。
1、更新緩存
優(yōu)點:每次數(shù)據(jù)變化都及時更新緩存,所以查詢時不容易出現(xiàn)未命中的情況。
缺點:更新緩存的消耗比較大。如果數(shù)據(jù)需要經(jīng)過復(fù)雜的計算再寫入緩存,那么頻繁的更新緩存,就會影響服務(wù)器的性能。如果是寫入數(shù)據(jù)頻繁的業(yè)務(wù)場景,那么可能頻繁的更新緩存時,卻沒有業(yè)務(wù)讀取該數(shù)據(jù)。
2、刪除緩存
優(yōu)點:操作簡單,無論更新操作是否復(fù)雜,都是將緩存中的數(shù)據(jù)直接刪除。
缺點:刪除緩存后,下一次查詢緩存會出現(xiàn)未命中,這時需要重新讀取一次數(shù)據(jù)庫。從上面的比較來看,一般情況下,刪除緩存是更優(yōu)的方案。
先操作數(shù)據(jù)庫還是更新緩存
1.先更新數(shù)據(jù)庫再刪除緩存
- 線程A更新數(shù)據(jù)庫成功,線程A刪除緩存失敗;
- 線程B讀取緩存成功,由于緩存刪除失敗,所以線程B讀取到的是緩存中舊的數(shù)據(jù)。
- 最后線程A刪除緩存成功,有別的線程訪問緩存同樣的數(shù)據(jù),與數(shù)據(jù)庫中的數(shù)據(jù)是一樣。
- 最終,緩存和數(shù)據(jù)庫的數(shù)據(jù)是一致的,但是會有一些線程讀到舊的數(shù)據(jù)。
1.2正常情況下沒有出現(xiàn)失敗場景
在并發(fā)場景下,也許會有些許線程像線程b一樣讀的是舊數(shù)據(jù),但在刪除緩存后,最終緩存與數(shù)據(jù)庫的數(shù)據(jù)是一致的,并且都是最新的數(shù)據(jù)。但線程B在這個過程里讀到了舊的數(shù)據(jù),可能還有其他線程也像線程B一樣,在這兩步之間讀到了緩存中舊的數(shù)據(jù),但因為這兩步的執(zhí)行速度會比較快,所以影響不大。對于這兩步之后,其他進程再讀取緩存數(shù)據(jù)的時候,就不會出現(xiàn)類似于進程B的問題了。
2.先刪除緩存再更新數(shù)據(jù)庫
- 線程A刪除緩存成功,線程A更新數(shù)據(jù)庫失敗;
- 線程B從緩存中讀取數(shù)據(jù);由于緩存被刪,進程B無法從緩存中得到數(shù)據(jù),進而從數(shù)據(jù)庫讀取數(shù)據(jù);此時數(shù)據(jù)庫中的數(shù)據(jù)更新失敗,線程B從數(shù)據(jù)庫成功獲取舊的數(shù)據(jù),然后將數(shù)據(jù)更新到了緩存。
- 最終,如果沒有異步重試的話緩存和數(shù)據(jù)庫的數(shù)據(jù)是一致的,但仍然是舊的數(shù)據(jù)。
2.2正常情況下沒有出現(xiàn)失敗場景
進程A的兩步操作均成功,但由于存在并發(fā),在這兩步之間,進程B訪問了緩存。最終結(jié)果是,緩存中存儲了舊的數(shù)據(jù),而數(shù)據(jù)庫中存儲了新的數(shù)據(jù),二者數(shù)據(jù)不一致。
這種方式的解決方案也就是在第2步更新數(shù)據(jù)庫后,延遲一會再刪一次Redis,也就是延遲雙刪,這樣就可以保證最終數(shù)據(jù)一致性。
最終結(jié)論:
經(jīng)過對比你會發(fā)現(xiàn),先更新數(shù)據(jù)庫、再刪除緩存是影響更小的方案。如果第二步出現(xiàn)失敗的情況,則可以采用重試機制解決問題。
最終解決方案
利用(MQ)消息隊列和Canal中間件進行刪除的補償
Canal目前在大型企業(yè)中熱度下降,使用flinkcdc是目前的趨勢,而目前主流CDC(變更數(shù)據(jù)獲?。┦?strong>flink cdc 而flinkcdc插件是基于flink平臺(大數(shù)據(jù)平臺)此處只需要簡單理解Canal作用并簡單實現(xiàn)即可。目前企業(yè)中常見的數(shù)據(jù)同步方案就是CDC中間件+MQ的方案,大型公司一般是有大數(shù)據(jù)業(yè)務(wù),所以使用大數(shù)據(jù)平臺和kafka,此處使用的是Canal+Rabbitmq
Canal安裝與部署
Mysql前置準備
在服務(wù)中找到Mysql配置文件對應(yīng)目錄
其中一共需要注意四個配置項
server-id=1 #master端的ID號【必須是唯一的】; log-bin=D:\MySQL\binlog\mysql-bin.log #同步的日志路徑,一定注意這個目錄要是mysql有權(quán)限寫入的 binlog_format=row #行級,記錄每次操作后每行記錄的變化。 binlog-do-db=db_xiaomi #指定庫,縮小監(jiān)控的范圍。
1.查看端口號配置對應(yīng)主要用于集群環(huán)境下區(qū)分id
2.創(chuàng)建binlog文件存放目錄
3.數(shù)據(jù)的保存格式(一共有三種)
4.指定需要監(jiān)控的庫名(如果該項不指定配置,那么默認所有數(shù)據(jù)庫開啟binlog)
設(shè)置好后啟動服務(wù)。
啟動后看到在binlog文件目錄中看到log文件
??Mysql的binlog日志三種格式:
Canal默認選擇的是ROW
- STATEMENT:基于SQL語句的復(fù)制(statement-based replication, SBR)
- ROW:基于行的復(fù)制(row-based replication, RBR)
- MIXED:混合模式復(fù)制(mixed-based replication, MBR)
官網(wǎng)下載地址:Release v1.1.7 · alibaba/canal · GitHub
修改Mysql示例配置文件
修改連接數(shù)據(jù)庫授權(quán)的用戶和密碼
配置好后在bin目錄中執(zhí)行啟動命令文件
到此完成
JAVA項目整合Canal
引入依賴
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
測試demo
public static void main(String[] args) throws InvalidProtocolBufferException { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", ""); while (true) { //2.獲取連接 canalConnector.connect(); //3.指定要監(jiān)控的數(shù)據(jù)庫 canalConnector.subscribe("db.xiaomi.*"); //4.獲取 Message Message message = canalConnector.get(100); List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0) { System.out.println("沒有數(shù)據(jù),休息一會"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { for (CanalEntry.Entry entry : entries) { // 獲取表名 String tableName = entry.getHeader().getTableName(); // Entry 類型 CanalEntry.EntryType entryType = entry.getEntryType(); // 判斷 entryType 是否為 ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // 序列化數(shù)據(jù) ByteString storeValue = entry.getStoreValue(); // 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 獲取事件類型 CanalEntry.EventType eventType = rowChange.getEventType(); // 獲取具體的數(shù)據(jù) List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // 遍歷并打印數(shù)據(jù) for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); Map<String, Object> bMap = new HashMap<>(); for (CanalEntry.Column column : beforeColumnsList) { bMap.put(column.getName(), column.getValue()); } Map<String, Object> afMap = new HashMap<>(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afMap.put(column.getName(), column.getValue()); } System.out.println("表名:" + tableName + ",操作類型:" + eventType); System.out.println("改前:" + bMap); System.out.println("改后:" + afMap); } } } } } }
JAVA項目整合RabbitMQ
?最終解決思路
即先更新數(shù)據(jù)庫,然后在刪除緩存,更新數(shù)據(jù)庫后通過Canal拉去MySQL的binlog日志,將更新消息放入MQ,由MQ異步執(zhí)行刪除操作。
業(yè)務(wù)思路:
在下訂后將庫存數(shù)據(jù)庫更新,根據(jù)商品的id值進行更新緩存和es;
業(yè)務(wù)思路圖
?具體實現(xiàn)
那么Canal類的代碼如上圖所示,因為他是監(jiān)聽功能,那么就要一直啟動保持運行,目前是將Canal類放在頁面訪問服務(wù)實例中,那么在SpringBoot的Application啟動時應(yīng)該也要將Canal啟動。Spring提供兩種方式實現(xiàn)CommandLineRunner接口或@PostConstruct注解來實現(xiàn)。
此處以實現(xiàn)CommandLineRunner接口為例,只適合類似于初始化一些數(shù)據(jù)
??此處不適合使用上面的類,應(yīng)為其中使用了while(true)中寫了個死循環(huán)一直運行,這樣就會導(dǎo)致啟動類啟動后執(zhí)行這個類而導(dǎo)致一直阻塞在這里。如果要使用那么應(yīng)該是在一個單獨的服務(wù)模塊中就可以這樣使用。
/**Canal監(jiān)聽類 * @author 12547 * @version 1.0 * @Date 2024/3/19 20:44 */ @Component public class CanalRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { System.out.println(">>>>>>>此處并不適用Canal的運行方式<<<<<<<<<<"); System.out.println(">>>>>>>應(yīng)該是單獨起一個線程<<<<<<<<<<"); } }
可以看到在啟動SpringBoot實例后,執(zhí)行了該方法。
??bug解決
但在編寫其他測試類的時候發(fā)現(xiàn)其一直阻塞在這里而不執(zhí)行測試代碼,推測其一直阻塞線程(因為有死循環(huán))。
解決方案
將其改為異步形式執(zhí)行。將其改為@Async異步執(zhí)行。
?@Async的使用
應(yīng)為Async用到線程池相關(guān),所以先自定義一個用于異步的線程池
/** 自定義線程池 bean 用于Async異步調(diào)用 * @author 12547 * @version 1.0 * @Date 2024/3/20 15:45 */ @Configuration @EnableAsync public class AsyncConfig { /** * 自定義線程 * @return */ @Bean("asyncPoll") public ThreadPoolTaskExecutor asyncOperationExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 設(shè)置核心線程數(shù) executor.setCorePoolSize(8); // 設(shè)置最大線程數(shù) executor.setMaxPoolSize(20); // 設(shè)置隊列大小 executor.setQueueCapacity(Integer.MAX_VALUE); // 設(shè)置線程活躍時間(秒) executor.setKeepAliveSeconds(60); // 設(shè)置線程名前綴+分組名稱 executor.setThreadNamePrefix("AsyncOperationThread-"); executor.setThreadGroupName("AsyncOperationGroup"); // 所有任務(wù)結(jié)束后關(guān)閉線程池 executor.setWaitForTasksToCompleteOnShutdown(true); // 初始化 executor.initialize(); return executor; } }
將Canal的運行代碼改為異步執(zhí)行
在啟動類后通過調(diào)用使其異步執(zhí)行即可。并經(jīng)過測試后不再影響測試類的使用。
那么在Canal監(jiān)聽類中,當監(jiān)聽到數(shù)據(jù)變化后,將變化發(fā)送給MQ消息
消費者監(jiān)聽類
/**異步數(shù)據(jù)更新Redis類 * @author 12547 * @version 1.0 * @Date 2024/3/20 15:49 */ @Component public class RedisDataListenerService { @Autowired private CacheService cacheService; @Autowired private StringRedisTemplate redisTemplate; /** * Redis數(shù)據(jù)更新消費者監(jiān)聽方法 */ @RabbitListener(queues = MqConstants.QUEUE_NAME) public void updateRedisDataByAsync(Map<String,Object> msg){ System.out.println("監(jiān)聽到數(shù)據(jù)變化:"); System.out.println("數(shù)據(jù)變化商品id:"+msg.get("id")); //正常情況Redis應(yīng)該每個商品id一個key TODO 需要改造詳情緩存查詢將List<phone>改為單獨的一個phone對象 redisTemplate.opsForHash().putAll(msg.get("id").toString(),msg); System.out.println(msg.get("id").toString()); System.out.println(cacheService.getHashCache(msg.get("id").toString(), "num")); } }
與Redis實現(xiàn)數(shù)據(jù)同步基本demo到這差不多了已經(jīng),后續(xù)可以結(jié)合項目進一步優(yōu)化
到此這篇關(guān)于Redis和數(shù)據(jù)庫的一致性(Canal+MQ)的文章就介紹到這了,更多相關(guān)Redis和數(shù)據(jù)庫一致性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Win10配置redis服務(wù)實現(xiàn)過程詳解
這篇文章主要介紹了Win10配置redis服務(wù)實現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-07-07redis發(fā)布和訂閱_動力節(jié)點Java學院整理
這篇文章主要為大家詳細介紹了redis發(fā)布和訂閱的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08