散列火花数据框的多列
Posted
技术标签:
【中文标题】散列火花数据框的多列【英文标题】:Hashing multiple columns of spark dataframe 【发布时间】:2019-12-16 08:43:19 【问题描述】:我需要散列 spark 数据帧的特定列。有些列具有特定的数据类型,它们基本上是标准 spark 的 DataType 类的扩展。问题是由于某种原因,在 when 情况下,某些条件无法按预期工作。
作为哈希表,我有一张地图。我们称之为 tableConfig:
val tableConfig = Map("a" -> "KEEP", "b" -> "HASH", "c" -> "KEEP", "d" -> "HASH", "e" -> "KEEP")
盐变量用于与列连接:
val salt = "abc"
散列函数如下所示:
def hashColumns(tableConfig: Map[String, String], salt: String, df: DataFrame): DataFrame =
val removedColumns = tableConfig.filter(_._2 == "REMOVE").keys.toList
val hashedColumns = tableConfig.filter(_._2 == "HASH").keys.toList
val cleanedDF = df.drop(removedColumns: _ *)
val colTypes = cleanedDF.dtypes.toMap
def typeFromString(s: String): DataType = s match
case "StringType" => StringType
case "BooleanType" => BooleanType
case "IntegerType" => IntegerType
case "DateType" => DateType
case "ShortType" => ShortType
case "DecimalType(15,7)" => DecimalType(15,7)
case "DecimalType(18,2)" => DecimalType(18,2)
case "DecimalType(11,7)" => DecimalType(11,7)
case "DecimalType(17,2)" => DecimalType(17,2)
case "DecimalType(38,2)" => DecimalType(38,2)
case _ => throw new TypeNotPresentException(
"Please check types in the dataframe. The following column type is missing: ".concat(s), null
)
val getType = colTypes.mapcase (k, _) => (k, typeFromString(colTypes(k)))
val hashedDF = cleanedDF.columns.foldLeft(cleanedDF)
(memoDF, colName) =>
memoDF.withColumn(
colName,
when(col(colName).isin(hashedColumns: _*) && col(colName).isNull, null).
when(col(colName).isin(hashedColumns: _*) && col(colName).isNotNull,
sha2(concat(col(colName), lit(salt)), 256)).otherwise(col(colName)
)
)
hashedDF
我收到有关特定列的错误。即错误如下:
org.apache.spark.sql.AnalysisException: 由于数据类型不匹配,无法解析 '(
c
IN ('a', 'b', 'd', 'e'))':参数必须相同类型但为:布尔值!=字符串;;
列名已更改。
我的搜索没有给出任何明确的解释为什么 isin 或 isNull 函数不能按预期工作。此外,我遵循特定的实现方式,并希望避免以下方法:
1) 没有 UDF。它们对我来说很痛苦。
2) 在 spark 数据框列上没有 for 循环。数据可能包含超过十亿个样本,这在性能方面会令人头疼。
【问题讨论】:
正如错误所说,columns
和 hashedColumns 之间似乎不匹配。但是,在此之前您必须修复您的条件,因为col(colName)
不能为空并且同时具有以下值之一:['a', 'b', 'c', etc]
,所以这个col(colName).isin(hashedColumns: _*) && col(colName).isNull
永远不会为真。而不是isin
,您可能需要使用array_contains
除了@AlexandrosBiratsis 的评论之外,您还应该根据它们的 DataType 将一些列转换为它们的字符串表示形式(尝试使用您的 getType
函数)
@AlexandrosBiratsis 谢谢!我会检查。但我不确定是否可以将 array_contains 表达式与布尔条件一起使用。
@baitmbarek 不幸的是,强制转换没有帮助,只是在函数中留下了帮助代码。
你不应该简单地转换你的列,而是将它们转换为一些字符串 representation
【参考方案1】:
如 cmets 中所述,第一个修复应该是删除条件 col(colName).isin(hashedColumns: _*) && col(colName).isNull
,因为此检查将始终为 false。
关于这个错误,是因为col(colName)
和hashedColumns
的值类型不匹配。 hashedColumns
的值始终是一个字符串,因此 col(colName)
也应该是一个字符串,但在您的情况下,它似乎是一个 Boolean
。
我在这里看到的最后一个问题与foldLeft
的逻辑有关。如果我理解正确,您想要实现的是浏览这些列并将sha2
应用于hashedColumns
中存在的那些。为此,您必须将代码更改为:
// 1st change: Convert each element of hashedColumns from String to Spark col
val hashArray = hashedColumns.map(lit(_))
val hashedDF = cleanedDF.columns.foldLeft(cleanedDF)
(memoDF, colName) =>
memoDF.withColumn(
colName,
// 2nd.change: check if colName is in "a", "b", "c", "d" etc, if so apply sha2 otherwise leave the value as it is
when(col(colName).isNotNull && array_contains(array(hashArray:_*), lit(colName)) ,
sha2(concat(col(colName), lit(salt)), 256)
)
)
更新:
通过foldLeft
遍历所有列效率不高,并且会增加额外开销,当您有大量列时甚至更多(请参阅下面与@baitmbarek 的讨论)我添加了另一种方法而不是foldLeft
使用单选。在下一个代码中,when
仅适用于 hashedColumns
。我们将列分为 nonHashedCols 和 transformCols 然后我们连接列表并将其传递给select
:
val transformedCols = hashedColumns.map c =>
when(col(c).isNotNull , sha2(concat(col(c), lit(salt)), 256)).as(c)
val nonHashedCols = (cleanedDF.columns.toSet -- hashedColumns.toSet).map(col(_)).toList
cleanedDF.select((nonHashedCols ++ transformedCols):_*)
【讨论】:
这应该足以解决问题,但在列上折叠时要小心。处理大量它们时性能不佳。 你是对的@baitmbarek,仅基于hashedColumns
而不是所有列生成最终df 可能是一个更好的解决方案。谢谢你指出
@AlexandrosBiratsis 太棒了!非常感谢!我完全忘记了(经常在 Scala 和 Python 之间跳转)可以在代码中将变量定义为 transformCols。已经接受了你的回答。我会尽快更新我的帖子,提及您的解决方案。
是的@memu Column 类提供的功能非常丰富和灵活,因此您可以通过多种方式使用它。如上图的动态选择是常用的建议案例【参考方案2】:
(代表问题作者发布解决方案,将其从问题移至答案部分)。
@AlexandrosBiratsis 在性能和优雅的实现方面给出了非常好的解决方案。所以 hashColumns 函数将如下所示:
def hashColumns(tableConfig: Map[String, String], salt: String, df: DataFrame): DataFrame =
val removedCols = tableConfig.filter(_._2 == "REMOVE").keys.toList
val hashedCols = tableConfig.filter(_._2 == "HASH").keys.toList
val cleanedDF = df.drop(removedCols: _ *)
val transformedCols = hashedCols.map c =>
when(col(c).isNotNull , sha2(concat(col(c), lit(salt)), 256)).as(c)
val nonHashedCols = (cleanedDF.columns.toSet -- hashedCols.toSet).map(col(_)).toList
val hashedDF = cleanedDF.select((nonHashedCols ++ transformedCols):_*)
hashedDF
【讨论】:
以上是关于散列火花数据框的多列的主要内容,如果未能解决你的问题,请参考以下文章