使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误
Posted
技术标签:
【中文标题】使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误【英文标题】:Loading many jsons into BQ using Apache Beam & Data Flow - json schema errors 【发布时间】:2020-01-22 10:16:37 【问题描述】:我需要使用 Python 中的 Apache Beam 将大量 json 文件加载到 BQ 中。 jsons 有一个非常复杂的模式(具有多个层次结构),更重要的是 - 它并不一致。 有些字段非常罕见,以至于它们只出现在 0.01% 的 json 中。我不能让 BQ 使用 AUTO_DETECT 推断 WriteToBigQuery 方法中的模式,因为它只检查 100 行 - 还远远不够。我尝试使用 python generate-schema 实用程序针对 0.1% 的数据构建模式 - 但同样,有些字段非常罕见,以至于它仍然失败。
没有这样的字段:FIELD_NAME。
我尝试找到一种方法来上传文件而不考虑任何错误,并将错误保存到错误表中,我可以单独处理。但是,我无论如何都没有在 WriteToBigQuery 模块中找到这样做。 我还尝试在将每个 json 发送到管道之前对其进行验证,但速度非常慢。我还尝试根据指定的模式“过滤”json,但这需要遍历所有 json - 速度也很慢,因为每个 json 大小约为 13 KB。
有没有人遇到任何可以提供帮助的东西?奇怪的是,使用 Apache Beam 写入 BQ 时没有使用任何 max_rejected 属性。 任何有关如何处理此问题的想法将不胜感激。
【问题讨论】:
【参考方案1】:一种可能性是“手动”计算架构。如果我们将模式表示为一组元组set([field, type])
- 例如set([('name', str), ('age', int)])
。
class CombineSchemasByDestination(beam.DoFn):
def __init__(self):
self.schemas_per_dest = defaultdict(set)
def process(self, dest_schema):
destination, schemas = dest_schema
for s in schemas:
self.schemas_per_dest[destination].union(s)
def finish_bundle(self):
for dest, schema in self.schemas_per_dest.items():
yield (dest, schema)
schemas_per_dest = (my_data
| beam.Map(lambda row: (get_destination(row),
[get_row_schema(row)]))
| beam.ParDo(CombineSchemasByDestination())
| beam.GroupByKey()
| beam.CombineSchemasByDestination())
my_data | beam.WriteToBigQuery(....
schema=lambda dest, schema_map: schema_map.get(dest),
schema_side_inputs=(beam.pvalue.AsDict(schemas_per_dest,))
我认为这应该有助于解决您的问题。想法?
【讨论】:
如果我理解正确,这会迫使我为每个计算的模式设置不同的目的地。我的意思是所有数据都应该由一个模式表示,因为所有数据都有相同的目的地。当然,我可以稍后合并数据 应该仍然有效。假设get_destination
总是返回相同的值——没关系。这次我们关心的只是提供模式作为辅助输入。【参考方案2】:
我最终做的是根据我不断从 BQ 得到的错误来格式化 JSON。我注意到缺少的字段总是完全嵌套在 JSON 中的 2-3 个字段下,所以我只是将这些字段按原样转换为 JSON——这样我就成功地加载了数据。 尽管如此,在 Apache Beam 中设置一个最大拒绝设置的错误日志表会非常有帮助。
【讨论】:
以上是关于使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS