python+pcap+dpkt 抓包小实例
Posted wangjq_china
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python+pcap+dpkt 抓包小实例相关的知识,希望对你有一定的参考价值。
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*- 3 4 """ 网络数据包捕获与分析程序 """ 5 6 import pcap 7 import dpkt 8 import json 9 import re 10 import time 11 from urllib import unquote 12 13 # 过滤输出目标ip 14 dst_lists = [ 15 \'203.66.1.212\', # nslookup dpdcs.4399sy.com.hk 16 \'52.74.10.186\', # nslookup dpdcs.4399en.com 17 \'52.58.69.212\', # nslookup dpdcs.4399sy.ru 18 \'220.241.11.3\', # nslookup dpdcs.4399th.com 19 \'124.243.195.63\', # nslookup sdkdcs.4399sy.com 20 \'42.62.106.216\', # nslookup udpdcs.4399sy.com 21 \'42.62.106.230\', # nslookup udpdcs.4399sy.com 22 ] 23 24 req_data = "" 25 times = 0 26 27 28 def capt_data(eth_name="eth0", p_type=None): 29 """ 30 捕获网卡数据包 31 :param eth_name 网卡名,eg. eth0,eth3... 32 :param p_type 日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包 33 :return: 34 """ 35 36 pc = pcap.pcap(eth_name) 37 pc.setfilter(\'tcp port 80\') # 设置监听过滤器 38 print \'start capture....\' 39 if pc: 40 for p_time, p_data in pc: # p_time为收到时间,p_data为收到数据 41 anly_capt(p_time, p_data, p_type) 42 43 44 def anly_capt(p_time, p_data, p_type): 45 """ 46 解析数据包 47 :param p_data 收到数据 48 :param p_type 日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包 49 :return: 50 """ 51 52 p = dpkt.ethernet.Ethernet(p_data) 53 if p.data.__class__.__name__ == \'IP\': 54 ip_data = p.data 55 src_ip = \'%d.%d.%d.%d\' % tuple(map(ord, list(ip_data.src))) 56 dst_ip = \'%d.%d.%d.%d\' % tuple(map(ord, list(ip_data.dst))) 57 if p.data.data.__class__.__name__ == \'TCP\': 58 tcp_data = p.data.data 59 if tcp_data.dport == 80: 60 # print tcp_data.data 61 if tcp_data.data: 62 # 调用日志模块,对日志进行处理 63 if p_type == 1: 64 # sdk日志用例分析 65 if dst_ip in dst_lists: 66 tmp = tcp_data.data.strip() 67 global req_data, times 68 if tmp.startswith("POST") or tmp.startswith("GET"): # or times > 0 69 if req_data: 70 haiwai_log_case(req_data) 71 req_data = tmp + "\\n" 72 # times = 0 73 else: 74 req_data = req_data + tmp 75 # times = times + 1 76 77 elif p_type == 2: 78 # 目标域名过滤输出 79 if dst_ip in dst_lists: 80 print "tcp_data:", tcp_data.data 81 82 else: 83 # 无过滤条件输出 84 print "tcp_data:", tcp_data.data 85 86 87 # android 日誌類型,从data中获取 88 log_type_from_data = { 89 90 \'open_game\': u\'[打开游戏]\', 91 \'network_check\': u\'[网络监测]\', 92 \'open_login\': u\'[登录界面前]\', 93 \'select_server\': u\'[选服日志]\', 94 \'create_role\': u\'[创角日志]\', 95 \'role_level_change\': u\'[等级日志]\', 96 97 # 海外,俄语 98 \'activity_open\': u\'[打开游戏]\', 99 \'load_start_before_login\': u\'[加载开始]\', 100 \'load_finish_before_login\': u\'[加载结束]\', 101 \'activity_before_login\': u\'[登录界面前]\', 102 \'click_enter\': u\'[进入游戏]\', 103 \'get_user_server_login\': u\'[选服日志]\', 104 \'user_create_role\': u\'[创角日志]\', 105 \'role_login\': u\'[角色登录]\', 106 \'enter_success\': u\'[成功进入游戏]\', 107 \'role_level\': u\'[等级日志]\', 108 \'user_online\': u\'[在线日志]\', 109 \'exit_success\': u\'[退出游戏]\', 110 111 } 112 113 # ios日誌類型,从请求资源路径获取 114 log_type_from_path = { 115 \'activity_open.php\': u\'[打开游戏]\', 116 \'load_start_before_login.php\': u\'[加载开始]\', 117 \'load_finish_before_login.php\': u\'[加载结束]\', 118 \'activity_before_login.php\': u\'[登录界面前]\', 119 \'click_enter.php\': u\'[进入游戏]\', 120 \'get_user_server_login.php\': u\'[选服日志]\', 121 \'user_create_role.php\': u\'[创角日志]\', 122 \'role_login.php\': u\'[角色登录]\', 123 \'enter_success.php\': u\'[成功进入游戏]\', 124 \'user_online.php\': u\'[在线日志]\', 125 \'role_level.php\': u\'[等级日志]\', 126 \'exit_success.php\': u\'[退出游戏]\', 127 \'share.php\': u\'[分享日志]\', 128 \'init_info.php\': u\'[初始化日志]\', 129 \'event.php\': u\'[事件日志]\', 130 \'user_login.php\': u\'[user_login]\', 131 \'user_server_login.php\': u\'[user_server_login]\', 132 \'enter_game.php\': u\'[enter_game]\', 133 } 134 135 # 过滤path 136 filter_out_list = [ 137 \'u/\', 138 \'plugin/error/check\', 139 \'service/version/get_info\', 140 ] 141 142 # 过滤打印出属于列表中的host的日志。 143 host_list = [ 144 \'dpdcs.4399sy.com.hk\', 145 \'dpdcs.4399en.com\', 146 \'dpdcs.4399sy.ru\', 147 \'dpdcs.4399th.com\', 148 \'sdkdcs.4399sy.com\', 149 \'udpdcs.4399sy.com\', 150 ] 151 152 153 def formattime(t): # 日期字段格式化 154 return time.strftime(\'%c\', time.gmtime(t + 8 * 3600)) 155 156 157 def req_to_dict(req_string): 158 """ 159 将请求数据转换为dic 160 :param req_string: 161 :return: 162 """ 163 req_dict = {} 164 req_string = req_string.strip() 165 if len(req_string) > 0: 166 req_string = unquote(req_string) 167 # print "req_string_after_unquote:",req_string 168 m1 = re.search("(GET|POST)(.*)\\?(.*)HTTP/1.1", req_string) # (method,path,param) 169 m2 = re.search("Host:(.*)", req_string) # (host,) 170 # m3 = re.search("\\sdata=(.*)\\s", req_string) # (body,) 171 m4 = re.search("\\sdata=([\\s\\S]*)", req_string) # (body,) 172 # m5 = re.search("eventTime\\":\\"(\\d+)", req_string) # (eventTime,) 173 m5 = re.search("eventTime\\"\\s*:\\s*\\"(\\d+)", req_string) 174 if m1: 175 req_dict["method"] = m1.group(1).strip() 176 req_dict["path"] = m1.group(2).strip()[1:] 177 param_string = m1.group(3).strip() 178 if param_string: 179 param_string = param_string.split("&") 180 param_dict = {} 181 for item in param_string: 182 tmp_list = item.split("=") 183 if len(tmp_list) > 1: 184 param_dict[tmp_list[0]] = tmp_list[1] 185 req_dict["param"] = param_dict 186 if m2: 187 req_dict["host"] = m2.group(1).strip() 188 if m4: 189 try: 190 body = m4.group(1).replace("\\n", "") 191 body = json.loads(body) 192 except ValueError: 193 print "\\033[1;31;40m" 194 print "m4:Error:body ValueError,req_string-->%s" % req_string 195 print "\\033[0m" 196 body = {} 197 req_dict["body"] = body 198 199 if m5: 200 req_dict["eventTime"] = formattime(int(m5.group(1))) 201 202 return req_dict 203 204 205 def haiwai_log_assert(req_dict): 206 """ 207 日志断言处理,输出分析结果 208 :param req_dict: 209 :return: 210 """ 211 212 # 从 data 中获取日志类型 213 if isinstance(req_dict, dict) and req_dict.get("body"): 214 if req_dict.get("body").get("data"): 215 data_type = req_dict.get("body").get("data").keys() 216 data_type_set = set(data_type) 217 types_key_set = set(log_type_from_data.keys()) 218 intersect = data_type_set.intersection(types_key_set) 219 if intersect: 220 log_type = intersect.pop() 221 print "\\033[1;31;40m %s log pass!--from body data || EventTime:-->[%s] \\033[0m" % ( 222 log_type_from_data.get(log_type), req_dict.get("eventTime")) 223 print req_dict 224 else: 225 if \'common\' in data_type and len(data_type) == 2: 226 data_type.remove(\'common\') 227 print "\\033[1;31;40m %s log not register!--from body data \\033[0m" % data_type 228 229 # 从 path 中获取日志类型 230 path = req_dict.get("path") 231 host = req_dict.get("host") 232 if host in host_list: 233 if path in log_type_from_path.keys(): 234 eventTime = "" 235 if req_dict.get("eventTime"): 236 eventTime = req_dict.get("eventTime") 237 else: 238 if req_dict.get("param"): 239 eventTime = req_dict.get("param").get("time") 240 if eventTime: 241 eventTime = formattime(int(eventTime)) 242 print "\\033[1;31;40m %s log pass--from url || EventTime:-->[%s] \\033[0m" % ( 243 log_type_from_path.get(path), eventTime) 244 print req_dict 245 elif path and path not in filter_out_list: 246 print "\\033[1;31;40m %s log not register!--from url! \\033[0m" % path 247 print req_dict 248 249 250 def client_log_check(log_type, req_dict, platform="sy"): 251 """ 252 检查SDK客户端请求字段,返回测试结果集 253 :param log_type: 日志类型 254 :param req_dict: 日志字典 255 :param platform: 测试平台 256 :return: 257 """ 258 pass 259 260 261 def haiwai_log_case(req_string): 262 """ 263 日志用例集 264 一:将数据包转换为dict 265 二:对日志分析处理,输出测试结果 266 """ 267 268 req_dict = req_to_dict(req_string) 269 haiwai_log_assert(req_dict) 270 271 272 if __name__ == \'__main__\': 273 try: 274 capt_data("eth3", 1) 275 except TypeError: 276 capt_data("eth3", 1)
以上是关于python+pcap+dpkt 抓包小实例的主要内容,如果未能解决你的问题,请参考以下文章
pcap中tcp包的'flags'属性值在python中由dpkt读取时表示什么?