如何从 Python 中的文件/流中懒惰地读取多个 JSON 值?

Posted

技术标签:

【中文标题】如何从 Python 中的文件/流中懒惰地读取多个 JSON 值?【英文标题】:How I can I lazily read multiple JSON values from a file/stream in Python? 【发布时间】:2011-10-16 17:00:51 【问题描述】:

我想从 Python 中的文件/流中读取多个 JSON 对象,一次一个。不幸的是json.load() 只是.read()s 直到文件结束;似乎没有任何方法可以使用它来读取单个对象或懒惰地迭代对象。

有没有办法做到这一点?使用标准库是理想的,但如果有第三方库,我会使用它。

目前我将每个对象放在单独的行上并使用json.loads(f.readline()),但我真的不想这样做。

使用示例

example.py

import my_json as json
import sys

for o in json.iterload(sys.stdin):
    print("Working on a", type(o))

in.txt

"foo": ["bar", "baz"] 1 2 [] 4 5 6

示例会话

$ python3.2 example.py < in.txt
Working on a dict
Working on a int
Working on a int
Working on a list
Working on a int
Working on a int
Working on a int

【问题讨论】:

能否添加一个嵌套对象的行为示例? @TimMcNamara:嵌套对象的行为不应该改变。但是,一旦我们到达第一个***对象的末尾(在我的示例中为"foo": ["bar", "baz"]),它应该yield 它然后继续到下一个(1)。 为什么要避免“json 行”?总是可以将一个对象序列化为 json,这样它的 json 表示中就没有'\n'(一个换行符,而不是两个字符),因为 '\n' 必须在 json 字符串中转义,并且因此'\n' 只能用于格式化,例如,我相信json.dumps() 默认不会引入'\n'。请注意,Unicode 换行符(例如 U+0085)可能在 json 字符串中未转义。 ijson 库在这种情况下可能很有用。 pypi.python.org/pypi/ijsongithub.com/isagalaev/ijson 标题不应该是“我如何才能在 Python 中从文件/流中懒惰地读取多个 JSON ?”因为一个对象也是一个值,一个 json int、字符串等也是一个值,而反过来不一定是真的? 【参考方案1】:

JSON 通常不适合这种增量使用;没有标准的方法可以序列化多个对象,这样就可以轻松地一次加载一个对象,而无需解析整个对象。

您正在使用的每行对象解决方案也可以在其他地方看到。 Scrapy 称其为“JSON 行”:

https://docs.scrapy.org/en/latest/topics/exporters.html?highlight=exporters#jsonitemexporter http://www.enricozini.org/2011/tips/python-stream-json/

你可以稍微用 Python 来做:

for jsonline in f:
    yield json.loads(jsonline)   # or do the processing in this loop

我认为这是最好的方法——它不依赖任何第三方库,而且很容易理解发生了什么。我也在自己的一些代码中使用过它。

【讨论】:

re: "no standard way": 我看不出问题,只要你有一个单字符的缓冲区,语法似乎就可以使多个连续的对象明确。感谢您指出其他人使用“JSON 行”,我现在使用它感觉不那么糟糕。【参考方案2】:

也许有点晚了,但我确实遇到了这个问题(嗯,或多或少)。我对这些问题的标准解决方案通常是对一些众所周知的根对象进行正则表达式拆分,但在我的情况下这是不可能的。一般而言,唯一可行的方法是实施适当的分词器

在没有找到足够通用且性能相当好的解决方案后,我自己结束了,编写了splitstream 模块。它是一个预分词器,可以理解 JSON 和 XML,并将连续的流拆分为多个块进行解析(但实际解析由您决定)。为了从中获得某种性能,它被编写为 C 模块。

例子:

from splitstream import splitfile

for jsonstr in splitfile(sys.stdin, format="json")):
    yield json.loads(jsonstr)

【讨论】:

太棒了。感谢分享。 这是最终的解决方案。希望大家继续更新。 它很简单。感谢提供这么有用的模块。 你能上传一个编译好的 .py 版本吗?我尝试构建和安装该模块,但是......它会产生一堆关于重新定义常量等的错误。 该模块是用 C 语言编写的。将其移植到纯 Python 中作为练习留给准备完成任务的人 :)。但是,对于编写它的目的来说,它可能太慢了。如果您在编译时遇到问题,您可能需要安装 python-dev 包。【参考方案3】:

当然可以。您只需直接前往raw_decode。此实现将整个文件加载到内存中并对该字符串进行操作(就像json.load 所做的那样);如果您有大文件,您可以将其修改为仅在必要时从文件中读取。

import json
from json.decoder import WHITESPACE

def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    if isinstance(string_or_fp, file):
        string = string_or_fp.read()
    else:
        string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()

用法:正如你所要求的,它是一个生成器。

【讨论】:

似乎棘手的部分是确保流式读取带来足够的文件,以便您有整个对象要解码。所以这是一个简单的方法,如果你例如假设对象中没有换行符。但是除非你在文件上强加那种额外的结构,OP 试图避免这种结构,否则你似乎需要像 @Benedict 那样的解决方案【参考方案4】:

这实际上是一个非常讨厌的问题,因为您必须按行进行流式传输,但是要针对大括号跨多行进行模式匹配,还要进行模式匹配 json。这是一种 json-preparse,然后是 json 解析。 Json 相比其他格式,解析起来比较容易,所以并不总是需要去解析库,但是,我们应该如何解决这些矛盾的问题呢?

发电机来救援!

像这样的问题生成器的美妙之处在于,您可以将它们堆叠在一起,逐渐抽象出问题的难度,同时保持惰性。我也考虑过使用将值传回生成器的机制(send()),但幸运的是我不需要使用它。

要解决第一个问题,您需要某种流式查找器,作为 re.finditer 的流式版本。我在下面的尝试会根据需要插入行(取消注释调试语句以查看),同时仍然返回匹配项。我实际上然后稍微修改它以产生不匹配的行以及匹配(在产生的元组的第一部分标记为 0 或 1)。

import re

def streamingfinditer(pat,stream):
  for s in stream:
#    print "Read next line: " + s
    while 1:
      m = re.search(pat,s)
      if not m:
        yield (0,s)
        break
      yield (1,m.group())
      s = re.split(pat,s,1)[1]

这样,就可以匹配直到大括号,每次都考虑大括号是否平衡,然后根据需要返回简单对象或复合对象。

braces='[]'
whitespaceesc=' \t'
bracesesc='\\'+'\\'.join(braces)
balancemap=dict(zip(braces,[1,-1,1,-1]))
bracespat='['+bracesesc+']'
nobracespat='[^'+bracesesc+']*'
untilbracespat=nobracespat+bracespat

def simpleorcompoundobjects(stream):
  obj = ""
  unbalanced = 0
  for (c,m) in streamingfinditer(re.compile(untilbracespat),stream):
    if (c == 0): # remainder of line returned, nothing interesting
      if (unbalanced == 0):
        yield (0,m)
      else:
        obj += m
    if (c == 1): # match returned
      if (unbalanced == 0):
        yield (0,m[:-1])
        obj += m[-1]
      else:
        obj += m
      unbalanced += balancemap[m[-1]]
      if (unbalanced == 0):
        yield (1,obj)
        obj="" 

返回元组如下:

(0,"String of simple non-braced objects easy to parse")
(1," 'Compound' : 'objects' ")

基本上这就是完成的讨厌的部分。我们现在只需要按照我们认为合适的方式进行最后一层的解析。例如,我们可以使用 Jeremy Roman 的 iterload 函数(谢谢!)对单行进行解析:

def streamingiterload(stream):
  for c,o in simpleorcompoundobjects(stream):
    for x in iterload(o):
      yield x 

测试一下:

of = open("test.json","w") 
of.write("""[ "hello" ]  "goodbye" : 1  1 2 
 2
9 78
 4 5  "animals" : [ "dog" , "lots of mice" ,
 "cat" ] 
""")
of.close()
// open & stream the json
f = open("test.json","r")
for o in streamingiterload(f.readlines()):
  print o
f.close()

我得到了这些结果(如果你打开那条调试线,你会看到它根据需要拉入这些线):

[u'hello']
u'goodbye': 1
1
2

2
9
78
4
5
u'animals': [u'dog', u'lots of mice', u'cat']

这不适用于所有情况。由于json 库的实现,如果不自己重新实现解析器,不可能完全正确地工作。

【讨论】:

如果你想正确地做到这一点,你还需要注意字符串中的大括号和方括号。然后还要注意转义的引号。在不知不觉中,“预解析器”将变得几乎与完整的 JSON 解析器一样复杂。 谢谢杰里米。这是一个很好的挑战!是的,彼得 - 你当然是绝对正确的 :) 做得很好。如果像 """]" 这样的字符出现在 JSON 字符串中,这是否会正确运行?我认为这是使用正则表达式解析的一般限制。 当我四处寻找时,我发现主解析函数的构建方式是 不可能 正确地懒惰地使用它,所以你不会得到完美的结果没有自己实现一个完整的解析器。这个答案展示了几个有用的相关内容,并且很好地处理了简单的案例。 这个答案是可怕的,我不知道为什么它被赞成。作者承认它实际上并不适用于所有输入,因此根据定义它甚至不是正确答案,并且它使用了一个复杂的正则表达式 computed,所以我们甚至无法阅读这是什么。有时会给出正确结果的函数有什么用?【参考方案5】:

这是一个非常简单的解决方案。秘诀是尝试、失败并使用异常中的信息来正确解析。唯一的限制是文件必须是可搜索的。

def stream_read_json(fn):
    import json
    start_pos = 0
    with open(fn, 'r') as f:
        while True:
            try:
                obj = json.load(f)
                yield obj
                return
            except json.JSONDecodeError as e:
                f.seek(start_pos)
                json_str = f.read(e.pos)
                obj = json.loads(json_str)
                start_pos += e.pos
                yield obj

编辑:刚刚注意到这仅适用于 Python >=3.5。之前,失败返回 ValueError,你必须从字符串中解析出位置,例如

def stream_read_json(fn):
    import json
    import re
    start_pos = 0
    with open(fn, 'r') as f:
        while True:
            try:
                obj = json.load(f)
                yield obj
                return
            except ValueError as e:
                f.seek(start_pos)
                end_pos = int(re.match('Extra data: line \d+ column \d+ .*\(char (\d+).*\)',
                                    e.args[0]).groups()[0])
                json_str = f.read(end_pos)
                obj = json.loads(json_str)
                start_pos += end_pos
                yield obj

【讨论】:

欢迎来到 Stack Overflow 并感谢您的回答!这与我希望找到的更接近。我应该能够根据我正在考虑的案例类型调整它,即使它们不直接提供搜索。 re 不起作用 - 反斜杠需要转义。考虑一个原始字符串r'...' 我自己的工作需要这个,所以我创建了一个小的 python 库来做这个,或多或少地使用你的技术和一些细节,它在这里:pypi.python.org/pypi/Streamy 如果你使用ujson而不是json,你将获得巨大的加速 在实现这个答案时,您宁愿相信f.tell() 而不是错误位置【参考方案6】:

我相信更好的方法是使用状态机。下面是一个示例代码,我通过将下面链接上的 NodeJS 代码转换为 Python 3(使用的 nonlocal 关键字仅在 Python 3 中可用,代码在 Python 2 上不起作用)

Edit-1:更新代码并使其与 Python 2 兼容

Edit-2:更新并添加了仅限 Python3 的版本

https://gist.github.com/creationix/5992451

仅 Python 3 版本

# A streaming byte oriented JSON parser.  Feed it a single byte at a time and
# it will emit complete objects as it comes across them.  Whitespace within and
# between objects is ignored.  This means it can parse newline delimited JSON.
import math


def json_machine(emit, next_func=None):
    def _value(byte_data):
        if not byte_data:
            return

        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _value  # Ignore whitespace

        if byte_data == 0x22:  # "
            return string_machine(on_value)

        if byte_data == 0x2d or (0x30 <= byte_data < 0x40):  # - or 0-9
            return number_machine(byte_data, on_number)

        if byte_data == 0x7b:  #:
            return object_machine(on_value)

        if byte_data == 0x5b:  # [
            return array_machine(on_value)

        if byte_data == 0x74:  # t
            return constant_machine(TRUE, True, on_value)

        if byte_data == 0x66:  # f
            return constant_machine(FALSE, False, on_value)

        if byte_data == 0x6e:  # n
            return constant_machine(NULL, None, on_value)

        if next_func == _value:
            raise Exception("Unexpected 0x" + str(byte_data))

        return next_func(byte_data)

    def on_value(value):
        emit(value)
        return next_func

    def on_number(number, byte):
        emit(number)
        return _value(byte)

    next_func = next_func or _value
    return _value


TRUE = [0x72, 0x75, 0x65]
FALSE = [0x61, 0x6c, 0x73, 0x65]
NULL = [0x75, 0x6c, 0x6c]


def constant_machine(bytes_data, value, emit):
    i = 0
    length = len(bytes_data)

    def _constant(byte_data):
        nonlocal i
        if byte_data != bytes_data[i]:
            i += 1
            raise Exception("Unexpected 0x" + str(byte_data))

        i += 1
        if i < length:
            return _constant
        return emit(value)

    return _constant


def string_machine(emit):
    string = ""

    def _string(byte_data):
        nonlocal string

        if byte_data == 0x22:  # "
            return emit(string)

        if byte_data == 0x5c:  # \
            return _escaped_string

        if byte_data & 0x80:  # UTF-8 handling
            return utf8_machine(byte_data, on_char_code)

        if byte_data < 0x20:  # ASCII control character
            raise Exception("Unexpected control character: 0x" + str(byte_data))

        string += chr(byte_data)
        return _string

    def _escaped_string(byte_data):
        nonlocal string

        if byte_data == 0x22 or byte_data == 0x5c or byte_data == 0x2f:  # " \ /
            string += chr(byte_data)
            return _string

        if byte_data == 0x62:  # b
            string += "\b"
            return _string

        if byte_data == 0x66:  # f
            string += "\f"
            return _string

        if byte_data == 0x6e:  # n
            string += "\n"
            return _string

        if byte_data == 0x72:  # r
            string += "\r"
            return _string

        if byte_data == 0x74:  # t
            string += "\t"
            return _string

        if byte_data == 0x75:  # u
            return hex_machine(on_char_code)

    def on_char_code(char_code):
        nonlocal string
        string += chr(char_code)
        return _string

    return _string


# Nestable state machine for UTF-8 Decoding.
def utf8_machine(byte_data, emit):
    left = 0
    num = 0

    def _utf8(byte_data):
        nonlocal num, left
        if (byte_data & 0xc0) != 0x80:
            raise Exception("Invalid byte in UTF-8 character: 0x" + byte_data.toString(16))

        left = left - 1

        num |= (byte_data & 0x3f) << (left * 6)
        if left:
            return _utf8
        return emit(num)

    if 0xc0 <= byte_data < 0xe0:  # 2-byte UTF-8 Character
        left = 1
        num = (byte_data & 0x1f) << 6
        return _utf8

    if 0xe0 <= byte_data < 0xf0:  # 3-byte UTF-8 Character
        left = 2
        num = (byte_data & 0xf) << 12
        return _utf8

    if 0xf0 <= byte_data < 0xf8:  # 4-byte UTF-8 Character
        left = 3
        num = (byte_data & 0x07) << 18
        return _utf8

    raise Exception("Invalid byte in UTF-8 string: 0x" + str(byte_data))


# Nestable state machine for hex escaped characters
def hex_machine(emit):
    left = 4
    num = 0

    def _hex(byte_data):
        nonlocal num, left

        if 0x30 <= byte_data < 0x40:
            i = byte_data - 0x30
        elif 0x61 <= byte_data <= 0x66:
            i = byte_data - 0x57
        elif 0x41 <= byte_data <= 0x46:
            i = byte_data - 0x37
        else:
            raise Exception("Expected hex char in string hex escape")

        left -= 1
        num |= i << (left * 4)

        if left:
            return _hex
        return emit(num)

    return _hex


def number_machine(byte_data, emit):
    sign = 1
    number = 0
    decimal = 0
    esign = 1
    exponent = 0

    def _mid(byte_data):
        if byte_data == 0x2e:  # .
            return _decimal

        return _later(byte_data)

    def _number(byte_data):
        nonlocal number
        if 0x30 <= byte_data < 0x40:
            number = number * 10 + (byte_data - 0x30)
            return _number

        return _mid(byte_data)

    def _start(byte_data):
        if byte_data == 0x30:
            return _mid

        if 0x30 < byte_data < 0x40:
            return _number(byte_data)

        raise Exception("Invalid number: 0x" + str(byte_data))

    if byte_data == 0x2d:  # -
        sign = -1
        return _start

    def _decimal(byte_data):
        nonlocal decimal
        if 0x30 <= byte_data < 0x40:
            decimal = (decimal + byte_data - 0x30) / 10
            return _decimal

        return _later(byte_data)

    def _later(byte_data):
        if byte_data == 0x45 or byte_data == 0x65:  # E e
            return _esign

        return _done(byte_data)

    def _esign(byte_data):
        nonlocal esign
        if byte_data == 0x2b:  # +
            return _exponent

        if byte_data == 0x2d:  # -
            esign = -1
            return _exponent

        return _exponent(byte_data)

    def _exponent(byte_data):
        nonlocal exponent
        if 0x30 <= byte_data < 0x40:
            exponent = exponent * 10 + (byte_data - 0x30)
            return _exponent

        return _done(byte_data)

    def _done(byte_data):
        value = sign * (number + decimal)
        if exponent:
            value *= math.pow(10, esign * exponent)

        return emit(value, byte_data)

    return _start(byte_data)


def array_machine(emit):
    array_data = []

    def _array(byte_data):
        if byte_data == 0x5d:  # ]
            return emit(array_data)

        return json_machine(on_value, _comma)(byte_data)

    def on_value(value):
        array_data.append(value)

    def _comma(byte_data):
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _comma  # Ignore whitespace

        if byte_data == 0x2c:  # ,
            return json_machine(on_value, _comma)

        if byte_data == 0x5d:  # ]
            return emit(array_data)

        raise Exception("Unexpected byte: 0x" + str(byte_data) + " in array body")

    return _array


def object_machine(emit):
    object_data = 
    key = None

    def _object(byte_data):
        if byte_data == 0x7d:  #
            return emit(object_data)

        return _key(byte_data)

    def _key(byte_data):
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _object  # Ignore whitespace

        if byte_data == 0x22:
            return string_machine(on_key)

        raise Exception("Unexpected byte: 0x" + str(byte_data))

    def on_key(result):
        nonlocal key
        key = result
        return _colon

    def _colon(byte_data):
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _colon  # Ignore whitespace

        if byte_data == 0x3a:  # :
            return json_machine(on_value, _comma)

        raise Exception("Unexpected byte: 0x" + str(byte_data))

    def on_value(value):
        object_data[key] = value

    def _comma(byte_data):
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _comma  # Ignore whitespace

        if byte_data == 0x2c:  # ,
            return _key

        if byte_data == 0x7d:  #
            return emit(object_data)

        raise Exception("Unexpected byte: 0x" + str(byte_data))

    return _object

Python 2 兼容版本

# A streaming byte oriented JSON parser.  Feed it a single byte at a time and
# it will emit complete objects as it comes across them.  Whitespace within and
# between objects is ignored.  This means it can parse newline delimited JSON.
import math


def json_machine(emit, next_func=None):
    def _value(byte_data):
        if not byte_data:
            return

        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _value  # Ignore whitespace

        if byte_data == 0x22:  # "
            return string_machine(on_value)

        if byte_data == 0x2d or (0x30 <= byte_data < 0x40):  # - or 0-9
            return number_machine(byte_data, on_number)

        if byte_data == 0x7b:  #:
            return object_machine(on_value)

        if byte_data == 0x5b:  # [
            return array_machine(on_value)

        if byte_data == 0x74:  # t
            return constant_machine(TRUE, True, on_value)

        if byte_data == 0x66:  # f
            return constant_machine(FALSE, False, on_value)

        if byte_data == 0x6e:  # n
            return constant_machine(NULL, None, on_value)

        if next_func == _value:
            raise Exception("Unexpected 0x" + str(byte_data))

        return next_func(byte_data)

    def on_value(value):
        emit(value)
        return next_func

    def on_number(number, byte):
        emit(number)
        return _value(byte)

    next_func = next_func or _value
    return _value


TRUE = [0x72, 0x75, 0x65]
FALSE = [0x61, 0x6c, 0x73, 0x65]
NULL = [0x75, 0x6c, 0x6c]


def constant_machine(bytes_data, value, emit):
    local_data = "i": 0, "length": len(bytes_data)

    def _constant(byte_data):
        # nonlocal i, length
        if byte_data != bytes_data[local_data["i"]]:
            local_data["i"] += 1
            raise Exception("Unexpected 0x" + byte_data.toString(16))

        local_data["i"] += 1

        if local_data["i"] < local_data["length"]:
            return _constant
        return emit(value)

    return _constant


def string_machine(emit):
    local_data = "string": ""

    def _string(byte_data):
        # nonlocal string

        if byte_data == 0x22:  # "
            return emit(local_data["string"])

        if byte_data == 0x5c:  # \
            return _escaped_string

        if byte_data & 0x80:  # UTF-8 handling
            return utf8_machine(byte_data, on_char_code)

        if byte_data < 0x20:  # ASCII control character
            raise Exception("Unexpected control character: 0x" + byte_data.toString(16))

        local_data["string"] += chr(byte_data)
        return _string

    def _escaped_string(byte_data):
        # nonlocal string

        if byte_data == 0x22 or byte_data == 0x5c or byte_data == 0x2f:  # " \ /
            local_data["string"] += chr(byte_data)
            return _string

        if byte_data == 0x62:  # b
            local_data["string"] += "\b"
            return _string

        if byte_data == 0x66:  # f
            local_data["string"] += "\f"
            return _string

        if byte_data == 0x6e:  # n
            local_data["string"] += "\n"
            return _string

        if byte_data == 0x72:  # r
            local_data["string"] += "\r"
            return _string

        if byte_data == 0x74:  # t
            local_data["string"] += "\t"
            return _string

        if byte_data == 0x75:  # u
            return hex_machine(on_char_code)

    def on_char_code(char_code):
        # nonlocal string
        local_data["string"] += chr(char_code)
        return _string

    return _string


# Nestable state machine for UTF-8 Decoding.
def utf8_machine(byte_data, emit):
    local_data = "left": 0, "num": 0

    def _utf8(byte_data):
        # nonlocal num, left
        if (byte_data & 0xc0) != 0x80:
            raise Exception("Invalid byte in UTF-8 character: 0x" + byte_data.toString(16))

        local_data["left"] -= 1

        local_data["num"] |= (byte_data & 0x3f) << (local_data["left"] * 6)
        if local_data["left"]:
            return _utf8
        return emit(local_data["num"])

    if 0xc0 <= byte_data < 0xe0:  # 2-byte UTF-8 Character
        local_data["left"] = 1
        local_data["num"] = (byte_data & 0x1f) << 6
        return _utf8

    if 0xe0 <= byte_data < 0xf0:  # 3-byte UTF-8 Character
        local_data["left"] = 2
        local_data["num"] = (byte_data & 0xf) << 12
        return _utf8

    if 0xf0 <= byte_data < 0xf8:  # 4-byte UTF-8 Character
        local_data["left"] = 3
        local_data["num"] = (byte_data & 0x07) << 18
        return _utf8

    raise Exception("Invalid byte in UTF-8 string: 0x" + str(byte_data))


# Nestable state machine for hex escaped characters
def hex_machine(emit):
    local_data = "left": 4, "num": 0

    def _hex(byte_data):
        # nonlocal num, left
        i = 0  # Parse the hex byte
        if 0x30 <= byte_data < 0x40:
            i = byte_data - 0x30
        elif 0x61 <= byte_data <= 0x66:
            i = byte_data - 0x57
        elif 0x41 <= byte_data <= 0x46:
            i = byte_data - 0x37
        else:
            raise Exception("Expected hex char in string hex escape")

        local_data["left"] -= 1
        local_data["num"] |= i << (local_data["left"] * 4)

        if local_data["left"]:
            return _hex
        return emit(local_data["num"])

    return _hex


def number_machine(byte_data, emit):
    local_data = "sign": 1, "number": 0, "decimal": 0, "esign": 1, "exponent": 0

    def _mid(byte_data):
        if byte_data == 0x2e:  # .
            return _decimal

        return _later(byte_data)

    def _number(byte_data):
        # nonlocal number
        if 0x30 <= byte_data < 0x40:
            local_data["number"] = local_data["number"] * 10 + (byte_data - 0x30)
            return _number

        return _mid(byte_data)

    def _start(byte_data):
        if byte_data == 0x30:
            return _mid

        if 0x30 < byte_data < 0x40:
            return _number(byte_data)

        raise Exception("Invalid number: 0x" + byte_data.toString(16))

    if byte_data == 0x2d:  # -
        local_data["sign"] = -1
        return _start

    def _decimal(byte_data):
        # nonlocal decimal
        if 0x30 <= byte_data < 0x40:
            local_data["decimal"] = (local_data["decimal"] + byte_data - 0x30) / 10
            return _decimal

        return _later(byte_data)

    def _later(byte_data):
        if byte_data == 0x45 or byte_data == 0x65:  # E e
            return _esign

        return _done(byte_data)

    def _esign(byte_data):
        # nonlocal esign
        if byte_data == 0x2b:  # +
            return _exponent

        if byte_data == 0x2d:  # -
            local_data["esign"] = -1
            return _exponent

        return _exponent(byte_data)

    def _exponent(byte_data):
        # nonlocal exponent
        if 0x30 <= byte_data < 0x40:
            local_data["exponent"] = local_data["exponent"] * 10 + (byte_data - 0x30)
            return _exponent

        return _done(byte_data)

    def _done(byte_data):
        value = local_data["sign"] * (local_data["number"] + local_data["decimal"])
        if local_data["exponent"]:
            value *= math.pow(10, local_data["esign"] * local_data["exponent"])

        return emit(value, byte_data)

    return _start(byte_data)


def array_machine(emit):
    local_data = "array_data": []

    def _array(byte_data):
        if byte_data == 0x5d:  # ]
            return emit(local_data["array_data"])

        return json_machine(on_value, _comma)(byte_data)

    def on_value(value):
        # nonlocal array_data
        local_data["array_data"].append(value)

    def _comma(byte_data):
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _comma  # Ignore whitespace

        if byte_data == 0x2c:  # ,
            return json_machine(on_value, _comma)

        if byte_data == 0x5d:  # ]
            return emit(local_data["array_data"])

        raise Exception("Unexpected byte: 0x" + str(byte_data) + " in array body")

    return _array


def object_machine(emit):
    local_data = "object_data": , "key": ""

    def _object(byte_data):
        # nonlocal object_data, key
        if byte_data == 0x7d:  #
            return emit(local_data["object_data"])

        return _key(byte_data)

    def _key(byte_data):
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _object  # Ignore whitespace

        if byte_data == 0x22:
            return string_machine(on_key)

        raise Exception("Unexpected byte: 0x" + byte_data.toString(16))

    def on_key(result):
        # nonlocal object_data, key
        local_data["key"] = result
        return _colon

    def _colon(byte_data):
        # nonlocal object_data, key
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _colon  # Ignore whitespace

        if byte_data == 0x3a:  # :
            return json_machine(on_value, _comma)

        raise Exception("Unexpected byte: 0x" + str(byte_data))

    def on_value(value):
        # nonlocal object_data, key
        local_data["object_data"][local_data["key"]] = value

    def _comma(byte_data):
        # nonlocal object_data
        if byte_data == 0x09 or byte_data == 0x0a or byte_data == 0x0d or byte_data == 0x20:
            return _comma  # Ignore whitespace

        if byte_data == 0x2c:  # ,
            return _key

        if byte_data == 0x7d:  #
            return emit(local_data["object_data"])

        raise Exception("Unexpected byte: 0x" + str(byte_data))

    return _object

测试它

if __name__ == "__main__":
    test_json = """[1,2,"3"] "name": 
    "tarun" 1 2 
    3 ["name":"a", 
    "data": [1,
    null,2]]
"""
    def found_json(data):
        print(data)

    state = json_machine(found_json)

    for char in test_json:
        state = state(ord(char))

同样的输出是

[1, 2, '3']
'name': 'tarun'
1
2
3
['name': 'a', 'data': [1, None, 2]]

【讨论】:

不错的解决方案!稍后我会仔细研究,但这很有希望。但就其价值而言,我更喜欢仅 Python 3 的版本。对所有局部变量使用 dicts 有点尴尬,我很高兴过去离开 Python 2。 ;) @JeremyBanks,当然我不知道您针对的是哪个版本。现在,我添加了一个仅 Python3 的版本和兼容 Py2 的版本,也为可能仍在 Python 2 上的其他人添加了答案 @JeremyBanks,赏金仅剩 1 天,希望您可以查看并提供有关答案的反馈 看来唯一真正理解问题的人是Tarun。解析的效率取决于输入发生的传递次数。大多数答案使用正则表达式或预先读取一行(这也可能很危险),或者更糟糕的是,解析失败的次数未知。太糟糕了,这不是 Python 的一部分。【参考方案7】:

我想提供一个解决方案。关键思想是“尝试”解码:如果失败,给它更多的feed,否则使用偏移信息准备下一次解码。

但是当前的json模块不能容忍字符串头部的SPACE被解码,所以我必须把它们去掉。

import sys
import json

def iterload(file):
    buffer = ""
    dec = json.JSONDecoder()
    for line in file:         
        buffer = buffer.strip(" \n\r\t") + line.strip(" \n\r\t")
        while(True):
            try:
                r = dec.raw_decode(buffer)
            except:
                break
            yield r[0]
            buffer = buffer[r[1]:].strip(" \n\r\t")


for o in iterload(sys.stdin):
    print("Working on a", type(o),  o)

========================== 我已经测试了几个 txt 文件,它工作正常。 (in1.txt)

"foo": ["bar", "baz"]

 1 2 [
  ]  4
"foo1": ["bar1", "foo2":"A":1, "B":3, "DDD":4]

 5   6

(in2.txt)

"foo"
: ["bar",
  "baz"]
   
1 2 [
] 4 5 6

(in.txt,您的姓名首字母)

"foo": ["bar", "baz"] 1 2 [] 4 5 6

(Benedict 测试用例的输出)

python test.py < in.txt
('Working on a', <type 'list'>, [u'hello'])
('Working on a', <type 'dict'>, u'goodbye': 1)
('Working on a', <type 'int'>, 1)
('Working on a', <type 'int'>, 2)
('Working on a', <type 'dict'>, )
('Working on a', <type 'int'>, 2)
('Working on a', <type 'int'>, 9)
('Working on a', <type 'int'>, 78)
('Working on a', <type 'int'>, 4)
('Working on a', <type 'int'>, 5)
('Working on a', <type 'dict'>, u'animals': [u'dog', u'lots of mice', u'cat'])

【讨论】:

【参考方案8】:

这是我的:

import simplejson as json
from simplejson import JSONDecodeError
class StreamJsonListLoader():
    """
    When you have a big JSON file containint a list, such as

    [
        ...
    ,
    
        ...
    ,
    
        ...
    ,
    ...
    ]

    And it's too big to be practically loaded into memory and parsed by json.load,
    This class comes to the rescue. It lets you lazy-load the large json list.
    """

    def __init__(self, filename_or_stream):
        if type(filename_or_stream) == str:
            self.stream = open(filename_or_stream)
        else:
            self.stream = filename_or_stream

        if not self.stream.read(1) == '[':
            raise NotImplementedError('Only JSON-streams of lists (that start with a [) are supported.')

    def __iter__(self):
        return self

    def next(self):
        read_buffer = self.stream.read(1)
        while True:
            try:
                json_obj = json.loads(read_buffer)

                if not self.stream.read(1) in [',',']']:
                    raise Exception('JSON seems to be malformed: object is not followed by comma (,) or end of list (]).')
                return json_obj
            except JSONDecodeError:
                next_char = self.stream.read(1)
                read_buffer += next_char
                while next_char != '':
                    next_char = self.stream.read(1)
                    if next_char == '':
                        raise StopIteration
                    read_buffer += next_char

【讨论】:

嗨,这非常有用,但你能告诉我如何使用该类来加载 json 文件吗?【参考方案9】:

我使用了@wuilang 的优雅解决方案。简单的方法 -- 读取一个字节,尝试解码,读取一个字节,尝试解码,...... -- 有效,但不幸的是它非常慢。

就我而言,我试图从文件中读取相同对象类型的“漂亮打印”JSON 对象。这使我能够优化方法;我可以逐行读取文件,只有在找到包含完全“”的行时才解码:

def iterload(stream):
    buf = ""
    dec = json.JSONDecoder()
    for line in stream:
        line = line.rstrip()
        buf = buf + line
        if line == "":
            yield dec.raw_decode(buf)
            buf = ""

如果您碰巧使用每行一个紧凑的 JSON 来转义字符串文字中的换行符,那么您可以安全地进一步简化这种方法:

def iterload(stream):
    dec = json.JSONDecoder()
    for line in stream:
        yield dec.raw_decode(line)

显然,这些简单的方法仅适用于非常特定类型的 JSON。但是,如果这些假设成立,则这些解决方案可以正确且快速地运行。

【讨论】:

【参考方案10】:

如果您使用 json.JSONDecoder 实例,您可以使用raw_decode 成员函数。它返回 JSON 值的 python 表示元组和解析停止位置的索引。这使得对剩余的 JSON 值进行切片(或在流对象中查找)变得容易。我对跳过输入中不同 JSON 值之间的空格的额外 while 循环不太满意,但我认为它可以完成工作。

import json

def yield_multiple_value(f):
    '''
    parses multiple JSON values from a file.
    '''
    vals_str = f.read()
    decoder = json.JSONDecoder()
    try:
        nread = 0
        while nread < len(vals_str):
            val, n = decoder.raw_decode(vals_str[nread:])
            nread += n
            # Skip over whitespace because of bug, below.
            while nread < len(vals_str) and vals_str[nread].isspace():
                nread += 1
            yield val
    except json.JSONDecodeError as e:
        pass
    return

下一个版本要短得多,并且会吃掉已经解析的字符串部分。似乎由于某种原因,当字符串中的第一个字符是空格时,第二次调用 json.JSONDecoder.raw_decode() 似乎失败了,这也是我在上面的 whileloop 中跳过空格的原因......

def yield_multiple_value(f):
    '''
    parses multiple JSON values from a file.
    '''
    vals_str = f.read()
    decoder = json.JSONDecoder()
    while vals_str:
        val, n = decoder.raw_decode(vals_str)
        #remove the read characters from the start.
        vals_str = vals_str[n:]
        # remove leading white space because a second call to decoder.raw_decode()
        # fails when the string starts with whitespace, and
        # I don't understand why...
        vals_str = vals_str.lstrip()
        yield val
    return

在有关 json.JSONDecoder 类的文档中,方法 raw_decode https://docs.python.org/3/library/json.html#encoders-and-decoders 包含以下内容:

这可用于从可能具有的字符串中解码 JSON 文档 最后是无关的数据。

而且这些无关数据很容易成为另一个 JSON 值。换句话说,编写该方法时可能会考虑到这个目的。

使用上面的函数输入.txt,我获得了原始问题中给出的示例输出。

【讨论】:

【参考方案11】:

您可以使用 https://pypi.org/project/json-stream-parser/ 来达到这个目的。

import sys
from json_stream_parser import load_iter
for obj in load_iter(sys.stdin):
    print(obj)

输出

'foo': ['bar', 'baz']
1
2
[]
4
5
6

【讨论】:

【参考方案12】:

如果您可以控制数据的生成方式,您可能希望切换到另一种格式,例如 ndjson,它代表 Newline Delimited JSON,并允许流式传输增量 JSON 格式的数据.每行本身都是有效的 JSON。有两个 Python 包可用:ndjson 和 jsonlines。

还有 json-stream 允许您在阅读 JSON 时对其进行处理,从而避免必须预先加载整个 JSON。您应该能够使用它从流中读取 JSON 数据,还可以在任何其他 I/O 操作之前和之后使用相同的流,或者从同一流中读取多个 JSON 对象。

【讨论】:

以上是关于如何从 Python 中的文件/流中懒惰地读取多个 JSON 值?的主要内容,如果未能解决你的问题,请参考以下文章

如何在从 NodeJS 中的多个输入流中读取时写入单个文件

Clojure 懒惰地从文件中读取随机行

如何标记多个资源从属性中懒惰地读取它们?

用于从流中读取多个 protobuf 消息的 python 示例

使用 IPC C# 时如何有效地从管道流中读取

java如何直接从流中读取excel的文件内容?Stream