python實(shí)現(xiàn)skywalking的trace模塊過(guò)濾和報(bào)警(實(shí)例代碼)
??skywalking本身的報(bào)警功能,用起來(lái)視乎不是特別好用,目前想實(shí)現(xiàn)對(duì)skywalking的trace中的錯(cuò)誤接口進(jìn)行過(guò)濾并報(bào)警通知管理員和開(kāi)發(fā)。所以自己就用python對(duì)skywalking做了二次數(shù)據(jù)清洗實(shí)現(xiàn)。項(xiàng)目方在了自己了github(https://github.com/shygit-dev/skywalking-cli-python)上了,有興趣的同學(xué)可以做二次改造,共同學(xué)習(xí)。下面簡(jiǎn)單列出了代碼內(nèi)容:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
import smtplib
from email.mime.text import MIMEText
import re
def interface_content_filter(trace_id):
'''
對(duì)詳細(xì)日志內(nèi)容(業(yè)務(wù)邏輯報(bào)錯(cuò))進(jìn)行過(guò)濾
:param trace_id:
:return: 【1|0】
'''
url = "http://172.16.53.232:50001/query"
params = {
"trace_id": trace_id
}
detail_trace_id_log = requests.request(method="GET",url=url,params=params)
detail_trace_id_log = detail_trace_id_log.text
print(detail_trace_id_log)
print(type(detail_trace_id_log))
with open("blackname_keyword_list","r",encoding="utf-8") as f:
for line in f:
print(line)
result = re.search(line.strip(),detail_trace_id_log)
print(result)
if result != None:
print("哥們匹配到日志黑名單關(guān)鍵字了:%s" % line)
return 0
print("提示:%s不在關(guān)鍵字黑名單中" % trace_id)
return 1
def interface_filter(endpointName):
"""
設(shè)置接口黑名單
:param endpointName:
:return: 【1|0】
"""
endpointName = re.sub("\(|\)",".",endpointName)
with open("blackname_list","r",encoding="utf-8") as f:
bn_list = f.read()
match_result = re.search(endpointName.strip(),bn_list)
if match_result == None:
print("提示:接口不存在黑名單中")
return 1
print("提示:接口在黑名單中")
return 0
def trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr):
"""
skywalking trace功能對(duì)錯(cuò)誤接口進(jìn)行過(guò)濾,默認(rèn)最大一次獲取2000條數(shù)據(jù),每分鐘執(zhí)行一次
:param start_time:
:param end_time:
:return:
"""
url = sw_url
data = {
"query": "query queryTraces($condition: TraceQueryCondition) {\n data: queryBasicTraces(condition: $condition) {\n traces {\n key: segmentId\n endpointNames\n duration\n start\n isError\n traceIds\n }\n total\n }}",
"variables": {
"condition": {
"queryDuration": {
"start": start_time, #"2021-12-07 1734"
"end": end_time,
"step": "MINUTE"
},
"traceState": "ERROR",
"paging": {
"pageNum": 1,
"pageSize": per_page_size,
"needTotal": "true"
},
"queryOrder": "BY_START_TIME"
# "traceId": "b669d0069be84fce82261901de412e7c.430.16388637511348105"
}
}
}
result = requests.request(method="post",url=url,json=data)
i = 0
# print(result.content)
# print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))))
with open("mail.html","w",encoding="utf-8") as f:
f.write('<head><meta charset="UTF-8"><title>Title</title><style>.t {border-right: 2px solid black;border-bottom: 2px solid black;}.t th,td {border-top: 2px solid black;border-left: 2px solid black;font-size: 10px;}</style></head><body><div style="color:red;font-size=15px;">最近15分鐘統(tǒng)計(jì):</div><table class="t" border="0" cellspacing="0" cellpadding="10px"><thead><tr style="<th style="width: 100px;">時(shí)間</th><th>持續(xù)時(shí)長(zhǎng)</th><th>接口名稱</th><th>追蹤ID</th></tr></thead><tbody>')
for trace in result.json()["data"]["data"]["traces"]:
# print(trace["endpointNames"])
print("時(shí)間:%s\n" % time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:])))),
"持續(xù)時(shí)長(zhǎng):%s\n" % trace["duration"],
"接口名稱:%s\n" % trace["endpointNames"][0],
"跟蹤ID:%s" % trace["traceIds"][0])
# print(time.localtime(1638869640.194))
i+=1
print(i)
s_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(float("%s.%s" % (trace["start"][0:10],trace["start"][10:]))))
dur_time = trace["duration"]
endpointName = trace["endpointNames"][0]
trace_id = trace["traceIds"][0]
# 調(diào)用接口黑名單過(guò)濾功能
result = interface_filter(endpointName)
if result == 0:
print("哥們進(jìn)入黑名單了!",endpointName)
continue
# 調(diào)用關(guān)鍵字黑名單過(guò)濾功能
keyword_result = interface_content_filter(trace_id)
if keyword_result == 0:
print("哥們進(jìn)入關(guān)鍵字黑名單了!", trace_id)
continue
with open("mail.html","a",encoding="utf-8") as f:
f.write('<tr><td>%s</td><td>%s</td><td>%s</td><td><a href="http://%s/query?trace_id=%s" rel="external nofollow" >%s</a></td></tr>' %(s_time,dur_time,endpointName,trace_detail_addr,trace_id,trace_id))
with open("mail.html","a",encoding="utf-8") as f:
f.write('</tbody></table></body>')
def send_mail(receiver):
"""
發(fā)送報(bào)錯(cuò)接口郵件
:return:
"""
server = "mail.test.com"
sender = "sa@test.com"
sender_pwd = "1qaz@WSX"
send_addr = "sa@test.com"
receiver = receiver
with open("mail.html","r",encoding="utf-8") as f:
content = f.read()
if re.search("<td>",content) == None:
print("無(wú)報(bào)錯(cuò)接口!",content)
return 0
print("郵件前",content)
msg_mail = MIMEText(content,"html","utf-8")
msg_mail["Subject"] = "Skywalking報(bào)錯(cuò)接口統(tǒng)計(jì)"
msg_mail["From"] = sender
msg_mail["To"] = receiver
server_obj = smtplib.SMTP_SSL(server)
server_obj.connect(server,465)
server_obj.login(sender,sender_pwd)
server_obj.sendmail(send_addr,receiver,msg_mail.as_string())
if __name__ == "__main__":
# 設(shè)定查詢時(shí)間間隔,默認(rèn)900s(15min)
end_time = time.time()
start_time = end_time - 900
start_time=time.strftime("%Y-%m-%d %H%M",time.localtime(start_time))
end_time = time.strftime("%Y-%m-%d %H%M", time.localtime(end_time))
print(start_time)
print(end_time)
sw_url = "http://172.16.53.232:9412/graphql" # skywalking的前端服務(wù)的地址和端口
per_page_size = 5000 #指定一次獲取endpoint接口的數(shù)目
trace_detail_addr = "127.0.0.1:5000" #指定查詢指定trace_id詳細(xì)日志
receiver = "shy@test.com" #報(bào)警郵件接收人地址
trace_erro_interface(start_time,end_time,sw_url,per_page_size,trace_detail_addr)
send_mail(receiver)
# interface_filter()
# interface_content_filter("3c4212dd2dd548d394ba312c4619405d.104.16390380592724487")
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Tile:
# Author:shy
import requests
import time
from flask import Flask,request
app = Flask(__name__)
@app.route("/query",methods=["get"])
def trace_id_query():
"""
查詢指定trace_id詳細(xì)日志信息
:return: f.read()
"""
trace_id = request.args.get("trace_id")
url="http://172.16.53.232:9412/graphql"
# url="http://skywalking.roulw.com/graphql"
data = {
"query": "query queryTrace($traceId: ID!) {\n trace: queryTrace(traceId: $traceId) {\n spans {\n traceId\n segmentId\n spanId\n parentSpanId\n refs {\n traceId\n parentSegmentId\n parentSpanId\n type\n }\n serviceCode\n serviceInstanceName\n startTime\n endTime\n endpointName\n type\n peer\n component\n isError\n layer\n tags {\n key\n value\n }\n logs {\n time\n data {\n key\n value\n }\n }\n }\n }\n }",
"variables": {
"traceId": trace_id
}
}
result = requests.request(method="post",url=url,json=data)
with open("detail_log", "w", encoding="utf-8") as f:
f.write("<div style='color: red;font-size: 30px;'>生產(chǎn)Skywalking報(bào)錯(cuò)接口跟蹤日志日志:<br /></div>")
for trace_id in result.json()["data"]["trace"]["spans"]:
if trace_id["isError"]:
# print(trace_id)
print("服務(wù)名稱:%s\n" % trace_id["serviceCode"],
"開(kāi)始時(shí)間:%s\n" % trace_id["startTime"],
"接口名稱:%s\n" % trace_id["endpointName"],
"peer名稱:%s\n" % trace_id["peer"],
"tags名稱:%s\n" % trace_id["tags"],
"詳細(xì)日志:%s" % trace_id["logs"])
content = "服務(wù)名稱:%s<br />開(kāi)始時(shí)間:%s<br />接口名稱:%s<br />peer名稱:%s<br />tags名稱:%s" % (trace_id["serviceCode"],trace_id["startTime"],trace_id["endpointName"],trace_id["peer"],trace_id["tags"])
with open("detail_log","a",encoding="utf-8") as f:
f.write(content)
f.write("<br />********詳細(xì)日志**********<br />")
for logs in trace_id["logs"]:
for log in logs["data"]:
if log["key"] == "message":
print(log["value"])
with open("detail_log","a",encoding="utf-8") as f:
f.write(log["value"])
# return log["value"]
elif log["key"] == "stack":
print(log["value"])
with open("detail_log","a",encoding="utf-8") as f:
f.write(log["value"])
with open("detail_log", "a", encoding="utf-8") as f:
f.write("<div style='color: red;font-size: 20px;'><br />========下一個(gè)接口信息=========<br /></div>")
with open("detail_log","r",encoding="utf-8") as f:
return f.read()
if __name__ == "__main__":
# trace_id = "14447ae7199c40a2b9862411daba180b.2142.16388920322367785"
# trace_id_query(trace_id)
app.run()
到此這篇關(guān)于python實(shí)現(xiàn)skywalking的trace模塊過(guò)濾和報(bào)警的文章就介紹到這了,更多相關(guān)python trace模塊過(guò)濾和報(bào)警內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
TensorFlow自定義損失函數(shù)來(lái)預(yù)測(cè)商品銷(xiāo)售量
這篇文章主要介紹了TensorFlow自定義損失函數(shù)——預(yù)測(cè)商品銷(xiāo)售量,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02
Python巧用SnowNLP實(shí)現(xiàn)生成srt字幕文件
SnowNLP是一個(gè)可以方便的處理中文文本內(nèi)容的python類庫(kù),本文主要為大家詳細(xì)介紹了Python如何巧用SnowNLP實(shí)現(xiàn)將一段話一鍵生成srt字幕文件,感興趣的可以了解下2024-01-01
pandas使用fillna函數(shù)填充N(xiāo)aN值的代碼實(shí)例
最近在工作中遇到一個(gè)問(wèn)題,pandas讀取的數(shù)據(jù)中nan在保存后變成空字符串,所以下面這篇文章主要給大家介紹了關(guān)于pandas使用fillna函數(shù)填充N(xiāo)aN值的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07
Python容錯(cuò)的前綴樹(shù)實(shí)現(xiàn)中文糾錯(cuò)
本文使用 Python 實(shí)現(xiàn)了前綴樹(shù),并且支持編輯距離容錯(cuò)的查詢。文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07
使用?OpenCV?開(kāi)發(fā)虛擬鍵盤(pán)的方法
OpenCV是一個(gè)強(qiáng)大的圖像處理工具,用于機(jī)器學(xué)習(xí)、圖像處理等的跨平臺(tái)開(kāi)源庫(kù),用于開(kāi)發(fā)實(shí)時(shí)計(jì)算機(jī)視覺(jué)應(yīng)用程序,本文重點(diǎn)給大家介紹使用?OpenCV?開(kāi)發(fā)虛擬鍵盤(pán)的方法,感興趣的朋友一起看看吧2021-11-11
使用 Python 處理 JSON 格式的數(shù)據(jù)
JSON 是一個(gè)很好的選擇。如果你對(duì) Python 有所了解,就更加事半功倍了。下面就來(lái)介紹一下如何使用 Python 處理 JSON 數(shù)據(jù)。感興趣的朋友跟隨小編一起看看吧2019-07-07
利用python解決mysql視圖導(dǎo)入導(dǎo)出依賴的問(wèn)題
這篇文章主要給大家介紹了關(guān)于利用python解決mysql視圖導(dǎo)入導(dǎo)出依賴的問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用python具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12
Django REST Swagger實(shí)現(xiàn)指定api參數(shù)
這篇文章主要介紹了Django REST Swagger實(shí)現(xiàn)指定api參數(shù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-07-07

