使用 Spark BigQuery 连接器启动 Dataproc 集群

Posted

技术标签:

【中文标题】使用 Spark BigQuery 连接器启动 Dataproc 集群【英文标题】:Spinning up a Dataproc cluster with Spark BigQuery Connector 【发布时间】:2020-01-31 15:14:03 【问题描述】:

阅读此 repo 下的说明:Google Cloud Storage and BigQuery connectors 我按照以下初始化操作创建了一个安装了特定版本的 Google Cloud Storage 和 BigQuery 连接器的新 Dataproc 集群:

gcloud beta dataproc clusters create christos-test \
--region europe-west1 \
--subnet <a subnet zone> \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway \
--initialization-actions gs://<bucket-name>/init-scripts/v.0.0.1/connectors.sh \
--metadata gcs-connector-version=1.9.16 \
--metadata bigquery-connector-version=0.13.16 \
--zone europe-west1-b \
--master-machine-type n1-standard-4 \
--worker-boot-disk-size 500 \
--image=<an-image> \
--project=<a-project-id> \
--service-account=composer-dev@vf-eng-ca-nonlive.iam.gserviceaccount.com \
--no-address \
--max-age=5h \
--max-idle=1h \
--labels=<owner>=christos,<team>=group \
--tags=allow-internal-dataproc-dev,allow-ssh-from-management-zone,allow-ssh-from-management-zone2 \

--properties=core:fs.gs.implicit.dir.repair.enable=false

正如您应该看到的,我必须将外部依赖项添加到我自己的存储桶中:gs://init-dependencies-big-20824/init-scripts/v.0.0.1/connectors.sh。根据 scipt 的说明(我指的是connector.sh 脚本),我还必须在这个桶中添加以下罐子:

gcs-connector-hadoop2-1.9.16.jar gcs-connector-1.7.0-hadoop2.jar gcs-connector-1.8.0-hadoop2.jar bigquery-connector-hadoop2-0.13.16.jar

脚本运行正常,集群创建成功。但是,通过Jupyter 使用PySpark 笔记本仍然会导致BigQuery“找不到类”异常。当我直接从终端运行PySpark 时也会发生同样的情况。我能够避免该异常的唯一方法是在我的集群的主节点中复制另一个jar(这次是spark-bigquery_2.11-0.8.1-beta-shaded.jar)并以以下方式启动PySpark

pyspark --jars spark-bigquery_2.11-0.8.1-beta-shaded.jar

显然,这超出了目的。

我做错了什么?我考虑过更改connector.sh 脚本以包含另一个copy 函数,因此将spark-bigquery_2.11-0.8.1-beta-shaded.jar 复制到/usr/lib/hadoop/lib 下,所以我尝试手动复制此jar 并启动PySpark,但这仍然不起作用...

【问题讨论】:

【参考方案1】:

连接器初始化操作仅适用于来自 GoogleCloudDataproc/hadoop-connectors 的 Hadoop 的 Cloud Storage 和 BigQuery 连接器。

如果您使用 Spark,通常不应使用适用于 Hadoop 的 BigQuery 连接器,因为您已经使用 --jars 参数添加的 spark-bigquery-connector 存储库中有更新的适用于 Spark 的 BigQuery connector。

要在集群创建期间安装 Spark BigQuery 连接器,您需要编写自己的初始化操作,将其复制到集群节点上的 /usr/lib/spark/jars/ 目录中。请注意,您不需要复制连接器初始化操作中的所有代码,只需将 Spark BigQuery 连接器着色 jar 从您的 Cloud Storage 存储桶复制到 /usr/lib/spark/jars/ 目录:

gsutil cp gs://path/to/spark-bigquery-connector.jar /usr/lib/spark/jars/

更好的方法是将 Spark BigQuery 连接器与其他依赖项一起嵌入到您的应用程序分发中。

更新

Connectors initialization action 现在支持 Spark BigQuery 连接器,可用于在集群创建期间在 Dataproc 集群上安装 Spark BigQuery 连接器:

REGION=<region>
CLUSTER_NAME=<cluster_name>
gcloud dataproc clusters create $CLUSTER_NAME \
    --region $REGION \
    --initialization-actions gs://goog-dataproc-initialization-actions-$REGION/connectors/connectors.sh \
    --metadata spark-bigquery-connector-version=0.15.1-beta

【讨论】:

这是一个明智的回应,但是即使在您提供的路径下复制了相关的 jar 之后,我仍然无法让 pyspark 正常工作...尝试运行读取仍然会导致 @987654333 @. 有趣,我已经用gsutil cp gs://maven-central/repos/central/data/com/google/cloud/spark/spark-bigquery_2.11/0.8.1-beta/spark-bigquery_2.11-0.8.1-beta-shaded.jar /usr/lib/spark/jars/ init 操作和spark.read.format("bigquery").option("table", "publicdata.samples.shakespeare").load().show() PySpark 程序对其进行了测试,它可以工作。【参考方案2】:

使用包含依赖项的 Google 公共 spark-lib

--jars "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

--jars "gs://spark-lib/bigquery/spark-bigquery-latest.jar

取决于部署 Dataproc 集群的 Scala 版本

对我来说效果很好。

【讨论】:

这适用于作业,但不适用于集群创建。这个特定的问题似乎询问有关集群创建的问题。

以上是关于使用 Spark BigQuery 连接器启动 Dataproc 集群的主要内容,如果未能解决你的问题,请参考以下文章

Google BigQuery Spark 连接器:如何在追加时忽略未知值

Spark BigQuery 连接器,设置欧盟位置

如何通过 Spark SQL 连接 BigQuery?

Google Spark-BigQuery-Connector如何利用BigQuery Storage API?

Spark BigQuery 连接器:写入 ARRAY 类型会导致异常:“”无效值:ARRAY 不是有效值“”

在Apache Spark中使用Bigquery Connector时如何设置分区数?