结合 BigQuery 和 Pub/Sub Apache Beam

Posted

技术标签:

【中文标题】结合 BigQuery 和 Pub/Sub Apache Beam【英文标题】:Combine BigQuery and Pub/Sub Apache Beam 【发布时间】:2018-09-05 10:52:31 【问题描述】:

我正在尝试使用 DataFlowRunner 执行以下操作:

    从已分区的 BigQuery 表中读取数据(大量数据,但只获取最近两天的数据) 从 Pub/Sub 订阅中读取 JSON 通过一个公共键加入两个集合 将连接的集合插入到另一个 BigQuery 表中

我对 Apache Beam 非常陌生,所以我不能 100% 确定我想做的事情是否可行。

当我尝试加入两行时,我的问题出现了,在使用 CoGroupByKey 转换后,数据似乎永远不会同时到达,尽管窗口策略是相同的(30 秒固定窗口,窗口结束触发和丢弃触发窗格)。

我的一些相关代码块:

    /* Getting the data and windowing */
    PCollection<PubsubMessage> pubsub = p.apply("ReadPubSub sub",PubsubIO.readMessages().fromSubscription(SUB_ALIM_REC));

    String query = /* The query */
    PCollection<TableRow> bqData = p.apply("Reading BQ",BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
            .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardSeconds(30)))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes());        

    PCollection<TableRow> tableRow = pubsub.apply(Window.<PubsubMessage>into(FixedWindows.of(Duration.standardSeconds(120)))
            .triggering(AfterWatermark.pastEndOfWindow())
            .withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes())
            .apply("JSON to TableRow",ParDo.of(new ToTableRow()));



    /*  Join code   */  
    PCollection<TableRow> finalResultCollection =
                kvpCollection.apply("Join TableRows", ParDo.of(
                        new DoFn<KV<Long, CoGbkResult>,  TableRow>() 
                            private static final long serialVersionUID = 6627878974147676533L;

                    @ProcessElement
                    public void processElement(ProcessContext c) 
                        KV<Long, CoGbkResult> e = c.element();
                        Long idPaquete = e.getKey();
                        Iterable<TableRow> it1 = e.getValue().getAll(packTag);
                        Iterable<TableRow> it2 = e.getValue().getAll(alimTag);
                        for(TableRow t1 : itPaq) 
                            for (TableRow t2 : itAlimRec) 
                                TableRow joinedRow = new TableRow();
                                /* set the required fields from each collection */
                                c.output(joinedRow);
                            

                        
                    
                ));

在过去的两天里我也收到了这个错误:

java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2808d228
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: BigQuery source must be split before being read
        org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.createReader(BigQuerySourceBase.java:153)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:463)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:442)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:293)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:286)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

非常感谢您的指导,以了解我正在尝试做的事情是否可行,或者是否有其他方法可以解决这种情况。

【问题讨论】:

我不确定您的 BigQuery 结果是否需要一个定时窗口,是吗?也许你应该看看 Side Inputs beam.apache.org/documentation/programming-guide/#side-inputs 【参考方案1】:

我也尝试过做同样的事情。据我从this question 了解到,目前这是不可能的。我尝试使用 PeriodicImpulse 自己做,遵循this example(虽然,我不想要侧面输入)。我写了类似以下代码,我得到了ValueError: BigQuery source is not currently available for use in streaming pipelines.

segments = p | 'triggering segments fetch' >> PeriodicImpulse() \
       | "loading segments" >> beam.io.Read(beam.io.BigQuerySource(
            use_standard_sql=True,
        query=f'''
            SELECT 
                id,
                segment
            FROM `some_table`''')) \
       | "windowing info" >> beam.WindowInto(window.FixedWindows(5))

info = p | "reading info" >> beam.io.ReadFromPubSub(
    topic='my_test_topic') \
       | "parsing info" >> beam.Map(message_to_json) \
       | "mapping info" >> beam.Map(lambda x: (x['id'], x['username'])) \
       | "windowing info" >> beam.WindowInto(window.FixedWindows(5))

results = ('segments': segments, 'info': info | beam.CoGroupByKey()) | "printing" >> beam.Map(print_out)

我认为目前最好的解决方案是使用像 Datastore 这样的外部存储。我在另一个生产管道中使用了这种方法,效果很好。你可以找到解释here。

【讨论】:

以上是关于结合 BigQuery 和 Pub/Sub Apache Beam的主要内容,如果未能解决你的问题,请参考以下文章

将 BigQuery 表流式传输到 Google Pub/Sub

数据流模板“Pub/Sub Avro to Bigquery”无法解码

Google Cloud Pub/Sub - Cloud Function & Bigquery - 数据插入未发生

如何在 BigQuery 插入错误时崩溃/停止 DataFlow Pub/Sub 摄取

数据流模板 Cloud Pub/Sub 主题与订阅 BigQuery

有没有办法让 Pub/Sub -> Dataflow -> BigQuery 模板处理每条消息的多个记录?