python 获取从了解CPRI格式的示波器转储的文件作为输入。文件应为csv,每行具有时间和数据值
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 获取从了解CPRI格式的示波器转储的文件作为输入。文件应为csv,每行具有时间和数据值相关的知识,希望对你有一定的参考价值。
__author__ = 'mep'
import getopt
import sys
from bitstring import BitArray
line_bit_rates = ['614.4', # 0 filling bytes
'1228.8', # 1
'invalid', # 2
'2457.6', # 3
'3072.0', # 4
'invalid', # 5
'invalid', # 6
'4915.2', # 7
'invalid', # 8
'6144.0', # 9
'invalid', # 10
'invalid', # 11
'invalid', # 12
'invalid', # 13
'invalid', # 14
'9830.4'] # 15
protocol_version = ['invalid',
'Version 1',
'Version 2']
hdlc_bit_rates = ['no HDLC',
'240 kbits/s'
'480 kbits/s',
'960 kbits/s,'
'1920 kbits/s',
'2400 kbits/s',
'highest possible HDLC bit rate',
'negotiated in higher layers']
def __usage():
print '''Usage:
-h, --help\tThis help text
-i, --input\tInput file (default: input.csv)
'''
exit()
# Following three lambdas convert an integer in 8b10 format to a "real" unsigned integer.
# Data looks like [KD]xx.x[+-] so we remove the first and last characters and split by the dot.
to10b = lambda x: BitArray(uint=int(x), length=5)
# This creates a list of the two numbers in the 8b10 value in binary, in reversed order (since the second
# number is more significant).
split_data = lambda x: [to10b(c).bin for c in str(x)[1:-1].split('.')][::-1]
# This gets the list of the two binary numbers, joins them and converts to uint.
data2uint = lambda x: BitArray(bin=''.join(split_data(x))).uint
# Gets location of each K28.5 character and returns them as a list (SOHF = Start of Hyper Frame)
findSOHFs = lambda x: [idx for idx, item in enumerate(x) if item[1].startswith('K28.5')]
# Calculates BFN. BFN is located in two places with high byte in Z.192.0 and low byte in Z.128.0
calc_bfn = lambda low_byte, high_byte: ((high_byte & 0xF) << 8) + low_byte
# Checks that the hyper frames are correct and returns the size of them (single value since they
# should be the same size).
def check_hfs(lst):
tmp_set = set()
for i in (range(len(lst))):
if not i:
# Skip first item
continue
tmp_set.add(lst[i]-lst[i-1])
if len(tmp_set) > 1:
print 'Hyperframes are of invalid size'
return 0
else:
return tmp_set.pop()
def print_hf_info(hf, samples_per_bf):
# Start with line bit rate
i = 0
try:
# Count number of D16.2s' and D5.6s' after the K28.5 character.
# First character can be either D16.2 or D5.6, after that only
# D16.2.
if hf[i+1].startswith('D16.2') or hf[i+1].startswith('D5.6'):
i += 1
while hf[i+1].startswith('D16.2'):
i += 1
except IndexError:
print "Invalid index: " + str(i)
return
# We have to try all these in case the last hyper frame is not complete
try:
print "HFN: " + str(data2uint(hf[64*samples_per_bf]))
except IndexError:
print "HFN: Not found"
try:
print "BFN: " + str(calc_bfn(data2uint(hf[128*samples_per_bf]), data2uint(hf[192*samples_per_bf])))
except IndexError:
print "BFN: Not found"
print "Line bit rate: " + str(line_bit_rates[i])
try:
print "Protocol version: " + protocol_version[data2uint(hf[2*samples_per_bf])]
except IndexError:
print "Protocol version: Not found"
try:
print "HDLC bit rate: " + hdlc_bit_rates[data2uint(hf[66*samples_per_bf])]
except IndexError:
print "HDLC bit rate: Not found"
print "-----------"
# Slurps the file into a list of lists skipping first line (header). Each row is
# represented as a [Time, Data] list with both values as strings.
readfile = lambda f: [x.rstrip().split(',') for x in f.readlines()[1:]]
def main(argv):
inputfile = 'input.csv' # Default file name
try:
opts, args = getopt.getopt(argv, 'i:', ['input='])
except getopt.GetoptError:
__usage()
for opt, arg in opts:
if opt in ('-h', '--help'):
__usage()
if opt in ('-i', '--input'):
inputfile = arg
f = open(inputfile, 'r')
lst = readfile(f)
f.close()
sohfs = findSOHFs(lst)
hfsize = check_hfs(sohfs)
samples_per_frame = hfsize / 256 # 256 basic frames per hyper frame
print "Len: " + str(len(lst))
print "Filename: " + inputfile
print "Samples per frame: " + str(samples_per_frame)
print "Start of hyper frames found at " + str(sohfs)
for index, item in enumerate(sohfs):
start = sohfs[index]
end = sohfs[index] + samples_per_frame*256
print
print "Hyperframe #" + str(index)
print "----------"
print "Starts (K28.5 found) at file position " + str(item)
print
print_hf_info([x[1] for x in lst[start:end]], samples_per_frame)
if __name__ == '__main__':
main(sys.argv[1:])
以上是关于python 获取从了解CPRI格式的示波器转储的文件作为输入。文件应为csv,每行具有时间和数据值的主要内容,如果未能解决你的问题,请参考以下文章