欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

MySQL特定表全量、增量數(shù)據(jù)同步到消息隊(duì)列-解決方案

 更新時(shí)間:2021年11月27日 17:28:48   作者:李雷  
mysql要同步原始全量數(shù)據(jù),也要實(shí)時(shí)同步MySQL特定庫(kù)的特定表增量數(shù)據(jù),同時(shí)對(duì)應(yīng)的修改、刪除也要對(duì)應(yīng),下面就為大家分享一下

1、原始需求

既要同步原始全量數(shù)據(jù),也要實(shí)時(shí)同步MySQL特定庫(kù)的特定表增量數(shù)據(jù),同時(shí)對(duì)應(yīng)的修改、刪除也要對(duì)應(yīng)。

數(shù)據(jù)同步不能有侵入性:不能更改業(yè)務(wù)程序,并且不能對(duì)業(yè)務(wù)側(cè)有太大性能壓力。

應(yīng)用場(chǎng)景:數(shù)據(jù)ETL同步、降低業(yè)務(wù)服務(wù)器壓力。

2、解決方案

3、canal介紹、安裝

canal是阿里巴巴旗下的一款開(kāi)源項(xiàng)目,純Java開(kāi)發(fā)?;跀?shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了MySQL(也支持mariaDB)。

工作原理:mysql主備復(fù)制實(shí)現(xiàn)

從上層來(lái)看,復(fù)制分成三步:

  1. master將改變記錄到二進(jìn)制日志(binary log)中(這些記錄叫做二進(jìn)制日志事件,binary log events,可以通過(guò)show binlog events進(jìn)行查看);
  2. slave將master的binary log events拷貝到它的中繼日志(relay log);
  3. slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。

canal的工作原理

原理相對(duì)比較簡(jiǎn)單:

  1. canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
  2. mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)
  3. canal解析binary log對(duì)象(原始為byte流)

架構(gòu)

說(shuō)明:

  • server代表一個(gè)canal運(yùn)行實(shí)例,對(duì)應(yīng)于一個(gè)jvm
  • instance對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列 (1個(gè)server對(duì)應(yīng)1..n個(gè)instance)

instance模塊:

  • eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進(jìn)行交互,協(xié)議解析)
  • eventSink (Parser和Store鏈接器,進(jìn)行數(shù)據(jù)過(guò)濾,加工,分發(fā)的工作)
  • eventStore (數(shù)據(jù)存儲(chǔ))
  • metaManager (增量訂閱&消費(fèi)信息管理器)

安裝

1、mysql、kafka環(huán)境準(zhǔn)備

2、canal下載:wget?https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3、解壓:tar -zxvf canal.deployer-1.1.3.tar.gz

4、對(duì)目錄conf里文件參數(shù)配置

對(duì)canal.properties配置:

進(jìn)入conf/example里,對(duì)instance.properties配置:

5、啟動(dòng):bin/startup.sh

6、日志查看:

4、驗(yàn)證

1、開(kāi)發(fā)對(duì)應(yīng)的kafka消費(fèi)者

package org.kafka;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;


/**
 *
 * Title: KafkaConsumerTest
 * Description:
 *  kafka消費(fèi)者 demo
 * Version:1.0.0
 * @author pancm
 * @date 2018年1月26日
 */
public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------開(kāi)始消費(fèi)---------");
        try {
            for (; ; ) {
                msgList = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    for (ConsumerRecord<String, String> record : msgList) {
                        //消費(fèi)100條就打印 ,但打印的數(shù)據(jù)不一定是這個(gè)規(guī)律的

                            System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


//                            String v = decodeUnicode(record.value());

//                            System.out.println(v);

                        //當(dāng)消費(fèi)了1000條就退出
                        if (messageNo % 1000 == 0) {
                            break;
                        }
                        messageNo++;
                    }
                } else {
                    Thread.sleep(11);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }


    /*
     * 中文轉(zhuǎn)unicode編碼
     */
    public static String gbEncoding(final String gbString) {
        char[] utfBytes = gbString.toCharArray();
        String unicodeBytes = "";
        for (int i = 0; i < utfBytes.length; i++) {
            String hexB = Integer.toHexString(utfBytes[i]);
            if (hexB.length() <= 2) {
                hexB = "00" + hexB;
            }
            unicodeBytes = unicodeBytes + "\\u" + hexB;
        }
        return unicodeBytes;
    }

    /*
     * unicode編碼轉(zhuǎn)中文
     */
    public static String decodeUnicode(final String dataStr) {
        int start = 0;
        int end = 0;
        final StringBuffer buffer = new StringBuffer();
        while (start > -1) {
            end = dataStr.indexOf("\\u", start + 2);
            String charStr = "";
            if (end == -1) {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } else {
                charStr = dataStr.substring(start + 2, end);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // 16進(jìn)制parse整形字符串。
            buffer.append(new Character(letter).toString());
            start = end;
        }
        return buffer.toString();

    }
}

2、對(duì)表bak1進(jìn)行增加數(shù)據(jù)

CREATE TABLE `bak1` (
  `vin` varchar(20) NOT NULL,
  `p1` double DEFAULT NULL,
  `p2` double DEFAULT NULL,
  `p3` double DEFAULT NULL,
  `p4` double DEFAULT NULL,
  `p5` double DEFAULT NULL,
  `p6` double DEFAULT NULL,
  `p7` double DEFAULT NULL,
  `p8` double DEFAULT NULL,
  `p9` double DEFAULT NULL,
  `p0` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

show create table bak1;

insert into bak1 select '李雷abcv',
  `p1` ,
  `p2` ,
  `p3` ,
  `p4` ,
  `p5` ,
  `p6` ,
  `p7` ,
  `p8` ,
  `p9` ,
  `p0`  from moci limit 10

3、查看輸出結(jié)果:

到此這篇關(guān)于MySQL特定表全量、增量數(shù)據(jù)同步到消息隊(duì)列-解決方案的文章就介紹到這了,更多相關(guān)MySQL特定表數(shù)據(jù)同步內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論