数据流 - 未调用函数 - 错误 - 未定义名称

Posted

技术标签:

【中文标题】数据流 - 未调用函数 - 错误 - 未定义名称【英文标题】:Dataflow - Function not being called - Error - name not defined 【发布时间】:2020-08-01 12:06:21 【问题描述】:

我正在使用 Google Dataflow 上的 Apache Beam,我正在通过 lambda 函数调用函数情感,但我收到一个错误,即未定义函数名称。

output_tweets = (lines
                     | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
                     | 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
                     | 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
                     | 'sentiment analysis' >> beam.FlatMap(lambda x: sentiment(x))
                     )

这是我的 Apache Beam 调用,在最后一行中提到了函数情绪,这给我带来了问题。

函数代码如下(我觉得应该不重要):

def sentiment(messages):
    if not isinstance(messages, list):
        messages = [messages]

    instances = list(map(lambda message: json.loads(message), messages))
    lservice = discovery.build('language', 'v1beta1', developerKey = APIKEY)
    for instance in instances['text']:
        response = lservice.documents().analyzeSentiment(
            body =
                'document': 
                    'type': 'PLAIN_TEXT',
                    'content': instance
                
            
        ).execute()
        instance['polarity'] = response['documentSentiment']['polarity']
        instance['magnitude'] = response['documentSentiment']['magnitude']

    return instances

我得到以下回溯

  File "stream.py", line 97, in <lambda>
NameError: name 'sentiment' is not defined [while running 'generatedPtransform-441']

有什么想法吗?

【问题讨论】:

【参考方案1】:

这个问题的发生可能有几个原因

    函数sentiment 定义是否存在于与光束管道相同的python 文件中。 函数sentiment的定义是否在beam pipeline中被调用之前?

我做了一个如下的快速测试,如果同时遵循以上两个,它就可以正常工作

def testing(messages):
    return messages.lower()

windowed_lower_word_counts = (windowed_words
                              | beam.Map(lambda word: testing(word))
                              | "count" >> beam.combiners.Count.PerElement())

ib.show(windowed_lower_word_counts, include_window_info=True)

0   b'have'     3   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
1   b'ransom'   1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
2   b'let'      1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0
3   b'me'       1   2020-04-19 06:04:39.999999+0000 2020-04-19 06:04:30.000000+0000 (10s)   Pane 0

如果函数是在调用后定义的,那么我们会得到如下所示的错误

windowed_lower_word_counts = (windowed_words
                              | beam.Map(lambda word: testing_after(word))
                              | "count" >> beam.combiners.Count.PerElement())

ib.show(windowed_lower_word_counts, include_window_info=True)

ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7f478f344820>, due to an exception.
 Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 954, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 552, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/core.py", line 1482, in <lambda>
    wrapper = lambda x: [fn(x)]
  File "<ipython-input-19-f34e29a17836>", line 2, in <lambda>
    | beam.Map(lambda word: testing_after_new(word))
NameError: name 'testing_after' is not defined

def testing_after(messages):
    return messages.lower()

更新

不要将函数传递为beam.FlatMap(lambda x : fn(x)),而是将函数传递为beam.FlatMap(x)

我相信在第一种情况下,Beam 会尝试在工作机器中查找 fn,但它无法找到它。实现细节可以在这里找到 - https://github.com/apache/beam/blob/fa4f4183a315f061e035d38ba2c5d4b837b371e0/sdks/python/apache_beam/transforms/core.py#L780

【讨论】:

函数情感在同一个脚本中,它的定义是在管道运行之前。我观察到的一件事是,如果我在没有 lambda 的情况下调用该函数而只使用 beam.Map,它就会到达该函数。知道为什么吗? 如果您查看github.com/apache/beam/blob/…,它表明用户定义的函数应该是types.BuiltinFunctionType, types.MethodType, types.FunctionType 的三种类型之一。因此,仅传递函数名称是正确的方法,我们的示例中也提到了这一点。在您提到的示例中,它无法在您可能必须在类中传递的工作人员中找到情绪。如果这有帮助,请接受答案。 嘿@jayadeep-jayaraman,这行得通。但是如果呼叫只是 beam.Map(x),则 beam.Map 也可以工作。事实证明,即使导入了库,它也会给我一个discovery.build 的错误。不知道这是否是问这个问题的正确地方。

以上是关于数据流 - 未调用函数 - 错误 - 未定义名称的主要内容,如果未能解决你的问题,请参考以下文章

VB中子程序或函数未定义是啥意思

调用内联函数时未定义的引用

FirebaseError:使用无效数据调用函数 DocumentReference.set()。不支持的字段值:未定义(在字段名称中找到)

未捕获的类型错误:无法读取未定义的属性“名称”

多重通知:调用函数时出现未定义的变量错误[重复]

为啥调用未定义函数时没有 PHP 错误?