使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误

Posted

技术标签:

【中文标题】使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误【英文标题】:Loading many jsons into BQ using Apache Beam & Data Flow - json schema errors 【发布时间】:2020-01-22 10:16:37 【问题描述】:

我需要使用 Python 中的 Apache Beam 将大量 json 文件加载到 BQ 中。 jsons 有一个非常复杂的模式(具有多个层次结构),更重要的是 - 它并不一致。 有些字段非常罕见,以至于它们只出现在 0.01% 的 json 中。我不能让 BQ 使用 AUTO_DETECT 推断 WriteToBigQuery 方法中的模式,因为它只检查 100 行 - 还远远不够。我尝试使用 python generate-schema 实用程序针对 0.1% 的数据构建模式 - 但同样,有些字段非常罕见,以至于它仍然失败。

没有这样的字段:FIELD_NAME。

我尝试找到一种方法来上传文件而不考虑任何错误,并将错误保存到错误表中,我可以单独处理。但是,我无论如何都没有在 WriteToBigQuery 模块中找到这样做。 我还尝试在将每个 json 发送到管道之前对其进行验证,但速度非常慢。我还尝试根据指定的模式“过滤”json,但这需要遍历所有 json - 速度也很慢,因为每个 json 大小约为 13 KB。

有没有人遇到任何可以提供帮助的东西?奇怪的是,使用 Apache Beam 写入 BQ 时没有使用任何 max_rejected 属性。 任何有关如何处理此问题的想法将不胜感激。

【问题讨论】:

【参考方案1】:

一种可能性是“手动”计算架构。如果我们将模式表示为一组元组set([field, type]) - 例如set([('name', str), ('age', int)])

class CombineSchemasByDestination(beam.DoFn):
  def __init__(self):
    self.schemas_per_dest = defaultdict(set)

  def process(self, dest_schema):
    destination, schemas = dest_schema
    for s in schemas:
      self.schemas_per_dest[destination].union(s)

  def finish_bundle(self):
    for dest, schema in self.schemas_per_dest.items():
      yield (dest, schema)

schemas_per_dest = (my_data 
                    | beam.Map(lambda row: (get_destination(row), 
                                            [get_row_schema(row)]))
                    | beam.ParDo(CombineSchemasByDestination())
                    | beam.GroupByKey()
                    | beam.CombineSchemasByDestination())

my_data | beam.WriteToBigQuery(....
  schema=lambda dest, schema_map: schema_map.get(dest),
  schema_side_inputs=(beam.pvalue.AsDict(schemas_per_dest,))

我认为这应该有助于解决您的问题。想法?

【讨论】:

如果我理解正确,这会迫使我为每个计算的模式设置不同的目的地。我的意思是所有数据都应该由一个模式表示,因为所有数据都有相同的目的地。当然,我可以稍后合并数据 应该仍然有效。假设get_destination 总是返回相同的值——没关系。这次我们关心的只是提供模式作为辅助输入。【参考方案2】:

我最终做的是根据我不断从 BQ 得到的错误来格式化 JSON。我注意到缺少的字段总是完全嵌套在 JSON 中的 2-3 个字段下,所以我只是将这些字段按原样转换为 JSON——这样我就成功地加载了数据。 尽管如此,在 Apache Beam 中设置一个最大拒绝设置的错误日志表会非常有帮助。

【讨论】:

以上是关于使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam 数据流 BigQuery

使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

Apache Beam,批处理和流式处理的融合!

数据流管道上的 Apache Beam StatusRuntimeException

Apache Beam - 跳过管道步骤

结合 BigQuery 和 Pub/Sub Apache Beam