案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型

Posted xena

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型相关的知识,希望对你有一定的参考价值。

硬件:XENA Valkyrie 或 Vantage主机,测试板卡不限,本方法适用于其100M~400G所有速率端口

环境配置:Python 3

实现功能:

1.控制流量仪进行流量测试,预定配置的流量发送,报文统计,丢包率,延迟等信息的统计

2.单个Python脚本配合不同的JSON对象文件,来实现不同端口数量,不同拓扑模型下的流量测试

3.在发包测试前及运行过程中都对发包端口的链路状态进行监控100M/1G/2.5G/5G/10G

4.嵌入ARP探测,获取DUT的MAC信息,并记录在测试日志中

5.统计端口流量的Tx,Rx信息,丢包率,延迟信息

设计思路:main文件控制主流程,嵌入Statistics模块进行数据统计,CreateClass模块进行命令预定义

运行效果:

技术图片

 

 技术图片

 

 

主要实现代码:

  1 import threading, time, json, random, queue, types, sys, socket
  2 from binascii import hexlify
  3 from TestUtilsL23 import XenaScriptTools
  4 from CreateClass import StreamCreate
  5 from Statistics import CollectTestresult
  6 
  7 def BuildDict(ports, configvalue, watching, watching_2):
  8     #Generate ports Dictionary
  9     a = 1 
 10     for port in ports:
 11         ip_address_b = configvalue[‘IP‘][0:10]
 12         ip_address_c = ip_address_b + str(a)
 13         mac_a = configvalue[‘mac‘][0:10]
 14         if a > 9:
 15             mac_address = mac_a + str(a)
 16         else:
 17             mac_address = mac_a + ‘0‘ + str(a)
 18         ip_address_a = hexlify(socket.inet_aton(ip_address_c)).decode()
 19         mac_ip = [str(mac_address), str(ip_address_a)]
 20         watching[port] = mac_ip
 21         a += 1
 22 
 23     watching_2[‘type_eth‘] = ‘FFFF‘,
 24     watching_2[‘type_vlan‘] = ‘8100‘,
 25     watching_2[‘type_ip‘] = ‘08004500006A000000007F11BA2D‘,
 26     watching_2[‘type_udp‘] = ‘00000000‘,
 27     watching_2[‘type_tcp‘] = ‘000000000000000050000000E5A40000‘,
 28     watching_2[‘tcpid‘] = [hex(int(configvalue[‘tcpPort‘][0]))[2:].zfill(4) + hex(int(configvalue[‘tcpPort‘][1]))[2:].zfill(4)]
 29 
 30 
 31 def PrintMes(ports, configvalue, ip_address):
 32     localtime = time.asctime( time.localtime(time.time()) )
 33     print("\\n##############################################")
 34     print("                   测试配置                   ")
 35     print("##############################################\\n")
 36     print("测试时间     :  " + str(localtime))
 37     print("设备IP地址   :  " + ip_address)
 38     print("测试端口     :  " + str(ports))
 39     print("测试模式     :  " + configvalue[‘Stype‘])
 40     print("持续时间     :  " + configvalue[‘testtime‘])
 41     print("报文类型     :  " + configvalue[‘header_type‘])
 42     print("报文长度     :  " + configvalue[‘packet‘])
 43     if configvalue[‘Stype‘] == ‘Aggregation‘:
 44         print("上行VLAN     :  " + str(configvalue[‘uvlan‘]))
 45         print("下行VLAN     :  " + str(configvalue[‘uvlan‘]))
 46         print("WAN速率      :  " + str(configvalue[‘wanrate‘]))
 47         print("LAN速率      :  " + str(configvalue[‘lanrate‘]))
 48     else:
 49         print("VLAN         :  " + str(configvalue[‘uvlan‘]))
 50         print("速率设置     :  " + str(configvalue[‘wanrate‘]))
 51     print("端口速率     :  " + str(configvalue[‘portrate‘]))
 52     print("学习时间     :  " + configvalue[‘learntime‘])
 53     print("丢包率设置   :  " + configvalue[‘threshold‘])
 54     if configvalue[‘snlearnenable‘] == ‘1‘:
 55         print("自动学习SN   :  Enable")
 56     else:
 57         print("自动学习SN   :  Disable")
 58     if configvalue[‘portspeedcheck‘] == ‘1‘:
 59         print("端口速率检测 :  Enable" )
 60     else:
 61         print("端口速率检测 :  Disable" )
 62 
 63 
 64 def Maclearning(xm, ports, configvalue):
 65     if configvalue[‘snlearnenable‘] == ‘1‘:
 66         xm.SendExpectOK(ports [1] + " PS_CREATE [0]")
 67         xm.SendExpectOK(ports [1] + " ps_tpldid [0] 0")
 68         xm.SendExpectOK(ports [1] + " PS_PACKETLENGTH [0] FIXED 70 1518")
 69         xm.SendExpectOK(ports [1] + " PS_HEADERPROTOCOL [0] ETHERNET ARP")
 70         learnIP_hex = hexlify(socket.inet_aton(configvalue[‘learnIP‘])).decode()
 71         xm.SendExpectOK(ports [1] + " PS_PACKETHEADER [0] 0xFFFFFFFFFFFF00000000000108060001080006040001000000000001C0A80164FFFFFFFFFFFF" + learnIP_hex)
 72         xm.SendExpectOK(ports [1] + " PS_RATEPPS [0] 2")
 73         xm.SendExpectOK(ports [1] + " PS_ENABLE [0] on")
 74         xm.SendExpectOK(ports [1] + " P_TRAFFIC ON")
 75         xm.SendExpectOK(ports [1] + " P_CAPTURE ON")
 76         time.sleep(1)
 77         xm.SendExpectOK(ports [1] + " P_CAPTURE OFF")
 78         xm.SendExpectOK(ports [1] + " P_TRAFFIC OFF")
 79         SN = xm.Send(ports[1] + " PC_PACKET [0] ?")
 80         SN = SN.split(‘  ‘)[-1]
 81         xm.SendExpectOK(ports [1] + " PS_DELETE [0]")
 82         Serial = str(SN)
 83         if SN == ‘<BADINDEX>‘:
 84             print ("\\nTest result :Failed;[ARP learning failed!!]")
 85             sys.exit()
 86         print ("DUT SN       :  " + str(Serial[16:28]))
 87 
 88 
 89 def PortSpeed(xm, ports, PSC, configvalue):
 90     if configvalue[‘PRcheck‘] == ‘1‘:
 91         xm.PortSyncCheck(ports)
 92     i = 0
 93     for port in ports:
 94         PS = xm.Send(port + ‘ p_speed ?‘)
 95         PS = PS.split(‘  ‘)[-1]
 96         PSC.append(str(PS))
 97         if configvalue[‘portspeedcheck‘] == ‘1‘:
 98             if str(PS) != configvalue[‘portrate‘][i]:
 99                 print (‘Test Result:Failed;[‘ + port + ‘ 端口速率不匹配,请检查配置...]‘)
100                 sys.exit()
101         i += 1
102 
103 
104 def runtest(xm, ports, configvalue):
105     xm.PortTrafficStart(ports)
106     time.sleep(float(configvalue[‘learntime‘]))
107     xm.PortTrafficStop(ports)
108     time.sleep(0.5)
109     xm.Statisticsclear(ports)
110     time.sleep(0.5)
111     xm.Porttimelimit(ports, configvalue[‘testtime‘])
112     time.sleep(0.2)
113     xm.PortTrafficStart(ports)
114     count = 0
115     print ("\\n开始打流,请耐心等候, 测试时间约为" + configvalue[‘testtime‘] + "秒......\\n")
116     while True:
117         print (">>>>>>>>>", end=‘‘)
118         xm.Send(ports[0] + " P_TRAFFIC ?")
119         sys.stdout.flush()
120         count += 1
121         time.sleep(1)
122         if count > int(configvalue[‘testtime‘]):
123             print (‘\\n\\n测试结束,正在收集测试数据......‘)
124             break
125     print()
126     xm.PortTrafficStop(ports)
127     time.sleep(1)
128 
129 
130 def main(argv):
131     a = sys.argv
132     configvalue = 
133     filename = str(a[1])
134     with open(filename,‘rb‘) as f:
135         configs=json.loads(f.read())
136 
137         ip_address = configs.get(‘ip_address‘)
138         ports = configs.get(‘ports‘, ‘‘)
139         configvalue[‘uvlan‘] = configs.get(‘uvlan‘, ‘‘)
140         configvalue[‘dvlan‘] = configs.get(‘dvlan‘, ‘‘)
141         configvalue[‘tcpPort‘] = configs.get(‘tcpUdpPort‘, ‘‘)
142         configvalue[‘packet‘] = configs.get(‘packet‘, ‘‘)
143         configvalue[‘testtime‘] = configs.get(‘testtime‘, ‘‘)
144         configvalue[‘threshold‘] = configs.get(‘threshold‘, ‘‘)
145         configvalue[‘header_type‘] = configs.get(‘headertype‘, ‘‘)
146         configvalue[‘learntime‘] = configs.get(‘learntime‘, ‘‘)
147         configvalue[‘payload‘] = configs.get(‘payload‘, ‘‘)
148         configvalue[‘wanrate‘] = configs.get(‘wanrate‘, ‘‘)
149         configvalue[‘lanrate‘] = configs.get(‘lanrate‘, ‘‘)
150         configvalue[‘portrate‘] = configs.get(‘portrate‘, ‘‘)
151         configvalue[‘PRcheck‘] = configs.get(‘PRcheck‘, ‘‘)
152         configvalue[‘IP‘] = configs.get(‘IP‘, ‘‘)
153         configvalue[‘mac‘] = configs.get(‘mac‘, ‘‘)
154         configvalue[‘snlearnenable‘] = configs.get(‘snlernenable‘, ‘‘)
155         configvalue[‘learnIP‘] = configs.get(‘learnIP‘, ‘‘)
156         configvalue[‘Stype‘] = configs.get(‘Stype‘, ‘‘)
157         configvalue[‘portspeedcheck‘] = configs.get(‘PScheck‘, ‘‘)
158 
159     PSC = []
160     FCS = []
161     watching = 
162     watching_2 = 
163     testresult = 
164 
165     PrintMes(ports, configvalue, ip_address)
166 
167     xm = XenaScriptTools(ip_address)
168     xm.LogonSetOwner("xena", "python_test_1")
169     xm.PortRelinquish(ports)
170     xm.PortReserve(ports)
171     xm.PortTrafficStop(ports)
172     xm.StreamsDelete(ports)
173     sc = StreamCreate(xm, ports, watching, watching_2, configvalue)
174     cr = CollectTestresult(xm, ports, PSC, configvalue, testresult, FCS)
175 
176     
177     BuildDict(ports, configvalue, watching, watching_2)
178     PortSpeed(xm, ports, PSC, configvalue)
179     if configvalue[‘snlearnenable‘] == ‘1‘:
180         Maclearning(xm, ports, configvalue)
181     if configvalue[‘Stype‘] == ‘Loopback‘:
182         sc.Loopback(xm, ports, watching, watching_2, configvalue)
183         runtest(xm, ports, configvalue)
184         cr.LoopbackStatistics(xm, ports, testresult)
185     if configvalue[‘Stype‘] == ‘Eachother‘:
186         sc.Eachother(xm, ports, watching, watching_2, configvalue)
187         runtest(xm, ports, configvalue)
188         cr.EachotherStatistics(xm, ports, testresult)
189     if configvalue[‘Stype‘] == ‘Aggregation‘:
190         sc.Aggregation(xm, ports, watching, watching_2, configvalue)
191         runtest(xm, ports, configvalue)
192         cr.AggregationStatistics(xm, ports, testresult)
193     cr.FCSGetValue(xm, ports, FCS)
194     cr.OutputStatistics(ports, PSC, configvalue, testresult, FCS)
195     xm.PortRelease(ports)
196 
197 if __name__ == ‘__main__‘:
198     sys.exit(main(sys.argv))

 

JSON对象配置文件


"##常用配置修改":"",

"ports": ["0/0","0/1","0/2","0/3","0/4","0/5"],
"packet": "FIXED 64 1518",
"testtime": "10",

"##注意:dvlan和lanrate只有在Aggregation模式生效,其他模式仅需uvlan和wanrate":"",
"##其中vlan设置为-1则表示不添加vlan":"",
"uvlan": "-1",
"dvlan": "-1",
"wanrate": ["100", "100", "100","100", "100", "100"],
"lanrate": ["21", "22", "23","24"],

"portrate": ["1000","1000","1000","1000","1000","1000"],

"##Stype为测试模式,分别可以配置为:Loopback, Eachother, Aggregation":"",
"##其中Loopback为环回测试,Eachother为两两互打测试,Aggregation为汇聚测试":"",
"Stype":"Eachother",

"##headertype为报文类型,分别可以配置为:TCP, UDP, IP, Ethernet":"",
"headertype": "Ethernet",

"##threshold为丢包率设置":"",
"threshold": "0",




"##不常用配置修改":"",

"##发送学习报文的时间":"",
"learntime": "3",

"##payload类型分别有: Pattern, Random":"",
"payload": "Pattern",

"##PRcheck为使能端口状态检测,1为开启,0为关闭;使能后端口为连接的时候会报错":"",
"PRcheck": "1",

"##PScheck为检测端口速率是否匹配,1为开启,0为关闭":"",
"PScheck":"1",

"tcpUdpPort": ["1024", "2048"],
"IP": "192.168.163.1",
"mac": "000000033333",

"##学习DUT的MAC地址":"",
"snlernenable": "0",
"learnIP": "192.168.2.1",

"##测试仪IP地址":"",
"ip_address": "192.168.1.200"

 

以上是关于案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型的主要内容,如果未能解决你的问题,请参考以下文章

Python面向对象案例汇总

Python面向对象案例汇总

Python面向对象案例汇总

代理设计模式

代理设计模式

Python面向对象