Dataproc + BigQuery 示例 - 有可用的吗?

Posted

技术标签:

【中文标题】Dataproc + BigQuery 示例 - 有可用的吗?【英文标题】:Dataproc + BigQuery examples - any available? 【发布时间】:2016-01-02 19:46:54 【问题描述】:

根据 Dataproc docos,它具有“与 BigQuery 的原生和自动集成”。

我在 BigQuery 中有一个表。我想使用我创建的 Dataproc 集群(使用 PySpark 作业)读取该表并对其执行一些分析。然后将此分析的结果写回 BigQuery。您可能会问“为什么不直接在 BigQuery 中进行分析!?” - 原因是因为我们正在创建复杂的统计模型,而 SQL 级别太高,无法开发它们。我们需要 Python 或 R 之类的东西,因此 Dataproc。

是否有任何可用的 Dataproc + BigQuery 示例?我找不到。

【问题讨论】:

【参考方案1】:

首先,如 this question 中所述,BigQuery 连接器已预安装在 Cloud Dataproc 集群上。

以下是有关如何将数据从 BigQuery 读取到 Spark 的示例。在此示例中,我们将从 BigQuery 读取数据以执行字数统计。 您使用 SparkContext.newAPIHadoopRDD 从 Spark 中的 BigQuery 读取数据。 Spark documentation 有更多关于使用SparkContext.newAPIHadoopRDD 的信息。 '

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject

import org.apache.hadoop.io.LongWritable

val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
    "['name': 'Word','type': 'STRING','name': 'Count','type': 'INTEGER']"
val jobName = "wordcount"

val conf = sc.hadoopConfiguration

// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)

// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)

// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
    fullyQualifiedOutputTableId, outputTableSchema)

val fieldName = "word"

val tableData = sc.newAPIHadoopRDD(conf,
    classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)

您需要使用您的设置自定义此示例,包括&lt;your-project-id&gt; 中的 Cloud Platform 项目 ID 和 &lt;your-fully-qualified-table-id&gt; 中的输出表 ID。

最后,如果您最终将 BigQuery 连接器与 MapReduce 结合使用,this page 提供了有关如何使用 BigQuery 连接器编写 MapReduce 作业的示例。

【讨论】:

这个有c#版本吗? 我只想指出,在full documentation 中也有关于如何进行清理的说明,否则中间导出文件将保留在 GCS 中。【参考方案2】:

上面的示例没有显示如何将数据写入输出表。你需要这样做:

.saveAsNewAPIHadoopFile(
hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY), 
classOf[String], 
classOf[JsonObject], 
classOf[BigQueryOutputFormat[String, JsonObject]], hadoopConf)

其中键:字符串实际上被忽略了

【讨论】:

以上是关于Dataproc + BigQuery 示例 - 有可用的吗?的主要内容,如果未能解决你的问题,请参考以下文章

将数据从 BigQuery 表加载到 Dataproc 集群时出错

从 Dataproc 写入 BigQuery 时在哪里可以找到错误?

使用 Spark BigQuery 连接器启动 Dataproc 集群

Google Cloud Dataproc 删除 BigQuery 表不起作用

Dataproc 笔记本无法导入或导出到 BigQuery:找不到类异常

如果使用 python 计算 kmeans 聚类,使用 BigQuery 作为 Dataproc 的 ndarrays 的数据源是不是有优势