在 Apache Spark 中加入文件
Posted
技术标签:
【中文标题】在 Apache Spark 中加入文件【英文标题】:Join files in Apache Spark 【发布时间】:2017-05-09 03:22:33 【问题描述】:我有一个这样的文件。 code_count.csv
code,count,year
AE,2,2008
AE,3,2008
BX,1,2005
CD,4,2004
HU,1,2003
BX,8,2004
另一个这样的文件。 details.csv
code,exp_code
AE,Aerogon international
BX,Bloomberg Xtern
CD,Classic Divide
HU,Honololu
我想要每个代码的总和,但在最终输出中,我想要 exp_code。像这样
Aerogon international,5
Bloomberg Xtern,4
Classic Divide,4
这是我的代码
var countData=sc.textFile("C:\path\to\code_count.csv")
var countDataKV=countData.map(x=>x.split(",")).map(x=>(x(0),1))
var sum=countDataKV.foldBykey(0)((acc,ele)=>(acc+ele))
sum.take(2)
给予
Array[(String, Int)] = Array((AE,5), (BX,9))
这里 sum 是 RDD[(String, Int)]。我对如何从另一个文件中提取 exp_code 有点困惑。请指导。
【问题讨论】:
【参考方案1】:您需要在 groupby 之后用代码计算总和,然后加入另一个数据框。下面是类似的例子。
import spark.implicits._
val df1 = spark.sparkContext.parallelize(Seq(("AE",2,2008), ("AE",3,2008), ("BX",1,2005), ("CD",4,2004), ("HU",1,2003), ("BX",8,2004)))
.toDF("code","count","year")
val df2 = spark.sparkContext.parallelize(Seq(("AE","Aerogon international"),
("BX","Bloomberg Xtern"), ("CD","Classic Divide"), ("HU","Honololu"))).toDF("code","exp_code")
val sumdf1 = df1.select("code", "count").groupBy("code").agg(sum("count"))
val finalDF = sumdf1.join(df2, "code").drop("code")
finalDF.show()
【讨论】:
【参考方案2】:如果您使用的是 spark 版本 > 2.0,您可以直接使用以下代码。 com.databricks.spark.csv 默认作为 spark 2.0 的一部分提供
val codeDF = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs://pathTo/code_count.csv")
val detailsDF = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs://pathTo/details.csv")
//
//
import org.apache.spark.sql.functions._
val resDF = codeDF.join(detailsDF,codeDF.col("code")===detailsDF.col("code")).groupBy(codeDF.col("code"),detailsDF.col("exp_code")).agg(sum("count").alias("cnt"))
输出: 如果您使用的是 spark
您可以点击此链接使用 com.databricks.spark.csv
https://github.com/databricks/spark-csv
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);
import hiveContext.implicits._
val codeDF = hiveContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("delimiter",",")
.load("hdfs://pathTo/code_count.csv")
val detailsDF = hiveContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter",",")
.load("hdfs://pathTo/details.csv")
import org.apache.spark.sql.functions._
val resDF = codeDF.join(detailsDF,codeDF.col("code")===detailsDF.col("code")).groupBy(codeDF.col("code"),detailsDF.col("exp_code")).agg(sum("count").alias("cnt"))
【讨论】:
@aravind 你能告诉我在哪个版本下我会得到这个 val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);我无法在 sql 包下获取配置单元。 我确信它应该适用于所有版本 > =1.5.2 添加这个import org.apache.spark.sql._
以及程序顶部的所有导入。如果您遇到问题,请告诉我。以上是关于在 Apache Spark 中加入文件的主要内容,如果未能解决你的问题,请参考以下文章