Python實現(xiàn)數(shù)據(jù)庫表的監(jiān)控警告的項目實踐
簡介
使用Python 實現(xiàn)對數(shù)據(jù)庫表的監(jiān)控告警功能, 并將告警信息通過釘釘機器人發(fā)送到釘釘群
實現(xiàn)DataWorks中數(shù)據(jù)質(zhì)量的基本功能, 當然 DW的數(shù)據(jù)質(zhì)量的規(guī)則類型很多, 用起來比較方便, 這里只簡單實現(xiàn)了其中兩個規(guī)則類型的功能, 僅供參考;
初次使用Python, 請多指教
使用工具: MaxCompute
1. 創(chuàng)建表
1. tmp_monitor_tbl_info
CREATE TABLE IF NOT EXISTS puture_bigdata.tmp_monitor_tbl_info (
`id` STRING COMMENT '表編號id'
, `tbl_name` STRING COMMENT '表名'
, `pt_format` STRING COMMENT '分區(qū)格式: yyyy-MM-dd,yyyyMMdd 等'
, `val_type` STRING COMMENT '值類型: 表行數(shù),周期值等'
, `monitor_flag` int COMMENT '監(jiān)控標識: 0:不監(jiān)控, 1:監(jiān)控;'
, `rule_code` int COMMENT '規(guī)則編碼: 1:表行數(shù),上周期差值, 2:表行數(shù),固定值 等'
, `rule_type` STRING COMMENT '規(guī)則類型: 表行數(shù),上周期差值; 表行數(shù),固定值; 與固定值比較 等'
, `expect_val` int COMMENT '期望值'
, `tbl_sort_code` int COMMENT '表類型編碼: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數(shù)據(jù) 等'
, `tbl_sort_name` STRING COMMENT '表類型名字: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數(shù)據(jù) 等'
, `pt_num` INT COMMENT '分區(qū)日期差值'
) COMMENT '數(shù)據(jù)監(jiān)控表信息'
tblproperties ("transactional"="true")
;
-- 插入數(shù)據(jù)
INSERT INTO TABLE puture_bigdata_dev.tmp_monitor_tbl_info
SELECT * FROM (
VALUES (1 , 'ods_amazon_amz_customer_returns_df', 'yyyyMMdd', '表行數(shù)', 1, 1, '表行數(shù),上周期差值', 0, 1, '亞馬遜' , -1)
, (2 , 'ods_amazon_amz_flat_file_all_orders_df', 'yyyyMMdd', '表行數(shù)', 1, 1, '表行數(shù),上周期差值', 0, 1, '亞馬遜' , -1)
, (3 , 'dim_sys_salesman_info_df', 'yyyyMMdd', '表行數(shù)', 1, 1, '表行數(shù),上周期差值', 0, 0, '其它' , -1)
) AS table_name(id, tbl_name, pt_format, val_type, monitor_flag, rule_code, rule_type, expect_val, tbl_sort_code, tbl_sort_name, pt_num) ;
2. tmp_monitor_tbl_info_log_di
CREATE TABLE IF NOT EXISTS puture_bigdata_dev.tmp_monitor_tbl_info_log_di (
`id` STRING COMMENT '監(jiān)控id編碼:md5(表名_分區(qū))_小時'
, `tbl_name` STRING COMMENT '表名'
, `stat_time` STRING COMMENT '統(tǒng)計時間'
, `pt_format` STRING COMMENT '分區(qū)格式: yyyy-MM-dd,yyyyMMdd 等'
, `stat_pt` STRING COMMENT '統(tǒng)計分區(qū)'
, `val_type` STRING COMMENT '值類型: 表行數(shù),周期值等'
, `val` int COMMENT '統(tǒng)計值'
, `rule_code` int COMMENT '規(guī)則編碼: 1:表行數(shù),上周期差值, 2:表行數(shù),固定值 等'
, `rule_type` STRING COMMENT '規(guī)則類型: 表行數(shù),上周期差值; 表行數(shù),固定值; 與固定值比較 等'
, `expect_val` int COMMENT '期望值'
, `is_exc` int COMMENT '是否異常: 0:否,1:是,默認值0'
, `tbl_sort_code` int COMMENT '表類型編碼: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數(shù)據(jù) 等'
, `tbl_sort_name` STRING COMMENT '表類型名字: 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數(shù)據(jù) 等'
) COMMENT '數(shù)據(jù)監(jiān)控信息記錄表'
PARTITIONED BY (pt STRING COMMENT '數(shù)據(jù)日期, yyyy-MM-dd') ;
2. 程序開發(fā)
1. 數(shù)據(jù)檢查程序
'''PyODPS 3
請確保不要使用從 MaxCompute下載數(shù)據(jù)來處理。下載數(shù)據(jù)操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。
推薦使用 PyODPS DataFrame(從 MaxCompute 表創(chuàng)建)和MaxCompute SQL來處理數(shù)據(jù)。
更詳細的內(nèi)容可以參考:https://help.aliyun.com/document_detail/90481.html
'''
import os
from odps import ODPS, DataFrame
from datetime import datetime, timedelta
from dateutil import parser
options.tunnel.use_instance_tunnel = True
# 獲取當前時間
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now_time)
pt = args['date']
print(pt)
date = datetime.strptime(pt, "%Y-%m-%d")
# 監(jiān)控表列表 tbl_sort_code -> 0:其它(維表類), 1:亞馬遜, 2:中小平臺, 3:市場數(shù)據(jù)
sql_tbl_info = """
SELECT * FROM puture_bigdata.tmp_monitor_tbl_info
WHERE monitor_flag = 1 AND tbl_sort_code = 3
"""
# 結(jié)果表
res_tbl_name = "puture_bigdata.tmp_monitor_tbl_info_log_di"
# 統(tǒng)計sql代碼 -- 表行數(shù),上周期差值
def sql_upper_period_diff():
sql = f"""
set odps.sql.hive.compatible=true ;
INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
SELECT
a.id
, a.tbl_name
, a.stat_time
, a.pt_format
, a.stat_pt
, a.val_type
, a.val
, a.rule_code
, a.rule_type
, a.expect_val
, IF (a.val = 0, 1, (IF ((a.val - NVL(b.val,0)) >= {expect_val}, 0, 1 ))) AS is_exc
, a.tbl_sort_code
, a.tbl_sort_name
FROM (
SELECT
concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
, '{tbl_name}' AS tbl_name
, '{now_time}' AS stat_time
, '{pt_format}' AS pt_format
, date_format('{date_str}' ,'{pt_format}') AS stat_pt
, '{val_type}' AS val_type
, COUNT(1) AS val
, '{rule_code}' AS rule_code
, '{rule_type}' AS rule_type
, {expect_val} AS expect_val
, {tbl_sort_code} AS tbl_sort_code
, '{tbl_sort_name}' AS tbl_sort_name
FROM puture_bigdata.{tbl_name}
WHERE pt = date_format('{date_str}' ,'{pt_format}')
) a
LEFT JOIN
(
SELECT tbl_name, val FROM (
SELECT tbl_name, val
, ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn
FROM {res_tbl_name}
WHERE pt = DATE_ADD('{date_str}', -1)
) WHERE rn = 1
) b
ON a.tbl_name = b.tbl_name
;
"""
return sql
# 表行數(shù), 固定值
def sql_line_fixed_val():
sql = f"""
set odps.sql.hive.compatible=true ;
INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
SELECT
concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
, '{tbl_name}' AS tbl_name
, '{now_time}' AS stat_time
, '{pt_format}' AS pt_format
, date_format('{date_str}' ,'{pt_format}') AS stat_pt
, '{val_type}' AS val_type
, COUNT(1) AS val
, '{rule_code}' AS rule_code
, '{rule_type}' AS rule_type
, {expect_val} AS expect_val
, IF (COUNT(1) >= {expect_val}, 0, 1 ) AS is_exc
, {tbl_sort_code} AS tbl_sort_code
, '{tbl_sort_name}' AS tbl_sort_name
FROM puture_bigdata.{tbl_name}
WHERE pt = date_format('{date_str}' ,'{pt_format}') ;
"""
return sql
# 執(zhí)行監(jiān)控統(tǒng)計代碼
def ex_monitor(sql: str):
try :
# print (sql)
o.execute_sql(sql, hints={'odps.sql.hive.compatible': True , "odps.sql.submit.mode":"script"})
print("{}: 運行成功".format(tbl_name) )
except Exception as e:
print('{}: 運行異常 ======> '.format(tbl_name) + str(e))
if __name__ == '__main__':
try :
with o.execute_sql(sql_tbl_info, hints={'odps.sql.hive.compatible': True}).open_reader() as reader:
for row_record in reader:
# print(row_record) # 打印一條數(shù)據(jù)值
tbl_name = row_record.tbl_name
pt_format = row_record.pt_format
val_type = row_record.val_type
monitor_flag = row_record.monitor_flag
rule_code = row_record.rule_code
rule_type = row_record.rule_type
expect_val = row_record.expect_val
tbl_sort_code = row_record.tbl_sort_code
tbl_sort_name = row_record.tbl_sort_name
pt_num = row_record.pt_num
date_str = (date + timedelta(days=pt_num)).strftime('%Y-%m-%d')
if rule_code == 1 :
ex_monitor(sql_upper_period_diff())
elif rule_code == 2 :
ex_monitor(sql_line_fixed_val())
else :
print("未知規(guī)則!!!")
except Exception as e:
print('異常 ======> ' + str(e))
2. 告警信息推送程序
'''PyODPS 3
請確保不要使用從 MaxCompute下載數(shù)據(jù)來處理。下載數(shù)據(jù)操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。
推薦使用 PyODPS DataFrame(從 MaxCompute 表創(chuàng)建)和MaxCompute SQL來處理數(shù)據(jù)。
更詳細的內(nèi)容可以參考:https://help.aliyun.com/document_detail/90481.html
'''
import json
import requests
from datetime import datetime
import os
from odps import ODPS, DataFrame
date_str = args['date']
# 接口地址和token信息
url = 'https://oapi.dingtalk.com/robot/send?access_token=***********************'
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print (now_time)
sql_query = f"""
SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc
FROM (
SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc
, ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC) AS rn
FROM puture_bigdata_dev.tmp_monitor_tbl_info_log_di
WHERE pt = '{date_str}'
AND tbl_sort_code = 1 -- 表種類
) a
WHERE rn = 1 AND is_exc = 1
"""
# 釘釘機器人,發(fā)送消息
def dd_robot(url:str, content: str):
HEADERS = {"Content-Type": "application/json;charset=utf-8"}
#content里面要設置關鍵字
data_info = {
"msgtype": "text",
"text": {
"content": content
},
"isAtAll": False
#這是配置需要@的人
# ,"at": {"atMobiles": ["15xxxxxx06",'18xxxxxx1']}
}
value = json.dumps(data_info)
response = requests.post(url,data=value,headers=HEADERS)
if response.json()['errmsg']!='ok':
print(response.text)
# 主函數(shù)
if __name__ == '__main__': # py3可以省略
try :
with o.execute_sql(sql_query, hints={'odps.sql.hive.compatible': True}).open_reader() as reader:
result_rows = list(reader) # 讀取所有的結(jié)果行
result_count = len(result_rows) # 獲取結(jié)果條數(shù)
#print("結(jié)果條數(shù):", result_count) # 打印結(jié)果條數(shù)
if result_count > 0 :
for row in result_rows:
tbl_name = row.tbl_name
stat_time = row.stat_time
stat_pt = row.stat_pt
val_type = row.val_type
val = row.val
rule_type = row.rule_type
expect_val = row.expect_val
#print (tbl_name)
content = "數(shù)據(jù)質(zhì)量(DQC)校驗告警 \n "
content = content + "【對象名稱】:" + tbl_name + " \n "
content = content + "【實際分區(qū)】:pt=" + stat_pt + " \n "
content = content + "【觸發(fā)規(guī)則】: " + rule_type + " | 當前樣本值: " + val + " | 閾值: " + expect_val + " \n "
content = content + now_time + " \n "
dd_robot(url, content)
else :
print ("無異常情況;")
except Exception as e:
print ('異常 ========>' + str(e) )
3. 告警樣例
數(shù)據(jù)質(zhì)量(DQC)校驗告警
【對象名稱】:dws_amazon_market_sales_stat_di
【實際分區(qū)】:pt=20240103
【觸發(fā)規(guī)則】: 表行數(shù),固定值 | 當前樣本值: 617 | 閾值: 650
2024-01-04 02:54:44
到此這篇關于Python實現(xiàn)數(shù)據(jù)庫表的監(jiān)控警告的項目實踐的文章就介紹到這了,更多相關Python 數(shù)據(jù)庫表監(jiān)控警告內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
在Python中字典根據(jù)多項規(guī)則排序的方法
今天小編就為大家分享一篇在Python中字典根據(jù)多項規(guī)則排序的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01
Python實現(xiàn)Windows上氣泡提醒效果的方法
這篇文章主要介紹了Python實現(xiàn)Windows上氣泡提醒效果的方法,涉及Python針對windows窗口操作的相關技巧,需要的朋友可以參考下2015-06-06
OpenCV實現(xiàn)從灰度圖像切出Mask前景區(qū)域
本文主要介紹了如何利用OpenCV實現(xiàn)從灰度圖像,根據(jù)閾值,切出多個前景區(qū)域,過濾面積太小的圖像。文中的示例代碼講解詳細,需要的可以參考一下2022-06-06

