如何在 Apache Beam 中解析 2 个 json 文件
Posted
技术标签:
【中文标题】如何在 Apache Beam 中解析 2 个 json 文件【英文标题】:How to parse 2 json files in Apache beam 【发布时间】:2021-06-18 08:30:54 【问题描述】:我有 2 个 json 配置文件要读取并希望将值分配给变量。我正在使用 apache Beam 创建数据流作业,但无法解析这些文件并将值分配给变量。
config1.json - "bucket_name": "mybucket"
config2.json - "dataset_name": "mydataset"
这是管道语句 ---- 我首先尝试使用一个 JSON 文件,但即使这样也不起作用
with beam.Pipeline(options=pipeline_options) as pipeline:
steps = (pipeline
| "Getdata" >> beam.io.ReadFromText(custom_options.configfile)
| "CUSTOM JSON PARSE" >> beam.ParDo(custom_json_parser(custom_options.configfile))
| "write to GCS" >> beam.io.WriteToText('gs://mynewbucket/outputfile.txt')
)
result = pipeline.run()
result.wait_until_finish()
我还尝试创建一个函数来解析至少一个文件。这是我创建的示例方法,但它不起作用。
class custom_json_parser(beam.DoFn):
import apache_beam as beam
from apache_beam.io.gcp import gcsio
import logging
def __init__(self, configfile):
self.configfile = configfile
def process(self, configfile):
logging.info("JSON PARSING STARTED")
with beam.io.gcp.gcsio.GcsIO().open(self.configfile, 'r') as f:
for line in f:
data = json.loads(line)
bucket = data.get('bucket_name')
dataset = data.get('dataset_name') ```
Can someone please suggest the best method to resolve this issue in apache beam?
Thanks in Advance
【问题讨论】:
管道是否多次读取这些文件?或者您可以在开始时只读取一次并使用这些参数执行整个管道吗? 我只想在开始时读取它们并执行整个管道 【参考方案1】:如果您只需要在管道中读取一次文件,请不要在管道中读取它们,而是在运行之前读取它们。
从 GCS 读取文件 解析文件并将有用的内容放入管道选项映射中 运行您的管道并使用选项中的数据编辑 1
您可以在管道之前使用这段代码加载文件并读取它。简单的 Python,标准 GCS 库。
from google.cloud import storage
import json
client = storage.Client()
bucket = client.get_bucket('your-bucket')
blob = bucket.get_blob("name.json")
json_data = blob.download_as_string().decode('UTF-8')
print(json_data) # print -> "name": "works!!"
print(json.loads(json_data)["name"]) # print -> works!!
【讨论】:
我只面临第二点的问题 - 解析文件并将有用的内容放入管道选项图中。我无法解析它们【参考方案2】:你可以试试下面的代码sn-p:-
解析文件的功能
class custom_json_parser(beam.DoFn):
def process(self, element):
logging.info(element)
data = json.loads(element)
bucket = data.get('bucket_name')
dataset = data.get('dataset_name')
return ["bucket": bucket , "dataset": dataset ]
通过管道你可以调用函数
with beam.Pipeline(options=pipeline_options) as pipeline:
steps = (pipeline
| "Getdata" >> beam.io.ReadFromText(custom_options.configfile)
| "CUSTOM JSON PARSE" >> beam.ParDo(custom_json_parser())
| "write to GCS" >> beam.io.WriteToText('gs://mynewbucket/outputfile.txt')
)
result = pipeline.run()
result.wait_until_finish()
它会起作用的。
【讨论】:
我在运行上述代码第 64 行时遇到以下错误,正在处理文件“/usr/local/lib/python3.7/json/__init__.py”,第 348 行,加载返回 _default_decoder .decode(s) 文件“/usr/local/lib/python3.7/json/decoder.py”,第 340 行,在 decode 中引发 JSONDecodeError("Extra data", s, end) RuntimeError: json.decoder.JSONDecodeError:额外数据:第 1 行第 16 列(字符 15)[运行“CUSTOM JSON PARSE”时] 在这里您可以看到一个工作示例 ***.com/questions/65850110/… 只需将 'ReadFromPubSub' 替换为 'ReadFromText' 尝试适合您的 sn-p。以上是关于如何在 Apache Beam 中解析 2 个 json 文件的主要内容,如果未能解决你的问题,请参考以下文章
如何从 PCollection Apache Beam Python 创建 N 个元素组
如何通过 Apache Beam 将文件上传到 Azure blob 存储?
apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python