如何将 Spark-BigQuery_connector 用于现有的 spark 环境(不使用 google dataproc)

Posted

技术标签:

【中文标题】如何将 Spark-BigQuery_connector 用于现有的 spark 环境(不使用 google dataproc)【英文标题】:How to use Spark-BigQuery_connector for existing spark environment ( not with google dataproc) 【发布时间】:2021-12-15 09:33:41 【问题描述】:

我致力于创建数据管道,它从各种常规数据库和 CSV 文件中获取数据。它将使用 pyspark 进行预处理,然后将结果数据帧写入 BigQuery 表。

我已将 jar 文件 gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.1.jar 复制到 $SPARK_HOME/jars 文件夹,并在创建 spark 对象时在 spark.jars.packages 中指定了相同的内容。

请给出一些建议,因为我能找到的所有博客都使用 DataProc 作为示例。

spark = SparkSession. \
        builder. \
        enableHiveSupport(). \
        appName('data_load'). \
        master('yarn'). \
        config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.1'). \
        getOrCreate()

Scala 版本 = 2.12.15

在写入大查询时,我收到与身份验证相关的错误。 BigQuery Connection API 已启用。

df.write.format('bigquery') \
  .option('table',big_table_id) \
  .save()

错误:

Py4JJavaError: An error occurred while calling o59.save.
: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Request had insufficient authentication scopes.
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:286)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:746)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:743)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:742)
    at com.google.cloud.bigquery.connector.common.BigQueryClient.getTable(BigQueryClient.java:107)
    at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable$lzycompute(BigQueryInsertableRelation.scala:67)
    at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable(BigQueryInsertableRelation.scala:67)
    at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists$lzycompute(BigQueryInsertableRelation.scala:53)
    at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists(BigQueryInsertableRelation.scala:49)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:114)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://www.googleapis.com/bigquery/v2/projects/project212/datasets/NYSE_datasets/tables/NYSE_table?prettyPrint=false

  "code" : 403,
  "details" : [ 
    "@type" : "type.googleapis.com/google.rpc.ErrorInfo",
    "reason" : "ACCESS_TOKEN_SCOPE_INSUFFICIENT"
   ],
  "errors" : [ 
    "domain" : "global",
    "message" : "Insufficient Permission",
    "reason" : "insufficientPermissions"
   ],
  "message" : "Request had insufficient authentication scopes.",
  "status" : "PERMISSION_DENIED"

    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:428)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
    at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:284)

【问题讨论】:

【参考方案1】:

您必须使用服务帐户在 Dataproc 外部进行身份验证,如 spark-bigquery-connector documentation 中所述:

使用服务帐户 JSON 密钥和 GOOGLE_APPLICATION_CREDENTIALS 作为 描述 here.

凭据也可以作为参数或显式提供 来自 Spark 运行时配置。它可以作为 直接使用 base64 编码的字符串,或包含 凭据(但不是两者)。

示例:spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")

spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")

或者,指定凭据文件名。

spark.read.format("bigquery").option("credentialsFile","</path/to/key/file>")

spark.conf.set("credentialsFile","</path/to/key/file>")

传递凭据的另一种方法是传递访问权限 用于验证对 Google Cloud 的 API 调用的令牌 平台 API。您可以通过运行gcloud auth application-default print-access-token 获取访问令牌。

spark.read.format("bigquery").option("gcpAccessToken","<acccess token>")

spark.conf.set("gcpAccessToken","<access-token>")

【讨论】:

以上是关于如何将 Spark-BigQuery_connector 用于现有的 spark 环境(不使用 google dataproc)的主要内容,如果未能解决你的问题,请参考以下文章

如何将Ios文件上传到

Qt如何将文字变成图片?

如何将Bitmap保存为本地图片文件?

在MATLAB中如何将图导出

ASP如何将SQLSERVER数据导出到DBF(VF)

如何将CSV格式转换成JSON格式