如何将 Spark-BigQuery_connector 用于现有的 spark 环境(不使用 google dataproc)
Posted
技术标签:
【中文标题】如何将 Spark-BigQuery_connector 用于现有的 spark 环境(不使用 google dataproc)【英文标题】:How to use Spark-BigQuery_connector for existing spark environment ( not with google dataproc) 【发布时间】:2021-12-15 09:33:41 【问题描述】:我致力于创建数据管道,它从各种常规数据库和 CSV 文件中获取数据。它将使用 pyspark 进行预处理,然后将结果数据帧写入 BigQuery 表。
我已将 jar 文件 gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.1.jar
复制到 $SPARK_HOME/jars
文件夹,并在创建 spark 对象时在 spark.jars.packages
中指定了相同的内容。
请给出一些建议,因为我能找到的所有博客都使用 DataProc 作为示例。
spark = SparkSession. \
builder. \
enableHiveSupport(). \
appName('data_load'). \
master('yarn'). \
config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.23.1'). \
getOrCreate()
Scala 版本 = 2.12.15
在写入大查询时,我收到与身份验证相关的错误。 BigQuery Connection API 已启用。
df.write.format('bigquery') \
.option('table',big_table_id) \
.save()
错误:
Py4JJavaError: An error occurred while calling o59.save.
: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Request had insufficient authentication scopes.
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:286)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:746)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$18.call(BigQueryImpl.java:743)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.getTable(BigQueryImpl.java:742)
at com.google.cloud.bigquery.connector.common.BigQueryClient.getTable(BigQueryClient.java:107)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable$lzycompute(BigQueryInsertableRelation.scala:67)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.getTable(BigQueryInsertableRelation.scala:67)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists$lzycompute(BigQueryInsertableRelation.scala:53)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.exists(BigQueryInsertableRelation.scala:49)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:114)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://www.googleapis.com/bigquery/v2/projects/project212/datasets/NYSE_datasets/tables/NYSE_table?prettyPrint=false
"code" : 403,
"details" : [
"@type" : "type.googleapis.com/google.rpc.ErrorInfo",
"reason" : "ACCESS_TOKEN_SCOPE_INSUFFICIENT"
],
"errors" : [
"domain" : "global",
"message" : "Insufficient Permission",
"reason" : "insufficientPermissions"
],
"message" : "Request had insufficient authentication scopes.",
"status" : "PERMISSION_DENIED"
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:428)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.getTable(HttpBigQueryRpc.java:284)
【问题讨论】:
【参考方案1】:您必须使用服务帐户在 Dataproc 外部进行身份验证,如 spark-bigquery-connector documentation 中所述:
使用服务帐户 JSON 密钥和
GOOGLE_APPLICATION_CREDENTIALS
作为 描述 here.凭据也可以作为参数或显式提供 来自 Spark 运行时配置。它可以作为 直接使用 base64 编码的字符串,或包含 凭据(但不是两者)。
示例:
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
或
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
或者,指定凭据文件名。
spark.read.format("bigquery").option("credentialsFile","</path/to/key/file>")
或
spark.conf.set("credentialsFile","</path/to/key/file>")
传递凭据的另一种方法是传递访问权限 用于验证对 Google Cloud 的 API 调用的令牌 平台 API。您可以通过运行
gcloud auth application-default print-access-token
获取访问令牌。
spark.read.format("bigquery").option("gcpAccessToken","<acccess token>")
或
spark.conf.set("gcpAccessToken","<access-token>")
【讨论】:
以上是关于如何将 Spark-BigQuery_connector 用于现有的 spark 环境(不使用 google dataproc)的主要内容,如果未能解决你的问题,请参考以下文章