如何根据条件(组中的值)更新列?

Posted

技术标签:

【中文标题】如何根据条件(组中的值)更新列?【英文标题】:How to update column based on a condition (a value in a group)? 【发布时间】:2016-11-19 10:54:15 【问题描述】:

我有以下df:

+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn| blue|
|  3|  fn|green|
+---+----+-----+

如果任何颜色列的值为red,那么我应该将颜色列的所有值更新为red,如下所示:

+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn|  red|
|  3|  fn|  red|
+---+----+-----+

我想不通。请帮忙;我试过以下代码:

val gp=jdbcDF.filter($"dept".contains("fn"))
     //.withColumn("newone",when($"dept"==="fn","RED").otherwise("NULL"))
    gp.show()
gp.map(
  row=>
    val row1=row.getAs[String](1)
    var row2=row.getAs[String](2)
    val make=if(row1 =="fn") row2="red"
    Row(row(0),row(1),make)
  
).collect().foreach(println)

【问题讨论】:

【参考方案1】:

给定:

val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "fn", "color")

进行计算:

val redOrNot = df.groupBy("fn")
  .agg(collect_set('color) as "values")
  .withColumn("hasRed", array_contains('values, "red"))

// gives null for no option
val colorPicker = when('hasRed, "red")
val result = df.join(redOrNot, "fn")
  .withColumn("resultColor", colorPicker) 
  .withColumn("color", coalesce('resultColor, 'color)) // skips nulls that leads to the answer
  .select('id, 'fn, 'color)

result 看起来如下(这似乎是一个答案):

scala> result.show
+---+---+-----+
| id| fn|color|
+---+---+-----+
|  1| fn|  red|
|  2| fn|  red|
|  3| fn|  red|
|  4| aa| blue|
|  5| aa|green|
|  6| bb|  red|
|  7| bb|  red|
|  8| aa| blue|
+---+---+-----+

您可以链接when 运算符并使用otherwise 设置默认值。请咨询scaladoc of when operator。

我认为您可以使用窗口运算符或用户定义的聚合函数 (UDAF) 做一些非常相似的事情(也许更有效),但是......嗯......目前不知道该怎么做。在这里留下评论以激励他人;-)

附言学到了很多!感谢您的想法!

【讨论】:

我希望我可以用UDF替换when,这样我就可以根据一些逻辑返回任何颜色。 查看更新。你应该小心使用 UDF,因为 Spark SQL 的查询优化器可能会做不好的优化。 @Shankar:这种语法是可能的df.withColumn("Green_Ind", when($"color" === "Green", 1).when($"color" === "Red", 1).otherwise(0)) @JacekLaskowski:窗口函数呢。虽然你的答案是最好的,但我认为它也可以用 Window 函数来实现不是吗? @JacekLaskowski:仍然想知道,如果没有join,是否无法实现这一点,如果数据很大,是的,虽然可以对第一个 DF 使用广播连接。【参考方案2】:

不需要昂贵分组的高效解决方案:

// All groups with `red`
df.where($"color" === "red").select($"fn".alias("fn_")).distinct
  // Join with input
  .join(df.as("df"), $"fn" === $"fn_", "rightouter")
  // Replace `color`
  .withColumn("color", when($"fn_"isNull, $"color").otherwise(lit("red")))
  .drop("fn_")

【讨论】:

这是一个很好的答案。 Jacek 在接受的答案中也提到了它。【参考方案3】:

如果满足某个属性,您将有条件地更新 DataFrame。在这种情况下,属性是“颜色列包含‘红色’”。表达这一点的惯用方式是使用所需的谓词进行过滤,然后确定是否有任何行满足它。无需加入。

import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.DataFrame

def makeAllRedIfAnyAreRed(df: DataFrame) = 
    val containsRed = df.filter(df("color") === "red").count() > 0
    if (containsRed) df.withColumn("color", lit("red")) else df

【讨论】:

【参考方案4】:

Spark 2.2.0: 示例数据框(取自上述示例)

    val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "dept", "color")

通过检查条件创建了一个 UDF 来执行更新。

val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)

val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()

输出:

火花 1.6:

val conf = new SparkConf().setMaster("local").setAppName("My app")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._
// For implicit conversions like converting RDDs to DataFrames
val df = sc.parallelize(Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
) ).toDF("id","dept","color")


val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))

final_df.show()

【讨论】:

【参考方案5】:

由于过滤后的数据框中的行数可能很少,因此我正在添加具有 isin().withColumn() 组合的解决方案。

示例数据帧

val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "dept", "color")

现在让我们只选择至少有一个 red color 行的depts 并将其放在broadcast 变量中,如下所示。

val depts = sc.broadcast(df.filter($"color" === "red").select(collect_set("dept")).first.getSeq[String](0)))

为过滤的depts 记录更新红色颜色。

isin() 采用可变参数,因此将列表转换为可变参数 (depts.value:_*)

//creating new column by giving diff name (clr) to see the diff
val result = df.withColumn("clr", when($"dept".isin(depts.value:_*),lit("red"))
                    .otherwise($"color"))

result.show()

+---+----+-----+-----+
| id|dept|color|  clr|
+---+----+-----+-----+
|  1|  fn|  red|  red|
|  2|  fn| blue|  red|
|  3|  fn|green|  red|
|  4|  aa| blue| blue|
|  5|  aa|green|green|
|  6|  bb|  red|  red|
|  7|  bb|  red|  red|
|  8|  aa| blue| blue|
+---+----+-----+-----+

【讨论】:

以上是关于如何根据条件(组中的值)更新列?的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK:根据条件用另一个行值更新一行中的值?

如何有条件的更新sql 数据表中的列值?

如何根据列表有条件地更新 Pandas 中的 DataFrame 列

如何根据与另一个表中的值的比较来更新列

如何根据r中的条件(日期)更新data.table中的值

如何根据条件更改考拉数据框中的值