尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错
Posted
技术标签:
【中文标题】尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错【英文标题】:Getting an Error while trying to write data from Google PubSub to GCS using Apache Beam in Dataflow 【发布时间】:2018-12-30 20:06:27 【问题描述】:我已经编写了以下代码来从 pub sub 写入流数据并将其写入谷歌云存储。
def run():
argv = [
'--project=0'.format(PROJECT),
'--job_name=mypubsubsample40',
# '--save_main_session',
'--staging_location=gs://abc/staging/',
'--temp_location=gs://abc/staging/',
'--runner=DataflowRunner',
'--streaming'
]
p = beam.Pipeline(argv=argv)
lines = p | 'read_stream' >>
beam.io.ReadStringsFromPubSub(subscription='projects/myprojectid
209306/subscriptions/mysub1',id_label="MESSAGE_ID") | 'write to file' >>
beam.io.WriteToText('gs://bagdfs2/myout')`
当我执行与“DirectRunner”相同的程序时,文件会在 GCS 中创建,但当我使用“DataFlowRunner”执行程序时,它不起作用。
此外,当管道在云数据流中运行时,大约一分钟后,我在日志中收到以下错误:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
self.data_channel_factory)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
for tag, pcoll_id
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
factory, transform_id, transform_proto, consumers, serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
c = base64.b64decode(encoded)
File "/usr/lib/python2.7/base64.py", line 78, in b64decode
raise TypeError(msg)
TypeError: Incorrect padding
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:55)
com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
self.data_channel_factory)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
for tag, pcoll_id
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
factory, transform_id, transform_proto, consumers, serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
c = base64.b64decode(encoded)
File "/usr/lib/python2.7/base64.py", line 78, in b64decode
raise TypeError(msg)
TypeError: Incorrect padding
我什至没有在 pubsub 中提取任何数据,但是一旦我开始在云数据流中运行我的管道,就会出现上述日志。
如果我在这里做错了什么,请告诉我。
【问题讨论】:
【参考方案1】:尚不支持在流管道中写入 GCS。
【讨论】:
以上是关于尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错的主要内容,如果未能解决你的问题,请参考以下文章
请求的身份验证范围不足 - GCP 上的 Dataflow/Apache Beam
Eclipse 上带有 Dataflow Runner 的 Apache Beam MinimalWordcount 示例
使用 Apache Beam 的 Dataflow 批量加载的性能问题
使用 Apache Beam 的 Dataflow 批量加载的性能问题