MLSQL Stack如何讓流調(diào)試更加簡單詳解
前言
有一位同學正在調(diào)研MLSQL Stack對流的支持。然后說了流調(diào)試其實挺困難的。經(jīng)過實踐,希望實現(xiàn)如下三點:
- 能隨時查看最新固定條數(shù)的Kafka數(shù)據(jù)
- 調(diào)試結(jié)果(sink)能打印在web控制臺
- 流程序能自動推測json schema(現(xiàn)在spark是不行的)
實現(xiàn)這三個點之后,我發(fā)現(xiàn)調(diào)試確實就變得簡單很多了。
流程
首先我新建了一個kaf_write.mlsql,里面方便我往Kafka里寫數(shù)據(jù):
set abc=''' { "x": 100, "y": 200, "z": 200 ,"dataType":"A group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B group"} '''; load jsonStr.`abc` as table1; select to_json(struct(*)) as value from table1 as table2; save append table2 as kafka.`wow` where kafka.bootstrap.servers="127.0.0.1:9092";
這樣我每次運行,數(shù)據(jù)就能寫入到Kafka.
接著,我寫完后,需要看看數(shù)據(jù)是不是真的都寫進去了,寫成了什么樣子:
!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;
這句話表示,我要采樣Kafka 10條Kafka數(shù)據(jù),該Kafka的地址為127.0.0.1:9092,主題為wow.運行結(jié)果如下:
沒有什么問題。接著我寫一個非常簡單的流式程序:
-- the stream name, should be uniq. set streamName="streamExample"; -- use kafkaTool to infer schema from kafka !kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow; load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092" as newkafkatable1; select * from newkafkatable1 as table21; -- print in webConsole instead of terminal console. save append table21 as webConsole.`` options mode="Append" and duration="15" and checkpointLocation="/tmp/s-cpl4";
運行結(jié)果如下:
在終端我們也可以看到實時效果了。
補充
當然,MLSQL Stack 還有對流還有兩個特別好地方,第一個是你可以對流的事件設置http協(xié)議的callback,以及對流的處理結(jié)果再使用批SQL進行處理,最后入庫。參看如下腳本:
-- the stream name, should be uniq. set streamName="streamExample"; -- mock some data. set data=''' {"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} {"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} '''; -- load data as table load jsonStr.`data` as datasource; -- convert table as stream source load mockStream.`datasource` options stepSizeRange="0-3" as newkafkatable1; -- aggregation select cast(value as string) as k from newkafkatable1 as table21; !callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated"; -- output the the result to console. save append table21 as custom.`` options mode="append" and duration="15" and sourceTable="jack" and code=''' select count(*) as c from jack as newjack; save append newjack as parquet.`/tmp/jack`; ''' and checkpointLocation="/tmp/cpl15";
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對腳本之家的支持。
相關文章
MySQL重啟之后無法寫入數(shù)據(jù)的問題排查及解決
客戶在給系統(tǒng)打補丁之后需要重啟服務器,數(shù)據(jù)庫在重啟之后,read_only 的設置與標準配置 文件中不一致,導致主庫在啟動之后無法按照預期寫入,所以本文給大家介紹了MySQL重啟之后無法寫入數(shù)據(jù)的問題排查及解決,需要的朋友可以參考下2024-05-05MySQL數(shù)據(jù)庫優(yōu)化的六種方式總結(jié)
關于數(shù)據(jù)庫優(yōu)化,網(wǎng)上有不少資料和方法,但是不少質(zhì)量參差不齊,所以下面這篇文章主要給大家介紹了關于MySQL數(shù)據(jù)庫優(yōu)化的六種方式,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2022-01-01windows下MySQL數(shù)據(jù)庫移動到其它盤
大家好,本篇文章主要講的是windows下MySQL數(shù)據(jù)庫移動到其它盤,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏2021-12-12Windows系統(tǒng)下MySQL8.0.21安裝教程(圖文詳解)
這篇文章主要介紹了Windows系統(tǒng)下MySQL8.0.21安裝教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08Mysql 查詢JSON結(jié)果的相關函數(shù)匯總
這篇文章主要介紹了Mysql 查詢 JSON 結(jié)果的相關函數(shù)匯總,幫助大家更好的理解和使用MySQL,感興趣的朋友可以了解下2020-11-11