在 Apache Spark 中加入文件

Posted

技术标签:

【中文标题】在 Apache Spark 中加入文件【英文标题】:Join files in Apache Spark 【发布时间】:2017-05-09 03:22:33 【问题描述】:

我有一个这样的文件。 code_count.csv

code,count,year
AE,2,2008
AE,3,2008
BX,1,2005
CD,4,2004
HU,1,2003
BX,8,2004

另一个这样的文件。 details.csv

code,exp_code
AE,Aerogon international
BX,Bloomberg Xtern
CD,Classic Divide
HU,Honololu

我想要每个代码的总和,但在最终输出中,我想要 exp_code。像这样

Aerogon international,5
Bloomberg Xtern,4
Classic Divide,4

这是我的代码

var countData=sc.textFile("C:\path\to\code_count.csv")
var countDataKV=countData.map(x=>x.split(",")).map(x=>(x(0),1))
var sum=countDataKV.foldBykey(0)((acc,ele)=>(acc+ele))
sum.take(2)

给予

Array[(String, Int)] = Array((AE,5), (BX,9))

这里 sum 是 RDD[(String, Int)]。我对如何从另一个文件中提取 exp_code 有点困惑。请指导。

【问题讨论】:

【参考方案1】:

您需要在 groupby 之后用代码计算总和,然后加入另一个数据框。下面是类似的例子。

import spark.implicits._
  val df1 = spark.sparkContext.parallelize(Seq(("AE",2,2008), ("AE",3,2008), ("BX",1,2005), ("CD",4,2004), ("HU",1,2003), ("BX",8,2004)))
    .toDF("code","count","year")

  val df2 = spark.sparkContext.parallelize(Seq(("AE","Aerogon international"),
    ("BX","Bloomberg Xtern"), ("CD","Classic Divide"), ("HU","Honololu"))).toDF("code","exp_code")


  val sumdf1 = df1.select("code", "count").groupBy("code").agg(sum("count"))

  val finalDF = sumdf1.join(df2, "code").drop("code")

finalDF.show()

【讨论】:

【参考方案2】:

如果您使用的是 spark 版本 > 2.0,您可以直接使用以下代码。 com.databricks.spark.csv 默认作为 spark 2.0 的一部分提供

val codeDF = spark
      .read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("hdfs://pathTo/code_count.csv")    

val detailsDF = spark
      .read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("hdfs://pathTo/details.csv")    
//
//
import org.apache.spark.sql.functions._
val resDF = codeDF.join(detailsDF,codeDF.col("code")===detailsDF.col("code")).groupBy(codeDF.col("code"),detailsDF.col("exp_code")).agg(sum("count").alias("cnt"))

输出: 如果您使用的是 spark

您可以点击此链接使用 com.databricks.spark.csv

https://github.com/databricks/spark-csv

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);
 import hiveContext.implicits._
val codeDF = hiveContext.read.format("com.databricks.spark.csv")
                                        .option("header", "true")
                                        .option("treatEmptyValuesAsNulls", "true")
                                        .option("inferSchema", "true")
                                        .option("delimiter",",")
                                        .load("hdfs://pathTo/code_count.csv")
val detailsDF = hiveContext.read.format("com.databricks.spark.csv")
                                        .option("header", "true")                                       
                                        .option("inferSchema", "true")
                                        .option("delimiter",",")
                                        .load("hdfs://pathTo/details.csv")

import org.apache.spark.sql.functions._
val resDF = codeDF.join(detailsDF,codeDF.col("code")===detailsDF.col("code")).groupBy(codeDF.col("code"),detailsDF.col("exp_code")).agg(sum("count").alias("cnt"))

【讨论】:

@aravind 你能告诉我在哪个版本下我会得到这个 val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);我无法在 sql 包下获取配置单元。 我确信它应该适用于所有版本 > =1.5.2 添加这个 import org.apache.spark.sql._ 以及程序顶部的所有导入。如果您遇到问题,请告诉我。

以上是关于在 Apache Spark 中加入文件的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中加入数据框

Apache Pig 如何在加载功能中加入白名单或黑名单?

如何在 spark scala 中加入 2 rdd

在 Spark 中加入倾斜的数据集?

使用用户定义的函数在 spark 中加入数据集时需要填充其他信息

Spark:在 UDF 或映射函数中加入