Python使用PyShark分析網(wǎng)絡(luò)流量的腳本分享
作為一名網(wǎng)絡(luò)安全工程師,是常常需要自己編寫一些實(shí)用的腳本的。下面是一個(gè)使用 PyShark 庫編寫的網(wǎng)絡(luò)流量分析腳本,專為網(wǎng)絡(luò)安全工程師設(shè)計(jì)。這個(gè)腳本可以分析 HTTP 和 TCP 流量,并提取關(guān)鍵信息用于安全分析。
#!/usr/bin/env python3
"""
網(wǎng)絡(luò)流量分析工具 - 使用 PyShark 分析 pcap 文件
適用于網(wǎng)絡(luò)安全工程師進(jìn)行流量分析和安全審計(jì)
"""
import pyshark
import argparse
from collections import defaultdict
import json
import sys
class TrafficAnalyzer:
def __init__(self, pcap_file):
self.pcap_file = pcap_file
self.http_flows = defaultdict(list)
self.tcp_flows = defaultdict(list)
self.suspicious_activities = []
def analyze_http_traffic(self):
"""分析 HTTP 流量"""
print("[+] 分析 HTTP 流量...")
try:
# 創(chuàng)建顯示過濾器,只捕獲 HTTP 流量
cap = pyshark.FileCapture(self.pcap_file, display_filter='http')
for packet in cap:
try:
if hasattr(packet, 'http'):
http_layer = packet.http
ip_layer = packet.ip
# 提取 HTTP 請(qǐng)求基本信息
flow_key = f"{ip_layer.src}:{packet.tcp.srcport} -> {ip_layer.dst}:{packet.tcp.dstport}"
http_info = {
'timestamp': packet.sniff_time,
'method': getattr(http_layer, 'request_method', 'N/A'),
'uri': getattr(http_layer, 'request_uri', 'N/A'),
'host': getattr(http_layer, 'host', 'N/A'),
'user_agent': getattr(http_layer, 'user_agent', 'N/A'),
'status_code': getattr(http_layer, 'response_code', 'N/A'),
'content_type': getattr(http_layer, 'content_type', 'N/A'),
'length': packet.length
}
self.http_flows[flow_key].append(http_info)
# 檢測(cè)可疑的 HTTP 活動(dòng)
self._detect_suspicious_http(http_info, flow_key)
except AttributeError as e:
# 忽略沒有 HTTP 層的包
continue
except Exception as e:
print(f"[-] 分析 HTTP 流量時(shí)出錯(cuò): {e}")
def analyze_tcp_traffic(self):
"""分析 TCP 流量"""
print("[+] 分析 TCP 流量...")
try:
# 創(chuàng)建顯示過濾器,只捕獲 TCP 流量
cap = pyshark.FileCapture(self.pcap_file, display_filter='tcp')
for packet in cap:
try:
if hasattr(packet, 'tcp'):
tcp_layer = packet.tcp
ip_layer = packet.ip
# 提取 TCP 流基本信息
flow_key = f"{ip_layer.src}:{tcp_layer.srcport} -> {ip_layer.dst}:{tcp_layer.dstport}"
tcp_info = {
'timestamp': packet.sniff_time,
'flags': getattr(tcp_layer, 'flags', 'N/A'),
'seq': getattr(tcp_layer, 'seq', 'N/A'),
'ack': getattr(tcp_layer, 'ack', 'N/A'),
'window_size': getattr(tcp_layer, 'window_size_value', 'N/A'),
'length': packet.length
}
self.tcp_flows[flow_key].append(tcp_info)
# 檢測(cè)可疑的 TCP 活動(dòng)
self._detect_suspicious_tcp(tcp_info, flow_key)
except AttributeError as e:
# 忽略沒有 TCP 層的包
continue
except Exception as e:
print(f"[-] 分析 TCP 流量時(shí)出錯(cuò): {e}")
def _detect_suspicious_http(self, http_info, flow_key):
"""檢測(cè)可疑的 HTTP 活動(dòng)"""
# 檢測(cè)可能的目錄遍歷攻擊
if '..' in http_info['uri'] or '/etc/passwd' in http_info['uri']:
self.suspicious_activities.append({
'type': 'HTTP - 可能的目錄遍歷攻擊',
'flow': flow_key,
'details': http_info,
'timestamp': http_info['timestamp']
})
# 檢測(cè) SQL 注入特征
sql_injection_patterns = ['union select', 'select *', 'insert into', 'drop table', '1=1']
if any(pattern in http_info['uri'].lower() for pattern in sql_injection_patterns):
self.suspicious_activities.append({
'type': 'HTTP - 可能的 SQL 注入嘗試',
'flow': flow_key,
'details': http_info,
'timestamp': http_info['timestamp']
})
# 檢測(cè)可疑用戶代理
suspicious_user_agents = ['sqlmap', 'nmap', 'nessus', 'nikto', 'w3af']
if any(agent in http_info['user_agent'].lower() for agent in suspicious_user_agents):
self.suspicious_activities.append({
'type': 'HTTP - 可疑用戶代理',
'flow': flow_key,
'details': http_info,
'timestamp': http_info['timestamp']
})
def _detect_suspicious_tcp(self, tcp_info, flow_key):
"""檢測(cè)可疑的 TCP 活動(dòng)"""
# 檢測(cè)可能的端口掃描 (多個(gè) SYN 包而沒有完整握手)
if tcp_info['flags'] == '0x00000002': # SYN 標(biāo)志
self.suspicious_activities.append({
'type': 'TCP - 可能的端口掃描 (SYN)',
'flow': flow_key,
'details': tcp_info,
'timestamp': tcp_info['timestamp']
})
# 檢測(cè)可能的網(wǎng)絡(luò)偵察 (FIN 掃描)
if tcp_info['flags'] == '0x00000001': # FIN 標(biāo)志
self.suspicious_activities.append({
'type': 'TCP - 可能的 FIN 掃描',
'flow': flow_key,
'details': tcp_info,
'timestamp': tcp_info['timestamp']
})
def generate_report(self, output_file=None):
"""生成分析報(bào)告"""
report = {
'http_flows': dict(self.http_flows),
'tcp_flows': dict(self.tcp_flows),
'suspicious_activities': self.suspicious_activities,
'summary': {
'total_http_flows': len(self.http_flows),
'total_tcp_flows': len(self.tcp_flows),
'total_suspicious_activities': len(self.suspicious_activities)
}
}
if output_file:
with open(output_file, 'w') as f:
json.dump(report, f, indent=4, default=str)
print(f"[+] 報(bào)告已保存到: {output_file}")
else:
print(json.dumps(report, indent=4, default=str))
return report
def main():
parser = argparse.ArgumentParser(description='網(wǎng)絡(luò)流量分析工具')
parser.add_argument('-f', '--file', required=True, help='PCAP 文件路徑')
parser.add_argument('-o', '--output', help='輸出報(bào)告文件路徑 (JSON 格式)')
parser.add_argument('--http', action='store_true', help='只分析 HTTP 流量')
parser.add_argument('--tcp', action='store_true', help='只分析 TCP 流量')
args = parser.parse_args()
if not args.http and not args.tcp:
# 如果沒有指定協(xié)議,默認(rèn)分析所有
args.http = True
args.tcp = True
analyzer = TrafficAnalyzer(args.file)
if args.http:
analyzer.analyze_http_traffic()
if args.tcp:
analyzer.analyze_tcp_traffic()
report = analyzer.generate_report(args.output)
# 打印簡(jiǎn)要摘要
print("\n[+] 分析完成!")
print(f" - HTTP 流數(shù)量: {report['summary']['total_http_flows']}")
print(f" - TCP 流數(shù)量: {report['summary']['total_tcp_flows']}")
print(f" - 可疑活動(dòng)數(shù)量: {report['summary']['total_suspicious_activities']}")
if report['summary']['total_suspicious_activities'] > 0:
print("\n[!] 檢測(cè)到可疑活動(dòng):")
for activity in report['suspicious_activities']:
print(f" - {activity['type']} ({activity['flow']})")
if __name__ == '__main__':
main()
使用示例
安裝依賴:
pip install pyshark
運(yùn)行腳本:
# 分析所有流量 python traffic_analyzer.py -f capture.pcap # 只分析 HTTP 流量 python traffic_analyzer.py -f capture.pcap --http # 只分析 TCP 流量 python traffic_analyzer.py -f capture.pcap --tcp # 保存報(bào)告到文件 python traffic_analyzer.py -f capture.pcap -o report.json
(.venv) (base) liuxiaowei@localhost 內(nèi)網(wǎng)滲透 % python pyshark分析流量.py -f /Users/liuxiaowei/attacker_2025/cap1.pcap --http
[+] 分析 HTTP 流量...
{
"http_flows": {
"192.168.1.61:65094 -> 140.207.56.109:80": [
{
"timestamp": "2025-08-21 07:10:13.445231",
"method": "POST",
"uri": "/mmtls/004d48a0",
"host": "extshort.weixin.qq.com",
"user_agent": "MicroMessenger Client",
"status_code": "N/A",
"content_type": "application/octet-stream",
"length": "1003"
}
],
"140.207.56.109:80 -> 192.168.1.61:65094": [
{
"timestamp": "2025-08-21 07:10:13.602393",
"method": "N/A",
"uri": "N/A",
"host": "N/A",
"user_agent": "N/A",
"status_code": "200",
"content_type": "application/octet-stream",
"length": "1236"
}
],
"192.168.1.61:65095 -> 140.207.56.109:80": [
{
"timestamp": "2025-08-21 07:10:13.634277",
"method": "POST",
"uri": "/mmtls/004d48a0",
"host": "extshort.weixin.qq.com",
"user_agent": "MicroMessenger Client",
"status_code": "N/A",
"content_type": "application/octet-stream",
"length": "745"
}
],
"140.207.56.109:80 -> 192.168.1.61:65095": [
{
"timestamp": "2025-08-21 07:10:13.749410",
"method": "N/A",
"uri": "N/A",
"host": "N/A",
"user_agent": "N/A",
"status_code": "200",
"content_type": "application/octet-stream",
"length": "1437"
}
],
"192.168.1.61:65096 -> 140.207.56.109:80": [
{
"timestamp": "2025-08-21 07:10:16.744114",
"method": "POST",
"uri": "/mmtls/004e0d95",
"host": "extshort.weixin.qq.com",
"user_agent": "MicroMessenger Client",
"status_code": "N/A",
"content_type": "application/octet-stream",
"length": "785"
}
],
"140.207.56.109:80 -> 192.168.1.61:65096": [
{
"timestamp": "2025-08-21 07:10:16.805098",
"method": "N/A",
"uri": "N/A",
"host": "N/A",
"user_agent": "N/A",
"status_code": "200",
"content_type": "application/octet-stream",
"length": "401"
}
]
},
"tcp_flows": {},
"suspicious_activities": [],
"summary": {
"total_http_flows": 6,
"total_tcp_flows": 0,
"total_suspicious_activities": 0
}
}
[+] 分析完成!
- HTTP 流數(shù)量: 6
- TCP 流數(shù)量: 0
- 可疑活動(dòng)數(shù)量: 0
功能說明
HTTP 流量分析:
- 提取請(qǐng)求方法、URI、主機(jī)、用戶代理等信息
- 檢測(cè)目錄遍歷攻擊
- 識(shí)別 SQL 注入嘗試
- 發(fā)現(xiàn)可疑用戶代理(如掃描工具)
TCP 流量分析:
- 分析 TCP 標(biāo)志位、序列號(hào)、確認(rèn)號(hào)等
- 檢測(cè)端口掃描活動(dòng)(SYN 掃描)
- 識(shí)別網(wǎng)絡(luò)偵察活動(dòng)(FIN 掃描)
安全檢測(cè):
- 自動(dòng)識(shí)別多種常見攻擊模式
- 生成詳細(xì)的安全報(bào)告
- 輸出 JSON 格式的報(bào)告便于進(jìn)一步處理
注意事項(xiàng)
- 確保已安裝 Wireshark/tshark,因?yàn)?PyShark 依賴于這些工具
- 處理大型 pcap 文件可能需要較長(zhǎng)時(shí)間和大量?jī)?nèi)存
- 腳本中的檢測(cè)規(guī)則是基礎(chǔ)示例,實(shí)際環(huán)境中可能需要根據(jù)具體需求調(diào)整和擴(kuò)展
這個(gè)腳本為網(wǎng)絡(luò)安全工程師提供了一個(gè)起點(diǎn),可以根據(jù)實(shí)際需求進(jìn)一步擴(kuò)展功能,如添加更多協(xié)議的解析、實(shí)現(xiàn)更復(fù)雜的安全檢測(cè)規(guī)則等。
到此這篇關(guān)于Python使用PyShark分析網(wǎng)絡(luò)流量的腳本分享的文章就介紹到這了,更多相關(guān)Python網(wǎng)絡(luò)流量分析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pygame實(shí)現(xiàn)俄羅斯方塊游戲(基礎(chǔ)篇3)
這篇文章主要介紹了pygame實(shí)現(xiàn)俄羅斯方塊游戲基礎(chǔ)的第3篇,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-10-10
PyQt5使用mimeData實(shí)現(xiàn)拖拽事件教程示例解析上
這篇文章主要為大家介紹了PyQt中如何使用mimeData實(shí)現(xiàn)拖拽事件的示例解析過程,有需要的朋友可以借鑒參考下希望能夠有所幫助,祝大家多多進(jìn)步2021-10-10
Python實(shí)現(xiàn)線程狀態(tài)監(jiān)測(cè)簡(jiǎn)單示例
這篇文章主要介紹了Python實(shí)現(xiàn)線程狀態(tài)監(jiān)測(cè),結(jié)合簡(jiǎn)單實(shí)例形式分析了Python線程start啟動(dòng)、sleep推遲運(yùn)行、isAlive判斷等方法使用技巧,需要的朋友可以參考下2018-03-03
Tensorflow tf.dynamic_partition矩陣拆分示例(Python3)
今天小編就為大家分享一篇Tensorflow tf.dynamic_partition矩陣拆分示例(Python3) ,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-02-02
python 實(shí)現(xiàn)關(guān)聯(lián)規(guī)則算法Apriori的示例
這篇文章主要介紹了python 實(shí)現(xiàn)關(guān)聯(lián)規(guī)則算法Apriori的示例,幫助大家更好的理解和學(xué)習(xí)python,感興趣的朋友可以了解下2020-09-09
Python Django簡(jiǎn)單實(shí)現(xiàn)session登錄注銷過程詳解
這篇文章主要介紹了Python Django簡(jiǎn)單實(shí)現(xiàn)session登錄注銷過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08

