python 又一个BMS解析器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 又一个BMS解析器相关的知识,希望对你有一定的参考价值。

import math
from enum import Enum

lcm = lambda x, y: x // math.gcd(x, y) * y

class BMSNodeType(Enum):
    DEF_ATTR = 1
    DEF_CHANNEL_SEQ = 2
    DEF_MEASURE_FRAC = 3
    DEF_SLOT = 4

    SLOT_KEYSOUND = 101
    SLOT_STOP_SEQ = 102
    SLOT_LNOBJ = 103
    SLOT_BPM = 104

    EVT_SAMPLE = 201
    EVT_MEASURE_START = 202
    EVT_BPM_CHANGE = 203
    EVT_STOP = 204
    EVT_EOC = 299


class BMSChartObject(object):
    def __init__(self, objtype, value, channel):
        self.type = objtype
        self.value = value
        self.channel = channel


class BMSParser(object):
    def __init__(self, chart):
        self._chart = chart

        self.lcnt = 0
        self.measures = []

        # for recording slots
        self.exbpm = {}
        self.stop_seq = {}
        self.lnobj = set()

        self.verbose = []

    def compile(self, file):
        ast = self._make_ast(file)

        # 1st pass
        if self._traverse_ast(ast) == False:
            return False

        # 2nd pass
        if self._build_object_list() == False:
            return False

        return True

    def _log(self, type, message, line=None):
        entry = (type,
                 self.lcnt if line is None else line,
                 message)
        self.verbose.append(entry)

    def _make_ast(self, file):
        ast = []

        while True:
            c = file.read(1)
            if len(c) == 0:
                break
            if c == ' ' or c == '\t':
                continue
            elif c == '\n' or c == '\r':
                self.lcnt += 1
                continue
            else:
                ln = file.readline().strip()
                self.lcnt += 1
                if c == '#':
                    cmd = self._parse_command(ln)
                    # print('Line {}, command: [{}]'.format(lcnt + 1, cmd))
                    if cmd is None:
                        return False
                    ast.append((self.lcnt, cmd))
        return ast

    def _parse_command(self, cmd):
        if '0' <= cmd[0] <= '9':
            return self._parse_command_channel(cmd)
        else:
            return self._parse_command_definition(cmd)

    def _parse_command_channel(self, cmd):
        idx_sep = cmd.find(':')
        if idx_sep < 0:
            self._log('error', Exception('Expected colon.'))
            return None

        cmd_id, cmd_body = cmd[:idx_sep].rstrip(), cmd[(idx_sep + 1):].lstrip()

        try:
            measure, channel = int(cmd_id[:-2]), cmd_id[-2:]
        except err:
            self._log('error', err)
            return None

        if channel == '02':
            return BMSNodeType.DEF_MEASURE_FRAC, (measure, channel, float(cmd_body))
        else:
            return BMSNodeType.DEF_CHANNEL_SEQ, (measure, channel, cmd_body)

    def _parse_command_definition(self, cmd):
        idx_sep = cmd.find(' ')
        if idx_sep == 0:
            self._log('error', Exception('Expected a command right after `#`.'))
            return None
        elif idx_sep > 0:
            cmd_head, cmd_body = cmd[:idx_sep].rstrip(), cmd[(idx_sep + 1):].lstrip()
        else:
            cmd_head, cmd_body = cmd.rstrip(), ''

        slots_prefix = [
            ('WAV', BMSNodeType.SLOT_KEYSOUND),
            ('BPM', BMSNodeType.SLOT_BPM),
            ('LNOBJ', BMSNodeType.SLOT_LNOBJ),
            ('STOP', BMSNodeType.SLOT_STOP_SEQ)
        ]

        # slot-related definition
        for pfix, slotType in slots_prefix:
            lpf = len(pfix)
            # for partial matching (e.g. BPM and BPMxx)
            if cmd_head[:lpf].upper() == pfix and len(cmd_head) >= lpf + 2:
                slot_id = cmd_head[lpf:(lpf + 2)]
                return BMSNodeType.DEF_SLOT, (slotType, pfix, slot_id, cmd_body)

        # normal definition
        return BMSNodeType.DEF_ATTR, (cmd_head, cmd_body)

    def _traverse_ast(self, ast):
        errOccurred = False
        for lcnt, (nodeType, nodeVal) in ast:
            ret = True
            self.lcnt = lcnt

            if nodeType == BMSNodeType.DEF_ATTR:
                ret = self._process_def_attr(*nodeVal)

            elif nodeType == BMSNodeType.DEF_SLOT:
                ret = self._process_def_slot(nodeVal)

            elif nodeType == BMSNodeType.DEF_CHANNEL_SEQ:
                ret = self._process_def_channel_seq(*nodeVal)

            elif nodeType == BMSNodeType.DEF_MEASURE_FRAC:
                mno, _, frac = nodeVal
                ret = self._process_def_measure_frac(mno, frac)

            else:
                self._log('verbose', 'Unhandled AST node type [{}]'.format(nodeType))
                # raise TypeError('Unhandled AST node type [{}]'.format(nodeType))

            if ret is not None and not ret:
                errOccurred = True
        if errOccurred:
            return False

    def _get_measure(self, mno):
        # (beat cnt, granularity, sequences)
        measure_default = lambda: [None, 1, []]

        if mno < 0:
            raise ValueError('Measure number should be non-negative.')
        if mno >= len(self.measures):
            # extend array to fill the gap from queried measure
            n = mno - len(self.measures) + 1
            self.measures += [ measure_default() for _ in range(n) ]

        return self.measures[mno]

    def _process_def_attr(self, attrName, attrVal):
        meta_key = attrName.lower()
        if meta_key == 'bpm':
            self._chart.bpm = float(attrVal)
        elif meta_key == 'total':
            self._chart.gaugeIncMax = float(attrVal)
        elif meta_key == 'lntype':
            if not attrVal.isdigit() or int(attrVal) != 1:
                self._log('error', Exception('LN type other than 1 is unsupported, got `{}`.'.format(attrVal)))
                return False
        elif meta_key == 'stagefile':
            self._chart.stageFile = attrVal
        elif meta_key in self._chart.metadata:
            # TODO: type conversion for numerical values
            self._chart.metadata[meta_key] = attrVal
            # TODO: emit warning of duplicates
        else:
            self._log('warning', 'Unknown attribute `{}` with value `{}`.'.format(attrName, attrVal))

    def _process_def_slot(self, nodeVal):
        slotType, _, sid, *slotTup = nodeVal
        if slotType == BMSNodeType.SLOT_KEYSOUND:
            path = slotTup[0]
            return self._process_slot_keysound(sid, path)

        elif slotType == BMSNodeType.SLOT_BPM:
            bpm_str = slotTup[0]
            self.exbpm[sid] = float(bpm_str)

        elif slotType == BMSNodeType.SLOT_STOP_SEQ:
            dur_str = slotTup[0]
            self.stop_seq[sid] = float(dur_str)

        # TODO: support more `SLOT_*` here

        else:
            self._log('error', Exception('Unhandled slot definition [{}].'.format(slotType)))
            return False

    def _process_slot_keysound(self, sid, path):
        # TODO: path normalization
        self._chart.sounds[sid] = path

    def _process_def_channel_seq(self, mno, cid, seq):
        measureDef = self._get_measure(mno)

        if len(seq) % 2 != 0:
            self._log('error',
                       Exception('Malformed channel sequence (measure #{}:{}; length is {})'
                                 .format(mno, cid, len(seq))))
            return False

        lst = [ seq[i:(i + 2)] for i in range(0, len(seq), 2) ]

        measureDef[1] = lcm(measureDef[1], len(lst))
        measureDef[2].append((cid, lst))

    def _process_def_measure_frac(self, mno, frac):
        measureDef = self._get_measure(mno)

        if frac <= 0:
            self._log('error',
                       Exception('Measure fraction should be a positive decimal, got {} for measure #{}.'
                                 .format(frac, mno)))
            return False

        if measureDef[0] is not None:
            self._log('warning', 'Measure fraction for measure #{} had already been specified.'.format(mno))

        measureDef[0] = frac * 4.0

    def _build_object_list(self):
        cur_bpm = self._chart.bpm

        # time base within measure
        cur_ts = 0

        objects = self._chart.objects

        for mno, measure in enumerate(self.measures):
            cur_ts_frac = 0

            beat_cnt, tot_frac, mrefs = measure

            if beat_cnt is None:
                beat_cnt = measure[0] = 4.0

            # record a reference to object list
            self._chart.measures.append((beat_cnt, tot_frac, cur_ts, len(objects)))
            objects.append([cur_ts, (BMSNodeType.EVT_MEASURE_START, beat_cnt)])

            object_buf = []

            # sort by channel
            # NOTE that the nature is that stop sequence (cid=09) should be processed at last
            # (i.e. after sounds and bpm changes)
            mrefs.sort(key=lambda x: x[0] if x[0] != '09' else 'zz1')  # sort by cid

            for cid, seq in mrefs:
                ldef = len(seq)
                for idx, slbl in enumerate(seq):
                    if slbl == '00':
                        continue
                    if slbl not in self._chart.sounds:
                        self._log('warning', 'Sound slot for label `{}` is not defined.'.format(slbl), line=-1)
                    frac = tot_frac // ldef * idx
                    object_buf.append((frac, cid, slbl))

            # TODO: channel merging
            # by the fact that the sort is stable (as per Python's spec)
            # a linear scan suffices
            object_buf.sort(key=lambda x: x[0])  # sort by frac

            for frac, cid, slbl in object_buf:
                ts = cur_ts + beat_cnt * 60 * (frac - cur_ts_frac) / (cur_bpm * tot_frac)

                if cid == '03':
                    try:
                        new_bpm = float(int(slbl, 16))
                    except ValueError:
                        # FIXME: line number information
                        self._log('warning', 'Invalid BPM change [{}], ignoring.'.format(slbl), line=-1)
                        new_bpm = cur_bpm

                    obj = BMSNodeType.EVT_BPM_CHANGE, new_bpm

                    # update time base
                    cur_bpm = new_bpm
                    cur_ts = ts
                    cur_ts_frac = frac

                elif cid == '08':
                    new_bpm = self.exbpm[slbl]
                    obj = BMSNodeType.EVT_BPM_CHANGE, new_bpm

                    # update time base
                    cur_bpm = new_bpm
                    cur_ts = ts
                    cur_ts_frac = frac

                elif cid == '09':
                    # stop 1 tick means 1/48 beat (1/192 measure under 4/4 time sig.)
                    duration = (self.stop_seq[slbl] * 60) / (48.0 * cur_bpm)
                    obj = BMSNodeType.EVT_STOP, self.stop_seq[slbl]
                    cur_ts += duration

                elif cid == '04':
                    self._log('warning', 'Ignoring reference of video slot {}: BGA is not implemented yet.'.format(slbl))
                    continue

                else:  # TODO: detect playable channels
                    obj = BMSNodeType.EVT_SAMPLE, slbl, cid

                objects.append([ts, obj])

            cur_ts += beat_cnt * 60 * (1 - cur_ts_frac / tot_frac) / cur_bpm

        objects.append([cur_ts, (BMSNodeType.EVT_EOC, )])


class BMSChart(object):
    def __init__(self, src_file=None):
        self.metadata = {
            'title': None,
            'artist': None,
            'genre': None,

            'preview': None,
            'difficulty': None,
            'playlevel': None,

            'subtitles': None,
            'subartists': None,
        }

        # related constants
        self.bpm = 0.0
        self.gaugeIncMax = 0.0

        # related images
        self.stageFile = None

        # the main structure of the chart
        self.measures = []
        self.sounds = {}
        self.objects = []

        # for parsing
        self.verbose = None
        self.success = None

        if src_file is not None:
            self.success = self._parse_from_file(src_file)

    def _parse_from_file(self, src_file):
        parser = BMSParser(self)
        parser_ret = parser.compile(src_file)

        self.verbose = parser.verbose
        return parser_ret


if __name__ == '__main__':
    f = open('bms/boku/a7.bms')

    bms = BMSChart(f)

    if bms.verbose:
        print('--- Verbose ---')
        for x in bms.verbose:
            print(x)

    print('--- Objects ---')
    for i, obj in enumerate(bms.objects):
        print(i, obj)

    print('--- Measures ---')
    for i, mea in enumerate(bms.measures):
        print(i, mea)

以上是关于python 又一个BMS解析器的主要内容,如果未能解决你的问题,请参考以下文章

python_day04 函数嵌套 名称空间和作用域 闭包 装饰器 迭代器 生成器 列表解析 三元表达式 生成器表达式

python怎么安装lxml html 解析器

Python中的XML解析[关闭]

用python编写一个快速解析器

Python 之父的解析器系列之三:生成一个 PEG 解析器

Python 之父再发文:构建一个 PEG 解析器