尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错

Posted

技术标签:

【中文标题】尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错【英文标题】:Getting an Error while trying to write data from Google PubSub to GCS using Apache Beam in Dataflow 【发布时间】:2018-12-30 20:06:27 【问题描述】:

我已经编写了以下代码来从 pub sub 写入流数据并将其写入谷歌云存储。

def run():
argv = [
  '--project=0'.format(PROJECT),
  '--job_name=mypubsubsample40',
  # '--save_main_session',
   '--staging_location=gs://abc/staging/',
   '--temp_location=gs://abc/staging/',
  '--runner=DataflowRunner',
  '--streaming'
]


p = beam.Pipeline(argv=argv)
lines = p | 'read_stream' >> 
beam.io.ReadStringsFromPubSub(subscription='projects/myprojectid 
209306/subscriptions/mysub1',id_label="MESSAGE_ID") | 'write to file' >> 
beam.io.WriteToText('gs://bagdfs2/myout')`

当我执行与“DirectRunner”相同的程序时,文件会在 GCS 中创建,但当我使用“DataFlowRunner”执行程序时,它不起作用。

此外,当管道在云数据流中运行时,大约一分钟后,我在日志中收到以下错误:

    java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
    self.data_channel_factory)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
    for tag, pcoll_id
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
    factory, transform_id, transform_proto, consumers, serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
    c = base64.b64decode(encoded)
  File "/usr/lib/python2.7/base64.py", line 78, in b64decode
    raise TypeError(msg)
TypeError: Incorrect padding

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:55)
        com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
    self.data_channel_factory)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
    for tag, pcoll_id
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
    factory, transform_id, transform_proto, consumers, serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
    c = base64.b64decode(encoded)
  File "/usr/lib/python2.7/base64.py", line 78, in b64decode
    raise TypeError(msg)
TypeError: Incorrect padding

我什至没有在 pubsub 中提取任何数据,但是一旦我开始在云数据流中运行我的管道,就会出现上述日志。

如果我在这里做错了什么,请告诉我。

【问题讨论】:

【参考方案1】:

尚不支持在流管道中写入 GCS。

【讨论】:

以上是关于尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错的主要内容,如果未能解决你的问题,请参考以下文章

请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam

Eclipse 上带有 Dataflow Runner 的 Apache Beam MinimalWordcount 示例

使用 Apache Beam 的 Dataflow 批量加载的性能问题

使用 Apache Beam 的 Dataflow 批量加载的性能问题

在 GCP Dataflow 上的 python apache 光束中使用 scipy

如何在Apache Beam / Google Dataflow中使用ParseJsons?