如何根据条件(组中的值)更新列?
Posted
技术标签:
【中文标题】如何根据条件(组中的值)更新列?【英文标题】:How to update column based on a condition (a value in a group)? 【发布时间】:2016-11-19 10:54:15 【问题描述】:我有以下df:
+---+----+-----+
|sno|dept|color|
+---+----+-----+
| 1| fn| red|
| 2| fn| blue|
| 3| fn|green|
+---+----+-----+
如果任何颜色列的值为red
,那么我应该将颜色列的所有值更新为red
,如下所示:
+---+----+-----+
|sno|dept|color|
+---+----+-----+
| 1| fn| red|
| 2| fn| red|
| 3| fn| red|
+---+----+-----+
我想不通。请帮忙;我试过以下代码:
val gp=jdbcDF.filter($"dept".contains("fn"))
//.withColumn("newone",when($"dept"==="fn","RED").otherwise("NULL"))
gp.show()
gp.map(
row=>
val row1=row.getAs[String](1)
var row2=row.getAs[String](2)
val make=if(row1 =="fn") row2="red"
Row(row(0),row(1),make)
).collect().foreach(println)
【问题讨论】:
【参考方案1】:给定:
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "fn", "color")
进行计算:
val redOrNot = df.groupBy("fn")
.agg(collect_set('color) as "values")
.withColumn("hasRed", array_contains('values, "red"))
// gives null for no option
val colorPicker = when('hasRed, "red")
val result = df.join(redOrNot, "fn")
.withColumn("resultColor", colorPicker)
.withColumn("color", coalesce('resultColor, 'color)) // skips nulls that leads to the answer
.select('id, 'fn, 'color)
result
看起来如下(这似乎是一个答案):
scala> result.show
+---+---+-----+
| id| fn|color|
+---+---+-----+
| 1| fn| red|
| 2| fn| red|
| 3| fn| red|
| 4| aa| blue|
| 5| aa|green|
| 6| bb| red|
| 7| bb| red|
| 8| aa| blue|
+---+---+-----+
您可以链接when
运算符并使用otherwise
设置默认值。请咨询scaladoc of when
operator。
我认为您可以使用窗口运算符或用户定义的聚合函数 (UDAF) 做一些非常相似的事情(也许更有效),但是......嗯......目前不知道该怎么做。在这里留下评论以激励他人;-)
附言学到了很多!感谢您的想法!
【讨论】:
我希望我可以用UDF替换when,这样我就可以根据一些逻辑返回任何颜色。 查看更新。你应该小心使用 UDF,因为 Spark SQL 的查询优化器可能会做不好的优化。 @Shankar:这种语法是可能的df.withColumn("Green_Ind", when($"color" === "Green", 1).when($"color" === "Red", 1).otherwise(0))
@JacekLaskowski:窗口函数呢。虽然你的答案是最好的,但我认为它也可以用 Window 函数来实现不是吗?
@JacekLaskowski:仍然想知道,如果没有join
,是否无法实现这一点,如果数据很大,是的,虽然可以对第一个 DF 使用广播连接。【参考方案2】:
不需要昂贵分组的高效解决方案:
// All groups with `red`
df.where($"color" === "red").select($"fn".alias("fn_")).distinct
// Join with input
.join(df.as("df"), $"fn" === $"fn_", "rightouter")
// Replace `color`
.withColumn("color", when($"fn_"isNull, $"color").otherwise(lit("red")))
.drop("fn_")
【讨论】:
这是一个很好的答案。 Jacek 在接受的答案中也提到了它。【参考方案3】:如果满足某个属性,您将有条件地更新 DataFrame。在这种情况下,属性是“颜色列包含‘红色’”。表达这一点的惯用方式是使用所需的谓词进行过滤,然后确定是否有任何行满足它。无需加入。
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.DataFrame
def makeAllRedIfAnyAreRed(df: DataFrame) =
val containsRed = df.filter(df("color") === "red").count() > 0
if (containsRed) df.withColumn("color", lit("red")) else df
【讨论】:
【参考方案4】:Spark 2.2.0: 示例数据框(取自上述示例)
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "dept", "color")
通过检查条件创建了一个 UDF 来执行更新。
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()
输出:
火花 1.6:
val conf = new SparkConf().setMaster("local").setAppName("My app")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// For implicit conversions like converting RDDs to DataFrames
val df = sc.parallelize(Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
) ).toDF("id","dept","color")
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()
【讨论】:
【参考方案5】:由于过滤后的数据框中的行数可能很少,因此我正在添加具有 isin()
和 .withColumn()
组合的解决方案。
示例数据帧
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "dept", "color")
现在让我们只选择至少有一个 red color
行的dept
s 并将其放在broadcast
变量中,如下所示。
val depts = sc.broadcast(df.filter($"color" === "red").select(collect_set("dept")).first.getSeq[String](0)))
为过滤的depts
记录更新红色颜色。
isin()
采用可变参数,因此将列表转换为可变参数 (depts.value:_*
)
//creating new column by giving diff name (clr) to see the diff
val result = df.withColumn("clr", when($"dept".isin(depts.value:_*),lit("red"))
.otherwise($"color"))
result.show()
+---+----+-----+-----+
| id|dept|color| clr|
+---+----+-----+-----+
| 1| fn| red| red|
| 2| fn| blue| red|
| 3| fn|green| red|
| 4| aa| blue| blue|
| 5| aa|green|green|
| 6| bb| red| red|
| 7| bb| red| red|
| 8| aa| blue| blue|
+---+----+-----+-----+
【讨论】:
以上是关于如何根据条件(组中的值)更新列?的主要内容,如果未能解决你的问题,请参考以下文章