Scala UDF 函数对数组列进行操作并返回自定义值

Posted

技术标签:

【中文标题】Scala UDF 函数对数组列进行操作并返回自定义值【英文标题】:Scala UDF function to operate on Array Column and return custom value 【发布时间】:2020-06-27 18:17:32 【问题描述】:

我有 2 个这样的数据集

val jsonStr ="""
    
        "TransactionId": 1,
        "TransactionName": "Name",
        "Order": 12, 
        "ReplaceStrings": [
            "UNDEFINED","INVALID"
        ],
        "Country" : "China"           
     
"""

val configurations = spark.read.json(Seq(jsonStr).toDS) 

这有我所有的配置和过滤器

My Data
val data =  Seq((1,"Mindy","Devaney","mdevaney0@cnbc.com","Female","United States","UTF-8"),(2,"Charmain","Clear","candriolli1@miitbeian.gov.cn","Female","**China**","UTF-8"),(3,"Dilan","**UNDEFINED**","dphilipeaux2@jalbum.net","Male","**China**","Windows-1252")).toDF("id","Fname","LName","mailid","Gender","Country","Codepage" )

现在,我的要求是加入具有过滤器的配置数据,并使用上述数据检索相应的结果,因为过滤器应用于中国,所有具有 UNDEFINED 作为值的 LName 都将替换为空字符串。

我尝试放置一些 UDF 将其定义为函数,但坚持如何发送 json 值,它是一个包装数组或尝试使用 Seq 数据类型

如果有人看过类似的案例或想法,请与我分享。

【问题讨论】:

【参考方案1】:

检查下面的代码。

scala> data.show(false)
+---+--------+-------------+----------------------------+------+-------------+------------+
|id |Fname   |LName        |mailid                      |Gender|Country      |Codepage    |
+---+--------+-------------+----------------------------+------+-------------+------------+
|1  |Mindy   |Devaney      |mdevaney0@cnbc.com          |Female|United States|UTF-8       |
|2  |Charmain|Clear        |candriolli1@miitbeian.gov.cn|Female|**China**    |UTF-8       |
|3  |Dilan   |**UNDEFINED**|dphilipeaux2@jalbum.net     |Male  |**China**    |Windows-1252|
+---+--------+-------------+----------------------------+------+-------------+------------+
scala> configurations.show(false)
+-------+-----+--------------------+-------------+---------------+
|Country|Order|ReplaceStrings      |TransactionId|TransactionName|
+-------+-----+--------------------+-------------+---------------+
|China  |12   |[UNDEFINED, INVALID]|1            |Name           |
+-------+-----+--------------------+-------------+---------------+

scala> val check = udf((lname:String,replaceStrings:Seq[String]) => if(replaceStrings.map(d => s"**$d**").contains(lname)) "" else lname )
scala> data.join(configurations,data("Country").contains(configurations("Country")),"inner").withColumn("LName",check($"LName",$"ReplaceStrings")).drop(configurations("Country")).show(false)
+---+--------+-----+----------------------------+------+---------+------------+-----+--------------------+-------------+---------------+
|id |Fname   |LName|mailid                      |Gender|Country  |Codepage    |Order|ReplaceStrings      |TransactionId|TransactionName|
+---+--------+-----+----------------------------+------+---------+------------+-----+--------------------+-------------+---------------+
|2  |Charmain|Clear|candriolli1@miitbeian.gov.cn|Female|**China**|UTF-8       |12   |[UNDEFINED, INVALID]|1            |Name           |
|3  |Dilan   |     |dphilipeaux2@jalbum.net     |Male  |**China**|Windows-1252|12   |[UNDEFINED, INVALID]|1            |Name           |
+---+--------+-----+----------------------------+------+---------+------------+-----+--------------------+-------------+---------------+

【讨论】:

以上是关于Scala UDF 函数对数组列进行操作并返回自定义值的主要内容,如果未能解决你的问题,请参考以下文章

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

Pyspark 使用 udf 处理数组列并返回另一个数组

Scala:可变参数 UDF

在 scala 中编写 udf 函数并在 pyspark 作业中使用它们

spark read 在 Scala UDF 函数中不起作用

Scala UDF 返回“不支持单元类型的架构”