Python實(shí)現(xiàn)數(shù)據(jù)庫(kù)表的監(jiān)控警告的項(xiàng)目實(shí)踐
簡(jiǎn)介
使用Python 實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)表的監(jiān)控告警功能, 并將告警信息通過(guò)釘釘機(jī)器人發(fā)送到釘釘群
實(shí)現(xiàn)DataWorks中數(shù)據(jù)質(zhì)量的基本功能, 當(dāng)然 DW的數(shù)據(jù)質(zhì)量的規(guī)則類(lèi)型很多, 用起來(lái)比較方便, 這里只簡(jiǎn)單實(shí)現(xiàn)了其中兩個(gè)規(guī)則類(lèi)型的功能, 僅供參考;
初次使用Python, 請(qǐng)多指教
使用工具: MaxCompute
1. 創(chuàng)建表
1. tmp_monitor_tbl_info
CREATE TABLE IF NOT EXISTS puture_bigdata.tmp_monitor_tbl_info ( `id` STRING COMMENT '表編號(hào)id' , `tbl_name` STRING COMMENT '表名' , `pt_format` STRING COMMENT '分區(qū)格式: yyyy-MM-dd,yyyyMMdd 等' , `val_type` STRING COMMENT '值類(lèi)型: 表行數(shù),周期值等' , `monitor_flag` int COMMENT '監(jiān)控標(biāo)識(shí): 0:不監(jiān)控, 1:監(jiān)控;' , `rule_code` int COMMENT '規(guī)則編碼: 1:表行數(shù),上周期差值, 2:表行數(shù),固定值 等' , `rule_type` STRING COMMENT '規(guī)則類(lèi)型: 表行數(shù),上周期差值; 表行數(shù),固定值; 與固定值比較 等' , `expect_val` int COMMENT '期望值' , `tbl_sort_code` int COMMENT '表類(lèi)型編碼: 0:其它(維表類(lèi)), 1:亞馬遜, 2:中小平臺(tái), 3:市場(chǎng)數(shù)據(jù) 等' , `tbl_sort_name` STRING COMMENT '表類(lèi)型名字: 0:其它(維表類(lèi)), 1:亞馬遜, 2:中小平臺(tái), 3:市場(chǎng)數(shù)據(jù) 等' , `pt_num` INT COMMENT '分區(qū)日期差值' ) COMMENT '數(shù)據(jù)監(jiān)控表信息' tblproperties ("transactional"="true") ;
-- 插入數(shù)據(jù) INSERT INTO TABLE puture_bigdata_dev.tmp_monitor_tbl_info SELECT * FROM ( VALUES (1 , 'ods_amazon_amz_customer_returns_df', 'yyyyMMdd', '表行數(shù)', 1, 1, '表行數(shù),上周期差值', 0, 1, '亞馬遜' , -1) , (2 , 'ods_amazon_amz_flat_file_all_orders_df', 'yyyyMMdd', '表行數(shù)', 1, 1, '表行數(shù),上周期差值', 0, 1, '亞馬遜' , -1) , (3 , 'dim_sys_salesman_info_df', 'yyyyMMdd', '表行數(shù)', 1, 1, '表行數(shù),上周期差值', 0, 0, '其它' , -1) ) AS table_name(id, tbl_name, pt_format, val_type, monitor_flag, rule_code, rule_type, expect_val, tbl_sort_code, tbl_sort_name, pt_num) ;
2. tmp_monitor_tbl_info_log_di
CREATE TABLE IF NOT EXISTS puture_bigdata_dev.tmp_monitor_tbl_info_log_di ( `id` STRING COMMENT '監(jiān)控id編碼:md5(表名_分區(qū))_小時(shí)' , `tbl_name` STRING COMMENT '表名' , `stat_time` STRING COMMENT '統(tǒng)計(jì)時(shí)間' , `pt_format` STRING COMMENT '分區(qū)格式: yyyy-MM-dd,yyyyMMdd 等' , `stat_pt` STRING COMMENT '統(tǒng)計(jì)分區(qū)' , `val_type` STRING COMMENT '值類(lèi)型: 表行數(shù),周期值等' , `val` int COMMENT '統(tǒng)計(jì)值' , `rule_code` int COMMENT '規(guī)則編碼: 1:表行數(shù),上周期差值, 2:表行數(shù),固定值 等' , `rule_type` STRING COMMENT '規(guī)則類(lèi)型: 表行數(shù),上周期差值; 表行數(shù),固定值; 與固定值比較 等' , `expect_val` int COMMENT '期望值' , `is_exc` int COMMENT '是否異常: 0:否,1:是,默認(rèn)值0' , `tbl_sort_code` int COMMENT '表類(lèi)型編碼: 0:其它(維表類(lèi)), 1:亞馬遜, 2:中小平臺(tái), 3:市場(chǎng)數(shù)據(jù) 等' , `tbl_sort_name` STRING COMMENT '表類(lèi)型名字: 0:其它(維表類(lèi)), 1:亞馬遜, 2:中小平臺(tái), 3:市場(chǎng)數(shù)據(jù) 等' ) COMMENT '數(shù)據(jù)監(jiān)控信息記錄表' PARTITIONED BY (pt STRING COMMENT '數(shù)據(jù)日期, yyyy-MM-dd') ;
2. 程序開(kāi)發(fā)
1. 數(shù)據(jù)檢查程序
'''PyODPS 3 請(qǐng)確保不要使用從 MaxCompute下載數(shù)據(jù)來(lái)處理。下載數(shù)據(jù)操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 推薦使用 PyODPS DataFrame(從 MaxCompute 表創(chuàng)建)和MaxCompute SQL來(lái)處理數(shù)據(jù)。 更詳細(xì)的內(nèi)容可以參考:https://help.aliyun.com/document_detail/90481.html ''' import os from odps import ODPS, DataFrame from datetime import datetime, timedelta from dateutil import parser options.tunnel.use_instance_tunnel = True # 獲取當(dāng)前時(shí)間 now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(now_time) pt = args['date'] print(pt) date = datetime.strptime(pt, "%Y-%m-%d") # 監(jiān)控表列表 tbl_sort_code -> 0:其它(維表類(lèi)), 1:亞馬遜, 2:中小平臺(tái), 3:市場(chǎng)數(shù)據(jù) sql_tbl_info = """ SELECT * FROM puture_bigdata.tmp_monitor_tbl_info WHERE monitor_flag = 1 AND tbl_sort_code = 3 """ # 結(jié)果表 res_tbl_name = "puture_bigdata.tmp_monitor_tbl_info_log_di" # 統(tǒng)計(jì)sql代碼 -- 表行數(shù),上周期差值 def sql_upper_period_diff(): sql = f""" set odps.sql.hive.compatible=true ; INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}') SELECT a.id , a.tbl_name , a.stat_time , a.pt_format , a.stat_pt , a.val_type , a.val , a.rule_code , a.rule_type , a.expect_val , IF (a.val = 0, 1, (IF ((a.val - NVL(b.val,0)) >= {expect_val}, 0, 1 ))) AS is_exc , a.tbl_sort_code , a.tbl_sort_name FROM ( SELECT concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id , '{tbl_name}' AS tbl_name , '{now_time}' AS stat_time , '{pt_format}' AS pt_format , date_format('{date_str}' ,'{pt_format}') AS stat_pt , '{val_type}' AS val_type , COUNT(1) AS val , '{rule_code}' AS rule_code , '{rule_type}' AS rule_type , {expect_val} AS expect_val , {tbl_sort_code} AS tbl_sort_code , '{tbl_sort_name}' AS tbl_sort_name FROM puture_bigdata.{tbl_name} WHERE pt = date_format('{date_str}' ,'{pt_format}') ) a LEFT JOIN ( SELECT tbl_name, val FROM ( SELECT tbl_name, val , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn FROM {res_tbl_name} WHERE pt = DATE_ADD('{date_str}', -1) ) WHERE rn = 1 ) b ON a.tbl_name = b.tbl_name ; """ return sql # 表行數(shù), 固定值 def sql_line_fixed_val(): sql = f""" set odps.sql.hive.compatible=true ; INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}') SELECT concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id , '{tbl_name}' AS tbl_name , '{now_time}' AS stat_time , '{pt_format}' AS pt_format , date_format('{date_str}' ,'{pt_format}') AS stat_pt , '{val_type}' AS val_type , COUNT(1) AS val , '{rule_code}' AS rule_code , '{rule_type}' AS rule_type , {expect_val} AS expect_val , IF (COUNT(1) >= {expect_val}, 0, 1 ) AS is_exc , {tbl_sort_code} AS tbl_sort_code , '{tbl_sort_name}' AS tbl_sort_name FROM puture_bigdata.{tbl_name} WHERE pt = date_format('{date_str}' ,'{pt_format}') ; """ return sql # 執(zhí)行監(jiān)控統(tǒng)計(jì)代碼 def ex_monitor(sql: str): try : # print (sql) o.execute_sql(sql, hints={'odps.sql.hive.compatible': True , "odps.sql.submit.mode":"script"}) print("{}: 運(yùn)行成功".format(tbl_name) ) except Exception as e: print('{}: 運(yùn)行異常 ======> '.format(tbl_name) + str(e)) if __name__ == '__main__': try : with o.execute_sql(sql_tbl_info, hints={'odps.sql.hive.compatible': True}).open_reader() as reader: for row_record in reader: # print(row_record) # 打印一條數(shù)據(jù)值 tbl_name = row_record.tbl_name pt_format = row_record.pt_format val_type = row_record.val_type monitor_flag = row_record.monitor_flag rule_code = row_record.rule_code rule_type = row_record.rule_type expect_val = row_record.expect_val tbl_sort_code = row_record.tbl_sort_code tbl_sort_name = row_record.tbl_sort_name pt_num = row_record.pt_num date_str = (date + timedelta(days=pt_num)).strftime('%Y-%m-%d') if rule_code == 1 : ex_monitor(sql_upper_period_diff()) elif rule_code == 2 : ex_monitor(sql_line_fixed_val()) else : print("未知規(guī)則!!!") except Exception as e: print('異常 ======> ' + str(e))
2. 告警信息推送程序
'''PyODPS 3 請(qǐng)確保不要使用從 MaxCompute下載數(shù)據(jù)來(lái)處理。下載數(shù)據(jù)操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 推薦使用 PyODPS DataFrame(從 MaxCompute 表創(chuàng)建)和MaxCompute SQL來(lái)處理數(shù)據(jù)。 更詳細(xì)的內(nèi)容可以參考:https://help.aliyun.com/document_detail/90481.html ''' import json import requests from datetime import datetime import os from odps import ODPS, DataFrame date_str = args['date'] # 接口地址和token信息 url = 'https://oapi.dingtalk.com/robot/send?access_token=***********************' now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print (now_time) sql_query = f""" SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc FROM ( SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC) AS rn FROM puture_bigdata_dev.tmp_monitor_tbl_info_log_di WHERE pt = '{date_str}' AND tbl_sort_code = 1 -- 表種類(lèi) ) a WHERE rn = 1 AND is_exc = 1 """ # 釘釘機(jī)器人,發(fā)送消息 def dd_robot(url:str, content: str): HEADERS = {"Content-Type": "application/json;charset=utf-8"} #content里面要設(shè)置關(guān)鍵字 data_info = { "msgtype": "text", "text": { "content": content }, "isAtAll": False #這是配置需要@的人 # ,"at": {"atMobiles": ["15xxxxxx06",'18xxxxxx1']} } value = json.dumps(data_info) response = requests.post(url,data=value,headers=HEADERS) if response.json()['errmsg']!='ok': print(response.text) # 主函數(shù) if __name__ == '__main__': # py3可以省略 try : with o.execute_sql(sql_query, hints={'odps.sql.hive.compatible': True}).open_reader() as reader: result_rows = list(reader) # 讀取所有的結(jié)果行 result_count = len(result_rows) # 獲取結(jié)果條數(shù) #print("結(jié)果條數(shù):", result_count) # 打印結(jié)果條數(shù) if result_count > 0 : for row in result_rows: tbl_name = row.tbl_name stat_time = row.stat_time stat_pt = row.stat_pt val_type = row.val_type val = row.val rule_type = row.rule_type expect_val = row.expect_val #print (tbl_name) content = "數(shù)據(jù)質(zhì)量(DQC)校驗(yàn)告警 \n " content = content + "【對(duì)象名稱(chēng)】:" + tbl_name + " \n " content = content + "【實(shí)際分區(qū)】:pt=" + stat_pt + " \n " content = content + "【觸發(fā)規(guī)則】: " + rule_type + " | 當(dāng)前樣本值: " + val + " | 閾值: " + expect_val + " \n " content = content + now_time + " \n " dd_robot(url, content) else : print ("無(wú)異常情況;") except Exception as e: print ('異常 ========>' + str(e) )
3. 告警樣例
數(shù)據(jù)質(zhì)量(DQC)校驗(yàn)告警
【對(duì)象名稱(chēng)】:dws_amazon_market_sales_stat_di
【實(shí)際分區(qū)】:pt=20240103
【觸發(fā)規(guī)則】: 表行數(shù),固定值 | 當(dāng)前樣本值: 617 | 閾值: 650
2024-01-04 02:54:44
到此這篇關(guān)于Python實(shí)現(xiàn)數(shù)據(jù)庫(kù)表的監(jiān)控警告的項(xiàng)目實(shí)踐的文章就介紹到這了,更多相關(guān)Python 數(shù)據(jù)庫(kù)表監(jiān)控警告內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python使用pymupdf實(shí)現(xiàn)PDF加密
這篇文章主要介紹了如何使用 Python 和 wxPython 庫(kù)創(chuàng)建一個(gè)簡(jiǎn)單的圖形用戶(hù)界面(GUI)應(yīng)用程序,用于對(duì) PDF 文件進(jìn)行加密,感興趣的小伙伴可以了解下2023-08-08selenium切換標(biāo)簽頁(yè)解決get超時(shí)問(wèn)題的完整代碼
這篇文章主要給大家介紹了關(guān)于selenium切換標(biāo)簽頁(yè)解決get超時(shí)問(wèn)題的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08在Python中字典根據(jù)多項(xiàng)規(guī)則排序的方法
今天小編就為大家分享一篇在Python中字典根據(jù)多項(xiàng)規(guī)則排序的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-01-01Python實(shí)現(xiàn)Windows上氣泡提醒效果的方法
這篇文章主要介紹了Python實(shí)現(xiàn)Windows上氣泡提醒效果的方法,涉及Python針對(duì)windows窗口操作的相關(guān)技巧,需要的朋友可以參考下2015-06-06pycharm 使用心得(五)斷點(diǎn)調(diào)試
PyCharm 作為IDE,斷點(diǎn)調(diào)試是必須有的功能。否則,我們還真不如用純編輯器寫(xiě)的快。2014-06-06OpenCV實(shí)現(xiàn)從灰度圖像切出Mask前景區(qū)域
本文主要介紹了如何利用OpenCV實(shí)現(xiàn)從灰度圖像,根據(jù)閾值,切出多個(gè)前景區(qū)域,過(guò)濾面積太小的圖像。文中的示例代碼講解詳細(xì),需要的可以參考一下2022-06-06Python實(shí)現(xiàn)服務(wù)端渲染SSR的示例代碼
服務(wù)端渲染是一種常見(jiàn)的技術(shù)策略,特別是在需要改善網(wǎng)站的搜索引擎優(yōu)化(SEO)和首屏加載時(shí)間的場(chǎng)景下,本文將介紹如何利用?Python?實(shí)現(xiàn)?SSR,感興趣的可以了解下2024-02-02