Dataproc + BigQuery 示例 - 有可用的吗?
Posted
技术标签:
【中文标题】Dataproc + BigQuery 示例 - 有可用的吗?【英文标题】:Dataproc + BigQuery examples - any available? 【发布时间】:2016-01-02 19:46:54 【问题描述】:根据 Dataproc docos,它具有“与 BigQuery 的原生和自动集成”。
我在 BigQuery 中有一个表。我想使用我创建的 Dataproc 集群(使用 PySpark 作业)读取该表并对其执行一些分析。然后将此分析的结果写回 BigQuery。您可能会问“为什么不直接在 BigQuery 中进行分析!?” - 原因是因为我们正在创建复杂的统计模型,而 SQL 级别太高,无法开发它们。我们需要 Python 或 R 之类的东西,因此 Dataproc。
是否有任何可用的 Dataproc + BigQuery 示例?我找不到。
【问题讨论】:
【参考方案1】:首先,如 this question 中所述,BigQuery 连接器已预安装在 Cloud Dataproc 集群上。
以下是有关如何将数据从 BigQuery 读取到 Spark 的示例。在此示例中,我们将从 BigQuery 读取数据以执行字数统计。
您使用 SparkContext.newAPIHadoopRDD
从 Spark 中的 BigQuery 读取数据。 Spark documentation 有更多关于使用SparkContext.newAPIHadoopRDD
的信息。 '
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
val projectId = "<your-project-id>"
val fullyQualifiedInputTableId = "publicdata:samples.shakespeare"
val fullyQualifiedOutputTableId = "<your-fully-qualified-table-id>"
val outputTableSchema =
"['name': 'Word','type': 'STRING','name': 'Count','type': 'INTEGER']"
val jobName = "wordcount"
val conf = sc.hadoopConfiguration
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)
// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
BigQueryConfiguration.configureBigQueryOutput(conf,
fullyQualifiedOutputTableId, outputTableSchema)
val fieldName = "word"
val tableData = sc.newAPIHadoopRDD(conf,
classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.cache()
tableData.count()
tableData.map(entry => (entry._1.toString(),entry._2.toString())).take(10)
您需要使用您的设置自定义此示例,包括<your-project-id>
中的 Cloud Platform 项目 ID 和 <your-fully-qualified-table-id>
中的输出表 ID。
最后,如果您最终将 BigQuery 连接器与 MapReduce 结合使用,this page 提供了有关如何使用 BigQuery 连接器编写 MapReduce 作业的示例。
【讨论】:
这个有c#版本吗? 我只想指出,在full documentation 中也有关于如何进行清理的说明,否则中间导出文件将保留在 GCS 中。【参考方案2】:上面的示例没有显示如何将数据写入输出表。你需要这样做:
.saveAsNewAPIHadoopFile(
hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY),
classOf[String],
classOf[JsonObject],
classOf[BigQueryOutputFormat[String, JsonObject]], hadoopConf)
其中键:字符串实际上被忽略了
【讨论】:
以上是关于Dataproc + BigQuery 示例 - 有可用的吗?的主要内容,如果未能解决你的问题,请参考以下文章
将数据从 BigQuery 表加载到 Dataproc 集群时出错
从 Dataproc 写入 BigQuery 时在哪里可以找到错误?
使用 Spark BigQuery 连接器启动 Dataproc 集群
Google Cloud Dataproc 删除 BigQuery 表不起作用
Dataproc 笔记本无法导入或导出到 BigQuery:找不到类异常
如果使用 python 计算 kmeans 聚类,使用 BigQuery 作为 Dataproc 的 ndarrays 的数据源是不是有优势