Pyspark 和 BigQuery 在 Google Dataproc 中使用两个不同的项目 ID

Posted

技术标签:

【中文标题】Pyspark 和 BigQuery 在 Google Dataproc 中使用两个不同的项目 ID【英文标题】:Pyspark and BigQuery using two different project-ids in Google Dataproc 【发布时间】:2016-12-09 15:52:07 【问题描述】:

我想使用具有不同项目 ID 的 Google Dataproc 运行一些 pyspark 作业,但到目前为止没有成功。我是 pyspark 和 Google Cloud 的新手,但我遵循 this 示例并且运行良好(如果 BigQuery 数据集是公共的或属于我的 GCP 项目,即 ProjectA)。输入参数如下所示:

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
projectA = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs:///hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = 
# Input Parameters
'mapred.bq.project.id': projectA,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'projectA',
'mapred.bq.input.dataset.id': 'my_dataset',
'mapred.bq.input.table.id': 'my_table',


# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)

但我需要的是从 ProjectB 的 BQ 数据集运行作业(我有查询它的凭据),因此在设置输入参数时,如下所示:

conf = 
# Input Parameters
'mapred.bq.project.id': projectA,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'projectB',
'mapred.bq.input.dataset.id': 'the_datasetB',
'mapred.bq.input.table.id': 'the_tableB',

并尝试从 BQ 加载数据,我的脚本一直在无限运行。我应该如何正确设置它?

仅供参考,在运行我之前提到的example 之后,我可以看到在 Google Storage 中创建了 2 个地毯(shard-0 和 shard-1)并包含相应的 BQ 数据,但我的工作只有 shard-0已创建,但它是空的。

【问题讨论】:

【参考方案1】:

我和我的同事丹尼斯谈过,这是他的建议:


“嗯,不确定,它应该可以工作。他们可能想在主节点内使用“bq”CLI 进行测试,以手动尝试将 projectB 表的一些“bq 提取”作业放入他们的 GCS 存储桶中,因为这就是所有连接器在引擎盖下做。

如果我不得不猜测我会怀疑他们只是意味着他们的个人用户名具有查询 projectB 的凭据,但 projectA 的默认服务帐户可能没有查询权限。 Dataproc 虚拟机中的所有内容都代表分配给虚拟机的计算服务帐户,而不是最终用户。

他们可以

gcloud 计算实例描述 -m

其中某处列出了服务帐户的电子邮件地址。”

【讨论】:

以上是关于Pyspark 和 BigQuery 在 Google Dataproc 中使用两个不同的项目 ID的主要内容,如果未能解决你的问题,请参考以下文章

如何将 PySpark 连接到 Bigquery

如何从 BigQuery 读取分区表到 Spark 数据帧(在 PySpark 中)

通过 Hadoop 输入格式示例用于 pyspark 的 BigQuery 连接器

有没有更好的方法通过 PySpark 集群(dataporc)将 spark df 加载到 BigQuery 中?

pyspark 读取 bigquery 时出错:java.lang.ClassNotFoundException:org.apache.spark.internal.Logging$class

BigQuery - 数据传输作业