按字段Scala中的值过滤rdd行

Posted

技术标签:

【中文标题】按字段Scala中的值过滤rdd行【英文标题】:Filter rdd lines by values in fields Scala 【发布时间】:2017-03-13 18:27:17 【问题描述】:

我有一个具有以下结构的 csv:

标题,标题,标题,标题,标题 val1、val2、val3、val4、val5 val1、val2、null、val4、val5 val1, val2, val3, null, val5

我需要做的是过滤掉标题和在特定位置包含空值的数据行(在 val3 处有空值是可以的,但在 val4 处没有空值)。我做了一个 rdd 并用逗号分隔行,我希望像访问数组的索引位置一样访问每一行。但我不知道如何进行比较。我可以提取字段:

rdd.map(values=>(values(2))

您如何进行比较?特别是“不包含”。我认为有一种比较方法可用,或者这个问题是否需要一个元组和!包含?

【问题讨论】:

是否有任何限制,您需要使用 RDD?如果不是我认为,您可以使用 DataFrame。 SPARK的DataFrame API最适合处理CSV文件的操作。 【参考方案1】:

假设您已经定义了包装这些值的类型,假设:

case class Record(val1: String, val2: Option[String], val3: String, val4: Option[String])

val rdd: RDD[Record] = ...
rdd.filter(record => record.val2.isDefined && record.val4.isDefined)

我希望这会有所帮助。

【讨论】:

【参考方案2】:

如果您使用DataFrames 而不是RDDs,您将使用filter 和布尔Column 操作。

假设val4val5 都不应该为空。

如果你的 csv 看起来像这样:

evan@vbox ~ > cat dat_1.csv
header1,header2,header3,header4,header5
val1,val2,val3,val4,val5
val1,val2,,val4,val5
val1,val2,val3,,val5

那么您的代码将如下所示:

scala> val dat_1 = spark.read.option("header", true).csv("dat_1.csv")
dat_1: org.apache.spark.sql.DataFrame = [header1: string, header2: string ... 3 more fields]

scala> dat_1.show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
|   val1|   val2|   val3|   val4|   val5|
|   val1|   val2|   null|   val4|   val5|
|   val1|   val2|   val3|   null|   val5|
+-------+-------+-------+-------+-------+


scala> data1.filter($"header4".isNotNull && $"header5".isNotNull).show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
|   val1|   val2|   val3|   val4|   val5|
|   val1|   val2|   null|   val4|   val5|
+-------+-------+-------+-------+-------+

否则,如果您的数据如下所示:

evan@vbox ~ > cat dat_2.csv
header1,header2,header3,header4,header5
val1,val2,val3,val4,val5
val1,val2,null,val4,val5
val1,val2,val3,null,val5

那么您的代码将如下所示:

scala> val dat_2 = spark.read.option("header", true).csv("dat_2.csv")
dat_2: org.apache.spark.sql.DataFrame = [header1: string, header2: string ... 3 more fields]

scala> dat_2.show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
|   val1|   val2|   val3|   val4|   val5|
|   val1|   val2|   null|   val4|   val5|
|   val1|   val2|   val3|   null|   val5|
+-------+-------+-------+-------+-------+


scala> dat_2.filter($"header4" =!= "null" && $"header5" =!= "null").show
+-------+-------+-------+-------+-------+
|header1|header2|header3|header4|header5|
+-------+-------+-------+-------+-------+
|   val1|   val2|   val3|   val4|   val5|
|   val1|   val2|   null|   val4|   val5|
+-------+-------+-------+-------+-------+

【讨论】:

【参考方案3】:

输入文件中的 Null 值不以它在文件中的表示方式表示:

header,header,header,header,header
val1, val2, val3, val4, val5
val1, val2, null, val4, val5
val1, val2, val3, null, val5

应该是这样的:

header,header,header,header,header
val1, val2, val3, val4, val5
val1, val2, null, val4, val5
val1, val2, val3,, val5

解决方案:使用 mapPartitionsWithIndex 删除第 0 个索引的第一个迭代器将从您的输入文件中过滤标题,而在第 4 个字段上使用 != "" 将过滤掉第 3 行

**scala>** sc.textFile("/User/VJ/testfile").
mapPartitionsWithIndex((x,y) => if (x==0) y.drop(1) else y).
filter(x=>x.split(",")(3) != "" ).
take(5).foreach(println)

所需输出:

val1, val2, val3, val4, val5
val1, val2, null, val4, val5

这里的例子 https://tips-to-code.blogspot.com/2018/08/nulls-in-scala-spark.html

谢谢, 维沙尔。

【讨论】:

以上是关于按字段Scala中的值过滤rdd行的主要内容,如果未能解决你的问题,请参考以下文章

使用 Scala 根据 RDD 中的多个键列对值进行分组的最快方法是啥? [复制]

使用数据框的子集和 spark/scala 中的两个特定字段过滤数据框 [关闭]

按字段名称在新的 JS 过滤数组中移动 JS 数组行

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

如何在 play 2.4 中的 scala 模板中设置类型列表字段的值?

Spark RDD编程