AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器

Posted

技术标签:

【中文标题】AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器【英文标题】:AttributeError: 'function' object has no attribute 'tableId'. Apache Beam Dataflow runner 【发布时间】:2020-06-11 15:15:28 【问题描述】:

我正在尝试从 Apache Beam PTransform WriteToBigQuery() 写入 bigquery,当我为表提供读取字段“DEVICE”值的 lambda 函数时出现错误。我在流式作业中做了这个确切的事情并且工作了,但由于某种原因这在这个批处理作业中不起作用。

我的管道选项:

import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

options = pipeline_options.PipelineOptions(flags=[])
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = 'europe-west1'
options.view_as(pipeline_options.SetupOptions).sdk_location = (
    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % 
    beam.version.__version__)

我的代码:

p = beam.Pipeline(DataflowRunner(), options=options)

data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://bucket_name/file')
    | "Parse json" >> beam.ParDo(lambda element: json.loads(element))
       )

telemetry_data = (data
    | "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
    | "Format telemetry data" >> beam.Map(format_telemetry)
    | "Telemetry data to bq" >> beam.io.WriteToBigQuery(
        table = lambda element: f'project:dataset.element["DEVICE"]__opdata',
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                        )
                 )

我的整个错误信息:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-12-ae8dd133c81b> in <module>
     13             table = lambda element: f'project:dataset.element["DEVICE"]__opdata',
---> 14             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
     15                                                             )
     16                      )

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pvalue.py in __or__(self, ptransform)
    138 
    139   def __or__(self, ptransform):
--> 140     return self.pipeline.apply(ptransform, self)
    141 
    142 

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    596     if isinstance(transform, ptransform._NamedPTransform):
    597       return self.apply(
--> 598           transform.transform, pvalueish, label or transform.label)
    599 
    600     if not isinstance(transform, ptransform.PTransform):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    606       try:
    607         old_label, transform.label = transform.label, label
--> 608         return self.apply(transform, pvalueish)
    609       finally:
    610         transform.label = old_label

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    649         transform.type_check_inputs(pvalueish)
    650 
--> 651       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
    652 
    653       if type_options is not None and type_options.pipeline_type_check:

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply(self, transform, input, options)
    151   def apply(self, transform, input, options):
    152     self._maybe_add_unified_worker_missing_options(options)
--> 153     return super(DataflowRunner, self).apply(transform, input, options)
    154 
    155   def _get_unique_step_name(self):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/runner.py in apply(self, transform, input, options)
    196       m = getattr(self, 'apply_%s' % cls.__name__, None)
    197       if m:
--> 198         return m(transform, input, options)
    199     raise NotImplementedError(
    200         'Execution of [%s] not implemented in runner %s.' % (transform, self))

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply_WriteToBigQuery(self, transform, pcoll, options)
    833       return pcoll | 'WriteToBigQuery' >> beam.io.Write(
    834           beam.io.BigQuerySink(
--> 835               transform.table_reference.tableId,
    836               transform.table_reference.datasetId,
    837               transform.table_reference.projectId,

AttributeError: 'function' object has no attribute 'tableId'

【问题讨论】:

【参考方案1】:

根据文档和此线程 https://***.com/a/62146803/5283663 看来您需要指定架构参数。

这能解决问题吗?

p = beam.Pipeline(DataflowRunner(), options=options)

data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://bucket_name/file')
    | "Parse json" >> beam.ParDo(lambda element: json.loads(element))
       )

telemetry_data = (data
    | "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
    | "Format telemetry data" >> beam.Map(format_telemetry)
    | "Telemetry data to bq" >> beam.io.WriteToBigQuery(
        table = lambda element: f'project:dataset.element["DEVICE"]__opdata',
        schema=set_schema,            
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                        )
                 )

【讨论】:

这实际上是我的主题。仅当您第一次从 WriteToBigQuery 函数创建数据表时才需要架构【参考方案2】:

这似乎是 DataflowRunner 本身的问题。我做了一个简单的例子,我得到了同样的错误。我尝试了从 2.11.0 到 2.21 的 SDK,我记得大约一年前我用 2.13.0 做了一个代码示例,所以我认为改变的是 DataflowRunner 本身。

如果您使用 DirectRunner,它可以正常工作。示例代码:

    with beam.Pipeline(options=pipeline_options) as p:
        elements = [
            'number': 1, 'table': "table1",
            'number': 2, 'table': "table2",
            'number': 3, 'table': "table1",
        ]

        schema='number:integer'

        def get_table(element):
            table = element['table']
            element.pop('table')
            return f"known_args.project:known_args.dataset.table"

        dyn_bq = (
                p
                | beam.Create(elements)
                | WriteToBigQuery(table=get_table,
                                   schema=schema,
                                   create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                   write_disposition=BigQueryDisposition.WRITE_APPEND)
        )

看不到任何可能的解决方法。我会提交一个公开问题并更新它。

【讨论】:

【参考方案3】:

如果您查看table 参数是parsed,当给定callable 时,它会被直接解析为可调用对象。 所以稍后,代码会尝试访问该可调用对象的某些无效属性。

您能否尝试提供一个元组/TableReference 而不是 lambda?

例如,

table = bigquery_tools.parse_table_reference(f'project:dataset.element["DEVICE"]__opdata')

【讨论】:

以上是关于AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器的主要内容,如果未能解决你的问题,请参考以下文章

AttributeError: 'builtin_function_or_method' 对象没有属性 'split' - 反向后门

AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器

AttributeError:类型对象“numpy.ndarray”没有属性“__array_function__”

PySide2 自定义信号错误:“AttributeError:‘function’对象没有属性‘connect’”

在设置视图集时,出现错误 AttributeError: 'function' object has no attribute 'get_extra_actions'

无法序列化对象:AttributeError:“builtin_function_or_method”对象没有属性“__code__”