如何在 Apache Beam 中解析 2 个 json 文件

Posted

技术标签:

【中文标题】如何在 Apache Beam 中解析 2 个 json 文件【英文标题】:How to parse 2 json files in Apache beam 【发布时间】:2021-06-18 08:30:54 【问题描述】:

我有 2 个 json 配置文件要读取并希望将值分配给变量。我正在使用 apache Beam 创建数据流作业,但无法解析这些文件并将值分配给变量。

config1.json - "bucket_name": "mybucket" config2.json - "dataset_name": "mydataset"

这是管道语句 ---- 我首先尝试使用一个 JSON 文件,但即使这样也不起作用

with beam.Pipeline(options=pipeline_options) as pipeline:
        steps = (pipeline
                | "Getdata" >> beam.io.ReadFromText(custom_options.configfile)
                | "CUSTOM JSON PARSE" >> beam.ParDo(custom_json_parser(custom_options.configfile))
                | "write to GCS" >> beam.io.WriteToText('gs://mynewbucket/outputfile.txt')
            )
    result = pipeline.run()
    result.wait_until_finish()

我还尝试创建一个函数来解析至少一个文件。这是我创建的示例方法,但它不起作用。

class custom_json_parser(beam.DoFn):
    import apache_beam as beam
    from apache_beam.io.gcp import gcsio
    import logging
    def __init__(self, configfile):
        self.configfile = configfile
    def process(self, configfile):
        logging.info("JSON PARSING STARTED")
        with beam.io.gcp.gcsio.GcsIO().open(self.configfile, 'r') as f:
            for line in f:
                data = json.loads(line)
                bucket = data.get('bucket_name')
        dataset = data.get('dataset_name') ```
        
  Can someone please suggest the best method to resolve this issue in apache beam?

Thanks in Advance          

【问题讨论】:

管道是否多次读取这些文件?或者您可以在开始时只读取一次并使用这些参数执行整个管道吗? 我只想在开始时读取它们并执行整个管道 【参考方案1】:

如果您只需要在管道中读取一次文件,请不要在管道中读取它们,而是在运行之前读取它们。

从 GCS 读取文件 解析文件并将有用的内容放入管道选项映射中 运行您的管道并使用选项中的数据

编辑 1

您可以在管道之前使用这段代码加载文件并读取它。简单的 Python,标准 GCS 库。

    from google.cloud import storage
    import json

    client = storage.Client()
    bucket = client.get_bucket('your-bucket')
    blob = bucket.get_blob("name.json")

    json_data = blob.download_as_string().decode('UTF-8')
    print(json_data) # print -> "name": "works!!"
    print(json.loads(json_data)["name"]) # print -> works!!

【讨论】:

我只面临第二点的问题 - 解析文件并将有用的内容放入管道选项图中。我无法解析它们【参考方案2】:

你可以试试下面的代码sn-p:-

解析文件的功能

class custom_json_parser(beam.DoFn):
    def process(self, element):
        logging.info(element)
        data = json.loads(element)        
        bucket = data.get('bucket_name')
        dataset = data.get('dataset_name')
        return ["bucket": bucket , "dataset": dataset ]

通过管道你可以调用函数

with beam.Pipeline(options=pipeline_options) as pipeline:
        steps = (pipeline
                | "Getdata" >> beam.io.ReadFromText(custom_options.configfile)
                | "CUSTOM JSON PARSE" >> beam.ParDo(custom_json_parser())
                | "write to GCS" >> beam.io.WriteToText('gs://mynewbucket/outputfile.txt')
            )
    result = pipeline.run()
    result.wait_until_finish()

它会起作用的。

【讨论】:

我在运行上述代码第 64 行时遇到以下错误,正在处理文件“/usr/local/lib/python3.7/json/__init__.py”,第 348 行,加载返回 _default_decoder .decode(s) 文件“/usr/local/lib/python3.7/json/decoder.py”,第 340 行,在 decode 中引发 JSONDecodeError("Extra data", s, end) RuntimeError: json.decoder.JSONDecodeError:额外数据:第 1 行第 16 列(字符 15)[运行“CUSTOM JSON PARSE”时] 在这里您可以看到一个工作示例 ***.com/questions/65850110/… 只需将 'ReadFromPubSub' 替换为 'ReadFromText' 尝试适合您的 sn-p。

以上是关于如何在 Apache Beam 中解析 2 个 json 文件的主要内容,如果未能解决你的问题,请参考以下文章

如何从 PCollection Apache Beam Python 创建 N 个元素组

如何通过 Apache Beam 将文件上传到 Azure blob 存储?

apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python

警告:apache_beam.options.pipeline_options:丢弃不可解析的参数

apache beam入门之 窗口水位线和超时数据概念

Apache Beam:ReadFromText 与 ReadAllFromText