如何通过 Spark SQL 连接 BigQuery?

Posted

技术标签:

【中文标题】如何通过 Spark SQL 连接 BigQuery?【英文标题】:How do I connect with BigQuery via Spark SQL? 【发布时间】:2019-04-10 10:37:47 【问题描述】:

我有一个简单的 python 代码,其中包括使用具有我的凭据的 JSON 文件连接 bigQuery。

data = pd.read_gbq(SampleQuery, project_id='XXXXXXXX', private_key='filename.json')

这里的 filename.json 格式如下:


  "type": "service_account",
  "project_id": "projectId",
  "private_key_id": "privateKeyId",
  "private_key": "privateKey",
  "client_email": "clientEmail",
  "client_id": "clientId",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/clientEmail"

现在,我需要将此代码移植到 pyspark。但是我很难找到如何使用 Spark SQL 进行查询。我正在使用 AWS EMR 集群来运行这个查询!

任何帮助将不胜感激!

【问题讨论】:

如果你能给出你尝试连接的示例代码,但它不起作用,那就太好了。 可以使用连接器cloud.google.com/dataproc/docs/tutorials/… BigQuery Storage API 就是为此目的而设计的,并且有一个专门构建的Spark SQL connector。 嗨,Kenneth,这看起来很有趣,您能否提供更多代码示例来说明它是如何工作的以及它是如何解决问题的 README.md 中有一些关于如何将连接器与现有 Spark 集群一起使用的文档,包括 scala 和 pyspark 示例。你能具体告诉我你在寻找什么吗?我帮不了你,但我可能会找到能帮到你的人。 【参考方案1】:

由于使用 Spark SQL 需要 SQLContext 对象,因此需要先配置 SparkContext 以连接到 BigQuery。在我看来,BigQuery Connector(由 sramalingam24 和 Kenneth Jung 解决)可用于在 BigQuery 中查询数据。

请注意,sramalingam24 提供了一个示例链接,以下是代码摘要:

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = 
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',


table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

word_counts = (
    table_data
    .map(lambda record: json.loads(record[1]))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

sql_context = SQLContext(sc)
(word_counts
 .toDF(['word', 'word_count'])
 .write.format('json').save(output_directory))

然后你可以下载connector jar for Other Hadoop clusters。 Kenneth Jung 提供的信息链接表明选项 --jar 可用于包含连接器 (--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar),这是包含的选项驱动程序和执行程序类路径上的 jar。

【讨论】:

【参考方案2】:

您的 submit pyspark 参数之一应该是带有 spark-bigquery-latest.jar 包的 jars,这就是我将其添加到谷歌云上的 dataproc 作业的方式:

gcloud dataproc jobs submit pyspark --cluster $CLUSTER_NAME jars gs://spark-lib/bigquery/spark-bigquery-latest.jar --driver-log-levels root=FATAL script.py

【讨论】:

以上是关于如何通过 Spark SQL 连接 BigQuery?的主要内容,如果未能解决你的问题,请参考以下文章

通过 Spark SQL 将 tableau 与 Elastic 搜索连接起来

通过 Spark 使用 BigQuery Storage API:请求多个分区但仅获得 1 个

如何在数据块中使用 Spark sql 连接 Spark 数据框列

spark sql:如何优化多个巨大的配置单元表连接

我们如何使用 SQL 风格的“LIKE”标准连接两个 Spark SQL 数据帧?

避免 Spark SQL 查询的笛卡尔连接