手写了个 BOSS 来了的摸鱼神器!
Posted AI科技大本营
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写了个 BOSS 来了的摸鱼神器!相关的知识,希望对你有一定的参考价值。
作者 |小小明
来源 |杰哥的IT之旅
今天我们的目标是就是将Ping整个网段IP的总耗时降低到5秒以内,这样我们就能够在5秒内知道指定mac地址设备的上下线,例如开发一个BOSS来了的摸鱼神器,只要老板的手机一连上wifi,这边在5秒内收到通知,立马停止摸鱼,就保证了平时放心大胆的摸鱼⚡。
那么如何提速呢?经过我几天的苦思冥想,并在学习了一些网络知识后,自己实现了PING命令,成功的实现了放心大胆的摸鱼。于是,在我看了几本书,写了几千行代码,踩了几百个坑后,终于把相关知识理解透了。下面是我将涉及到的核心知识点总结成了这篇文章,所以这篇文章都是非常精简的干货,强烈❤️建议收藏❤️。
学完本文,你的力量将不仅仅止于此,还能够底层化开发任何基于IP协议的自定义协议,当然这要看你自己是否具有举一反三的能力。甚至你还能继续自己深挖,去研究开发比IP协议更底层的协议。
渴望吗?渴望那就学起来吧⁉️
01
socket 套接字核心知识
socket 简介
进程间通信
指运行的程序之间的数据共享,在1台电脑上可以通过进程号(PID)来唯一标识一个进程进行通信。
在网络中,TCP/IP协议族网络层的“ip地址”可以唯一标识网络中的主机,而传输层的“协议+端口”可以唯一标识主机中的应用进程(进程)。网络中的进程通信就可以通过ip地址,协议,端口
这个标志与其它进程进行交互。
socket(简称 套接字
) 就是实现网络进程间通信的一种方式,网络上各种各样的服务大多都是基于 Socket 来完成通信的。为了建立通信通道,网络通信的每个端点拥有一个socket套接字对象,它们允许程序接受并进行连接,如发送和接受数据。
socket 链接
在 Python 中 使用socket 模块的函数 socket 就可以完成:
import socket
socket.socket(family=-1, type=-1, proto=-1, fileno=None)
参数说明:
family为指定的地址族,主要有三种:
socket.AF_UNIX :用于同一台机器进程间通信
socket.AF_INET :基于ipv4协议的Internet 进程间通信
socket.AF_INET6 :基于ipv6协议的Internet 进程间通信
更多的地址族还包括,socket.AF_BLUETOOTH
蓝牙相关、socket.AF_VSOCK
虚拟机通信、socket.AF_PACKET
直连网络设备底层接口等。
type为指定的套接字类型,主要有三种:
socket.SOCK_STREAM :流式套接字,使用面向连接的TCP协议实现字节流的传输
socket.SOCK_DGRAM :数据报套接字,使用面向非连接的UDP实现数据报套接字
socket.SOCK_RAW:原始套接字,该套接字允许对较低层协议(如 IP或 ICMP)进行直接访问
更多套接字类型还包括socket.SOCK_RDM
和socket.SOCK_SEQPACKET
等。
TCP 与 UDP 通信模型
对于tcp或udp套接字可以直接使用以下方式进行创建:
import socket
# 创建tcp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建udp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 不用的时候,关闭套接字
s.close()
UDP通信模型:在通信开始之前,不需要建立相关的链接,只需要发送数据即可。
UDP服务端示例代码:
from socket import *
# 创建套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 绑定本地的相关信息,不绑定系统会随机分配
udp_socket.bind(('0.0.0.0', 8080))
# 等待接收对方发送的数据
recv_data = udp_socket.recvfrom(1024) # 1024表示本次接收的最大字节数
# 显示接收到的数据,第1个元素是对方发送的数据,第2个元素是对方的ip和端口
print(recv_data[0].decode('u8'))
# 关闭套接字
udp_socket.close()
UDP客户端示例代码:
from socket import *
# 创建udp套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 发送数据到指定的电脑上的指定程序中
udp_socket.sendto("你好,服务器~".encode('u8'), ('192.168.1.103', 8080))
# 关闭套接字
udp_socket.close()
TCP通信模型:在通信开始之前,一定要先建立相关的链接,才能发送数据。
TCP服务端示例代码:
from socket import *
# 创建socket
tcp_server_socket = socket(AF_INET, SOCK_STREAM)
# 服务器绑定本机ip和端口
tcp_server_socket.bind(('0.0.0.0', 8080))
# 监听端口,128表示最大同时接收128个客户端链接
tcp_server_socket.listen(128)
# 如果有新的客户端来链接服务器,那么就产生一个新的套接字专门为这个客户端服务
client_socket, clientAddr = tcp_server_socket.accept()
# 接收对方发送过来的数据
recv_data = client_socket.recv(1024) # 接收1024个字节
print('接收到的数据为:', recv_data.decode('u8'))
# 发送一些数据到客户端
client_socket.send("你好客户端!".encode('u8'))
# 关闭为这个客户端服务的套接字
client_socket.close()
TCP客户端示例代码:
from socket import *
# 创建socket
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
# 链接服务器
tcp_client_socket.connect(('192.168.3.31', 8080))
tcp_client_socket.send("测试发送的内容".encode("u8"))
# 接收对方发送过来的数据,最大接收1024个字节
recvData = tcp_client_socket.recv(1024)
print('接收到的数据为:', recvData.decode('u8'))
# 关闭套接字
tcp_client_socket.close()
SOCK_RAW 原始套接字
上述两种套接字是常规的套接字模式,第三个参数省略或为零(IP协议)会自动选择正确的协议(TCP协议和UDP协议)。
当我们指定套接字类型为socket.SOCK_RAW
原始套接字时,第三个参数就需要指定proto协议号。
python的socket库预定义的协议号有:
socket.IPPROTO_TCP:TCP传输协议,值为6
socket.IPPROTO_UDP:UDP传输协议,值为17
socket.IPPROTO_ICMP:ICMP协议,值为1
socket.IPPROTO_IP:IP协议,值为0
socket.IPPROTO_RAW:可自行构建IP头部构建更底层的协议,值为1
也可以通过协议名称获取协议号常量:
import socket
print(socket.IPPROTO_ICMP, socket.getprotobyname("icmp"),
socket.IPPROTO_ICMP == socket.getprotobyname("icmp"))
1 1 True
可以看到两种方式获取协议号均可。
通过原始套接字我们可以使用ICMP或更底层的协议进行通讯从而实现更高级的功能。
我们需要使用ICMP协议进行网络通信就可以使用SOCK_RAW
原始套接字:
icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
socket 模块和对象的其他常用方法
socket模块的其他常用方法:
socket.gethostbyname
:将主机名转换为IPv4地址格式。IPv4地址以字符串形式返回
socket.gethostname
:返回包含Python解释器当前正在执行的机器的主机名的字符串
socket.gethostbyaddr
:根据IP地址获取主机名
socket.getprotobyname
:将Internet协议名称转换为协议号常量
在主机字节顺序与网络字节顺序不相同的机器上,使用以下方法转换:
网络顺序转换为主机字节顺序 | 主机顺序转换为网络字节顺序 | |
---|---|---|
32位正整数 4字节的交换操作 | socket.ntohl | socket.htonl |
16位正整数 2字节的交换操作 | socket.ntohs | socket.htons |
在主机字节顺序与网络字节顺序相同的机器上,执行以上方法是无操作的。
socket.inet_aton
:将字符串格式的IPv4地址打包为32位4字节的字节对象
获取本机ip地址方法1
:先获取本机主机名,再通过主机名获取ip
import socket
ip = socket.gethostbyname(socket.gethostname())
print(ip)
192.168.3.31
获取本机所有网卡的IP:
ips = socket.gethostbyname_ex(socket.gethostname())[-1]
print(ips)
['192.168.3.31']
⚠️注意:如果本机没有正确设置主机名时可能无法获取本机ip地址。
socket套接字对象的公用函数套接字函数:
s.getpeername() :返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)
s.getsockname() :返回套接字自己的地址。通常是一个元组(ipaddr,port)
s.setsockopt(level,optname,value) :设置给定套接字选项的值。
s.getsockopt(level,optname[.buflen]) :返回套接字选项的值。
s.settimeout(timeout) :设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect)
s.gettimeout() :返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None
s.fileno() :返回套接字的文件描述符
s.setblocking(flag) :如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。
s.makefile() :创建一个与该套接字相关联的文件。
获取本机ip地址方法2
:向任意网络地址发送一个无状态的UDP请求后,再通过套接字对象获取自己的地址从而获取本机地址
import socket
def get_local_ip():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('1.1.1.1', 80))
ip, port = s.getsockname()
return ip
# 获取本机IP
ip = get_local_ip()
print(ip)
192.168.3.31
✅即使无法连接Internet目标地址无法访问(发出报文会丢失),也可以使用该方法获取本机ip地址。
struct 二进制数据的转换
Python提供了一个struct模块来解决bytes和其他二进制数据类型的转换。
struct的pack函数把任意数据类型变成bytes。
import struct
print(struct.pack('>I', 10240099))
b'\\x00\\x9c@c'
pack 的第一个参数是处理指令:
>
:表示字节顺序是 big-endian,也就是网络序I
:表示 4 字节无符号整数H
:2 字节无符号整数。
后面的参数字节个数要和处理指令一致。
unpack 把 bytes 变成相应的数据类型:
>>> struct.unpack('>IH', b'\\xf0\\xf0\\xf0\\xf0\\x80\\x80')
(4042322160, 32896)
struct模块定义的数据类型可以参考Python官方文档:
https://docs.python.org/zh-cn/3/library/struct.html#format-characters
格式 | C 类型 | Python 类型 | 标准大小 | 注释 |
---|---|---|---|---|
x | 填充字节 | 无 | ||
c | char | 长度为 1 的字节串 | 1 | |
b | signed char | 整数 | 1 | (1), (2) |
B | unsigned char | 整数 | 1 | -2 |
? | _Bool | bool | 1 | -1 |
h | short | 整数 | 2 | -2 |
H | unsigned short | 整数 | 2 | -2 |
i | int | 整数 | 4 | -2 |
I | unsigned int | 整数 | 4 | -2 |
l | long | 整数 | 4 | -2 |
L | unsigned long | 整数 | 4 | -2 |
q | long long | 整数 | 8 | -2 |
Q | unsigned long long | 整数 | 8 | -2 |
n | ssize_t | 整数 | -3 | |
N | size_t | 整数 | -3 | |
e | -6 | 浮点数 | 2 | -4 |
f | float | 浮点数 | 4 | -4 |
d | double | 浮点数 | 8 | -4 |
s | char [] | 字节串 | ||
p | char [] | 字节串 | ||
P | void * | 整数 | -5 |
02
Ping 的工作原理
ping 基于 ICMP
协议工作的,ICMP 全称是 Internet Control Message Protocol,也就是互联网控制报文协议。ping 发出的ICMP 报文实际上是以侦察网络状态的形式实现了控制,反馈网络状态,从而调整传输策略以此控制整个局面。
ICMP
主要的功能包括:确认 IP 包是否成功送达目标地址、报告发送过程中 IP 包被废弃的原因和改善网络设置等。ICMP 协议主要负责在 IP
通信中通知某个 IP
包未能达到目标地址的原因。
ICMP 报文格式
上述报文格式中,左边的IP头部分不需要太关心,因为我们使用socket的原始套接字模式会自动帮我们封装IP头部分,右边的ICMP报文才是我们需要关心的部分。
⚠️注意:相比原生的 ICMP,Ping命令发出的ICMP报文多出了标识符和序号两个字段。
对于ICMP报文的类型,有两大类:
查询报文类型:用于诊断的查询消息
差错报文类型:通知出错原因的错误消息
不过咱们使用的PING只需要使用查询报文类型中的回送应答和回送请求。
ICMP 查询报文类型
回送消息:0表示回送应答,8表示回送请求。用于进行通信的主机或路由器之间,判断所发送的数据包是否已经成功到达对端的一种消息。
ping
命令是通过ICMP协议的回送消息实现的。
发送端主机向接收端主机发送一个回送请求(ICMP Echo Request Message
,类型 8
),只要正常接收到接收端返回的回送响应(ICMP Echo Reply Message
,类型 0
),则代表发送端主机到接收端主机可达。
ICMP 差错报文类型
对于差错报文类型,在本次编码中不会用到,无需深究,简单了解一下即可。
ICMP 常见差错报文:
目标不可达消息 —— 类型 为
3
原点抑制消息 —— 类型
4
重定向消息 —— 类型
5
超时消息 —— 类型
11
目标不可达消息(Destination Unreachable Message):
IP 路由器无法将 IP 数据包发送给目标地址时,会给发送端主机返回一个目标不可达的 ICMP 消息,并在这个消息中显示不可达的具体原因,原因记录在 ICMP 包头的代码字段。
由此,根据 ICMP 不可达的具体消息,发送端主机也就可以了解此次发送不可达的具体原因。
目标不可达的原因有:
网络不可达代码为
0
主机不可达代码为
1
协议不可达代码为
2
端口不可达代码为
3
需要进行分片但设置了不分片位代码为
4
原点抑制消息(ICMP Source Quench Message):
ICMP
原点抑制消息的目是为了缓和网络拥堵的问题,当路由器向低速线路发送数据时,其发送队列的缓存变为零而无法发送出去时,可以向 IP 包的源地址发送一个 ICMP 原点抑制消息。
但是收到这种 ICMP 消息的主机并不见得真的会增大 IP 包的传输间隔,还可能会引起不公平的网络通信,所以一般不被使用。
重定向消息(ICMP Redirect Message):
在路由器持有更好的路由信息时,发现发送端主机使用了不是最优
的路径发送数据,那么路由器会返回一个 ICMP 重定向消息给这个主机。这个消息中包含了最合适的路由信息和源数据,发送端下次可以发给另外一个更近的路由器。
超时消息(ICMP Time Exceeded Message):
IP 包中有一个8位的字段叫做 TTL
(Time To Live
,生存周期),它的值随着每经过一次路由器就会减 1,直到减到 0 时该 IP 包会被丢弃。
此时,IP 路由器将会发送一个ICMP超时消息给发送端主机,并通知该包已被丢弃。设置 IP 包生存周期的主要目的是为了在路由控制遇到问题发生循环状况时,避免 IP 包无休止地在网络上被转发。
也可以通过设置一个较小的 TTL 值 控制包的到达范围。
03
socket 原始套接字实现 ping 命令
学了这么多基础的网络知识,我们最终为了什么?就是为了能够自己实现PING命令。相关的网络知识还有很多,但对于我们实现PING命令并没有太大关系,就暂不做深究。
下面我们从实战出现,一步步调试继续深挖PING命令的实现原理。
首先我们创建ICMP
协议的原始套接字链接:
import socket
icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
发送回送请求
然后需要向目标发送一个回送请求
下面开始组织报文数据(对于系列号,我们可以自行决定要发送的值):
import os
import time
import struct
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum, identifier, serial_num = 8, 0, 0, os.getpid() & 0xFFFF, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据,包含当前时间戳,后面用Q补齐到192位
data = struct.pack("d", time.time()).ljust(192, b"Q")
计算校验和的规则这里我已经写成代码,大家可以直接看代码:
def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
⚠️注意:最终返回时通过socket.htons方法将数据从主机序转换为网络序。
然后就可以计算出校验和重新打包header:
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
然后就可以发送了:
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
target_addr = "192.168.3.31"
icmp_socket.sendto(header + data, (target_addr, 1))
⚠️注意:虽然发送给了1号端口,但其实发送给任意端口都可以。
接收回送响应
回送响应与回送请求结构一致
发送完消息后,我们就可以接收回送相应:
# 接收回送请求
recv_packet, addr = icmp_socket.recvfrom(1024)
# 前20字节是ip协议的ip头
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier, serial_num = struct.unpack(
"bbHHh", icmp_header
)
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])
⚠️注意:我们接收的回送请求中包含了前20自己的IP头。
从选项数据中可解析出了这个包发送的时间(之前发出时写入的时间)。
完善 ping 命令的开发
虽然标准的PING命令是用以上协议规则实现的,但我们并不需要完全按照上述规范,例如标识符可以发送任何16位的值,序号可以从任意数值开始,选项数据192位的空间也可以用来存放任何数据。
我们在接收回送响应时需要检查包的标识符,确定是自己发出的包才接收。
最终封装出如下方法:
import struct
import time
import os
import socket
import select
def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))
def receive_pong(icmp_socket, identifier=os.getpid() & 0xFFFF, serial_num=0, timeout=2):
icmp_socket.settimeout(timeout)
time_remaining = timeout
while True:
start_time = time.time()
# 接收回送请求
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
time_received = time.time()
time_spent = time_received-start_time
# 前20字节是ip协议的ip头
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier_reciver, serial_num_reciver = struct.unpack(
"bbHHh", icmp_header
)
if identifier_reciver != identifier or serial_num != serial_num_reciver:
# 不是当前自己发的包则忽略
time_remaining -= time_spent
if time_remaining <= 0:
raise socket.timeout
continue
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])
return int((time_received - time_sent)*1000), ip
192.168.3.31
是我当前本机的局域网IP地址,测试一下:
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
ip = '192.168.3.31'
sent_ping(icmp_socket, ip)
try:
delay, ip_received = receive_pong(icmp_socket, timeout=2)
print(f"延迟:delayms,对方ip:ip_received")
except socket.timeout as e:
print("超时")
延迟:0ms,对方ip:192.168.3.31
然后再批量ping一下指定当前网段的所有IP:
def get_local_ip():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(('1.1.1.1', 80))
ip, port = s.getsockname()
return ip
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
local_ip = get_local_ip()
net_segment = local_ip[:local_ip.rfind(".")]
ips = []
for i in range(1, 255):
ip = f"net_segment.i"
sent_ping(icmp_socket, ip)
print("ping", ip, end=" ")
try:
delay, ip_received = receive_pong(icmp_socket, timeout=0.1)
print(f"延迟:delayms,对方ip:ip_received")
ips.append(ip)
except socket.timeout as e:
print("超时")
print(ips)
icmp_socket.close()
超时时间0.1秒时,总耗时30秒。
超时时间设置为0.01秒时,总耗时则为2.59秒。
借助 arp 表获取当前网段在线设备
如何尽量快的获取到当前在线的设备?经过测试发现,被ping后,ping不通的机器,arp表能够自动删除对应的条目,那么思路1就是快速的向全网段发送回送请求不等待回送响应,然后2秒后去查arp表,即可看到最新的在线设备。
实现思路1:
import struct
import time
import os
import re
import socket
import pandas as pd
def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))
def get_arp_ip_mac():
header = None
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if not line or line.startswith("接口"):
continue
if header is None:
header = re.split(" 2,", line.strip())
break
df = pd.read_csv(res, sep=" 2,",
names=header, header=0, engine='python')
return df
def ping_net_segment_all(net_segment):
with socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) as icmp_socket:
for i in range(1, 255):
ip = f"net_segment.i"
sent_ping(icmp_socket, ip)
net_segment = "192.168.3"
ping_net_segment_all(net_segment)
# 等待回送响应的到来,预计1秒之内
time.sleep(1)
# 读取最新的arp表
df = get_arp_ip_mac()
df
于是我们获取到了当前网段在线的设备列表。
双线程获取指定网段的在线设备
不过使用arp表查看有个缺陷,只能查看当前网段的,跨网段的在线设备似乎看不到。经分析我使用的台式机通过有线连接到3网段,而手机通过WiFi连接到2网段,所以必须能够分析2网段设备的在线设备才有意义。
思路2:用两个线程一个线程专门发回送请求,一个线程专门接收回送响应,可以通过回送响应获取IP地址,于是就可以得到指定网段的当前在线的设备的ip。
先完成获取在线设备列表:
from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd
def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))
def receive_pong(icmp_socket, net_segment, timeout=2):
icmp_socket.settimeout(timeout)
ips = set()
while True:
start_time = time.time()
try:
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
if ip.startswith(net_segment):
ips.add(ip)
except socket.timeout as e:
break
return ips
def ping_net_segment_all(icmp_socket, net_segment):
for i in range(1, 255):
ip = f"net_segment.i"
sent_ping(icmp_socket, ip)
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:
p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
future = p.submit(receive_pong, icmp_socket, "192.168.2", 3)
ips = future.result()
ips
运行结果,目前我的手机ip为192.168.2.122
,运行后被顺利检测到:
'192.168.2.1',
'192.168.2.122',
'192.168.2.17',
'192.168.2.18',
'192.168.2.19',
'192.168.2.20',
'192.168.2.21',
'192.168.2.22',
'192.168.2.23',
'192.168.2.49'
关闭手机WiFi后,再次运行,顺利看到该IP的下线。
完成 BOSS 来了的摸鱼神器
在已经将更新时间缩短到5秒以内时,咱们就可以PING指定网段,最后完成分析设备上下线的功能,从而达到最终的目的完成BOSS来了的摸鱼神器。
from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pd
def calc_checksum(src_bytes):
"""用于计算ICMP报文的校验和"""
total = 0
max_count = len(src_bytes)
count = 0
while count < max_count:
val = src_bytes[count + 1]*256 + src_bytes[count]
total = total + val
total = total & 0xffffffff
count = count + 2
if max_count < len(src_bytes):
total = total + ord(src_bytes[len(src_bytes) - 1])
total = total & 0xffffffff
total = (total >> 16) + (total & 0xffff)
total = total + (total >> 16)
answer = ~total
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return socket.htons(answer)
def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,
serial_num=0, data=None):
# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 打包选项数据
if data:
data = data.ljust(192, b"Q")
else:
data = struct.pack("d", time.time()).ljust(192, b"Q")
checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,
checksum, identifier, serial_num)
# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
icmp_socket.sendto(header + data, (target_addr, 1))
def receive_pong(icmp_socket, net_segment, timeout=2):
icmp_socket.settimeout(timeout)
ips = set()
while True:
start_time = time.time()
try:
recv_packet, (ip, port) = icmp_socket.recvfrom(1024)
if ip.startswith(net_segment):
ips.add(ip)
except socket.timeout as e:
break
return ips
def ping_net_segment_all(icmp_socket, net_segment):
for i in range(1, 255):
ip = f"net_segment.i"
sent_ping(icmp_socket, ip)
last = None
while 1:
icmp_socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:
p.submit(ping_net_segment_all, icmp_socket, "192.168.2")
future = p.submit(receive_pong, icmp_socket, "192.168.2")
ips = future.result()
if last is None:
print("当前在线设备:", ips)
if last:
up = ips-last
if up:
print("\\r新上线设备:", up, end=" "*100)
down = last-ips
if down:
print("\\r刚下线设备:", down, end=" "*100)
last = ips
time.sleep(3)
结果示例:
当前在线设备: '192.168.2.122', '192.168.2.18', '192.168.2.20', '192.168.2.1', '192.168.2.23', '192.168.2.49', '192.168.2.21', '192.168.2.17', '192.168.2.22', '192.168.2.19'
刚下线设备: '192.168.2.122'
经测试,手工关闭或打开手机WiFi能够顺利看到设备IP的打印信息。这种方法虽然无法获取MAC地址,但是经测试,同一台机器都会被分配同一个IP,在我当前的网络下是满足要求的,只需要知道老板手机连接的IP就行了。或者观察一下,老板走之后,到底哪个IP下线了,专门去监控这个IP。
更安全的做法就是每看到有新的IP上线都额外警惕一点,如果你是win10系统可以使用如下方法实现系统通知:
from win10toast import ToastNotifier
toaster = ToastNotifier()
toaster.show_toast("通知标题", "通知内容!", duration=10)
上述三个参数分别是通知标题,通知的内容和通知持续的时间,对于摸鱼这种事持续时间可以调大掉,再手工关闭通知,通过pip install win10toast
安装。
04
总结
总算做成了这个摸鱼神器,不过虽然我上面一本正经的讲的津津有味,但不会真有人打算拿这个代码去应用于实际去对付老板吧⁉️不会吧,不会吧⁉️
真打算做摸鱼神器的童鞋,我个人推荐搞个网络摄像头,写个人物图像识别的代码,发现有人进来了都自动提醒,这样才可以更放心的摸鱼。万一老板没连wifi就过来了,这就有点坑。
开发摸鱼神器不是本文本身的目的,学习网络知识自主实现网络协议,从通过实际例子理解网络协议才是本文真正的目的。
往
期
回
顾
资讯
技术
技术
资讯
分享
点收藏
点点赞
点在看
以上是关于手写了个 BOSS 来了的摸鱼神器!的主要内容,如果未能解决你的问题,请参考以下文章