Canal實現(xiàn)MYSQL實時數(shù)據(jù)同步的示例代碼
部署Canal-Admin
1. 拉取Canal-Admin鏡像
為了兼容MYSQL8.0+, 我們需要拉取 v1.1.7的鏡像
docker pull canal/canal-admin:v1.1.7
2. 創(chuàng)建目錄
mkdir -p /data/canal-server/conf/
3. 創(chuàng)建canal_manager數(shù)據(jù)庫
CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */; USE `canal_manager`; SET NAMES utf8; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for canal_adapter_config -- ---------------------------- DROP TABLE IF EXISTS `canal_adapter_config`; CREATE TABLE `canal_adapter_config` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `category` varchar(45) NOT NULL, `name` varchar(45) NOT NULL, `status` varchar(45) DEFAULT NULL, `content` text NOT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_cluster -- ---------------------------- DROP TABLE IF EXISTS `canal_cluster`; CREATE TABLE `canal_cluster` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(63) NOT NULL, `zk_hosts` varchar(255) NOT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_config -- ---------------------------- DROP TABLE IF EXISTS `canal_config`; CREATE TABLE `canal_config` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `cluster_id` bigint(20) DEFAULT NULL, `server_id` bigint(20) DEFAULT NULL, `name` varchar(45) NOT NULL, `status` varchar(45) DEFAULT NULL, `content` text NOT NULL, `content_md5` varchar(128) NOT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `sid_UNIQUE` (`server_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_instance_config -- ---------------------------- DROP TABLE IF EXISTS `canal_instance_config`; CREATE TABLE `canal_instance_config` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `cluster_id` bigint(20) DEFAULT NULL, `server_id` bigint(20) DEFAULT NULL, `name` varchar(45) NOT NULL, `status` varchar(45) DEFAULT NULL, `content` text NOT NULL, `content_md5` varchar(128) DEFAULT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `name_UNIQUE` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_node_server -- ---------------------------- DROP TABLE IF EXISTS `canal_node_server`; CREATE TABLE `canal_node_server` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `cluster_id` bigint(20) DEFAULT NULL, `name` varchar(63) NOT NULL, `ip` varchar(63) NOT NULL, `admin_port` int(11) DEFAULT NULL, `tcp_port` int(11) DEFAULT NULL, `metric_port` int(11) DEFAULT NULL, `status` varchar(45) DEFAULT NULL, `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Table structure for canal_user -- ---------------------------- DROP TABLE IF EXISTS `canal_user`; CREATE TABLE `canal_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `username` varchar(31) NOT NULL, `password` varchar(128) NOT NULL, `name` varchar(31) NOT NULL, `roles` varchar(31) NOT NULL, `introduction` varchar(255) DEFAULT NULL, `avatar` varchar(255) DEFAULT NULL, `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; SET FOREIGN_KEY_CHECKS = 1; -- ---------------------------- -- Records of canal_user -- ---------------------------- BEGIN; INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28'); COMMIT; SET FOREIGN_KEY_CHECKS = 1;
4. 啟動Canal-Admin
如果是服務(wù)器部署的話,記得開放對應(yīng)的端口號
8089:8089
docker run -d --name canal-admin -p 8089:8089 canal/canal-admin:v1.1.7
5. 拷貝配置文件
docker cp canal-admin:/home/admin/canal-admin/conf/application.yml /data/canal-admin/conf/
6. 刪除Canal-Admin容器
docker rm -f canal-admin
7. 修改配置文件
(注意修改注釋位置的信息)
一共3處修改
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: xxx.xxx.xxx.xxx:3306 #創(chuàng)建數(shù)據(jù)庫canal_manager的地址及端口號 database: canal_manager username: root #數(shù)據(jù)庫賬號 password: xxxxxx #數(shù)據(jù)庫密碼 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
8. 部署Canal-Admin
docker run --name canal-admin -p 8089:8089 \ -v /data/canal-admin/conf/application.yml:/home/admin/canal-admin/conf/application.yml \ -v /data/canal-admin/logs/:/home/admin/canal-admin/logs/ \ -d canal/canal-admin:v1.1.7
9. 訪問Canal-Admin
訪問 xxx.xxx.xxx.xxx:8089, 前面換成自己的服務(wù)器、虛擬機地址
部署Canal-Server
0. 前置工作
查看MYSQL是否
開啟log_bin日志
和日志記錄格式
是否為Row
一般都是開啟的, 可以通過下列命令在Navicat
或其他數(shù)據(jù)庫工具進行查看
show variables like 'log_bin'; show variables like 'binlog_format';
授權(quán)賬號權(quán)限, 復(fù)制下列命令執(zhí)行即可
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1. 拉取Canal-Server鏡像
docker pull canal/canal-server:v1.1.7
2. 創(chuàng)建目錄
mkdir -p /data/canal-server/conf/
3. 啟動Canal-Server
如果是服務(wù)器部署的話,記得開放對應(yīng)的端口號
11111:11111
docker run -d --name canal-server -p 11111:11111 canal/canal-server:v1.1.7
4. 拷貝配置文件
docker cp canal-server:/home/admin/canal-server/conf/canal.properties /data/canal-server/conf/ docker cp canal-server:/home/admin/canal-server/conf/example/instance.properties /data/canal-server/conf/
5. 刪除Canal-Server
docker rm -f canal-server
6. 修改配置文件
canal.properties 一共1處修改
# tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal admin config canal.admin.manager = xxx.xxx.xxx.xxx:8089 # 改成自己的數(shù)據(jù)庫地址 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = tcp # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} canal.instance.global.spring.xml = classpath:spring/file-instance.xml # canal admin config canal.admin.manager = xxx.xxx.xxx.xxx:8089 # 改成自己的數(shù)據(jù)庫地址 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
instance.properties 一共2處修改
################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=10 # 這里的ID不要和Mysql的重復(fù)即可,可以直接填10 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=xxx.xxx.xxx.xxx:3306 # 改成需要監(jiān)聽的數(shù)據(jù)庫地址 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # mq config canal.mq.topic=example canal.mq.partition=0 canal.instance.multi.stream.on=false #################################################
7. 部署Canal-Server
docker run --name canal-server -p 11111:11111 \ -v /data/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \ -v /data/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \ -v /data/canal-server/logs/:/home/docker/canal-server/logs/ \ -d canal/canal-server:v1.1.7
部署Canal-Adapter
由于這個
Canal-Adapter
作者并沒有上傳對應(yīng)的鏡像
但是他上傳了對應(yīng)的tar
,所以我們可以制作對應(yīng)的鏡像
Canal-Adapter 的下載鏈接: Canal-Adapter
1. 創(chuàng)建對應(yīng)的文件,文件如下
路徑盡量跟我的一致
/opt/canal/
,因為后面的文件都是在這個路徑下
2. 對應(yīng)的文件內(nèi)容如下
Dockerfile
FROM openjdk:11 COPY canal.adapter-*.tar.gz /tmp/ RUN \ mkdir -p /opt/canal-adapter && \ tar -zxf /tmp/canal.adapter-*.tar.gz -C /opt/canal-adapter && \ rm -r /tmp/canal.adapter-*.tar.gz COPY startup.sh /opt/canal-adapter/bin/startup.sh WORKDIR /opt/canal-adapter CMD ["sh", "-c", "sh /opt/canal-adapter/bin/startup.sh && tail -F logs/adapter/adapter.log"]
startup.sh
#!/bin/bash current_path=`pwd` case "`uname`" in Linux) bin_abs_path=$(readlink -f $(dirname $0)) ;; *) bin_abs_path=`cd $(dirname $0); pwd` ;; esac base=${bin_abs_path}/.. export LANG=en_US.UTF-8 export BASE=$base if [ -f $base/bin/adapter.pid ] ; then echo "found adapter.pid , Please run stop.sh first ,then startup.sh" 2>&2 exit 1 fi if [ ! -d $base/logs ] ; then mkdir -p $base/logs fi ## set java path if [ -z "$JAVA" ] ; then JAVA=$(which java) fi ALIBABA_JAVA="/usr/alibaba/java/bin/java" TAOBAO_JAVA="/opt/taobao/java/bin/java" if [ -z "$JAVA" ]; then if [ -f $ALIBABA_JAVA ] ; then JAVA=$ALIBABA_JAVA elif [ -f $TAOBAO_JAVA ] ; then JAVA=$TAOBAO_JAVA else echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2 exit 1 fi fi case "$#" in 0 ) ;; 2 ) if [ "$1" = "debug" ]; then DEBUG_PORT=$2 DEBUG_SUSPEND="n" JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND" fi ;; * ) echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN." exit;; esac str=`file -L $JAVA | grep 64-bit` if [ -n "$str" ]; then JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -Xss256k -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError" else JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m " fi JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8" ADAPTER_OPTS="-DappName=canal-adapter" for i in $base/lib/*; do CLASSPATH=$i:"$CLASSPATH"; done CLASSPATH="$base/conf:$CLASSPATH"; echo "cd to $bin_abs_path for workaround relative path" cd $bin_abs_path echo CLASSPATH :$CLASSPATH exec $JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication
3. 構(gòu)造 Canal-Adapter
鏡像
cd /opt/canal docker build -t canal/canal-adapter:v1.1.7 .
4. 創(chuàng)建目錄
mkdir -p /data/canal-adapter/conf/es7
5. 啟動Canal-Adapter
docker run -d --name canal-adapter -p 8081:8081 canal/canal-adapter:v1.1.7
6. 拷貝配置文件
docker cp canal-adapter:/opt/canal-adapter/conf/application.yml /data/canal-adapter/conf/ docker cp canal-adapter:/opt/canal-adapter/conf/bootstrap.yml /data/canal-adapter/conf/ docker cp canal-adapter:/opt/canal-adapter/conf/es7/mytest_user.yml /data/canal-adapter/conf/es7
7. 刪除Canal-Adapter
docker rm -f canal-adapter
8. 修改配置文件
application.yml 一共7處地方
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: xxx.xxx.xxx.xxx:11111 #改成部署canal-server的地址 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://xxx.xxx.xxx:3341/infusion-xxxxx?useUnicode=true # 監(jiān)聽的數(shù)據(jù)庫地址 username: root #數(shù)據(jù)庫賬號 password: xxxxxxx #數(shù)據(jù)庫密碼 canalAdapters: - instance: example #如果沒改過的話 默認這個即可 groups: - groupId: g1 outerAdapters: - name: logger - name: es7 hosts: http://xxx.xxx.xxx:9200 #部署Es的服務(wù)器地址 properties: mode: rest # or rest # security.auth: test:123456 # only used for rest mode cluster.name: es #部署Es的容器名字
bootstrap.yml
canal: manager: jdbc: url: jdbc:mysql://xxxxxx:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8 username: root password: xxxxxxx
coupon_record.yml 一共3處地方
dataSourceKey: defaultDS destination: example #如果之前canal-server沒改就使用默認example groupId: g1 esMapping: _index: coupon_record #對應(yīng)索引庫名稱 _id: _id sql: "select id as _id,coupon_id,create_time,use_state,openid,user_type,user_name,coupon_title,start_time,end_time,order_id,price,condition_price,del_flag from sys_coupon_record" commitBatch: 3000
9. 部署Canal-Adapter
docker run --name canal-adapter -p 8081:8081 \ -v /data/canal-adapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \ -v /data/canal-adapter/conf/bootstrap.yml:/opt/canal-adapter/conf/bootstrap.yml \ -v /data/canal-adapter/conf/es7:/opt/canal-adapter/conf/es7 \ -v /data/canal-adapter/logs:/opt/canal-adapter/logs \ -d canal/canal-adapter:v1.1.7
部署Canal三件套到此結(jié)束啦,內(nèi)容可能有點多,需要認真理解!
到此這篇關(guān)于Canal實現(xiàn)MYSQL實時數(shù)據(jù)同步的示例代碼的文章就介紹到這了,更多相關(guān)Canal MYSQL實時數(shù)據(jù)同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
mysql 5.7.21 winx64免安裝版配置方法圖文教程
這篇文章主要為大家詳細介紹了mysql 5.7.21 winx64免安裝版配置方法圖文教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02MySQL分組查詢獲取每組最新的一條數(shù)據(jù)詳解(group?by)
在寫報表功能時遇到一個需要根據(jù)用戶id分組查詢最新一條錢包明細數(shù)據(jù)的需求,下面這篇文章主要給大家介紹了關(guān)于MySQL分組查詢獲取每組最新的一條數(shù)據(jù)的相關(guān)資料,需要的朋友可以參考下2024-08-08解析mysql中max_connections與max_user_connections的區(qū)別
本篇文章是對mysql中max_connections與max_user_connections的區(qū)別進行了詳細的分析介紹,需要的朋友參考下2013-06-06mysql中xtrabackup全量備份/增量備份及恢復(fù)
本文詳細介紹了使用XtraBackup工具進行MySQL數(shù)據(jù)庫的全量備份、增量備份以及恢復(fù)的詳細步驟,文章詳細列出了所需目錄結(jié)構(gòu)、配置文件和命令,為數(shù)據(jù)庫管理員提供了一套完整的備份恢復(fù)解決方案2024-09-09mysql server is running with the --skip-grant-tables option
今天在mysql中新建數(shù)據(jù)庫提示The MySQL server is running with the --skip-grant-tables option so it cannot execute this statement,原來是數(shù)據(jù)中配置的--skip-grant-tables,這樣安全就降低了,這個一般當(dāng)忘記root密碼的時候需要這樣操作2017-07-07