python構(gòu)造icmp echo請(qǐng)求和實(shí)現(xiàn)網(wǎng)絡(luò)探測(cè)器功能代碼分享
python發(fā)送icmp echo requesy請(qǐng)求
import socket
import struct
def checksum(source_string):
sum = 0
countTo = (len(source_string)/2)*2
count = 0
while count<countTo:
thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
sum = sum + thisVal
sum = sum & 0xffffffff
count = count + 2
if countTo<len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def ping(ip):
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
packet = struct.pack(
"!BBHHH", 8, 0, 0, 0, 0
)
chksum=checksum(packet)
packet = struct.pack(
"!BBHHH", 8, 0, chksum, 0, 0
)
s.sendto(packet, (ip, 1))
if __name__=='__main__':
ping('192.168.41.56')
掃描探測(cè)網(wǎng)絡(luò)功能(網(wǎng)絡(luò)探測(cè)器)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
探測(cè)網(wǎng)絡(luò)主機(jī)存活。
'''
import os
import struct
import array
import time
import socket
import IPy
import threading
class SendPingThr(threading.Thread):
'''
發(fā)送ICMP請(qǐng)求報(bào)文的線程。
參數(shù):
ipPool -- 可迭代的IP地址池
icmpPacket -- 構(gòu)造的icmp報(bào)文
icmpSocket -- icmp套字接
timeout -- 設(shè)置發(fā)送超時(shí)
'''
def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
threading.Thread.__init__(self)
self.Sock = icmpSocket
self.ipPool = ipPool
self.packet = icmpPacket
self.timeout = timeout
self.Sock.settimeout( timeout + 3 )
def run(self):
time.sleep(0.01) #等待接收線程啟動(dòng)
for ip in self.ipPool:
try:
self.Sock.sendto(self.packet, (ip, 0))
except socket.timeout:
break
time.sleep(self.timeout)
class Nscan:
'''
參數(shù):
timeout -- Socket超時(shí),默認(rèn)3秒
IPv6 -- 是否是IPv6,默認(rèn)為False
'''
def __init__(self, timeout=3, IPv6=False):
self.timeout = timeout
self.IPv6 = IPv6
self.__data = struct.pack('d', time.time()) #用于ICMP報(bào)文的負(fù)荷字節(jié)(8bit)
self.__id = os.getpid() #構(gòu)造ICMP報(bào)文的ID字段,無實(shí)際意義
@property #屬性裝飾器
def __icmpSocket(self):
'''創(chuàng)建ICMP Socket'''
if not self.IPv6:
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
else:
Sock = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.getprotobyname("ipv6-icmp"))
return Sock
def __inCksum(self, packet):
'''ICMP 報(bào)文效驗(yàn)和計(jì)算方法'''
if len(packet) & 1:
packet = packet + '\0'
words = array.array('h', packet)
sum = 0
for word in words:
sum += (word & 0xffff)
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
return (~sum) & 0xffff
@property
def __icmpPacket(self):
'''構(gòu)造 ICMP 報(bào)文'''
if not self.IPv6:
header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0) # TYPE、CODE、CHKSUM、ID、SEQ
else:
header = struct.pack('BbHHh', 128, 0, 0, self.__id, 0)
packet = header + self.__data # packet without checksum
chkSum = self.__inCksum(packet) # make checksum
if not self.IPv6:
header = struct.pack('bbHHh', 8, 0, chkSum, self.__id, 0)
else:
header = struct.pack('BbHHh', 128, 0, chkSum, self.__id, 0)
return header + self.__data # packet *with* checksum
def isUnIP(self, IP):
'''判斷IP是否是一個(gè)合法的單播地址'''
IP = [int(x) for x in IP.split('.') if x.isdigit()]
if len(IP) == 4:
if (0 < IP[0] < 223 and IP[0] != 127 and IP[1] < 256 and IP[2] < 256 and 0 < IP[3] < 255):
return True
return False
def makeIpPool(self, startIP, lastIP):
'''生產(chǎn) IP 地址池'''
IPver = 6 if self.IPv6 else 4
intIP = lambda ip: IPy.IP(ip).int()
ipPool = {IPy.intToIp(ip, IPver) for ip in range(intIP(startIP), intIP(lastIP)+1)}
return {ip for ip in ipPool if self.isUnIP(ip)}
def mPing(self, ipPool):
'''利用ICMP報(bào)文探測(cè)網(wǎng)絡(luò)主機(jī)存活
參數(shù):
ipPool -- 可迭代的IP地址池
'''
Sock = self.__icmpSocket
Sock.settimeout(self.timeout)
packet = self.__icmpPacket
recvFroms = set() #接收線程的來源IP地址容器
sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
sendThr.start()
while True:
try:
recvFroms.add(Sock.recvfrom(1024)[1][0])
except Exception:
pass
finally:
if not sendThr.isAlive():
break
return recvFroms & ipPool
if __name__=='__main__':
s = Nscan()
ipPool = s.makeIpPool('192.168.0.1', '192.168.0.254')
print( s.mPing(ipPool) )
- 使用Python編寫簡(jiǎn)單網(wǎng)絡(luò)爬蟲抓取視頻下載資源
- Python 網(wǎng)絡(luò)編程起步(Socket發(fā)送消息)
- python網(wǎng)絡(luò)編程學(xué)習(xí)筆記(一)
- Python 網(wǎng)絡(luò)編程說明
- python網(wǎng)絡(luò)編程之UDP通信實(shí)例(含服務(wù)器端、客戶端、UDP廣播例子)
- python socket網(wǎng)絡(luò)編程步驟詳解(socket套接字使用)
- python網(wǎng)絡(luò)編程學(xué)習(xí)筆記(三):socket網(wǎng)絡(luò)服務(wù)器
- 以Python的Pyspider為例剖析搜索引擎的網(wǎng)絡(luò)爬蟲實(shí)現(xiàn)方法
- Python使用urllib2獲取網(wǎng)絡(luò)資源實(shí)例講解
- python如何查看系統(tǒng)網(wǎng)絡(luò)流量的信息
相關(guān)文章
python 快速把超大txt文件轉(zhuǎn)存為csv的實(shí)例
今天小編就為大家分享一篇python 快速把超大txt文件轉(zhuǎn)存為csv的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-10-10python查看自己安裝的所有庫(kù)并導(dǎo)出的命令
這篇文章主要介紹了python查看自己安裝的所有庫(kù)并導(dǎo)出,主要包括查看安裝的庫(kù)通過命令查詢,導(dǎo)出庫(kù)安裝文件執(zhí)行命令,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06python3安裝OCR識(shí)別庫(kù)tesserocr過程圖解
這篇文章主要介紹了python3安裝OCR識(shí)別庫(kù)tesserocr過程圖解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04Win8.1下安裝Python3.6提示0x80240017錯(cuò)誤的解決方法
這篇文章主要為大家詳細(xì)介紹了Win8.1下安裝Python3.6提示0x80240017錯(cuò)誤的解決方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07解決Python 異常TypeError: cannot concatenate ''str'' and ''int''
這篇文章主要介紹了解決Python 異常TypeError: cannot concatenate 'str' and 'int' objects,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-04-04PyQt5下拉式復(fù)選框QComboCheckBox的實(shí)例
今天小編就為大家分享一篇PyQt5下拉式復(fù)選框QComboCheckBox的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-06-06Python腳本在Appium庫(kù)上對(duì)移動(dòng)應(yīng)用實(shí)現(xiàn)自動(dòng)化測(cè)試
這篇文章主要介紹了使用Python的Appium庫(kù)對(duì)移動(dòng)應(yīng)用實(shí)現(xiàn)自動(dòng)化測(cè)試的教程,屬于Python腳本的一個(gè)自動(dòng)化應(yīng)用,需要的朋友可以參考下2015-04-04