将 MutationGroup 流式传输到 Spanner

Posted

技术标签:

【中文标题】将 MutationGroup 流式传输到 Spanner【英文标题】:Streaming MutationGroups into Spanner 【发布时间】:2018-12-31 00:32:15 【问题描述】:

我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到扳手中。 目标是每 10 秒编写一次新的 MuationGroup,因为我们将使用 spanner 来查询近期 KPI。

当我不使用任何窗口时,我收到以下错误:

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
    at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1585)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1470)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:868)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:823)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:52)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:20)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:388)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:372)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline.main(EntityBuilderPipeline.java:122)
:entityBuilder FAILED

由于上述错误,我假设输入集合需要窗口化和触发,因为 SpannerIO 使用 GroupByKey(这也是我的用例所需要的):

        ...
        .apply("1-minute windows", Window.<MutationGroup>into(FixedWindows.of(Duration.standardMinutes(1)))
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))
        .apply(SpannerIO.write()
                    .withProjectId(entityConfig.getSpannerProject())
                    .withInstanceId(entityConfig.getSpannerInstance())
                    .withDatabaseId(entityConfig.getSpannerDb())
                    .grouped());

当我这样做时,我在运行时遇到以下异常:

java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
        org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:631)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
        com.google.cloud.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:182)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:71)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
        org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
        org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
        org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:145)

经过进一步调查,这似乎是由于 SpannerIO 中的 .apply(Wait.on(input)):它有一个全局侧面输入,似乎不适用于我的固定窗口,正如 Wait.java 的文档所述:

If signal is globally windowed, main input must also be. This typically would be useful
 *       only in a batch pipeline, because the global window of an infinite PCollection never
 *       closes, so the wait signal will never be ready.

作为临时解决方法,我尝试了以下方法:

添加一个带触发器而不是固定窗口的 GlobalWindow:

    .apply("globalwindow", Window.<MutationGroup>into(new GlobalWindows())
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))

这会导致只有在我耗尽管道时才会写入扳手。我的印象是Wait.on() 信号仅在全局窗口关闭时触发,并且不适用于触发器。

在 SpannerIO 中禁用 .apply(Wait.on(input))

这会导致管道卡在视图创建上 在此 SO 帖子中进行了描述: SpannerIO Dataflow 2.3.0 stuck in CreateDataflowView.

当我检查工作人员日志以获取线索时,我确实收到了以下警告:

logger:  "org.apache.beam.sdk.coders.SerializableCoder"
message:  "Can't verify serialized elements of type SpannerSchema have well defined equals method. This may produce incorrect results on some PipelineRunner
logger:  "org.apache.beam.sdk.coders.SerializableCoder"   
message:  "Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner"

请注意,一切都适用于 DirectRunner,我正在尝试使用 DataflowRunner。

对于我可以尝试使其运行的事情,是否有人有任何其他建议?我很难想象我是唯一一个尝试将 MutationGroups 流式传输到 spanner 中的人。

提前致谢!

【问题讨论】:

【参考方案1】:

目前,Beam Streaming 不支持 SpannerIO 连接器。请关注this Pull Request,它添加了对 spanner IO 连接器的流式支持。

【讨论】:

以上是关于将 MutationGroup 流式传输到 Spanner的主要内容,如果未能解决你的问题,请参考以下文章

将 JSON 流式传输到 Bigquery

将音频流式传输到本地网络

将音频流式传输到多个 Web 浏览器

使用 Java 将 JSON 流式传输到 BigQuery

如何将本地文件流式传输到 Chromecast?

如何将视频从浏览器提交/流式传输到服务器?