有没有办法在 bigquery 中使用动态数据集名称

Posted

技术标签:

【中文标题】有没有办法在 bigquery 中使用动态数据集名称【英文标题】:Is there a way to use dynamic dataset name in bigquery 【发布时间】:2020-02-25 14:50:02 【问题描述】:

问题陈述:

我正在尝试在气流中使用 BigqueryOperator。目的是通过动态更改数据集名称多次读取相同的查询,即数据集名称将作为参数传递。

示例: project.dataset1_layer1.tablename1, project.dataset2_layer1.tablename1

预期: 我想维护一个 SQL 副本,其中我可以将数据集名称作为参数传递,这些参数可以被该特定数据集替换。

错误消息:

我尝试将动态数据集名称作为 query_params 的一部分传递。但它失败了,出现以下错误消息。

查询被解析为 INFO - Executing: [u'SELECT col1, col2 FROM project.@partner_layer1.tablename']

ERROR - BigQuery job failed. Final error was: u'reason': u'invalidQuery', u'message': u'Query parameters cannot be used in place of table names at [1:37]', u'location': u'query'. u'CREATE_IF_NEEDED', u'query': u'SELECT col1, col2 FROM project.@partner_layer1.tablename', u'jobType': u'QUERY' `

到目前为止我尝试过的事情

查询模板temp.sql如下:

SELECT col1, col2 FROM `project.@partner_layer1.tablename`;

Airflow BigqueryOperator 使用如下:

query_template_dict = 
    'partner_list' = ['val1', 'val2', 'val3', 'val4']
    'google_project': 'project_name',
    'queries': 
        'layer3': 
            'template':             'temp.sql',
            'output_dataset':       '_layer3',
            'output_tbl':           'table_'.format(table_date),
            'output_tbl_schema':    'temp.txt'
        
    ,
    'applicable_tasks': 
        'val1': 
            'table_layer3': []
        ,
        'val2': 
            'table_layer3': []
        ,
        'val3': 
            'table_layer3': []
        ,
        'val4': 
            'table_layer3': []
        

    



for partner in query_template_dict['partner_list']:
    # Loop over applicable report queries for a partner
    applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
    for task in applicable_tasks:
        destination_tbl = '..'.format(query_template_dict['google_project'], partner,
                                              query_template_dict['queries'][task]['output_dataset'] , 
                                              query_template_dict['queries'][task]['output_tbl'])
                                              
        #Actual destination table structure
        #destination_tbl = 'project.partner_layer3.table_20200223'  
run_bq_cmd = BigQueryOperator (
                        task_id                                 =partner + '-' + task,
                        sql                                     =[query_template_dict['queries'][task]['template']],
                        destination_dataset_table               =destination_tbl,
                        use_legacy_sql                          =False,
                        write_disposition                       ='WRITE_APPEND',
                        create_disposition                      ='CREATE_IF_NEEDED',
                        allow_large_results                     =True,
                        query_params=[
                                
                                        "name":                 "partner",
                                        "parameterType":         "type": "STRING" ,
                                        "parameterValue":        "value": partner
                                ,

                                
                                         "name":             "batch_date",
                                         "parameterType":     "type": "STRING" ,
                                         "parameterValue":    "value": batch_date
                                
                        ],
                        dag=dag,

有人可以帮我解决这个问题吗? BigQuery 在动态传递数据集名称方面是否存在限制?

【问题讨论】:

【参考方案1】:

在 Airflow 中替换数据集名称,而不是在 BigQuery 中。

所以在将查询发送到 BigQuery 之前执行此操作 - 在 Airflow 中使用 Python 字符串替换。

【讨论】:

以上是关于有没有办法在 bigquery 中使用动态数据集名称的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法使用 BigQuery 视图作为数据流的输入?

有没有办法根据 Bigquery 查询中的标签选择表?

有没有办法在 BigQuery 中保存的视图中使用脚本方法?

BigQuery 中的动态 TABLE_DATE_RANGE

使用自动检测的动态 BigQuery 架构:错误架构没有字段

有没有办法在仅附加模式下使用 Google Apps 脚本将数据从 BigQuery 加载到 Google 表格?