Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf

Posted

技术标签:

【中文标题】Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf【英文标题】:Spark - How to apply a udf over single field in a Seq[Map<String,String>] 【发布时间】:2018-04-03 06:12:26 【问题描述】:

我有一个数据框,其中包含两列类型为 String 和 Seq[Map[String, String]]。比如:

Name    Contact
Alan    [(Map(number -> 12345   , type -> home)),   (Map(number -> 87878787 , type -> mobile))]
Ben     [(Map(number -> 94837593    , type -> job)),(Map(number -> 346      , type -> home))]

所以我需要在每个 Map[String,String] o 数组中的每个元素的字段 number 上应用 udf。这个udf 基本上将转换为 0000 任何长度小于 6 的number。像这样:

def valid_num_udf = 
udf((numb:String) =>
 
if(numb.length < 6)
   "0000"
else 
    numb 
)

预期的结果是这样的:

NAME    CONTACT
Alan    [(Map(number -> 0000    , type -> home)),   (Map(number -> 87878787 , type -> mobile))]
Ben     [(Map(number -> 94837593    , type -> job)),(Map(number -> 0000     , type -> home))]

我想要的是使用另一个 udf 访问每个 number 字段,然后应用 valid_num_udf()

我正在尝试这样的事情,但我不知道在 Scala 中执行此操作的正确语法是什么。

val newDf = Df.withColumn("VALID_CONTACT", myUdf($"CONTACT"))

//This part is really really wrong, but don't know better
def myUdf = udf[Seq[Map[String, String]], Seq[Map[String, String]]]  
    inputSeq => inputSeq.map(_.get("number") => valid_num_udf(_.get("number")))

谁能告诉我如何只访问地图中的一个字段,而保持地图的其他字段不变?

更新:DataFrame 的架构将是

root
 |-- NAME: string (nullable = true)
 |-- CONTACT: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

org.apache.spark.sql.types.StructType = StructType(StructField(NAME,StringType,true), StructField(CONTACT,ArrayType(MapType(StringType,StringType,true),true),true))

【问题讨论】:

【参考方案1】:

您的 UDF 的签名略有错误。您将通过 Seq[Map[String, String]] 作为输入:

val validNumber = udf (xs: Seq[Map[String, String]]) => 
                        xs.map x => 
                          if (x("number").length < 6) 
                             Map("number" -> "0000" , "type" -> x("type")) 
                          else x 
                     

 df.show(false)
+----+-----------------------------------------------------------------------------+
|name|contact                                                                      |
+----+-----------------------------------------------------------------------------+
|Alan|[Map(number -> 6789, type -> home), Map(number -> 987654321, type -> mobile)]|
+----+-----------------------------------------------------------------------------+


df.select(validNumber($"contact") ).show(false)
+-----------------------------------------------------------------------------+
|UDF(contact)                                                                 |
+-----------------------------------------------------------------------------+
|[Map(number -> 0000, type -> home), Map(number -> 987654321, type -> mobile)]|
+-----------------------------------------------------------------------------+

【讨论】:

感谢@philantrovert 的回答,如果我不将validNumber 的逻辑应用于x,我会调用另一个udf 函数,你能告诉我会怎样。这样做的原因是这只是一个我检查长度的虚拟示例,但我希望有一个 udf 函数分开,它可以成长并使逻辑更复杂 xs.map x =&gt; ... 这里x 是一个Map[String, String] 条目。您可以在 UDF 中应用任何允许在 Scala Maps 上执行的操作。你想达到什么目的? 到目前为止,我只有一条规则。长度 检查@Shaido 的编辑。定义一个包含所有规则的 Scala 函数,然后在 UDF 中调用该 Scala 函数。【参考方案2】:

udf 函数需要将列作为参数传递,这些参数经过序列化和反序列化以转换为原始数据类型。所以当列值到达 udf 函数时,它们已经是原始数据类型了。所以除非将原始类型转换为列类型,否则不能从 udf 函数调用另一个 udf 函数。

您可以做的而不是定义和调用另一个 udf 函数只是定义一个简单的函数并从 udf 函数调用该函数

import org.apache.spark.sql.functions._
def valid_num_udf(number: String) = number.length < 6 match
  case true => "0000"
  case false => number

def myUdf = udf((inputSeq: Seq[Map[String, String]]) => 
  inputSeq.map(x => Map("number" -> valid_num_udf(x("number")), "type"-> x("type")))
)

然后只需从withColumn api 调用udf 函数

val newDf = Df.withColumn("VALID_CONTACT", myUdf($"Contact"))

【讨论】:

【参考方案3】:

您可以使用一个单独的Seq[Map[String, String]] 作为输入并转换它,而不是创建两个单独的UDFs。这应该比将它作为两个单独的UDF 更快更好。

val valid_num_udf = udf((seq: Seq[Map[String, String]]) => 
  seq.map m => 
    m.get("number") match 
      case Some(number) if number.length < 6 => m + ("number" -> "0000")
      case _ => m 
    
   
)

使用提供的数据框:

df.withColumn("Contact", valid_num_udf($"Contact"))

会给

+----+----------------------------------------------------------------------------+
|Name|Contact                                                                     |
+----+----------------------------------------------------------------------------+
|Alan|[Map(number -> 0000, type -> home), Map(number -> 87878787, type -> mobile)]|
|Ben |[Map(number -> 94837593, type -> job), Map(number -> 0000, type -> home)]   |
+----+----------------------------------------------------------------------------+

要将逻辑与其他逻辑分开,您无需调用单独的UDF,只需将逻辑添加到方法并调用它即可。例如,

def valid_num(number: String) = 
  if (number.length < 6)
    "0000"
  else
    number

val myUdf = udf((seq: Seq[Map[String, String]]) => 
  seq.map m => 
    m.get("number") match 
      case Some(number) => m + ("number" -> valid_num(number))
      case _ => m 
    
   
)

【讨论】:

谢谢@Shaido 我喜欢这种匹配案例方法,但正如我在另一个答案中提到的那样,我希望有第二个 udf 函数,我可以继续添加逻辑。但这是一个不错的方法,我会考虑 @IgnacioAlorre 在关于划分 udf 的答案中添加了更多信息

以上是关于Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf的主要内容,如果未能解决你的问题,请参考以下文章

8. spark源码分析(基于yarn cluster模式)- Task执行,Map端写入实现

如何在单元测试中抑制Spark记录?

如何在 Scala 中将 Spark DataFrames 一一添加到 Seq()

使用 Map 替换 Spark 中的列值

如何按 Seq[org.apache.spark.sql.Column] 降序排序 spark DataFrame?

如何取消嵌套具有以下类型的 spark rdd ((String, scala.collection.immutable.Map[String,scala.collection.immutable.M