Scala Spark DataFrame SQL withColumn - 如何使用函数(x:字符串)进行转换

Posted

技术标签:

【中文标题】Scala Spark DataFrame SQL withColumn - 如何使用函数(x:字符串)进行转换【英文标题】:Scala Spark DataFrame SQL withColumn - how to use function(x:String) for transformations 【发布时间】:2018-04-03 04:30:25 【问题描述】:

我的目标是向现有 DataFrame 添加列,并使用 DF 中现有列的转换填充列。

我发现的所有示例都使用 withColumn 添加列,使用 when().otherwise() 进行转换。

我希望使用带有匹配大小写的已定义函数(x:String),它允许我使用字符串函数并应用更复杂的转换。

示例数据帧

val etldf = Seq(   
            ("Total, 20 to 24 years            "),
            ("Men, 20 to 24 years              "),
            ("Women, 20 to 24 years            ")).toDF("A")

使用 when().otherwise() 应用一个简单的转换。我可以将一堆这些嵌套在一起,但很快就会变得混乱。

val newcol = when($"A".contains("Men"), "Male").
  otherwise(when($"A".contains("Women"), "Female").
  otherwise("Both"))
val newdf = etldf.withColumn("NewCol", newcol)      
newdf.select("A","NewCol").show(100, false)

输出如下:

+---------------------------------+------+
|A                                |NewCol|
+---------------------------------+------+
|Total, 20 to 24 years            |Both  |
|Men, 20 to 24 years              |Male  |
|Women, 20 to 24 years            |Female|
+---------------------------------+------+

但是假设我想要一个稍微复杂一点的转换:

val newcol = when($"A".contains("Total") && $"A".contains("years"), $"A".indexOf("to").toString())

它不喜欢这样,因为 indexOf 是一个字符串函数,而不是 ColumnName 的成员。

我真正想做的是定义一个可以实现非常复杂的转换并将其传递给 withColumn() 的函数:

 def AtoNewCol( A : String): String = A match 
   case a if a.contains("Men") => "Male"
   case a if a.contains("Women") => "Female"
   case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
   case other => "Both"
 
 AtoNewCol("Total, 20 to 24 years            ")  

输出结果为 10(“to”的位置)

但我面临同样的类型不匹配:withColumn() 想要一个 ColumnName 对象:

scala> val newdf = etldf.withColumn("NewCol", AtoNewCol($"A"))
<console>:33: error: type mismatch;
found   : org.apache.spark.sql.ColumnName
required: String
val newdf = etldf.withColumn("NewCol", AtoNewCol($"A"))
                                                    ^

如果我更改 AtoNewCol(A: org.apache.spark.sql.ColumnName) 的签名,我会在实现中遇到同样的问题:

scala>  def AtoNewCol( A : org.apache.spark.sql.ColumnName): String = A 
match 
 |     case a if a.contains("Men") => "Male"
 |     case a if a.contains("Women") => "Female"
 |     case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
 |     case other => "Both"
 |   
<console>:30: error: type mismatch;
found   : org.apache.spark.sql.Column
required: Boolean
       case a if a.contains("Men") => "Male"
                           ^
.
.
.
etc.  

我希望有一种语法允许将列的值绑定到函数。

或者也许有一个除了 withColum() 之外的函数可以为转换定义更复杂的函数。

接受所有建议。

【问题讨论】:

你需要一个 udf 函数 【参考方案1】:

您只需要一个udf 函数

import org.apache.spark.sql.functions._
def AtoNewCol = udf(( A : String) => A match 
  case a if a.contains("Men") => "Male"
  case a if a.contains("Women") => "Female"
  case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString()
  case other => "Both"
)

etldf.withColumn("NewCol", AtoNewCol($"A")).show(false)

你应该得到

+---------------------------------+------+
|A                                |NewCol|
+---------------------------------+------+
|Total, 20 to 24 years            |10    |
|Men, 20 to 24 years              |Male  |
|Women, 20 to 24 years            |Female|
+---------------------------------+------+

udf 函数逐行工作,数据操作发生在原始数据类型上不像其他内置函数那样逐列

【讨论】:

嗨 Ramesh :) 我不想让你这么难过。分数真的那么重要。我喜欢获得积分,但它们并不那么重要。这里的目的不就是让学员在需要的时候来社区寻求帮助吗?好的问题和好的答案为搜索未来参考提供了良好的知识库。我们都从中受益。这就是我这样做的原因。 如果你没有说“所以为了客观和公平,我使用订单作为我的决策树,然后用时间验证”我不会生气。你明白我的意思吗?它与积分无关。它关于公平和你的决策树。你为什么要删除那些措辞激烈的评论? @Threadid,而不是意识到你的错误并纠正你在对我说大话。哇 @Threadid,顺便说一句,我知道 *** 团队最终会将接受分配给应得的答案(即使您不希望这样做)。我经历过。 *** 团队很棒。【参考方案2】:

您需要为此创建 UDF,您可以尝试以下操作。我正在使用你定义的函数。

def AtoNewCol = udf((A: String) => 
  A match 
    case a if a.contains("Men") => "Male"
    case a if a.contains("Women") => "Female"
    case a if a.contains("Total") && a.contains("years") => a.indexOf("to").toString
    case other => "Both"
  
)

etldf.withColumn("NewCol", AtoNewCol($"A")).show(false)

//    output
//    +---------------------------------+------+
//    |A                                |NewCol|
//    +---------------------------------+------+
//    |Total, 20 to 24 years            |10    | 
//    |Men, 20 to 24 years              |Male  |
//    |Women, 20 to 24 years            |Female|
//    +---------------------------------+------+

【讨论】:

以上是关于Scala Spark DataFrame SQL withColumn - 如何使用函数(x:字符串)进行转换的主要内容,如果未能解决你的问题,请参考以下文章

scala.collection.immutable.Iterable[org.apache.spark.sql.Row] 到 DataFrame ?错误:使用替代方法重载了方法值 createDat

Scala Spark DataFrame SQL withColumn - 如何使用函数(x:字符串)进行转换

Spark将DataFrame数据sftp到指定机器(scala)

使用Scala在Spark中创建DataFrame时出错

spark sql 操作

Spark: scala.MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema