如何从Spark数据帧中的When子句向udf发送多个列?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何从Spark数据帧中的When子句向udf发送多个列?相关的知识,希望对你有一定的参考价值。
我想在full_outer_join的基础上加入两个数据帧,并尝试在连接结果集中添加一个新列,它告诉我匹配的记录,单独左数据帧的不匹配记录和单独右数据帧的不匹配记录。
这是我的火花代码:
val creditLoc ="/data/accounts/credits/year=2016/month=06/day=02"
val debitLoc = "/data/accounts/debits/year=2016/month=06/day=02"
val creditDF = sqlContext.read.avro(creditLoc)
val debitDF = sqlContext.read.avro(debitLoc)
val credit = creditDF.withColumnRenamed("account_id","credit_account_id").as("credit")
val debit = debitDF.withColumnRenamed("account_id","debit_account_id").as("debit")
val fullOuterDF = credit.join(debit,credit("credit_account_id") === debit("debit_account_id"),"full_outer")
val CREDIT_DEBIT_CONSOLIDATE_SCHEMA=List(
("credit.credit_account_id","string"),
("credit.channel_name", "string"),
("credit.service_key", "string"),
("credit.trans_id", "string"),
("credit.trans_dt", "string"),
("credit.trans_amount", "string"),
("debit.debit_account_id","string"),
("debit.icf_number","string"),
("debit.debt_amount","string")
)
val columnNamesList = CREDIT_DEBIT_CONSOLIDATE_SCHEMA.map(elem => col(elem._1)).seq
val df = fullOuterDF.select(columnNamesList:_*)
val caseDF = df.withColumn("matching_type",
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
when(df("debit_account_id").isNull,"UNMATCHED_CREDIT").otherwise(
when(df("credit_account_id").isNull,"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
)
)
)
到目前为止,我在when
子句中应用了“matching_type”的逻辑,但现在我想在UDF
中编写“matching_type”的逻辑。如果像上面那样编写代码就行了。
下面的UDF
s接受一个列作为参数,如何创建一个接受多列的udf并根据udf中的条件返回一个布尔值?
val isUnMatchedCREDIT = udf[Boolean, String](credit_account_id => {
credit_account_id == null
})
val isUnMatchedDEBIT = udf[Boolean, String](debit_account_id => {
debit_account_id == null
})
val caseDF = df.withColumn("matching_type",
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
when(isUnMatchedCREDIT(df("credit_account_id")),"UNMATCHED_CREDIT").otherwise(
when(isUnMatchedDEBIT(df("debit_account_id")),"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
)
)
)
)
基本上我想创建另一个UDF
作为isMatchedCREDITDEBIT()
接受两列credit_account_id
和debit_account_id
,如果两个值都相等,则UDF
应返回true。简单来说,我想为以下逻辑创建一个UDF
:
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT")
我试过这个,但它抛出编译类型错误:
val isMatchedCREDITDEBIT()= udf[Boolean, String,String](credit_account_id => {
credit_account_id == debit_account_id
})
有人可以帮我吗?
答案
你可以创建一个udf
,它需要两列并执行你的逻辑:
val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
credit_account_id == debit_account_id
})
这可以在when
条款中调用
when(isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")), "MATCHING_CREDIT_DEBIT")
但是,为两列上执行的所有逻辑创建单个udf
会更容易。下面的udf
将两列作为输入并返回所需的字符串,而不是布尔值。
val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
if(credit_account_id == null){
"UNMATCHED_CREDIT"
} else if (debit_account_id == null){
"UNMATCHED_DEBIT"
} else if (credit_account_id == debit_account_id){
"MATCHING_CREDIT_DEBIT"
} else {
"INVALID_MATCHING_TYPE"
}
})
val caseDF = df.withColumn("matching_type",
isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")))
以上是关于如何从Spark数据帧中的When子句向udf发送多个列?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 PySpark SQL when() 子句中使用聚合值?