python学习之udp扫描内网存活主机

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python学习之udp扫描内网存活主机相关的知识,希望对你有一定的参考价值。

一:UDP协议是非面向连接的协议,不同于TCP扫描依赖于建立连接过程,因此UDP扫描是不可靠的 
udp主机扫描原理:利用ICMP端口不可达报文进行扫描    

 当发送一个udp数据包到主机的某个关闭端口上时,目的主机会返回一个ICMP包指示目标端口不可达,这样意味着主机是存活的
 优点:可以完成对UDP端口的探测。  
 缺点:需要系统管理员的权限。扫描结果的可靠性不高。因为当发出一个UDP数据报而没有收到任何的应答时,有可能因为这个UDP端口是开放的,也有可能是因为这个数据报在传输过程中丢失了,所以要挑选一个不太可能被使用的端口进行探测。另外,扫描的速度很慢。原因是在RFC1812的中对ICMP错误报文的生成速度做出了限制。例如Linux就将ICMP报文的生成速度限制为每4秒钟80个,当超出这个限制的时候,还要暂停1/4秒。

代码如下:

import socket
import os

#监听的主机
host="192.168.1.11"
#创建原始套接字 然后绑定在公开接口上
if os.name=="nt":
    socket_protocol=socket.IPPROTO_IP
else:
    socket_protocol=socket.IPPROTO_TCMP

sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)

sniffer.bind((host,0))
#设置在捕获的数据包中包含ip头
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

#如果操作系统是windows 需要将IOCTL设置为混杂模式 混杂模式即为抓取流经网卡的所有数据包
if os.name=="nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

#读取单个数据包
print sniffer.recvfrom(65565)

if os.name=="nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF) 

 

  先读取单个数据包 看返回的是什么

技术分享

当前模式下,可以嗅探到任何高层协议例如UDP TCP ICMP 的所有IP头信息,但是很明显,完全看不懂,因此需要对返回的数据包进行解码

UDP是基于IP协议之上的,ICMP同样是基于IP协议之上,所以要先对IP头进行解析 下图所示为IP头部分

IP头结构:

技术分享

wireshark抓包结果 图中所示部分为ping www.baidu.com时抓到的icmp包的IP头部

技术分享

IP头的数据结构为:

class IP(Structure):
    _fields_ = [
        ("ihl",                c_ubyte, 4),    #4位首部长度
        ("version",            c_ubyte, 4),    #4位IP版本号
        ("tos",                c_ubyte),        #8位服务类型
        ("len",                c_ushort),        #16位IP包总长度
        ("id",                c_ushort),        #16位标识,用于辅助IP包的拆装
        ("offset",            c_ushort),        #3位标志位加13位偏移位
        ("ttl",                c_ubyte),        #8位IP包生存时间
        ("protocol_num",    c_ubyte),        #8位协议号
        ("sum",                c_ushort),        #16位IP首部校验和
        ("src",                c_uint),        #32位源地址
        ("dst",                c_uint),        #32位目的地址
    ]

对IP头进行解码:

import socket
import os
import struct
from ctypes import *

host="192.168.1.11"
class IP(Structure):
    _fields_ = [
        ("ihl",                c_ubyte, 4),
        ("version",            c_ubyte, 4),
        ("tos",                c_ubyte),
        ("len",                c_ushort),
        ("id",                c_ushort),
        ("offset",            c_ushort),
        ("ttl",                c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",                c_ushort),
        ("src",                c_uint),
        ("dst",                c_uint),
    ]

    #使用from_buffer_copy方法在__new__方法将收到的数据生成一个IP class的实例
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self,socket_buffer=None):

        #协议字段与协议名称相对应
        self.protocol_map={1:"ICMP",6:"TCP",17:"UDP"}
        
        #使用了python struct库的pack方法 用指定的格式化参数将src 和dst的long型数值转换为字符串,然后使用socket.inet_ntoa方法将字符串的一串数字转换为对应的ip格式
        self.src_address=socket.inet_ntoa(struct.pack("<L",self.dst))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        #协议类型
        try:
            self.protocol=self.protocol_map[self.protocol_num]
        except Exception,e:
            self.protocol=str(self.protocol_num)


#创建一个socket并绑定到公共接口
if os.name=="nt":
    socket.protocol=socket.IPPROTO_IP
else:
    socket.protocol=socket.IPPROTO_ICMP

sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.protocol)
sniffer.bind((host,0))
#设置捕获的数据包中包含ip头
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
#如果操作系统是windows 需要将IOCTL设置为混杂模式 混杂模式即为捕获经过网卡的所有数据包
if os.name=="nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
#获取数据包并对ip头进行解码
try:
    while True:
        #读取单个数据包  数据包为数组 第一个元素为要解码的数据
        raw_buffer=sniffer.recvfrom(65565)[0]
        #缓冲区前20个字节作为ip头解码
        ip_head=IP(raw_buffer[0:20])
        #输出协议类型和通信双方ip地址
        print "Protocol:%s %s->%s" % (ip_head.protocol,ip_head.src_address,ip_head.dst_address)


except KeyboardInterrupt:
    # 如果运行在windows 则关闭混杂模式
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

IP协议并不是一个可靠的协议,它不保证数据被送达,所以,保证数据送达的工作应该由其他的模块来完成。其中一个重要的模块就是ICMP(网络控制报文)协议。
当传送IP数据包发生错误--比如主机不可达,路由不可达等等,ICMP协议将会把错误信息封包,然后传送回给主机。给主机一个处理错误的机会,这 也就是为什

么说建立在IP层以上的协议是可能做到安全的原因。ICMP数据包由8bit的错误类型和8bit的代码和16bit的校验和组成。而前 16bit就组成了ICMP所要传递的信息。ICMP

数据报报文被封装在IP数据报内部进行传输

ICMP头部数据结构

技术分享

图下所示为wires hark抓包ping www.baidu.com的icmp包头部

技术分享

#icmp头部数据结构
class
ICMP(Structure): _fields_=[ ("type", c_ubyte), #类型 ("code", c_ubyte), #代码值 ("checksum", c_ushort), #校验和 ("unused", c_ushort), #未使用 ("next_hop_mtu", c_ushort) #下一跳 ]

对ICMP头部进行解析:

icmp头部type值所代表的含义:

技术分享

8bits类型和8bits代码字段:一起决定了ICMP报文的类型。常见的有:
  
  类型8、代码0:回射请求。
  
  类型0、代码0:回射应答。
  
  类型11、代码0:超时。
常见的不可到达类型还有网络不可到达(Code=0)、主机不可到达(Code=1)、协议不可到达(Code=2)等

需要找到type 为3 (这种数据包意味着目标不可达)code为3的ICMP数据包  (代码值为3则代表目标主机产生了端口不可达的错误)

import socket
import os
import struct
from ctypes import *

host="192.168.1.11"
class IP(Structure):
    _fields_ = [
        ("ihl",                c_ubyte, 4),
        ("version",            c_ubyte, 4),
        ("tos",                c_ubyte),
        ("len",                c_ushort),
        ("id",                c_ushort),
        ("offset",            c_ushort),
        ("ttl",                c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",                c_ushort),
        ("src",                c_uint),
        ("dst",                c_uint)
    ]

    #使用from_buffer_copy方法在__new__方法将收到的数据生成一个IP class的实例
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self,socket_buffer=None):

        #协议字段与协议名称相对应
        self.protocol_map={1:"ICMP",6:"TCP",17:"UDP"}
        
        #使用了python struct库的pack方法 用指定的格式化参数将src 和dst的long型数值转换为字符串,然后使用socket.inet_ntoa方法将字符串的一串数字转换为对应的ip格式
        self.src_address=socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        #协议类型
        try:
            self.protocol=self.protocol_map[self.protocol_num]
        except Exception,e:
            self.protocol=str(self.protocol_num)


#解码icmp
class ICMP(Structure):
    _fields_=[
        ("type",            c_ubyte),   #类型
        ("code",            c_ubyte),    #代码值
        ("checksum",        c_ushort),    #校验和
        ("unused",            c_ushort),    #未使用
        ("next_hop_mtu",    c_ushort)    #下一跳
    ]

    #使用from_buffer_copy方法在__new__方法将收到的数据生成一个IP class的实例
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self,socket_buffer=None):
        pass


#创建一个socket并绑定到公共接口
if os.name=="nt":
    socket.protocol=socket.IPPROTO_IP
else:
    socket.protocol=socket.IPPROTO_ICMP

sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.protocol)
sniffer.bind((host,0))
#设置捕获的数据包中包含ip头
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
#如果操作系统是windows 需要将IOCTL设置为混杂模式 混杂模式即为捕获经过网卡的所有数据包
if os.name=="nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
#获取数据包并对ip头进行解码
try:
    while True:
        #读取单个数据包  数据包为数组 第一个元素为要解码的数据
        raw_buffer=sniffer.recvfrom(65565)[0]
        #缓冲区前20个字节作为ip头解码
        ip_head=IP(raw_buffer[0:20])
        # print ip_head.ihl
        #输出协议类型和通信双方ip地址
        print "Protocol:%s %s->%s" % (ip_head.protocol,ip_head.src_address,ip_head.dst_address)
        
        #如果协议类型为icmp 
        if ip_head.protocol=="ICMP":
            #计算ICMP包起始位置
            offset=ip_head.ihl * 4
            buf=raw_buffer[offset:offset+sizeof(ICMP)]

            #解析icmp
            icmp_header=ICMP(buf)
            # print icmp_header
            print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)

#处理异常 ctrl+c
except KeyboardInterrupt:
    # 如果运行在windows 则关闭混杂模式
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

解析结果:

技术分享

 

下面实现对整个内网进行主机扫描   需要用到netaddr模块

windows下安装netaddr:pip install netaddr

实现代码:

import threading
import time
import socket
import os
import struct

from ctypes import *
from netaddr import IPNetwork,IPAddress 

#监听的主机
host="192.168.1.11"

#扫描的子网
subnet="192.168.1.0/24"
#自定义字符串 在icmp包中进行核对
message="PYTHONTEST"

#批量发送udp数据
def udp_sender(subnet,message):
    time.sleep(1)
    sender=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
    for ip in IPNetwork(subnet):
        try:
            sender.sendto(message,("%s" % ip,32000))
        except Exception,e:
            print e

class IP(Structure):
    _fields_ = [
        ("ihl",                c_ubyte, 4),
        ("version",            c_ubyte, 4),
        ("tos",                c_ubyte),
        ("len",                c_ushort),
        ("id",                c_ushort),
        ("offset",            c_ushort),
        ("ttl",                c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",                c_ushort),
        ("src",                c_uint),
        ("dst",                c_uint)
    ]

    #使用from_buffer_copy方法在__new__方法将收到的数据生成一个IP class的实例
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self,socket_buffer=None):

        #协议字段与协议名称相对应
        self.protocol_map={1:"ICMP",6:"TCP",17:"UDP"}
        
        #使用了python struct库的pack方法 用指定的格式化参数将src 和dst的long型数值转换为字符串,然后使用socket.inet_ntoa方法将字符串的一串数字转换为对应的ip格式
        self.src_address=socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        #协议类型
        try:
            self.protocol=self.protocol_map[self.protocol_num]
        except Exception,e:
            self.protocol=str(self.protocol_num)


#解码icmp
class ICMP(Structure):
    _fields_=[
        ("type",            c_ubyte),   #类型
        ("code",            c_ubyte),    #代码值
        ("checksum",        c_ushort),    #校验和
        ("unused",            c_ushort),    #未使用
        ("next_hop_mtu",    c_ushort)    #下一跳
    ]

    #使用from_buffer_copy方法在__new__方法将收到的数据生成一个IP class的实例
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self,socket_buffer=None):
        pass


#创建一个socket并绑定到公共接口
if os.name=="nt":
    socket.protocol=socket.IPPROTO_IP
else:
    socket.protocol=socket.IPPROTO_ICMP

sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.protocol)
sniffer.bind((host,0))
#设置捕获的数据包中包含ip头
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
#如果操作系统是windows 需要将IOCTL设置为混杂模式 混杂模式即为捕获经过网卡的所有数据包
if os.name=="nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
#开始发包
t=threading.Thread(target=udp_sender,args=(subnet,message))
t.start()

#处理数据
try:
    i=1
    while True:
        #读取单个数据包  数据包为数组 第一个元素为要解码的数据
        raw_buffer=sniffer.recvfrom(65565)[0]
        #缓冲区前20个字节作为ip头解码
        ip_head=IP(raw_buffer[0:20])
        # print ip_head.ihl
        #输出协议类型和通信双方ip地址
        # print "Protocol:%s %s->%s" % (ip_head.protocol,ip_head.src_address,ip_head.dst_address)
        
        #如果协议类型为icmp 
        if ip_head.protocol=="ICMP":
            #计算ICMP包起始位置
            offset=ip_head.ihl * 4
            buf=raw_buffer[offset:offset+sizeof(ICMP)]

            #解析icmp
            icmp_header=ICMP(buf)
            # print icmp_header
            # print "ICMP -> Type: %d Code: %d" % (icmp_header.checksum, icmp_header.unused)
            #检查类型和代码值是否为3
            if icmp_header.type==3 and icmp_header.code==3:
                #确认响的主机在目标子网内
                if IPAddress(ip_head.src_address) in IPNetwork(subnet):
                    #确认ICMP包中包含我们发送的自定义的字符串
                    if raw_buffer[len(raw_buffer)-len(message):] == message:
                        #输出存活主机
                        print "%d Host Up: %s" % (i,ip_head.src_address)
#处理异常 ctrl+c
except KeyboardInterrupt:
    # 如果运行在windows 则关闭混杂模式
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

 扫描结果:由于我直接在本机测试的 所以只有一个存活主机

技术分享

 


 





























以上是关于python学习之udp扫描内网存活主机的主要内容,如果未能解决你的问题,请参考以下文章

fscan默认扫描端口

扫描神器--nmap

Python 使用Scapy模块编写ARP主机存活扫描

Python 扫描存活主机

利用已控的标边界一台机器的 beacon对目标内网进行各种存活探测

python学习之TCP/UDP