欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用Canal和Kafka解決MySQL與緩存的數(shù)據(jù)一致性問題

 更新時(shí)間:2024年07月28日 09:47:02   作者:Master_hl  
這篇文章主要介紹了使用Canal和Kafka解決MySQL與緩存的數(shù)據(jù)一致性問題,文中通過圖文結(jié)合的方式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下

1. 準(zhǔn)備工作

1. 開啟并配置MySQL的 BinLog(MySQL 8.0 默認(rèn)開啟)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因?yàn)?Cannal 不具備將SQL轉(zhuǎn)化成數(shù)據(jù)的能力
binlog-do-db=aicloud    # 監(jiān)控 AI Cloud 項(xiàng)目

如果要同步多個(gè)項(xiàng)目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重啟MySQL服務(wù)

3. 賦值數(shù)據(jù)同步權(quán)限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安裝并配置 Canal

下載地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 監(jiān)控 binlog 日志,binlog 日志的傳輸默認(rèn)使用 MySQL 的復(fù)制協(xié)議(基于 TCP/IP),

可以使用寫代碼的方式直接從 MySQL 服務(wù)器讀取數(shù)據(jù),此處使用本地 kafka 進(jìn)行存儲(chǔ)。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示從節(jié)點(diǎn) id,canal 的執(zhí)行原理就是偽裝成一個(gè)從庫(kù)去主庫(kù)同步數(shù)據(jù)

(主節(jié)點(diǎn)的 slaveId = 1)

address 配置連接本地的 MySQL

topic 配置數(shù)據(jù)發(fā)送到 Kafka 的某個(gè)主題下

5. 拷貝 Jar 包到 lib

將 canal 下 plugin 下的所有 jar 包拷貝到 lib 目錄下。

6. 刪除 bin 目錄下 startup.bat 里的參數(shù)

如果啟動(dòng)時(shí)報(bào)錯(cuò):

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

刪除 -XX:PermSize=128m 參數(shù)即可。

7. 啟動(dòng) canal

打開 cmd ,cd 到 bin 目錄下,輸入 startup.bat 回車

2. 將需要緩存的數(shù)據(jù)存儲(chǔ) Redis

此時(shí)我將這個(gè)查詢列表接口的數(shù)據(jù),存儲(chǔ)在 Redis 中:

/**
 * 獲取歷史聊天記錄(對(duì)話/繪圖)
 *
 * @param type
 * @return {@link ResponseEntity }
 */
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {
    String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);
    Object list = redisTemplate.opsForValue().get(listCacheKey);
    if (ObjectUtil.isNull(list)) {
        LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());
        queryWrapper.eq(Answer::getType, type);
        queryWrapper.eq(Answer::getModel, model);
        queryWrapper.orderByDesc(Answer::getAid);
        List<Answer> answerList = answerService.list(queryWrapper);
        List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());
        Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));
        List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());
        // 緩存 1 天
        redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);
        return ResponseEntity.success(answerVoList);
    } else {
        return ResponseEntity.success(list);
    }
}
/**
 * 查詢列表存儲(chǔ) Redis 緩存
 *
 * @param uid
 * @param model
 * @param type
 * @return {@link String }
 */
public static String getListCacheKey(Long uid, Integer model, Integer type) {
    return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 監(jiān)聽 Kafka Topic 中數(shù)據(jù)并刪除 Redis 緩存

首先對(duì)數(shù)據(jù)庫(kù)中需要緩存的數(shù)據(jù)進(jìn)行一些修改操作:

此時(shí),使用 kafka ui(下載地址劃到最底下),刷新 kafka 對(duì)應(yīng) topic 下的 message,就可以看到當(dāng)前所作出的修改:

執(zhí)行修改操作:將 “如何學(xué)習(xí)Spring???”修改成 “如何學(xué)習(xí)Spring??”

執(zhí)行刪除操作:

由此可見,對(duì)數(shù)據(jù)庫(kù)的每一個(gè)修改操作,都是對(duì)應(yīng)固定格式的一個(gè)數(shù)據(jù),所以可以監(jiān)聽對(duì)應(yīng)的  topic 并針對(duì) data 中的數(shù)據(jù)進(jìn)行一個(gè)提取,得到一個(gè)  cacheKey,然后刪除對(duì)應(yīng)的緩存,使得下一次的查詢?nèi)ピL問數(shù)據(jù)庫(kù),并同步緩存。

【代碼示例】

/**
 * canal 監(jiān)控 binlog 日志,將修改的數(shù)據(jù)存儲(chǔ) kafka topic 中
 * 監(jiān)聽 kafka topic 中的數(shù)據(jù)
 *
 * @param data
 * @param ack
 * @throws JsonProcessingException
 */
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {
    HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);
    if (map.isEmpty()) {
        ack.acknowledge();
        return;
    }
    // 匹配上對(duì)應(yīng)的數(shù)據(jù)庫(kù)和數(shù)據(jù)表
    if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&
            KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {
        // 更新緩存 
        List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);
        if (!CollectionUtils.isEmpty(list)) {
            for (Map<String, Object> answerMap : list) {
                String answerListCacheKey = RedisUtil.getListCacheKey(
                        Long.valueOf(answerMap.get("uid").toString()),
                        Integer.parseInt(answerMap.get("model").toString()),
                        Integer.parseInt(answerMap.get("type").toString()));
                // 刪除緩存,讓下一次查詢走數(shù)據(jù)庫(kù),并同步緩存
                redisTemplate.delete(answerListCacheKey);
            }
        }
    }
    //  手動(dòng)確認(rèn)應(yīng)答
    ack.acknowledge();
}
/**
 * canal 同步數(shù)據(jù)到 kafka
 */
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";
 
 
/**
 * 數(shù)據(jù)庫(kù),緩存數(shù)據(jù)一致性的
 */
 
public static final String DATABASE_KEY = "database";
 
public static final String TABLE_KEY = "table";
 
public static final String DATA_KEY = "data";
 
public static final String TARGET_DATABASE = "aicloud";
 
public static final String TARGET_TABLE = "answer";

【補(bǔ)充】

kafka ui 下載地址:??????https://github.com/provectus/kafka-ui/tags

修改配置

kafka:
  clusters:
    - name: kafka3_cluster
      bootstrapServers: 127.0.0.1:9092

以上就是使用Canal和Kafka解決MySQL與緩存的數(shù)據(jù)一致性問題的詳細(xì)內(nèi)容,更多關(guān)于MySQL與緩存的數(shù)據(jù)一致性的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • MySQL表約束的實(shí)現(xiàn)

    MySQL表約束的實(shí)現(xiàn)

    本文主要介紹了MySQL表約束的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • mysql基礎(chǔ):mysqld_safe 啟動(dòng)執(zhí)行流程詳解

    mysql基礎(chǔ):mysqld_safe 啟動(dòng)執(zhí)行流程詳解

    本篇文章是對(duì)mysql基礎(chǔ)中的mysqld_safe啟動(dòng)執(zhí)行流程進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下
    2013-06-06
  • MySQL數(shù)據(jù)庫(kù)的InnoDB和MyISAM存儲(chǔ)引擎的區(qū)別及說明

    MySQL數(shù)據(jù)庫(kù)的InnoDB和MyISAM存儲(chǔ)引擎的區(qū)別及說明

    InnoDB是MySQL的默認(rèn)存儲(chǔ)引擎,它支持事務(wù)、外鍵和行級(jí)鎖定,具有更好的并發(fā)控制性能和崩潰恢復(fù)能力,而MyISAM不支持事務(wù)和外鍵,使用表級(jí)鎖定,適合讀操作頻繁的場(chǎng)景
    2024-12-12
  • MySQL的聯(lián)表查詢實(shí)現(xiàn)

    MySQL的聯(lián)表查詢實(shí)現(xiàn)

    數(shù)據(jù)通常分布在多個(gè)表中,為了獲取全面的信息,需要進(jìn)行聯(lián)表查詢,本文主要介紹了MySQL的聯(lián)表查詢實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-08-08
  • 詳談mysql order by in 的字符順序(推薦)

    詳談mysql order by in 的字符順序(推薦)

    下面小編就為大家?guī)硪黄斦刴ysql order by in 的字符順序(推薦)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-04-04
  • MySQL獲取版本號(hào)的N種方法(全網(wǎng)最全以及避坑大全)

    MySQL獲取版本號(hào)的N種方法(全網(wǎng)最全以及避坑大全)

    相信大家在工作學(xué)習(xí)中經(jīng)常有需要查詢 MySQL 數(shù)據(jù)庫(kù)版本的情況,這里給大家介紹幾種在 MySQL 中查詢數(shù)據(jù)庫(kù)版本的方法以及每種方法適用的場(chǎng)景,相信總有一種可以幫到你,需要的朋友可以參考下
    2024-10-10
  • mysql連接查詢中and與where的區(qū)別淺析

    mysql連接查詢中and與where的區(qū)別淺析

    在使用數(shù)據(jù)庫(kù)查詢語句時(shí),單表的查詢有時(shí)候不能滿足項(xiàng)目的業(yè)務(wù)需求,在項(xiàng)目開發(fā)過程中,有很多需求都是要涉及到多表的連接查詢,這篇文章主要給大家介紹了關(guān)于mysql連接查詢中and與where區(qū)別的相關(guān)資料,需要的朋友可以參考下
    2021-07-07
  • MySQL需要關(guān)注的參數(shù)及狀態(tài)變量解讀

    MySQL需要關(guān)注的參數(shù)及狀態(tài)變量解讀

    這篇文章主要介紹了MySQL需要關(guān)注的參數(shù)及狀態(tài)變量解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • 詳解mysql集群:一主多從架構(gòu)實(shí)現(xiàn)

    詳解mysql集群:一主多從架構(gòu)實(shí)現(xiàn)

    這篇文章主要介紹了mysql集群一主多從架構(gòu)實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-05-05
  • Mysql version can not be less than 4.1 出錯(cuò)解決辦法

    Mysql version can not be less than 4.1 出錯(cuò)解決辦法

    這篇文章主要介紹了Mysql version can not be less than 4.1 解決辦法的相關(guān)資料,需要的朋友可以參考下
    2016-10-10

最新評(píng)論