python構(gòu)造icmp echo請求和實現(xiàn)網(wǎng)絡(luò)探測器功能代碼分享
python發(fā)送icmp echo requesy請求
import socket
import struct
def checksum(source_string):
sum = 0
countTo = (len(source_string)/2)*2
count = 0
while count<countTo:
thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
sum = sum + thisVal
sum = sum & 0xffffffff
count = count + 2
if countTo<len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def ping(ip):
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
packet = struct.pack(
"!BBHHH", 8, 0, 0, 0, 0
)
chksum=checksum(packet)
packet = struct.pack(
"!BBHHH", 8, 0, chksum, 0, 0
)
s.sendto(packet, (ip, 1))
if __name__=='__main__':
ping('192.168.41.56')
掃描探測網(wǎng)絡(luò)功能(網(wǎng)絡(luò)探測器)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
探測網(wǎng)絡(luò)主機存活。
'''
import os
import struct
import array
import time
import socket
import IPy
import threading
class SendPingThr(threading.Thread):
'''
發(fā)送ICMP請求報文的線程。
參數(shù):
ipPool -- 可迭代的IP地址池
icmpPacket -- 構(gòu)造的icmp報文
icmpSocket -- icmp套字接
timeout -- 設(shè)置發(fā)送超時
'''
def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
threading.Thread.__init__(self)
self.Sock = icmpSocket
self.ipPool = ipPool
self.packet = icmpPacket
self.timeout = timeout
self.Sock.settimeout( timeout + 3 )
def run(self):
time.sleep(0.01) #等待接收線程啟動
for ip in self.ipPool:
try:
self.Sock.sendto(self.packet, (ip, 0))
except socket.timeout:
break
time.sleep(self.timeout)
class Nscan:
'''
參數(shù):
timeout -- Socket超時,默認3秒
IPv6 -- 是否是IPv6,默認為False
'''
def __init__(self, timeout=3, IPv6=False):
self.timeout = timeout
self.IPv6 = IPv6
self.__data = struct.pack('d', time.time()) #用于ICMP報文的負荷字節(jié)(8bit)
self.__id = os.getpid() #構(gòu)造ICMP報文的ID字段,無實際意義
@property #屬性裝飾器
def __icmpSocket(self):
'''創(chuàng)建ICMP Socket'''
if not self.IPv6:
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
else:
Sock = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.getprotobyname("ipv6-icmp"))
return Sock
def __inCksum(self, packet):
'''ICMP 報文效驗和計算方法'''
if len(packet) & 1:
packet = packet + '\0'
words = array.array('h', packet)
sum = 0
for word in words:
sum += (word & 0xffff)
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
return (~sum) & 0xffff
@property
def __icmpPacket(self):
'''構(gòu)造 ICMP 報文'''
if not self.IPv6:
header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0) # TYPE、CODE、CHKSUM、ID、SEQ
else:
header = struct.pack('BbHHh', 128, 0, 0, self.__id, 0)
packet = header + self.__data # packet without checksum
chkSum = self.__inCksum(packet) # make checksum
if not self.IPv6:
header = struct.pack('bbHHh', 8, 0, chkSum, self.__id, 0)
else:
header = struct.pack('BbHHh', 128, 0, chkSum, self.__id, 0)
return header + self.__data # packet *with* checksum
def isUnIP(self, IP):
'''判斷IP是否是一個合法的單播地址'''
IP = [int(x) for x in IP.split('.') if x.isdigit()]
if len(IP) == 4:
if (0 < IP[0] < 223 and IP[0] != 127 and IP[1] < 256 and IP[2] < 256 and 0 < IP[3] < 255):
return True
return False
def makeIpPool(self, startIP, lastIP):
'''生產(chǎn) IP 地址池'''
IPver = 6 if self.IPv6 else 4
intIP = lambda ip: IPy.IP(ip).int()
ipPool = {IPy.intToIp(ip, IPver) for ip in range(intIP(startIP), intIP(lastIP)+1)}
return {ip for ip in ipPool if self.isUnIP(ip)}
def mPing(self, ipPool):
'''利用ICMP報文探測網(wǎng)絡(luò)主機存活
參數(shù):
ipPool -- 可迭代的IP地址池
'''
Sock = self.__icmpSocket
Sock.settimeout(self.timeout)
packet = self.__icmpPacket
recvFroms = set() #接收線程的來源IP地址容器
sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
sendThr.start()
while True:
try:
recvFroms.add(Sock.recvfrom(1024)[1][0])
except Exception:
pass
finally:
if not sendThr.isAlive():
break
return recvFroms & ipPool
if __name__=='__main__':
s = Nscan()
ipPool = s.makeIpPool('192.168.0.1', '192.168.0.254')
print( s.mPing(ipPool) )
- 使用Python編寫簡單網(wǎng)絡(luò)爬蟲抓取視頻下載資源
- Python 網(wǎng)絡(luò)編程起步(Socket發(fā)送消息)
- python網(wǎng)絡(luò)編程學習筆記(一)
- Python 網(wǎng)絡(luò)編程說明
- python網(wǎng)絡(luò)編程之UDP通信實例(含服務(wù)器端、客戶端、UDP廣播例子)
- python socket網(wǎng)絡(luò)編程步驟詳解(socket套接字使用)
- python網(wǎng)絡(luò)編程學習筆記(三):socket網(wǎng)絡(luò)服務(wù)器
- 以Python的Pyspider為例剖析搜索引擎的網(wǎng)絡(luò)爬蟲實現(xiàn)方法
- Python使用urllib2獲取網(wǎng)絡(luò)資源實例講解
- python如何查看系統(tǒng)網(wǎng)絡(luò)流量的信息
相關(guān)文章
python 快速把超大txt文件轉(zhuǎn)存為csv的實例
今天小編就為大家分享一篇python 快速把超大txt文件轉(zhuǎn)存為csv的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-10-10Win8.1下安裝Python3.6提示0x80240017錯誤的解決方法
這篇文章主要為大家詳細介紹了Win8.1下安裝Python3.6提示0x80240017錯誤的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-07-07解決Python 異常TypeError: cannot concatenate ''str'' and ''int''
這篇文章主要介紹了解決Python 異常TypeError: cannot concatenate 'str' and 'int' objects,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-04-04Python腳本在Appium庫上對移動應(yīng)用實現(xiàn)自動化測試
這篇文章主要介紹了使用Python的Appium庫對移動應(yīng)用實現(xiàn)自動化測試的教程,屬于Python腳本的一個自動化應(yīng)用,需要的朋友可以參考下2015-04-04