通过 Hadoop 输入格式示例用于 pyspark 的 BigQuery 连接器

Posted

技术标签:

【中文标题】通过 Hadoop 输入格式示例用于 pyspark 的 BigQuery 连接器【英文标题】:BigQuery connector for pyspark via Hadoop Input Format example 【发布时间】:2015-10-02 18:32:07 【问题描述】:

我有一个存储在 BigQuery 表中的大型数据集,我想将其加载到 pypark RDD 中以进行 ETL 数据处理。

我意识到 BigQuery 支持 Hadoop 输入/输出格式

https://cloud.google.com/hadoop/writing-with-bigquery-connector

并且 pyspark 应该能够使用此接口,以便通过使用“newAPIHadoopRDD”方法创建 RDD。

http://spark.apache.org/docs/latest/api/python/pyspark.html

不幸的是,两端的文档似乎很少,超出了我对 Hadoop/Spark/BigQuery 的了解。有没有人知道如何做到这一点?

【问题讨论】:

【参考方案1】:

Google 现在有一个example,介绍如何将 BigQuery 连接器与 Spark 结合使用。

使用 GsonBigQueryInputFormat 似乎确实存在问题,但我得到了一个简单的莎士比亚字数统计示例

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = "mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>", "mapred.bq.input.project.id": "publicdata", "mapred.bq.input.dataset.id":"samples", "mapred.bq.input.table.id": "shakespeare"  

tableData = sc.newAPIHadoopRDD("com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"], int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)

【讨论】:

嗨,是的。其实我早就想通了。但是他们只发布了 scala 的连接器。有时间我会使用 pyspark 解决。

以上是关于通过 Hadoop 输入格式示例用于 pyspark 的 BigQuery 连接器的主要内容,如果未能解决你的问题,请参考以下文章

Spark学习之数据读取与保存总结

Hadoop的可移植键值数据文件格式?

Hadoop TextInputFormat源码分析

格式化集群与启动集群

hadoop基本操作

Mapreduce Hadoop 的 PDF 输入格式