Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf
Posted
技术标签:
【中文标题】Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf【英文标题】:Spark - How to apply a udf over single field in a Seq[Map<String,String>] 【发布时间】:2018-04-03 06:12:26 【问题描述】:我有一个数据框,其中包含两列类型为 String 和 Seq[Map[String, String]]。比如:
Name Contact
Alan [(Map(number -> 12345 , type -> home)), (Map(number -> 87878787 , type -> mobile))]
Ben [(Map(number -> 94837593 , type -> job)),(Map(number -> 346 , type -> home))]
所以我需要在每个 Map[String,String] o 数组中的每个元素的字段 number
上应用 udf
。这个udf
基本上将转换为 0000 任何长度小于 6 的number
。像这样:
def valid_num_udf =
udf((numb:String) =>
if(numb.length < 6)
"0000"
else
numb
)
预期的结果是这样的:
NAME CONTACT
Alan [(Map(number -> 0000 , type -> home)), (Map(number -> 87878787 , type -> mobile))]
Ben [(Map(number -> 94837593 , type -> job)),(Map(number -> 0000 , type -> home))]
我想要的是使用另一个 udf 访问每个 number
字段,然后应用 valid_num_udf()
我正在尝试这样的事情,但我不知道在 Scala 中执行此操作的正确语法是什么。
val newDf = Df.withColumn("VALID_CONTACT", myUdf($"CONTACT"))
//This part is really really wrong, but don't know better
def myUdf = udf[Seq[Map[String, String]], Seq[Map[String, String]]]
inputSeq => inputSeq.map(_.get("number") => valid_num_udf(_.get("number")))
谁能告诉我如何只访问地图中的一个字段,而保持地图的其他字段不变?
更新:DataFrame 的架构将是
root
|-- NAME: string (nullable = true)
|-- CONTACT: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
或
org.apache.spark.sql.types.StructType = StructType(StructField(NAME,StringType,true), StructField(CONTACT,ArrayType(MapType(StringType,StringType,true),true),true))
【问题讨论】:
【参考方案1】:您的 UDF 的签名略有错误。您将通过 Seq[Map[String, String]]
作为输入:
val validNumber = udf (xs: Seq[Map[String, String]]) =>
xs.map x =>
if (x("number").length < 6)
Map("number" -> "0000" , "type" -> x("type"))
else x
df.show(false)
+----+-----------------------------------------------------------------------------+
|name|contact |
+----+-----------------------------------------------------------------------------+
|Alan|[Map(number -> 6789, type -> home), Map(number -> 987654321, type -> mobile)]|
+----+-----------------------------------------------------------------------------+
df.select(validNumber($"contact") ).show(false)
+-----------------------------------------------------------------------------+
|UDF(contact) |
+-----------------------------------------------------------------------------+
|[Map(number -> 0000, type -> home), Map(number -> 987654321, type -> mobile)]|
+-----------------------------------------------------------------------------+
【讨论】:
感谢@philantrovert 的回答,如果我不将validNumber 的逻辑应用于x,我会调用另一个udf 函数,你能告诉我会怎样。这样做的原因是这只是一个我检查长度的虚拟示例,但我希望有一个 udf 函数分开,它可以成长并使逻辑更复杂xs.map x => ...
这里x
是一个Map[String, String]
条目。您可以在 UDF 中应用任何允许在 Scala Maps 上执行的操作。你想达到什么目的?
到目前为止,我只有一条规则。长度
检查@Shaido 的编辑。定义一个包含所有规则的 Scala 函数,然后在 UDF 中调用该 Scala 函数。【参考方案2】:
udf 函数需要将列作为参数传递,这些参数经过序列化和反序列化以转换为原始数据类型。所以当列值到达 udf 函数时,它们已经是原始数据类型了。所以除非将原始类型转换为列类型,否则不能从 udf 函数调用另一个 udf 函数。
您可以做的而不是定义和调用另一个 udf 函数只是定义一个简单的函数并从 udf 函数调用该函数
import org.apache.spark.sql.functions._
def valid_num_udf(number: String) = number.length < 6 match
case true => "0000"
case false => number
def myUdf = udf((inputSeq: Seq[Map[String, String]]) =>
inputSeq.map(x => Map("number" -> valid_num_udf(x("number")), "type"-> x("type")))
)
然后只需从withColumn
api 调用udf
函数
val newDf = Df.withColumn("VALID_CONTACT", myUdf($"Contact"))
【讨论】:
【参考方案3】:您可以使用一个单独的Seq[Map[String, String]]
作为输入并转换它,而不是创建两个单独的UDF
s。这应该比将它作为两个单独的UDF
更快更好。
val valid_num_udf = udf((seq: Seq[Map[String, String]]) =>
seq.map m =>
m.get("number") match
case Some(number) if number.length < 6 => m + ("number" -> "0000")
case _ => m
)
使用提供的数据框:
df.withColumn("Contact", valid_num_udf($"Contact"))
会给
+----+----------------------------------------------------------------------------+
|Name|Contact |
+----+----------------------------------------------------------------------------+
|Alan|[Map(number -> 0000, type -> home), Map(number -> 87878787, type -> mobile)]|
|Ben |[Map(number -> 94837593, type -> job), Map(number -> 0000, type -> home)] |
+----+----------------------------------------------------------------------------+
要将逻辑与其他逻辑分开,您无需调用单独的UDF
,只需将逻辑添加到方法并调用它即可。例如,
def valid_num(number: String) =
if (number.length < 6)
"0000"
else
number
val myUdf = udf((seq: Seq[Map[String, String]]) =>
seq.map m =>
m.get("number") match
case Some(number) => m + ("number" -> valid_num(number))
case _ => m
)
【讨论】:
谢谢@Shaido 我喜欢这种匹配案例方法,但正如我在另一个答案中提到的那样,我希望有第二个 udf 函数,我可以继续添加逻辑。但这是一个不错的方法,我会考虑 @IgnacioAlorre 在关于划分 udf 的答案中添加了更多信息以上是关于Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf的主要内容,如果未能解决你的问题,请参考以下文章
8. spark源码分析(基于yarn cluster模式)- Task执行,Map端写入实现
如何在 Scala 中将 Spark DataFrames 一一添加到 Seq()
如何按 Seq[org.apache.spark.sql.Column] 降序排序 spark DataFrame?
如何取消嵌套具有以下类型的 spark rdd ((String, scala.collection.immutable.Map[String,scala.collection.immutable.M