Airflow - BigQuery 架构字段中的值无效

Posted

技术标签:

【中文标题】Airflow - BigQuery 架构字段中的值无效【英文标题】:Airflow - BigQuery Invalid value at schema fields 【发布时间】:2020-10-16 07:26:02 【问题描述】:

我有一个 GCP 云编辑器可以将数据从 GCS 加载到 BQ。我使用schema_fields 选项来传递源模式。我在变量中传递源模式。我从xcom pull 得到这个。请参阅下面的print(schema)

["mode": "NULLABLE", "name": "id", "type": "INTEGER", "mode": "NULLABLE", "name": "c1", "type": "DATE", "mode": "NULLABLE", "name": "c2", "type": "TIME", "mode": "NULLABLE", "name": "c3", "type": "DATETIME", "mode": "NULLABLE", "name": "c4", "type": "TIMESTAMP"]

在 BQ 运算符中,我使用 schema_fields=schema

但是当我运行 dag 时,它的抛出错误。

ERROR - <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/xxx-xxx/jobs?
alt=json returned "Invalid value at 'job.configuration.load.schema.fields' (type.googleapis.com/google.cloud.bigquery.v2.TableFieldSchema), 
"["mode": "NULLABLE", "name": "id", "type": "INTEGER", "mode": "NULLABLE", "name": "c1", "type": "DATE", "mode": "NULLABLE", "name": "c2", "type": "TIME", "mode": "NULLABLE", "name": "c3", "type": "DATETIME", "mode": "NULLABLE", "name": "c4", "type": "TIMESTAMP"]"">

但是当我将此模式保存为 GCS 中的文件并尝试使用 schema_object 时,它就起作用了。但同样的事情通过变量没有工作。

【问题讨论】:

【参考方案1】:

Schema_fields 参数接受一个有效的列表。目前还没有模板化。

我已经使用了这个示例字段并且它正在工作。请检查类似的格式,看看它是否适合您。

schema_fields=[
                 'name': 'Col1', 'type': 'STRING', 'mode': 'NULLABLE',
                 'name': 'Col2', 'type': 'STRING', 'mode': 'NULLABLE',
             ]

【讨论】:

【参考方案2】:

我可以通过使用 json.loads 来解决这个问题

schema=json.loads(xcom_pull commands)

【讨论】:

以上是关于Airflow - BigQuery 架构字段中的值无效的主要内容,如果未能解决你的问题,请参考以下文章

Airflow 中是不是有操作员可以根据 BigQuery 中的查询创建表?

如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?

GA 到 BigQuery 导出架构中的“hits.publisher.adsRevenue”字段是啥意思?

Airflow - BigQuery 作业状态检查失败。最终错误是:%s'

如何使用气流将 bigquery 导出到 bigtable?架构问题

BigQuery 中的部分 JSON 架构