ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询

Posted

技术标签:

【中文标题】ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询【英文标题】:ValueError: A BigQuery table or a query must be specified with beam.io.gcp.bigquery.ReadFromBigQuery 【发布时间】:2021-09-23 11:45:02 【问题描述】:

我正在尝试将 BigQuery 表名作为 apache 梁管道模板的值提供程序传递。根据their documentation 和这个*** answer,可以将值提供者传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery

这就是我的管道代码

class UserOptions(PipelineOptions):
    """Define runtime argument"""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--input', type=str)
        parser.add_value_provider_argument('--output', type=str)

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)

(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
             user_options.input
         )

当我在本地运行代码时,命令行传递的user_options.input 的值为--input projectid.dataset_id.table

但是,我遇到了错误:

ValueError: A BigQuery table or a query must be specified

我试过了:

通过projectid:dataset_id.table

使用bigquery.TableReference -> 不可能

使用f'user_options.input'

传递查询 -> 在本地运行时有效,但在 GCP 上调用模板时无效。错误说明:

在请求中未设置默认数据集时缺少数据集。", "errors": [ "message": "Table name "RuntimeValueProvider(option: input, type: str, default_value: None)" 缺少数据集而没有请求中设置了默认数据集。", "domain": "global", "reason": "invalid" ], "status": "INVALID_ARGUMENT" >

我错过了什么?

【问题讨论】:

【参考方案1】:

table 参数必须按名称传递给 ReadFromBigQuery

BigQuerySource(已弃用)接受 table 作为第一个参数,因此您可以按位置传递一个参数 (docs)。但是ReadFromBigQuery 期望gcs_location 作为第一个参数(docs)。因此,如果您将代码从使用 BigQuerySource 移植到使用 ReadFromBigQuery 并且您没有明确地按名称传递表,它将失败并出现您收到的错误。

这里有两个有效的例子和一个无效的例子:

import apache_beam as beam

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

if __name__ == "__main__":
    args = [
        '--temp_location=gs://my_temp_bucket',
    ]
    # This works:
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery' 
          >> beam.io.ReadFromBigQuery(table=f"project_id:dataset_id.table_id")
        )
    # So does this:
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery' 
          >> beam.io.ReadFromBigQuery(table=f"dataset_id.table_id", project=project_id)
        )
    # But this doesn't work becuase the table argument is not passed in by name.
    # The f"project_id:dataset_id.table_id" string is interpreted as the gcs_location.
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery'
          >> beam.io.ReadFromBigQuery(f"project_id:dataset_id.table_id")
        )

【讨论】:

以上是关于ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询的主要内容,如果未能解决你的问题,请参考以下文章

ValueError:索引必须单调递增或递减

ValueError:类的数量必须大于一;得到 1

ValueError:必须使用 beam.io.gcp.bigquery.ReadFromBigQuery 指定 BigQuery 表或查询

使用 isin(list) 过滤数据帧时出现“ValueError:列必须与键长度相同”

出现错误:“ValueError:如果使用所有标量值,则必须传递索引”将 ndarray 转换为 pandas Dataframe

ValueError:您必须包含至少一个标签和至少一个序列