AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器
Posted
技术标签:
【中文标题】AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器【英文标题】:AttributeError: 'function' object has no attribute 'tableId'. Apache Beam Dataflow runner 【发布时间】:2020-06-11 15:15:28 【问题描述】:我正在尝试从 Apache Beam PTransform WriteToBigQuery() 写入 bigquery,当我为表提供读取字段“DEVICE”值的 lambda 函数时出现错误。我在流式作业中做了这个确切的事情并且工作了,但由于某种原因这在这个批处理作业中不起作用。
我的管道选项:
import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
options = pipeline_options.PipelineOptions(flags=[])
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = 'europe-west1'
options.view_as(pipeline_options.SetupOptions).sdk_location = (
'/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
beam.version.__version__)
我的代码:
p = beam.Pipeline(DataflowRunner(), options=options)
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://bucket_name/file')
| "Parse json" >> beam.ParDo(lambda element: json.loads(element))
)
telemetry_data = (data
| "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
| "Format telemetry data" >> beam.Map(format_telemetry)
| "Telemetry data to bq" >> beam.io.WriteToBigQuery(
table = lambda element: f'project:dataset.element["DEVICE"]__opdata',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
我的整个错误信息:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-12-ae8dd133c81b> in <module>
13 table = lambda element: f'project:dataset.element["DEVICE"]__opdata',
---> 14 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
15 )
16 )
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pvalue.py in __or__(self, ptransform)
138
139 def __or__(self, ptransform):
--> 140 return self.pipeline.apply(ptransform, self)
141
142
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
596 if isinstance(transform, ptransform._NamedPTransform):
597 return self.apply(
--> 598 transform.transform, pvalueish, label or transform.label)
599
600 if not isinstance(transform, ptransform.PTransform):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
606 try:
607 old_label, transform.label = transform.label, label
--> 608 return self.apply(transform, pvalueish)
609 finally:
610 transform.label = old_label
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
649 transform.type_check_inputs(pvalueish)
650
--> 651 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
652
653 if type_options is not None and type_options.pipeline_type_check:
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply(self, transform, input, options)
151 def apply(self, transform, input, options):
152 self._maybe_add_unified_worker_missing_options(options)
--> 153 return super(DataflowRunner, self).apply(transform, input, options)
154
155 def _get_unique_step_name(self):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/runner.py in apply(self, transform, input, options)
196 m = getattr(self, 'apply_%s' % cls.__name__, None)
197 if m:
--> 198 return m(transform, input, options)
199 raise NotImplementedError(
200 'Execution of [%s] not implemented in runner %s.' % (transform, self))
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply_WriteToBigQuery(self, transform, pcoll, options)
833 return pcoll | 'WriteToBigQuery' >> beam.io.Write(
834 beam.io.BigQuerySink(
--> 835 transform.table_reference.tableId,
836 transform.table_reference.datasetId,
837 transform.table_reference.projectId,
AttributeError: 'function' object has no attribute 'tableId'
【问题讨论】:
【参考方案1】:根据文档和此线程 https://***.com/a/62146803/5283663 看来您需要指定架构参数。
这能解决问题吗?
p = beam.Pipeline(DataflowRunner(), options=options)
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://bucket_name/file')
| "Parse json" >> beam.ParDo(lambda element: json.loads(element))
)
telemetry_data = (data
| "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
| "Format telemetry data" >> beam.Map(format_telemetry)
| "Telemetry data to bq" >> beam.io.WriteToBigQuery(
table = lambda element: f'project:dataset.element["DEVICE"]__opdata',
schema=set_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
【讨论】:
这实际上是我的主题。仅当您第一次从 WriteToBigQuery 函数创建数据表时才需要架构【参考方案2】:这似乎是 DataflowRunner 本身的问题。我做了一个简单的例子,我得到了同样的错误。我尝试了从 2.11.0 到 2.21 的 SDK,我记得大约一年前我用 2.13.0 做了一个代码示例,所以我认为改变的是 DataflowRunner 本身。
如果您使用 DirectRunner,它可以正常工作。示例代码:
with beam.Pipeline(options=pipeline_options) as p:
elements = [
'number': 1, 'table': "table1",
'number': 2, 'table': "table2",
'number': 3, 'table': "table1",
]
schema='number:integer'
def get_table(element):
table = element['table']
element.pop('table')
return f"known_args.project:known_args.dataset.table"
dyn_bq = (
p
| beam.Create(elements)
| WriteToBigQuery(table=get_table,
schema=schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND)
)
看不到任何可能的解决方法。我会提交一个公开问题并更新它。
【讨论】:
【参考方案3】:如果您查看table
参数是parsed,当给定callable
时,它会被直接解析为可调用对象。
所以稍后,代码会尝试访问该可调用对象的某些无效属性。
您能否尝试提供一个元组/TableReference 而不是 lambda?
例如,
table = bigquery_tools.parse_table_reference(f'project:dataset.element["DEVICE"]__opdata')
【讨论】:
以上是关于AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器的主要内容,如果未能解决你的问题,请参考以下文章
AttributeError: 'builtin_function_or_method' 对象没有属性 'split' - 反向后门
AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器
AttributeError:类型对象“numpy.ndarray”没有属性“__array_function__”
PySide2 自定义信号错误:“AttributeError:‘function’对象没有属性‘connect’”
在设置视图集时,出现错误 AttributeError: 'function' object has no attribute 'get_extra_actions'
无法序列化对象:AttributeError:“builtin_function_or_method”对象没有属性“__code__”