包括自定义 PTransform 导致在 GCP 的 Dataflow 作业中找不到依赖项

Posted

技术标签:

【中文标题】包括自定义 PTransform 导致在 GCP 的 Dataflow 作业中找不到依赖项【英文标题】:including custom PTransform causes not found dependencies in the Dataflow job in GCP 【发布时间】:2021-12-24 07:19:54 【问题描述】:

我试图创建一个复合 PTransform 如下(Python):

class LimitVolume(beam.PTransform):
def __init__(self, daily_window, daily_limit):
    super().__init__()
    self.daily_window = daily_window
    self.daily_limit = daily_limit

def expand(self, input_events_pcoll):
    events_with_ts_pcol = (input_events_pcoll
        | 'Timestamp using RECEIVED_TIMESTAMP' >> beam.Map(
            lambda message: beam.window.TimestampedValue(message, message['RECEIVED_TIMESTAMP']))
        )
     ...
    return events_with_ts_pcol

然后在主run()方法中使用如下:

      def run():
          ...
          result_pcol = input_pcol | LimitVolume(daily_window, daily_limit)

run() 和 LimitVolume 都在同一个 main.py 脚本中,然后将其作为作业提交/部署到 GCP 中

当我通过 DirectRunner 在本地运行此作业时 - 一切正常; 如果我在 GCP 中使用 DataflowRunner 提交并运行它 - 它会开始抛出如下错误:

in process NameError: name 'arrow' is not defined [while running 'Parse Json-ptransform-898945'] 
and in <lambda> NameError: name 'time' is not defined [while running 'define schedule-ptransform-899107']

基本上没有找到很多在 requirements.txt 文件中定义并在部署作业时通过 --requirements_file 选项指定的依赖项

请参阅下面的完整错误堆栈跟踪(缩写)。

现在,妙语

如果我将 LimitVolume PTransform 中的相同逻辑放入 run() 方法并直接在我的管道中指定:

def run():
    ...
    events_with_ts_pcol = (input_pcol
                       | 'Timestamp using RECEIVED_TIMESTAMP' >> beam.Map(
            lambda message: beam.window.TimestampedValue(message, message['RECEIVED_TIMESTAMP']))
                       )
    ...

并从 main.py 文件中删除 LimitVolume 类的定义 - 它在本地和 GCP 中都可以正常工作! 依赖项没有问题。

那么,很明显,管道中唯一存在的自定义 PTransform 有一些非常“特别”的地方 - 有人知道那可能是什么吗?

我找不到任何关于自定义 PTransforms 的信息,或者使用它的包装细节,或者像这样的错误 - 这本身就令人担忧......

谢谢!!

这里是更大的错误输出:

 File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 45, in process NameError: name 'arrow' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 997, in process_bundle element.data) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 351, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 45, in process NameError: name 'arrow' is not defined [while running 'Parse Json-ptransform-898945'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:771 
...
line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1562, in <lambda> wrapper = lambda x: [fn(x)] File "/Users/mpopova/Marina/TT_Projects/gcp_inboundconverter/ibc_ingest_dataflow/src/main.py", line 273, in <lambda> NameError: name 'time' is not defined [while running 'define schedule-ptransform-899107'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:771

【问题讨论】:

【参考方案1】:

Apache Beam 存在一个未知问题 [1]:

当主模块使用 'super' 调用超类方法时,在 Python 3 上使用 --save_main_session 失败。

这个问题已经存在几年了,但是由于依赖于名为 Dill 的 Beam,它尚未得到修复。 Dill 的 issue 可以在 Github issues [2] 上找到。

正如 Github 问题 [3] 中的一位 cmets 所述,解决方法是:

对于main中声明的类,例如:

class MyClass(SuperClass)

当从父类调用init函数时: super().__init__()

应该使用明确的类名:

SuperClass.__init__()

在您的代码中,更改应为:

class LimitVolume(beam.PTransform):
    def __init__(self, daily_window, daily_limit):
        beam.PTransform.__init__(self)
        ...

同时,错误NameError: name 'time' is not defined也可能与Apache Beam Python依赖项导入机制的另一个问题有关。正如@robertwb 所说,如果问题发生在__main__ 会话中,您可以将--save_main_session 管道选项设置为True

但是,如果错误发生在它之外,您可以通过在本地导入模块来解决这个问题,在哪里使用它。 (此处归功于 Google Dataflow 文档 [4])

例如,而不是:

import re
…
def myfunc():
  # use re module

用途:

def myfunc():
  import re
  # use re module

[1]https://issues.apache.org/jira/browse/BEAM-6158

[2]https://github.com/uqfoundation/dill/issues/300

[3]https://github.com/uqfoundation/dill/issues/300#issuecomment-505011149

[4]https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors

【讨论】:

谢谢!!!将 super().__init__() 更改为 beam.PTransform.__init__(self) 解决了我的问题! 这是一个写得很好的答案!【参考方案2】:

这听起来像是在__main__ 会话中捕获/腌制对象的问题。您可以尝试传递 save_main_session 标志。就个人而言,我更喜欢将主模块中的所有必需对象(导入、定义等)放入 run() 方法中以确保正确捕获它们的解决方案。

另请注意,有一个 effort 可以迁移到 cloudpickle 以避免这些限制。

【讨论】:

谢谢@robertwb!创建管道时,我已经将标志 save_main_sesstion 设置为“true”:options = PipelineOptions(beam_args, save_main_session=True, streaming=True) save_main_session 是一个不能解决所有问题的 hack;我会把东西放在你的 run() 方法中(所以它们是本地人,而不是__main__ globals)或单独的模块(可以通过名称正确引用)。 至于将所有内容放入 run() - 是的,当然,这就是我现在必须做的 - 但它使得无法正确地对单个转换进行单元测试。拥有一个自定义 PTransform 的整个想法是能够对它进行各种场景的单元测试......在 run() 中将它全部作为一个整体让它变得更加困难 如果它打算被重用,你可以把它放在它自己的模块中,这样也可以正常工作。 (我意识到这并不理想,希望 cloudpickle 成为正确的长期解决方案。)

以上是关于包括自定义 PTransform 导致在 GCP 的 Dataflow 作业中找不到依赖项的主要内容,如果未能解决你的问题,请参考以下文章

使用 Terraform 创建 GCP 自定义 IAM 角色

无法使用自定义内存指标自动缩放 GCP 实例

GCP Dataproc 自定义图像 Python 环境

使用自定义服务帐户在 GCP 中创建 VM 时,KMS 权限出现 400 错误

无法使用自定义容器 GCP 进行部署

Colab - 连接到自定义 GCP 虚拟机