作为数据流运行器运行时,apache beam.io.BigQuerySource use_standard_sql 不起作用

Posted

技术标签:

【中文标题】作为数据流运行器运行时,apache beam.io.BigQuerySource use_standard_sql 不起作用【英文标题】:apache beam.io.BigQuerySource use_standard_sql not working when running as dataflow runner 【发布时间】:2021-04-02 19:14:44 【问题描述】:

我有一个数据流作业,我将首先从 bigquery 查询中读取数据(在标准 sql 中)。它在直接运行器模式下完美运行。但是我尝试在数据流运行器模式下运行此数据流并遇到此错误:

response: , content

显然 use_standard_sql 参数在数据流运行器模式下不起作用。 版本: 阿帕奇光束:2.24.0 蟒蛇:3.8

last_update_date = pipeline | 'Read last update date' >> beam.io.Read(beam.io.BigQuerySource(
    query='''
        SELECT
            MAX(date) AS date
        FROM
            GoogleSearchConsole.search_query_analytics_log
    ''',
    use_standard_sql=True
))

【问题讨论】:

【参考方案1】:

尝试以下从 Bigquery 读取数据并写入 Bigquery 的代码。 代码是一个 apache beam 数据流运行器代码:-

#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse, logging
from apache_beam.options.pipeline_options import SetupOptions

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxxx'
#plitting Of Records----------------------#

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
          '--cur_suffix',
          dest='cur_suffix',
          help='Input table suffix to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)


    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)


    logging.info('***********')
    logging.info(known_args.cur_suffix)
    data_loading = (
        p1
        | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query='''SELECT SUBSTR(_time, 1, 19) as _time, dest FROM `project.dataset.table`''', use_standard_sql=True))
    )

    project_id = "xxxxxxx"
    dataset_id = 'AAAAAAAA'
    table_schema_Audit = ('_time:DATETIME, dest:STRING')

#---------------------Type = audit----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Write-Audit' >> beam.io.WriteToBigQuery(
                                                    table='YYYYYYY',
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_Audit,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
                                                    ))



    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  path_service_account = 'ABGFDfc927.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()

【讨论】:

感谢您的回复。我仍然不明白为什么我在数据流运行器模式下失败了。你能帮忙解释一下吗? 不幸的是,我可以在你的描述中看到完整的代码,如果你能分享完整的代码会更好,然后我可以调查问题。您在描述中给出的代码部分对我来说看起来不错。我假设可能是其他问题。

以上是关于作为数据流运行器运行时,apache beam.io.BigQuerySource use_standard_sql 不起作用的主要内容,如果未能解决你的问题,请参考以下文章

AttributeError:“function”对象没有属性“tableId”。 Apache Beam 数据流运行器

如何为 Gitlab 运行器启用通过 SSH 克隆?

是否可以在运行时使用测试运行器将测试用例注入soapui?

当使用面向以下异常的testng并行运行自动生成的测试运行器时

XCTest 测试运行器在完成运行测试之前以代码 -1 退出

GitLab 多个运行器,交换工件