python 获取从了解CPRI格式的示波器转储的文件作为输入。文件应为csv,每行具有时间和数据值

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 获取从了解CPRI格式的示波器转储的文件作为输入。文件应为csv,每行具有时间和数据值相关的知识,希望对你有一定的参考价值。

__author__ = 'mep'

import getopt
import sys
from bitstring import BitArray

line_bit_rates = ['614.4',    # 0 filling bytes
                  '1228.8',   # 1
                  'invalid',  # 2
                  '2457.6',   # 3
                  '3072.0',   # 4
                  'invalid',  # 5
                  'invalid',  # 6
                  '4915.2',   # 7
                  'invalid',  # 8
                  '6144.0',   # 9
                  'invalid',  # 10
                  'invalid',  # 11
                  'invalid',  # 12
                  'invalid',  # 13
                  'invalid',  # 14
                  '9830.4']   # 15

protocol_version = ['invalid',
                    'Version 1',
                    'Version 2']

hdlc_bit_rates = ['no HDLC',
                  '240 kbits/s'
                  '480 kbits/s',
                  '960 kbits/s,'
                  '1920 kbits/s',
                  '2400 kbits/s',
                  'highest possible HDLC bit rate',
                  'negotiated in higher layers']

def __usage():
    print '''Usage:
    -h, --help\tThis help text
    -i, --input\tInput file (default: input.csv)
    '''
    exit()

# Following three lambdas convert an integer in 8b10 format to a "real" unsigned integer.
# Data looks like [KD]xx.x[+-] so we remove the first and last characters and split by the dot.
to10b = lambda x: BitArray(uint=int(x), length=5)
# This creates a list of the two numbers in the 8b10 value in binary, in reversed order (since the second
# number is more significant).
split_data = lambda x: [to10b(c).bin for c in str(x)[1:-1].split('.')][::-1]
# This gets the list of the two binary numbers, joins them and converts to uint.
data2uint = lambda x: BitArray(bin=''.join(split_data(x))).uint

# Gets location of each K28.5 character and returns them as a list (SOHF = Start of Hyper Frame)
findSOHFs = lambda x: [idx for idx, item in enumerate(x) if item[1].startswith('K28.5')]

# Calculates BFN. BFN is located in two places with high byte in Z.192.0 and low byte in Z.128.0
calc_bfn = lambda low_byte, high_byte: ((high_byte & 0xF) << 8) + low_byte

# Checks that the hyper frames are correct and returns the size of them (single value since they
# should be the same size).
def check_hfs(lst):
    tmp_set = set()

    for i in (range(len(lst))):
        if not i:
            # Skip first item
            continue
        tmp_set.add(lst[i]-lst[i-1])

    if len(tmp_set) > 1:
        print 'Hyperframes are of invalid size'
        return 0
    else:
        return tmp_set.pop()


def print_hf_info(hf, samples_per_bf):
    # Start with line bit rate
    i = 0
    try:
        # Count number of D16.2s' and D5.6s' after the K28.5 character.
        # First character can be either D16.2 or D5.6, after that only
        # D16.2.
        if hf[i+1].startswith('D16.2') or hf[i+1].startswith('D5.6'):
            i += 1
            while hf[i+1].startswith('D16.2'):
                i += 1
    except IndexError:
        print "Invalid index: " + str(i)
        return

    # We have to try all these in case the last hyper frame is not complete
    try:
        print "HFN: " + str(data2uint(hf[64*samples_per_bf]))
    except IndexError:
        print "HFN: Not found"
    try:
        print "BFN: " + str(calc_bfn(data2uint(hf[128*samples_per_bf]), data2uint(hf[192*samples_per_bf])))
    except IndexError:
        print "BFN: Not found"
    print "Line bit rate: " + str(line_bit_rates[i])
    try:
        print "Protocol version: " + protocol_version[data2uint(hf[2*samples_per_bf])]
    except IndexError:
        print "Protocol version: Not found"
    try:
        print "HDLC bit rate: " + hdlc_bit_rates[data2uint(hf[66*samples_per_bf])]
    except IndexError:
        print "HDLC bit rate: Not found"
    print "-----------"


# Slurps the file into a list of lists skipping first line (header). Each row is
# represented as a [Time, Data] list with both values as strings.
readfile = lambda f: [x.rstrip().split(',') for x in f.readlines()[1:]]


def main(argv):
    inputfile = 'input.csv'  # Default file name

    try:
        opts, args = getopt.getopt(argv, 'i:', ['input='])
    except getopt.GetoptError:
        __usage()

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            __usage()
        if opt in ('-i', '--input'):
            inputfile = arg

    f = open(inputfile, 'r')
    lst = readfile(f)
    f.close()

    sohfs = findSOHFs(lst)
    hfsize = check_hfs(sohfs)
    samples_per_frame = hfsize / 256  # 256 basic frames per hyper frame

    print "Len: " + str(len(lst))
    print "Filename: " + inputfile
    print "Samples per frame: " + str(samples_per_frame)
    print "Start of hyper frames found at " + str(sohfs)

    for index, item in enumerate(sohfs):
        start = sohfs[index]
        end = sohfs[index] + samples_per_frame*256
        print
        print "Hyperframe #" + str(index)
        print "----------"
        print "Starts (K28.5 found) at file position " + str(item)
        print
        print_hf_info([x[1] for x in lst[start:end]], samples_per_frame)


if __name__ == '__main__':
    main(sys.argv[1:])

以上是关于python 获取从了解CPRI格式的示波器转储的文件作为输入。文件应为csv,每行具有时间和数据值的主要内容,如果未能解决你的问题,请参考以下文章

从 Java 堆转储中获取已使用和释放的内存

如何从核心转储中分析内存使用情况?

如何在python中将光标列表转换为JSON格式?

通过python将弹性数据转储到csv或任何NOSQL中

C++/Python 中的声音监控

转储内存中加载的 Python 模块的内容