解析SQL?Server?CDC配合Kafka?Connect監(jiān)聽(tīng)數(shù)據(jù)變化的問(wèn)題
寫(xiě)在前面
好久沒(méi)更新Blog了,從CRUD Boy轉(zhuǎn)型大數(shù)據(jù)開(kāi)發(fā),拉寬了不少的知識(shí)面,從今年年初開(kāi)始籌備、組建、招兵買(mǎi)馬,到現(xiàn)在穩(wěn)定開(kāi)搞中,期間踏過(guò)無(wú)數(shù)的火坑,也許除了這篇還很寫(xiě)上三四篇。
進(jìn)入主題,通常企業(yè)為了實(shí)現(xiàn)數(shù)據(jù)統(tǒng)計(jì)、數(shù)據(jù)分析、數(shù)據(jù)挖掘、解決信息孤島等全局?jǐn)?shù)據(jù)的系統(tǒng)化運(yùn)作管理 ,為BI、經(jīng)營(yíng)分析、決策支持系統(tǒng)等深度開(kāi)發(fā)應(yīng)用奠定基礎(chǔ),挖掘數(shù)據(jù)價(jià)值 ,企業(yè)會(huì)開(kāi)始著手建立數(shù)據(jù)倉(cāng)庫(kù),數(shù)據(jù)中臺(tái)。而這些數(shù)據(jù)來(lái)源則來(lái)自于企業(yè)的各個(gè)業(yè)務(wù)系統(tǒng)的數(shù)據(jù)或爬取外部的數(shù)據(jù),從業(yè)務(wù)系統(tǒng)數(shù)據(jù)到數(shù)據(jù)倉(cāng)庫(kù)的過(guò)程就是一個(gè)ETL(Extract-Transform-Load)行為,包括了采集、清洗、數(shù)據(jù)轉(zhuǎn)換等主要過(guò)程,通常異構(gòu)數(shù)據(jù)抽取轉(zhuǎn)換使用Sqoop、DataX等,日志采集Flume、Logstash、Filebeat等。
數(shù)據(jù)抽取分為全量抽取和增量抽取,全量抽取類似于數(shù)據(jù)遷移或數(shù)據(jù)復(fù)制,全量抽取很好理解;增量抽取在全量的基礎(chǔ)上做增量,只監(jiān)聽(tīng)、捕捉動(dòng)態(tài)變化的數(shù)據(jù)。如何捕捉數(shù)據(jù)的變化是增量抽取的關(guān)鍵,一是準(zhǔn)確性,必須保證準(zhǔn)確的捕捉到數(shù)據(jù)的動(dòng)態(tài)變化,二是性能,不能對(duì)業(yè)務(wù)系統(tǒng)造成太大的壓力。
增量抽取方式
通常增量抽取有幾種方式,各有優(yōu)缺點(diǎn)。
1. 觸發(fā)器
在源數(shù)據(jù)庫(kù)上的目標(biāo)表創(chuàng)建觸發(fā)器,監(jiān)聽(tīng)增、刪、改操作,捕捉到數(shù)據(jù)的變更寫(xiě)入臨時(shí)表。
優(yōu)點(diǎn):操作簡(jiǎn)單、規(guī)則清晰,對(duì)源表不影響;
缺點(diǎn):對(duì)源數(shù)據(jù)庫(kù)有侵入,對(duì)業(yè)務(wù)系統(tǒng)有一定的影響;
2. 全表比對(duì)
在ETL過(guò)程中,抽取方建立臨時(shí)表待全量抽取存儲(chǔ),然后在進(jìn)行比對(duì)數(shù)據(jù)。
優(yōu)點(diǎn):對(duì)源數(shù)據(jù)庫(kù)、源表都無(wú)需改動(dòng),完全交付ETL過(guò)程處理,統(tǒng)一管理;
缺點(diǎn):ETL效率低、設(shè)計(jì)復(fù)雜,數(shù)據(jù)量越大,速度越慢,時(shí)效性不確定;
3. 全表刪除后再插入
在抽取數(shù)據(jù)之前,先將表中數(shù)據(jù)清空,然后全量抽取。
優(yōu)點(diǎn):ETL 操作簡(jiǎn)單,速度快。
缺點(diǎn):全量抽取一般采取T+1的形式,抽取數(shù)據(jù)量大的表容易對(duì)數(shù)據(jù)庫(kù)造成壓力;
4. 時(shí)間戳
時(shí)間戳的方式即在源表上增加時(shí)間戳列,對(duì)發(fā)生變更的表進(jìn)行更新,然后根據(jù)時(shí)間戳進(jìn)行提取。
優(yōu)點(diǎn):操作簡(jiǎn)單,ELT邏輯清晰,性能比較好;
缺點(diǎn):對(duì)業(yè)務(wù)系統(tǒng)有侵入,數(shù)據(jù)庫(kù)表也需要額外增加字段。對(duì)于老的業(yè)務(wù)系統(tǒng)可能不容易做變更。
5. CDC方式
變更數(shù)據(jù)捕獲Change Data Capture(簡(jiǎn)稱CDC),SQLServer為實(shí)時(shí)更新數(shù)據(jù)同步提供了CDC機(jī)制,類似于Mysql的binlog,將數(shù)據(jù)更新操作維護(hù)到一張CDC表中。開(kāi)啟CDC的源表在插入INSERT、更新UPDATE和刪除DELETE活動(dòng)時(shí)會(huì)插入數(shù)據(jù)到日志表中。cdc通過(guò)捕獲進(jìn)程將變更數(shù)據(jù)捕獲到變更表中,通過(guò)cdc提供的查詢函數(shù),可以捕獲這部分?jǐn)?shù)據(jù)。詳情可以查看官方介紹:關(guān)于變更數(shù)據(jù)捕獲 (SQL Server)
優(yōu)點(diǎn):提供易于使用的API 來(lái)設(shè)置CDC 環(huán)境,縮短ETL 的時(shí)間,無(wú)需修改業(yè)務(wù)系統(tǒng)表結(jié)構(gòu)。
缺點(diǎn):受數(shù)據(jù)庫(kù)版本的限制,實(shí)現(xiàn)過(guò)程相對(duì)復(fù)雜。
CDC增量抽取
先決條件
1. 已搭建好Kafka集群,Zookeeper集群;
2. 源數(shù)據(jù)庫(kù)支持CDC,版本采用開(kāi)發(fā)版或企業(yè)版。
案例環(huán)境:
Ubuntu 20.04
Kafka2.13-2.7.0
Zookeeper 3.6.2
SQL Server 2012
步驟
除了數(shù)據(jù)庫(kù)開(kāi)啟CDC支持以外,主要還是要將變更的數(shù)據(jù)通過(guò)Kafka Connect傳輸數(shù)據(jù),Debezium是目前官方推薦的連接器,它支持絕大多數(shù)主流數(shù)據(jù)庫(kù):MySQL、PostgreSQL、SQL Server、Oracle等等,詳情查看Connectors。
1. 數(shù)據(jù)庫(kù)步驟
開(kāi)啟數(shù)據(jù)庫(kù)CDC支持
在源數(shù)據(jù)庫(kù)執(zhí)行以下命令:
EXEC sys.sp_cdc_enable_db GO
附上關(guān)閉語(yǔ)句:
exec sys.sp_cdc_disable_db
查詢是否啟用
select * from sys.databases where is_cdc_enabled = 1
創(chuàng)建測(cè)試數(shù)據(jù)表:(已有表則跳過(guò)此步驟)
create table T_LioCDC ( ID int identity(1,1) primary key , Name nvarchar(16), Sex bit, CreateTime datetime, UpdateTime datetime );
對(duì)源表開(kāi)啟CDC支持:
exec sp_cdc_enable_table @source_schema='dbo', @source_name='T_LioCDC', @role_name=null, @supports_net_changes = 1;
確認(rèn)是否有權(quán)限訪問(wèn)CDC Table:
EXEC sys.sp_cdc_help_change_data_capture
確認(rèn)SQL Server Agent已開(kāi)啟:
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
以上則完成對(duì)數(shù)據(jù)庫(kù)的CDC操作。
2. Kafka步驟
Kafka Connect的工作模式分為兩種,分別是standalone模式和distributed模式。standalone用于單機(jī)測(cè)試,本文用distributed模式,用于生產(chǎn)環(huán)境。(Kafka必須先運(yùn)行啟動(dòng),再進(jìn)行以下步驟進(jìn)行配置。)
下載Sql Server Connector
下載連接器后,創(chuàng)建一個(gè)文件夾來(lái)存放,解壓到該目錄下即可,例子路徑:/usr/soft/kafka/kafka_2.13_2.7.0/plugins(記住這個(gè)路徑,配置中要用到)
下載地址:debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz
編輯connect-distributed.properties配置
修改Kafka connect配置文件,$KAFKA_HOME/config/connect-distributed.properties,變更內(nèi)容如下:
//kafka集群ip+portbootstrap.servers=172.192.10.210:9092,172.192.10.211:9092,172.192.10.212:9092 key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.topic=connect-offsets offset.storage.replication.factor=1 offset.storage.partitions=3 offset.storage.cleanup.policy=compact config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 status.storage.partitions=3 //剛剛下載連接器解壓的路徑 plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins
看到配置中有三個(gè)Topic,分別是
config.storage.topic:用以保存connector和task的配置信息,需要注意的是這個(gè)主題的分區(qū)數(shù)只能是1,而且是有多副本的。
offset.storage.topic:用以保存offset信息。
status.storage.topic:用以保存connetor的狀態(tài)信息。
這些Topic可以不用創(chuàng)建,啟動(dòng)后會(huì)默認(rèn)創(chuàng)建。
啟動(dòng)Kafka集群
保存配置之后,將connect-distributed.properties分發(fā)到集群中,然后啟動(dòng):
bin/connect-distributed.sh config/connect-distributed.properties
檢查是否啟動(dòng)
connector支持REST API的方式進(jìn)行管理,所以用Post man或者Fiddler可以調(diào)用相關(guān)接口進(jìn)行管理。檢查是否啟動(dòng):
不用奇怪,上面配置集群的IP是172段,這里的192.168.1.177仍是我的集群中的一個(gè)服務(wù)器,因?yàn)榉?wù)器都使用了雙網(wǎng)卡。因?yàn)檫€沒(méi)有連接器相關(guān)配置,所以接口返回是一個(gè)空數(shù)組,接下來(lái)將新增一個(gè)連接器。
編寫(xiě)sqlserver-cdc-source.json
{ "name": "sqlserver-cdc-source", "config": { "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector", "database.server.name" : "JnServer", "database.hostname" : "172.192.20.2", --目標(biāo)數(shù)據(jù)庫(kù)的ip "database.port" : "1433", --目標(biāo)數(shù)據(jù)庫(kù)的端口 "database.user" : "sa", --目標(biāo)數(shù)據(jù)庫(kù)的賬號(hào) "database.password" : "123456", --密碼 "database.dbname" : "Dis", --目標(biāo)數(shù)據(jù)庫(kù)的數(shù)據(jù)庫(kù)名稱 "table.whitelist": "dbo.T_LioCDC", --監(jiān)聽(tīng)表名 "schemas.enable" : "false", "mode":"incrementing", --增量模式 "incrementing.column.name": "ID", --增量列名 "database.history.kafka.bootstrap.servers" : "172.192.10.210:9092,172.192.10.211:9092,172.192.10.212", --kafka集群 "database.history.kafka.topic": "TopicTLioCDC", --kafka topic內(nèi)部使用,不是由消費(fèi)者使用 "value.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter" } } //源文地址:?https://www.cnblogs.com/EminemJK/p/14688907.html
還有其他額外的配置,可以參考官方文檔。然后執(zhí)行
繼續(xù)執(zhí)行檢查,就發(fā)現(xiàn)連接器已經(jīng)成功配置了:
其他API
GET /connectors – 返回所有正在運(yùn)行的connector名。 POST /connectors – 新建一個(gè)connector; 請(qǐng)求體必須是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。 GET /connectors/{name} – 獲取指定connetor的信息。 GET /connectors/{name}/config – 獲取指定connector的配置信息。 PUT /connectors/{name}/config – 更新指定connector的配置信息。 GET /connectors/{name}/status – 獲取指定connector的狀態(tài),包括它是否在運(yùn)行、停止、或者失敗,如果發(fā)生錯(cuò)誤,還會(huì)列出錯(cuò)誤的具體信息。 GET /connectors/{name}/tasks – 獲取指定connector正在運(yùn)行的task。 GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態(tài)信息。 PUT /connectors/{name}/pause – 暫停connector和它的task,停止數(shù)據(jù)處理知道它被恢復(fù)。 PUT /connectors/{name}/resume – 恢復(fù)一個(gè)被暫停的connector。 POST /connectors/{name}/restart – 重啟一個(gè)connector,尤其是在一個(gè)connector運(yùn)行失敗的情況下比較常用 POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個(gè)task,一般是因?yàn)樗\(yùn)行失敗才這樣做。 DELETE /connectors/{name} – 刪除一個(gè)connector,停止它的所有task并刪除配置。//源文地址:?https://www.cnblogs.com/EminemJK/p/14688907.html
查看Topic
/usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh --list --zookeeper localhost:2000
TopicJnServer.dbo.T_LioCDC則是供我們消費(fèi)的主題,啟動(dòng)一個(gè)消費(fèi)者進(jìn)行監(jiān)聽(tīng)測(cè)試:
bin/kafka-console-consumer.sh --bootstrap-server 172.192.10.210:9092? --consumer-property group.id=group1 --consumer-property client.id=consumer-1? --topic JnServer.dbo.T_LioCDC
然后再源表進(jìn)行一些列增刪改操作,
--測(cè)試代碼 insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('A',1,getdate(),getdate()) insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('B',0,getdate(),getdate()) insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('C',1,getdate(),getdate()) insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('D',0,getdate(),getdate()) insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('E',1,getdate(),getdate()) insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('F',1,getdate(),getdate()) insert into T_LioCDC(name, sex, createtime,UpdateTime) values ('G',0,getdate(),getdate()) update T_LioCDC set Name='Lio.Huang',UpdateTime=getdate() where ID=7
已經(jīng)成功捕捉到數(shù)據(jù)的變更,對(duì)比幾個(gè)操作Json,依次是insert、update、delete:
到此這篇關(guān)于SQL?Server?CDC配合Kafka?Connect監(jiān)聽(tīng)數(shù)據(jù)變化的文章就介紹到這了,更多相關(guān)SQL?Server?CDC監(jiān)聽(tīng)數(shù)據(jù)變化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SQL Server 數(shù)據(jù)庫(kù)備份和還原認(rèn)識(shí)和總結(jié)(二)
本文將針對(duì)上文繼續(xù)進(jìn)行數(shù)據(jù)備份和還原講解,主要講解備份和還原的一些關(guān)鍵選項(xiàng)2012-08-08SQLServer 優(yōu)化SQL語(yǔ)句 in 和not in的替代方案
用IN寫(xiě)出來(lái)的SQL的優(yōu)點(diǎn)是比較容易寫(xiě)及清晰易懂,這比較適合現(xiàn)代軟件開(kāi)發(fā)的風(fēng)格。2010-04-04存儲(chǔ)過(guò)程實(shí)現(xiàn)訂單號(hào),流水單號(hào)(8位)的詳細(xì)思路
存儲(chǔ)過(guò)程實(shí)現(xiàn)訂單號(hào),流水單號(hào)是一個(gè)比較不錯(cuò)的功能,在處理訂單方面起到一個(gè)不錯(cuò)的作用;此文章是借鑒園中的各位大神的,本人只是略作修改。有不好的地方,歡迎吐槽2013-01-01數(shù)據(jù)庫(kù)清除日志文件(LDF文件過(guò)大)
數(shù)據(jù)庫(kù)清除日志文件,(LDF文件過(guò)大),一般情況下,有更簡(jiǎn)單的方法,需要在sqlserver查詢執(zhí)行。2009-11-11java 連接sql server2008數(shù)據(jù)庫(kù)配置
本篇文章給大家分享java連接sql server2008數(shù)據(jù)庫(kù)配置的相關(guān)資料,需要的朋友可以參考下2015-09-09在SQL觸發(fā)器或存儲(chǔ)過(guò)程中獲取在程序登錄的用戶
每個(gè)用戶可以登錄系統(tǒng),在程序中操作數(shù)據(jù)(添加,更新和刪除)需要實(shí)現(xiàn)記錄操作跟蹤。是誰(shuí)添加,更新和刪除的,這些信息將會(huì)插入至AuditLog表中2012-01-01一道關(guān)于數(shù)據(jù)庫(kù)(經(jīng)典父子級(jí) ID 關(guān)聯(lián))更新題
這篇文章主要介紹了一道關(guān)于數(shù)據(jù)庫(kù)(經(jīng)典父子級(jí) ID 關(guān)聯(lián))更新題,大家?guī)兔ο胂脒€有其它解決思路沒(méi)有?2015-06-06sql2005 存儲(chǔ)過(guò)程分頁(yè)示例代碼
sql2005分頁(yè)存儲(chǔ)過(guò)程示例2010-03-03