Dataflow 大型侧输入中的 Apache Beam

Posted

技术标签:

【中文标题】Dataflow 大型侧输入中的 Apache Beam【英文标题】:Apache Beam in Dataflow Large Side Input 【发布时间】:2018-05-11 02:48:32 【问题描述】:

这与this question最相似。

我正在 Dataflow 2.x 中创建一个管道,该管道从 Pubsub 队列获取流输入。传入的每条消息都需要通过来自 Google BigQuery 的非常大的数据集进行流式传输,并在写入数据库之前附加所有相关值(基于键)。

问题在于 BigQuery 的映射数据集非常大 - 任何将其用作辅助输入的尝试都会失败,Dataflow 运行器会抛出错误“java.lang.IllegalArgumentException: ByteString would be too long”。我尝试了以下策略:

1) 侧面输入

如上所述,映射数据(显然)太大而无法执行此操作。如果我在这里错了或者有解决方法,请告诉我,因为这是最简单的解决方案。

2) 键值对映射

在此策略中,我在管道的第一部分读取 BigQuery 数据和 Pubsub 消息数据,然后通过 ParDo 转换运行每个数据,将 PCollections 中的每个值更改为 KeyValue 对。然后,我运行 Merge.Flatten 转换和 GroupByKey 转换以将相关映射数据附加到每条消息。 这里的问题是流数据需要窗口化才能与其他数据合并,因此我还必须将窗口化应用于大型有界 BigQuery 数据。它还要求两个数据集上的窗口策略相同。但是对于有界数据没有任何窗口策略是有意义的,我所做的几次窗口尝试只是在一个窗口中发送所有 BQ 数据,然后再也不发送它。它需要与每条传入的 pubsub 消息一起加入。

3) 在 ParDo (DoFn) 中直接调用 BQ

这似乎是个好主意 - 让每个工作人员声明地图数据的静态实例。如果不存在,则直接调用 BigQuery 以获取它。不幸的是,这每次都会从 BigQuery 引发内部错误(因为在整个消息中只是说“内部错误”)。向 Google 提交支持请求后,他们告诉我,基本上,“你不能那样做”。

看来这个任务并不真正适合“令人尴尬的可并行化”模型,所以我在这里找错树了吗?

编辑:

即使在数据流中使用高内存机器并尝试将侧面输入放入地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long

这是我正在使用的代码示例(伪):

    Pipeline pipeline = Pipeline.create(options);

    PCollectionView<Map<String, TableRow>> mapData = pipeline
            .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
            .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) 
            .apply(View.asMap());

    PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
            .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));

    messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() 
        @ProcessElement
        public void processElement(ProcessContext c) 
            JSONObject data = new JSONObject(new String(c.element().getPayload()));
            String key = getKeyFromData(data);
            TableRow sideInputData = c.sideInput(mapData).get(key);
            if (sideInputData != null) 
                LOG.info("holyWowItWOrked");
                c.output(new TableRow());
             else 
                LOG.info("noSideInputDataHere");
            
        
    ).withSideInputs(mapData));

管道在从ParDo 中记录任何内容之前引发异常并失败。

堆栈跟踪:

java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
        com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
        com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

【问题讨论】:

您使用的是哪种 SideInput 视图?你能分享一个你是如何使用它的例子吗? 您是否考虑过使用有状态 ParDo?如果您在全局窗口中处理,这将允许您将来自 BigQuery 的值存储在状态中,并使用它来处理从另一个流到达的每个值。您需要使用您提到的相同 Merge.Flatten 方法,因为 Stateful DoFn 仅适用于单个输入集合。 对于您首先评论@BenChambers 侧面输入它是一个大映射表。每行都有一个键字符串,它可能与传入的 Pubsub 消息中的数据相匹配。映射数据集每周更改,但目前约为 4000 万行(约 10 GB),并且在一周内是完全静态且不变的。我现在正在查看有状态的 pardo 文档,看看是否可行...... 对于侧输入,您使用的是View.asSingletonView.asMap等吗?例如 -- View.asSingleton 将采用单个元素的 PCollection 并使其对 ParDo 可见。 View.asMap 将采用 PCollection&lt;KV&lt;K, V&gt;&gt; 并将其作为 Map&lt;K, V&gt; 提供,但只会读取您需要的密钥。 有这方面的消息吗?面临同样的问题 【参考方案1】:

查看本文https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2中名为“模式:流模式大型查找表”的部分(这可能是唯一可行的解​​决方案,因为您的侧输入不适合内存):

说明:

大型(以 GB 为单位)查找表必须准确,并且经常更改或 不适合内存。

示例:

您有来自零售商的销售点信息,需要 将产品项目的名称与数据记录相关联 包含产品ID。有成千上万的项目 存储在一个可以不断变化的外部数据库中。还有,所有 必须使用正确的值处理元素。

解决方案:

使用“Calling external services for data enrichment”模式 但不是调用微服务,而是调用读取优化的 NoSQL 数据库(例如 Cloud Datastore 或 Cloud Bigtable)。

对于每个要查找的值,使用 KV 创建一个键值对 实用程序类。执行 GroupByKey 以创建相同密钥类型的批次 对数据库进行调用。在 DoFn 中,调用 该键的数据库,然后将值应用于所有值 遍历可迭代对象。与客户一起遵循最佳实践 如“为数据调用外部服务”中所述的实例化 丰富”。

其他相关模式在本文中描述:https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1:

模式:缓慢变化的查找缓存 模式:调用外部服务来丰富数据

【讨论】:

以上是关于Dataflow 大型侧输入中的 Apache Beam的主要内容,如果未能解决你的问题,请参考以下文章

Dataflow 中的自定义 Apache Beam Python 版本

防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈

我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?

Python Apache Beam 侧输入断言错误

Apache Beam/Google Dataflow - 将数据从 Google Datastore 导出到 Cloud Storage 中的文件

Eclipse 上带有 Dataflow Runner 的 Apache Beam MinimalWordcount 示例