ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询
Posted
技术标签:
【中文标题】ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询【英文标题】:ValueError: A BigQuery table or a query must be specified with beam.io.gcp.bigquery.ReadFromBigQuery 【发布时间】:2021-09-23 11:45:02 【问题描述】:我正在尝试将 BigQuery 表名作为 apache 梁管道模板的值提供程序传递。根据their documentation 和这个*** answer,可以将值提供者传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery
。
这就是我的管道代码
class UserOptions(PipelineOptions):
"""Define runtime argument"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str)
parser.add_value_provider_argument('--output', type=str)
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)
(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
user_options.input
)
当我在本地运行代码时,命令行传递的user_options.input
的值为--input projectid.dataset_id.table
但是,我遇到了错误:
ValueError: A BigQuery table or a query must be specified
我试过了:
通过projectid:dataset_id.table
使用bigquery.TableReference
-> 不可能
使用f'
user_options.input'
传递查询 -> 在本地运行时有效,但在 GCP 上调用模板时无效。错误说明:
在请求中未设置默认数据集时缺少数据集。", "errors": [ "message": "Table name "RuntimeValueProvider(option: input, type: str, default_value: None)" 缺少数据集而没有请求中设置了默认数据集。", "domain": "global", "reason": "invalid" ], "status": "INVALID_ARGUMENT" >
我错过了什么?
【问题讨论】:
【参考方案1】:table
参数必须按名称传递给 ReadFromBigQuery
。
BigQuerySource
(已弃用)接受 table
作为第一个参数,因此您可以按位置传递一个参数 (docs)。但是ReadFromBigQuery
期望gcs_location
作为第一个参数(docs)。因此,如果您将代码从使用 BigQuerySource
移植到使用 ReadFromBigQuery
并且您没有明确地按名称传递表,它将失败并出现您收到的错误。
这里有两个有效的例子和一个无效的例子:
import apache_beam as beam
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
if __name__ == "__main__":
args = [
'--temp_location=gs://my_temp_bucket',
]
# This works:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"project_id:dataset_id.table_id")
)
# So does this:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"dataset_id.table_id", project=project_id)
)
# But this doesn't work becuase the table argument is not passed in by name.
# The f"project_id:dataset_id.table_id" string is interpreted as the gcs_location.
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(f"project_id:dataset_id.table_id")
)
【讨论】:
以上是关于ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询的主要内容,如果未能解决你的问题,请参考以下文章
ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询
使用 isin(list) 过滤数据帧时出现“ValueError:列必须与键长度相同”
出现错误:“ValueError:如果使用所有标量值,则必须传递索引”将 ndarray 转换为 pandas Dataframe