如何获得字符串匹配的行数并将其添加为 Scala 中的新列?
Posted
技术标签:
【中文标题】如何获得字符串匹配的行数并将其添加为 Scala 中的新列?【英文标题】:How would I get the row wise count of a string match and add it as a new column in Scala? 【发布时间】:2020-09-24 18:24:43 【问题描述】:这是我的数据框:
val new_df = Seq(("mike","A","B","B","C","A"),
("bob","A","A","B","A","C")).toDF("name","math","science","english","history","art")
这是我试图生成的结果数据框:
val new_df2 = Seq(("mike","A","B","B","C","A",2,2,1),
("bob","A","A","B","A","C",3,1,1)).toDF("name","math","science","english","history","art","A_count","B_count","C_count")
以下是表格视图中的前后快照:
所以我想添加一个列来获取每一行的 A (或任何字符串匹配)的计数。我该怎么办?我知道使用 withColumn 会起作用,但我不确定如何在一行上进行字符串匹配。
非常感谢,祝您有美好的一天!
【问题讨论】:
【参考方案1】:这可以通过对Dataframe
的行和列进行平面映射,然后对结果执行.pivot().count()
来动态解决。
val grades = Array[String]("A", "B", "C")
val cols = new_df.columns
val gcount = new_df.flatMap( row =>
val name = row.getAs[String]("name")
cols.flatMap( c =>
val grade = row.getAs[String](c)
if (grades.contains(grade))
Some(name, grade)
else
None
)
).toDF("name", "grade")
.groupBy("name")
.pivot("grade")
.count()
.withColumnRenamed("A", "A_count")
.withColumnRenamed("B", "B_count")
.withColumnRenamed("C", "C_count")
new_df.join(gcount, "name").show()
这会导致:
+----+----+-------+-------+-------+---+-------+-------+-------+
|name|math|science|english|history|art|A_count|B_count|C_count|
+----+----+-------+-------+-------+---+-------+-------+-------+
|mike| A| B| B| C| A| 2| 2| 1|
| bob| A| A| B| A| C| 3| 1| 1|
+----+----+-------+-------+-------+---+-------+-------+-------+
--- 编辑
我可以完成每个主要步骤,并解释它们的作用:
val gcount = new_df.flatMap( row =>
val name = row.getAs[String]("name")
cols.flatMap( c =>
val grade = row.getAs[String](c)
if (grades.contains(grade))
Some(name, grade)
else
None
)
).toDF("name", "grade")
.flatmap()
与 .map()
类似,只是输入记录与输出记录的比例不是 1:1,而是 1:n。换句话说,它可以为它处理的每条记录发出 0 条或更多条记录。在这种情况下,我们遍历每一行,然后遍历每一列,为每个人的每个等级发出一条记录(只要单元格的值存在于我们感兴趣的等级的初始数组中)。 .toDF()
方法只是将我们得到的Dataset[(String, String)]
转换回Dataframe
。此代码块生成的Dataframe
如下所示:
+----+-----+
|name|grade|
+----+-----+
|mike| A|
|mike| B|
|mike| B|
|mike| C|
|mike| A|
| bob| A|
| bob| A|
| bob| B|
| bob| A|
| bob| C|
+----+-----+
有了那个中间 Dataframe
我们可以做一个
groupBy("name").pivot("grade").count()
groupBy()
根据提供的列中包含的值创建记录组。然后,.pivot()
将在其提供的列中获取不同的值,并为每个值创建一个新列。最后,.count()
方法确定如何聚合不同列组的值。
.withColumnRenamed()
方法只是为了使最终的Dataframe
“看起来”就像您通过实际重命名列所请求的那样(因为实际的先前成绩值成为列名)。将Some(name, grade)
更改为Some(name, grade+"_count")
以避免静态列重命名会更好地解决这个问题。
关于您遇到的 CSV 错误,我必须查看实际代码和 CSV 标头才能了解可能导致该错误的原因。
--- 替代解决方案
我还设计了一个 hacky 替代解决方案,它需要修复源 Dataframe
中的列,这可能不是最有效的方法。它与上述更好的解决方案大多无关,但我提供它以防万一:
var df = new_df
val grades = Array[String]("A", "B", "C")
grades.foreach(g =>
df = df.withColumn(g +"_count", ($"math" === lit(g)).cast("Int") + ($"science" === lit(g)).cast("Int") + ($"english" === lit(g)).cast("Int") + ($"history" === lit(g)).cast("Int") + ($"art" === lit(g)).cast("Int"))
)
同样,它是一个 hacky 解决方案,但可以适用于少数“等级”、固定列集,并且如果您不太关心效率。
【讨论】:
您是否有一个更易于理解并分成更小步骤的分步解决方案?您的代码在此示例数据帧上完美运行,但是当我将其应用于我的巨大数据帧时,我收到 CSV 标头不符合架构的错误。是的,我仔细检查了我是否适当地更改了所有变量名称等。 我在“--- Edit”之后添加了一些主要步骤的解释。【参考方案2】:最好的方法是使用 Spark SQL。虽然您可以使用 flatMap 解决这个问题,但它要慢得多,最好尝试在线完成所有操作!我写了这篇文章,以便您可以轻松扩展到更多年级,并且样板更少。
import org.apache.spark.sql.Column
def colIsGrade(col:Column, grade:String) = when(col === lit(grade), lit(1)).otherwise(lit(0))
def countOccurenceOf(grade:String) = (List($"math", $"science", $"english", $"history", $"art").foldLeft(lit(0))
case (count, subject) => colIsGrade(subject, grade) + count
).as(s"$grade_count")
val grades = List("A","B","C","D","E","F")
val gradesColumnStatement = grades.map(countOccurenceOf)
new_df.select(col("*") +: gradesColumnStatement :_*)
【讨论】:
以上是关于如何获得字符串匹配的行数并将其添加为 Scala 中的新列?的主要内容,如果未能解决你的问题,请参考以下文章
ComputeStatistics 返回错误的行数值。如何在 MS Word 中获得实际的行数?