数据流管道上的 Apache Beam StatusRuntimeException

Posted

技术标签:

【中文标题】数据流管道上的 Apache Beam StatusRuntimeException【英文标题】:Apache Beam StatusRuntimeException on Dataflow pipeline 【发布时间】:2021-06-27 19:49:19 【问题描述】:

我正在使用 apache_beam==2.24.0 编写用 python2.7 编写的数据流管道。管道的工作是使用 beam 的 ReadFromPubSub 批量消费来自订阅的 pubsub 消息,对消息进行一些处理,然后将结果数据持久化到两个不同的 bigquery 表中。我正在消耗大量数据。 Google-cloud-pubsub 版本是 1.7.0 。运行管道后一切正常,但几个小时后我开始收到异常:

org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: CANCELLED: call has been cancelled

在 gcp 数据流控制台上,日志显示此错误,但工作本身似乎工作正常。它使用订阅中的数据并将其写入 bigquery。 CANCELLED: call 在这里被提及,为什么我会收到此错误?我该如何解决这个问题?

完整的堆栈跟踪:

Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
    org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
    org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.flush(BeamFnDataSizeBasedBufferingOutboundObserver.java:100)
    org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.shouldWait(RemoteGrpcPortWriteOperation.java:124)
    org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:167)
    org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
    org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:57)
    org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowReshuffleFn.processElement(StreamingGroupAlsoByWindowReshuffleFn.java:39)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
    org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
    org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:748)

【问题讨论】:

流式传输管道无限期地重试失败的元素。只要系统延迟和数据新鲜度正常,就不用担心低级错误。这似乎是一些常见的 grpc 错误:***.com/questions/57110811/…。您提到使用 Python SDK,堆栈跟踪是在 Java 中的。您是否使用了一些 xlang 功能? 我只是在使用 apache-beam 的 python sdk。 sdk 可能在内部使用了一些 xlang 功能。 这些错误应该不会造成太大的麻烦。另外,您能否尝试使用 Python3 和更新版本的 Beam?现在可能存在一些已修复的 grpc 问题。 该项目仅使用 python 的 2.7 版本,而 beam==2,24 是 python2.7 的最后支持版本。虽然管道正在使用“Dataflow”运行器,但 bash 进程(用于运行 python 管道)是否可能会导致问题?昨天我连续监控了 10 个小时的管道并没有收到错误,但通常在启动管道后 3 小时内出现错误。 bash 脚本不应导致此问题,因为它在 Dataflow 上运行。这里报告了一个类似的问题,但标记为不是错误:issues.apache.org/jira/browse/BEAM-9630。这似乎不是问题,您可以忽略它。我还在该票中添加了一条评论。 【参考方案1】:

我工作的客户可以选择为 Google Cloud 支持提出请求票。来自 Google Cloud 支持的确切回复:

您发现的这个错误是无害的。数据流是一个大规模并行数据处理平台,当有自动缩放事件可以移动工作虚拟机时。当虚拟机关闭时,grpc 通道在运行程序进程之前关闭,正在处理的工作项将在另一个新启动的运行程序上重试。这些错误可以忽略。

【讨论】:

以上是关于数据流管道上的 Apache Beam StatusRuntimeException的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam,BigQueryIO.WriteTableRows() 上的 NoSuchMethodError?

使用 Python 处理 Apache Beam 管道中的异常

Apache Beam - 跳过管道步骤

请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam

Apache Beam实战指南 | 大数据管道(pipeline)设计及实践

Apache Beam 管道中的连续状态