如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道

Posted

技术标签:

【中文标题】如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道【英文标题】:How to create a Dataflow pipeline from Pub/Sub to GCS in Python 【发布时间】:2019-07-11 18:05:40 【问题描述】:

我想使用 Dataflow 将数据从 Pub/Sub 移动到 GCS。 所以基本上我希望 Dataflow 在固定的时间内(例如 15 分钟)积累一些消息,然后在这段时间过去后将这些数据作为文本文件写入 GCS。

我的最终目标是创建一个自定义管道,所以“Pub/Sub to Cloud Storage”模板对我来说是不够的,而且我对 Java 完全不了解,这让我开始在 Python 中进行调整。

这是我目前所拥有的(Apache Beam Python SDK 2.10.0):

import apache_beam as beam

TOPIC_PATH="projects/<my-project>/topics/<my-topic>"

def CombineFn(e):
    return "\n".join(e)

o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
       | "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
       | "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
       | "Output" >> beam.io.WriteToText("<GCS path or local path>"))

res = p.run()
res.wait_until_finish()

我在本地环境中运行这个程序没有问题。

python main.py

它在本地运行,但从指定的 Pub/Sub 主题读取,并在每次 30 秒后写入指定的 GCS 路径,如预期的那样。

但是,现在的问题是,当我在 Google Cloud Platform 即 Cloud Dataflow 上运行它时,它会不断发出神秘的异常。

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle
    bundle_processor.process_bundle(instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish
    def finish(self):
  File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish
    with self.scoped_finish_state:
  File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish
    self.dofn_runner.finish()
  File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish
    self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
  File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    bundle_method()
  File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    def invoke_finish_bundle(self):
  File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    self.output_processor.finish_bundle_outputs(
  File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
    self.update_counters_start(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
    self.opcounter.update_from(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
    self.do_sample(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
  File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    def get_estimated_size_and_observables(self, value, nested=False):
  File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    self._windows_coder.estimate_size(value.windows, nested=True))
  File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
    self.get_estimated_size_and_observables(value))
  File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
    self._elem_coder.get_estimated_size_and_observables(
  File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    return self.estimate_size(value, nested), []
  File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
    typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090']

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:280)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:130)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle
    bundle_processor.process_bundle(instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish
    def finish(self):
  File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish
    with self.scoped_finish_state:
  File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish
    self.dofn_runner.finish()
  File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish
    self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
  File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    bundle_method()
  File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    def invoke_finish_bundle(self):
  File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    self.output_processor.finish_bundle_outputs(
  File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive
    self.update_counters_start(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
    self.opcounter.update_from(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
    self.do_sample(windowed_value)
  File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
    self.coder_impl.get_estimated_size_and_observables(windowed_value))
  File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    def get_estimated_size_and_observables(self, value, nested=False):
  File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    self._windows_coder.estimate_size(value.windows, nested=True))
  File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
    self.get_estimated_size_and_observables(value))
  File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
    self._elem_coder.get_estimated_size_and_observables(
  File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    return self.estimate_size(value, nested), []
  File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
    typed_value = value
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090']

        org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
        org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

每次尝试写入 GCS 时,都会以非阻塞方式显示上述异常。 这导致我遇到一种情况,当它尝试输出时,肯定会生成一个新的文本文件,但文本内容始终与第一个窗口输出相同。这显然是不受欢迎的。

异常在堆栈跟踪中嵌套得如此之深,以至于很难猜测根本原因是什么,我不知道为什么它在 DirectRunner 上运行良好,但在 DataflowRunner 上却完全没有。 似乎它在管道的某个地方说,全局窗口值被转换为非全局窗口值,尽管我在管道的第二阶段使用了非全局窗口变换。添加自定义触发器没有帮助。

【问题讨论】:

这似乎来自谷歌方面。 耸耸肩 【参考方案1】:

我在试图弄清楚这个问题时遇到了很多麻烦

TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090']

梁 2.9.0 之后的 WriteToText 似乎有些问题(我使用的是梁 2.14.0,python 3.7)

| "Output" >> beam.io.WriteToText("<GCS path or local path>"))

它对我有用的是移除了管道部分并添加了一个自定义 DoFn:

class WriteToGCS(beam.DoFn):
    def __init__(self):
        self.outdir = "gs://<project>/<folder>/<file>"

    def process(self, element):
        from apache_beam.io.filesystems import FileSystems # needed here
        import json
        writer = FileSystems.create(self.outdir + '.csv', 'text/plain')
        writer.write(element)
        writer.close()


并在管道中添加:

| 'Save file' >> beam.ParDo(WriteToGCS())

【讨论】:

【参考方案2】:

我遇到了同样的错误,找到了解决方法,但没有解决方法:

TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'test-file-out/Write/WriteImpl/WriteBundles']

使用DirectRunner 在本地运行,使用DataflowRunner 在数据流上运行。

恢复为 apache-beam[gcp]==2.9.0 允许我的管道按预期运行。

【讨论】:

感谢您的建议。恢复到 2.9.0,问题已经消失,但它在 Stackdriver 上发出了非常多与 grpc 相关的(非阻塞)错误,而且目前似乎不稳定。现在我切换到 Java 并开始学习它,产生了预期的结果。 Streaming-to-streaming 看起来很稳定,但现在不太可能很好地支持 Streaming-to-batch 样式。 根据此页面 (beam.apache.org/documentation/sdks/python-streaming),Python 中的流式支持是实验性的。我建议在 Jira 中创建一个问题:issues.apache.org/jira/projects/BEAM/issues。 您是如何恢复使用 apache-beam[gcp]==2.9.0 的? @IoTuser 只需执行 pip uninstall 然后 pip install apache-beam[gcp]==2.9.0 我在使用 WriteToBigQuery 连接器时遇到了梁 2.25.0 的问题。有人知道如何解决这个问题吗?

以上是关于如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道的主要内容,如果未能解决你的问题,请参考以下文章

如何在 EF 中创建从两个表到一个源的一对一映射?

我如何在表格中创建从一行到另一个仪表板的链接

交互式地图,如何在 symfony 中创建从地图标记的弹出窗口到模板的链接

如何在 Android 11 中创建从图库中选择图像的意图?

如何在Django中创建从Abstract User继承的多个用户配置文件?

使用 Python 将 Pub/Sub 消息加载到 BigQuery