如何在本地使用 java 连接到带有 spark 的 Google 大查询?

Posted

技术标签:

【中文标题】如何在本地使用 java 连接到带有 spark 的 Google 大查询?【英文标题】:How to connect to Google big query with spark using java locally? 【发布时间】:2019-12-05 12:56:31 【问题描述】:

我正在尝试使用 java 中的 spark 连接到 Google 大查询,但我无法找到相同的准确文档。

我试过了:https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

https://github.com/GoogleCloudPlatform/spark-bigquery-connector#compiling-against-the-connector

我的代码:

sparkSession.conf().set("credentialsFile", "/path/OfMyProjectJson.json");
Dataset<Row> dataset = sparkSession.read().format("bigquery").option("table","myProject.myBigQueryDb.myBigQuweryTable")
          .load();
dataset.printSchema();

但这是抛出异常:

Exception in thread "main" java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider com.google.cloud.spark.bigquery.BigQueryRelationProvider could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:614)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    at com.mySparkConnector.getDataset(BigQueryFetchClass.java:12)


Caused by: java.lang.IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment.  Please set a project ID using the builder.
    at com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:142)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.ServiceOptions.<init>(ServiceOptions.java:285)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions.<init>(BigQueryOptions.java:91)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions.<init>(BigQueryOptions.java:30)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions$Builder.build(BigQueryOptions.java:86)
    at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions.getDefaultInstance(BigQueryOptions.java:159)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider$.$lessinit$greater$default$2(BigQueryRelationProvider.scala:29)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.<init>(BigQueryRelationProvider.scala:40)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    ... 15 more

我的 json 文件包含 project_id 我尝试搜索可能的解决方案,但找不到任何解决方案,因此请帮助我找到此异常的解决方案,或者任何有关如何使用 spark 连接到大查询的文档。

【问题讨论】:

【参考方案1】:

我在气流中使用 DataProcPySparkOperator 运算符时遇到了完全相同的错误。解决方法是提供

dataproc_pyspark_jars='gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'

而不是

dataproc_pyspark_jars='gs://spark-lib/bigquery/spark-bigquery-latest.jar'

我猜在你的情况下它应该作为命令行参数传递

--jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

【讨论】:

我要补充一点,这里的问题可能是取决于您的 Spark 版本的 scala 版本(2.11 或 2.12)。对我来说,我正在设置我的 AWS EMR,支持 delta 和 bigquery。现在两者都可以正常工作,因为我将 Spark 3.x 与 Scala 2.12 一起使用。【参考方案2】:

最近一个PR handling this issue 已被合并到 spark-bigquery-connector 中,新版本的连接器即将发布。

目前一个简单的解决方案是将环境变量 GOOGLE_APPLICATION_CREDENTIALS=/path/OfMyProjectJson.json 添加到 spark 运行时。

【讨论】:

我应该如何上传我的 json 凭证文件并将其路径设置为 env 变量? 我假设你在本地有一个 json 文件。如果您不这样做,您可以按照此处cloud.google.com/iam/docs/… 的说明创建一个。如果它是您的开发者机器,可以通过将 export GOOGLE_APPLICATION_CREDENTIALS=/path/OfMyProjectJson.json 添加到您的 ~/.bashrc 来将其添加到您的环境中

以上是关于如何在本地使用 java 连接到带有 spark 的 Google 大查询?的主要内容,如果未能解决你的问题,请参考以下文章

如何从本地安装的 spark 连接到 aws-redshift?

使用java将oracle数据库连接到apache spark时出错

Java spark使用reduceByKey避免嵌套列表将对象连接到一个列表中

将 Java Spark Sql 连接到 Mysql

无法通过 Spark 连接到 Mongo DB

从本地 jupyter notebook 连接到 spark 集群