如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?

Posted

技术标签:

【中文标题】如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?【英文标题】:How do I send multiple columns to a udf from a When Clause in Spark dataframe? 【发布时间】:2017-09-27 22:47:57 【问题描述】:

我想在 full_outer_join 的基础上加入两个数据帧,并尝试在加入的结果集中添加一个新列,它告诉我匹配的记录、单独来自左侧数据帧的不匹配记录和单独来自右侧数据帧的不匹配记录。

这是我的火花代码:

val creditLoc ="/data/accounts/credits/year=2016/month=06/day=02"
val debitLoc = "/data/accounts/debits/year=2016/month=06/day=02"
val creditDF = sqlContext.read.avro(creditLoc)
val debitDF  = sqlContext.read.avro(debitLoc) 
val credit  =  creditDF.withColumnRenamed("account_id","credit_account_id").as("credit")
val debit   =  debitDF.withColumnRenamed("account_id","debit_account_id").as("debit")
val fullOuterDF =  credit.join(debit,credit("credit_account_id") === debit("debit_account_id"),"full_outer")
val CREDIT_DEBIT_CONSOLIDATE_SCHEMA=List(
  ("credit.credit_account_id","string"),
  ("credit.channel_name",  "string"),
  ("credit.service_key",  "string"),
  ("credit.trans_id", "string"),
  ("credit.trans_dt",  "string"),
  ("credit.trans_amount",  "string"),
  ("debit.debit_account_id","string"),
  ("debit.icf_number","string"),
  ("debit.debt_amount","string")
)

val columnNamesList = CREDIT_DEBIT_CONSOLIDATE_SCHEMA.map(elem => col(elem._1)).seq 
val df  = fullOuterDF.select(columnNamesList:_*)

val caseDF = df.withColumn("matching_type",
  when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
    when(df("debit_account_id").isNull,"UNMATCHED_CREDIT").otherwise(
      when(df("credit_account_id").isNull,"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
    )
  )
)

到目前为止,我在when 子句本身中应用了“matching_type”的逻辑,但现在我想在UDF 中编写“matching_type”的逻辑。 如果像上面的代码那样写的话。

下面的UDFs 接受单列作为参数,如何创建一个接受多列的 udf 并根据该 udf 内的条件返回布尔值?

val isUnMatchedCREDIT = udf[Boolean, String](credit_account_id => 
  credit_account_id == null
)

val isUnMatchedDEBIT = udf[Boolean, String](debit_account_id => 
  debit_account_id == null
)


val caseDF = df.withColumn("matching_type",
  when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
    when(isUnMatchedCREDIT(df("credit_account_id")),"UNMATCHED_CREDIT").otherwise(
      when(isUnMatchedDEBIT(df("debit_account_id")),"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
      )
    )
  )
)

基本上我想创建另一个UDF 作为isMatchedCREDITDEBIT(),它接受两列credit_account_iddebit_account_id,如果两个值相等,UDF 应该返回 true,否则返回 false。简单来说,我想为以下逻辑创建一个UDF

when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT")

我试过了,但它抛出编译类型错误:

val isMatchedCREDITDEBIT()= udf[Boolean, String,String](credit_account_id => 
  credit_account_id == debit_account_id 
)

有人可以帮我解决这个问题吗?

【问题讨论】:

【参考方案1】:

您可以创建一个包含两列的udf,并执行如下逻辑:

val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => 
  credit_account_id == debit_account_id
)

可以在when子句中调用

when(isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")), "MATCHING_CREDIT_DEBIT")

但是,为您在两列上执行的所有逻辑创建一个 udf 会更容易。下面的udf 将两列都作为输入并返回您想要的字符串,而不是布尔值。

val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => 
  if(credit_account_id == null)
    "UNMATCHED_CREDIT"
   else if (debit_account_id == null)
    "UNMATCHED_DEBIT"
   else if (credit_account_id == debit_account_id)
    "MATCHING_CREDIT_DEBIT"
   else 
    "INVALID_MATCHING_TYPE"
  
)

val caseDF = df.withColumn("matching_type", 
  isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")))

【讨论】:

太好了,你的代码比我的方法好。非常感谢

以上是关于如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 PySpark SQL when() 子句中使用聚合值?

控制 spark-sql 和数据帧中的字段可空性

如何确定 Apache Spark 数据帧中的分区大小

如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件

如何根据 Spark Scala 中其他数据帧中的多列匹配过滤数据帧

如何在 python 中消除 apache spark 数据帧中的标头和尾标