如何使用气流将 bigquery 导出到 bigtable?架构问题

Posted

技术标签:

【中文标题】如何使用气流将 bigquery 导出到 bigtable?架构问题【英文标题】:how to export bigquery to bigtable using airflow? schema issue 【发布时间】:2021-10-10 07:16:00 【问题描述】:

我正在使用 Airflow 以 Avro 格式将 BigQuery 行提取到 Google Cloud Storage。

with models.DAG(
    "bigquery_to_bigtable",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime.now(),
    catchup=False,
    tags=["test"],
) as dag:
    
    data_to_gcs = BigQueryInsertJobOperator(
        task_id="data_to_gcs",
        project_id=project_id,
        location=location,
        configuration=
            "extract": 
                "destinationUri": gcs_uri, "destinationFormat": "AVRO",
                "sourceTable": 
                    "projectId": project_id, "datasetId": dataset_id, 
                    "tableId": table_id)

    gcs_to_bt = DataflowTemplatedJobStartOperator(
        task_id="gcs_to_bt",
        template="gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable",
        location=location,
        parameters=
            'bigtableProjectId': project_id,
            'bigtableInstanceId': bt_instance_id,
            'bigtableTableId': bt_table_id,
            'inputFilePattern': 'gs://export/test.avro-*'
        ,
    )

data_to_gcs >> gcs_to_bt

bigquery 行包含

row_key      | 1_cnt | 2_cnt | 3_cnt
1#2021-08-03 |   1   |   2   |   2 
2#2021-08-02 |   5   |   1   |   5 
.
.
.

我想将 row_key 列用于 bigtable 中的行键,并将其余列用于特定列族中的列,例如 bigtable 中的 my_cf

但是我在使用数据流将 avro 文件加载到 bigtable 时收到错误消息

"java.io.IOException: Failed to start reading from source: gs://export/test.avro-"
Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key

我读到的docs 告诉我们:

Bigtable 表必须存在并且具有相同的列族 在 Avro 文件中导出。

如何在 Avro 中导出具有相同列族的 BigQuery?

【问题讨论】:

【参考方案1】:

我认为您必须将 AVRO 转换为正确的架构。你提到的Documentation 还说:

Bigtable 需要来自输入 Avro 文件的特定架构。

有一个link 指的是必须使用的特殊数据模式。

如果我理解正确,您只是从表中导入数据,结果虽然是 AVRO 架构,但对架构的要求并不多,因此您需要将数据转换为适合您的 BigTable 架构的适当架构。

【讨论】:

@WytrzymałyWiktor 我会试一试的! 对不起,这么晚才选择你的答案是正确的。现在我可以理解 AVRO 的架构并可以使用它来导入。谢谢!

以上是关于如何使用气流将 bigquery 导出到 bigtable?架构问题的主要内容,如果未能解决你的问题,请参考以下文章

如何使用apache气流调度谷歌云bigquery存储过程

气流 - 脚本更改文件名变量

优化将BigQuery的数据传输到MongoDB的气流任务

将 XLS 文件从 GCS 导入 BigQuery

BigQuery 作业状态已完成,但未使用气流插入任何行

运行 BigQuery 查询并使用气流将数据写入 Parquet 中的云存储桶