散列火花数据框的多列

Posted

技术标签:

【中文标题】散列火花数据框的多列【英文标题】:Hashing multiple columns of spark dataframe 【发布时间】:2019-12-16 08:43:19 【问题描述】:

我需要散列 spark 数据帧的特定列。有些列具有特定的数据类型,它们基本上是标准 spark 的 DataType 类的扩展。问题是由于某种原因,在 when 情况下,某些条件无法按预期工作。

作为哈希表,我有一张地图。我们称之为 tableConfig:

val tableConfig = Map("a" -> "KEEP", "b" -> "HASH", "c" -> "KEEP", "d" -> "HASH", "e" -> "KEEP")

盐变量用于与列连接:

val salt = "abc"

散列函数如下所示:

def hashColumns(tableConfig: Map[String, String], salt: String, df: DataFrame): DataFrame = 

    val removedColumns = tableConfig.filter(_._2 == "REMOVE").keys.toList
    val hashedColumns = tableConfig.filter(_._2 == "HASH").keys.toList

    val cleanedDF = df.drop(removedColumns: _ *)

    val colTypes = cleanedDF.dtypes.toMap

    def typeFromString(s: String): DataType = s match 
      case "StringType" => StringType
      case "BooleanType" => BooleanType
      case "IntegerType" => IntegerType
      case "DateType" => DateType
      case "ShortType" => ShortType
      case "DecimalType(15,7)" => DecimalType(15,7)
      case "DecimalType(18,2)" => DecimalType(18,2)
      case "DecimalType(11,7)" => DecimalType(11,7)
      case "DecimalType(17,2)" => DecimalType(17,2)
      case "DecimalType(38,2)" => DecimalType(38,2)
      case _ => throw new TypeNotPresentException(
        "Please check types in the dataframe. The following column type is missing: ".concat(s), null
      )
    

    val getType = colTypes.mapcase (k, _) => (k, typeFromString(colTypes(k)))

    val hashedDF = cleanedDF.columns.foldLeft(cleanedDF) 
      (memoDF, colName) =>
        memoDF.withColumn(
          colName,
          when(col(colName).isin(hashedColumns: _*) && col(colName).isNull, null).
            when(col(colName).isin(hashedColumns: _*) && col(colName).isNotNull,
              sha2(concat(col(colName), lit(salt)), 256)).otherwise(col(colName)
            )
        )
    

    hashedDF
  

我收到有关特定列的错误。即错误如下:

org.apache.spark.sql.AnalysisException: 由于数据类型不匹配,无法解析 '(c IN ('a', 'b', 'd', 'e'))':参数必须相同类型但为:布尔值!=字符串;;

列名已更改。

我的搜索没有给出任何明确的解释为什么 isin 或 isNull 函数不能按预期工作。此外,我遵循特定的实现方式,并希望避免以下方法:

1) 没有 UDF。它们对我来说很痛苦。

2) 在 spark 数据框列上没有 for 循环。数据可能包含超过十亿个样本,这在性能方面会令人头疼。

【问题讨论】:

正如错误所说,columns 和 hashedColumns 之间似乎不匹配。但是,在此之前您必须修复您的条件,因为col(colName) 不能为空并且同时具有以下值之一:['a', 'b', 'c', etc],所以这个col(colName).isin(hashedColumns: _*) && col(colName).isNull 永远不会为真。而不是isin,您可能需要使用array_contains 除了@AlexandrosBiratsis 的评论之外,您还应该根据它们的 DataType 将一些列转换为它们的字符串表示形式(尝试使用您的 getType 函数) @AlexandrosBiratsis 谢谢!我会检查。但我不确定是否可以将 array_contains 表达式与布尔条件一起使用。 @baitmbarek 不幸的是,强制转换没有帮助,只是在函数中留下了帮助代码。 你不应该简单地转换你的列,而是将它们转换为一些字符串 representation 【参考方案1】:

如 cmets 中所述,第一个修复应该是删除条件 col(colName).isin(hashedColumns: _*) && col(colName).isNull,因为此检查将始终为 false

关于这个错误,是因为col(colName)hashedColumns的值类型不匹配。 hashedColumns 的值始终是一个字符串,因此 col(colName) 也应该是一个字符串,但在您的情况下,它似乎是一个 Boolean

我在这里看到的最后一个问题与foldLeft 的逻辑有关。如果我理解正确,您想要实现的是浏览这些列并将sha2 应用于hashedColumns 中存在的那些。为此,您必须将代码更改为:

// 1st change: Convert each element of hashedColumns from String to Spark col
val hashArray = hashedColumns.map(lit(_))

val hashedDF = cleanedDF.columns.foldLeft(cleanedDF) 
  (memoDF, colName) =>
    memoDF.withColumn(
      colName,

      // 2nd.change: check if colName is in "a", "b", "c", "d" etc, if so apply sha2 otherwise leave the value as it is
      when(col(colName).isNotNull && array_contains(array(hashArray:_*), lit(colName)) ,
        sha2(concat(col(colName), lit(salt)), 256)
      )
    )

更新:

通过foldLeft 遍历所有列效率不高,并且会增加额外开销,当您有大量列时甚至更多(请参阅下面与@baitmbarek 的讨论)我添加了另一种方法而不是foldLeft 使用单选。在下一个代码中,when 仅适用于 hashedColumns。我们将列分为 nonHashedCols 和 transformCols 然后我们连接列表并将其传递给select

val transformedCols = hashedColumns.map c =>
  when(col(c).isNotNull , sha2(concat(col(c), lit(salt)), 256)).as(c)


val nonHashedCols = (cleanedDF.columns.toSet -- hashedColumns.toSet).map(col(_)).toList

cleanedDF.select((nonHashedCols ++ transformedCols):_*)

【讨论】:

这应该足以解决问题,但在列上折叠时要小心。处理大量它们时性能不佳。 你是对的@baitmbarek,仅基于hashedColumns 而不是所有列生成最终df 可能是一个更好的解决方案。谢谢你指出 @AlexandrosBiratsis 太棒了!非常感谢!我完全忘记了(经常在 Scala 和 Python 之间跳转)可以在代码中将变量定义为 transformCols。已经接受了你的回答。我会尽快更新我的帖子,提及您的解决方案。 是的@memu Column 类提供的功能非常丰富和灵活,因此您可以通过多种方式使用它。如上图的动态选择是常用的建议案例【参考方案2】:

(代表问题作者发布解决方案,将其从问题移至答案部分)

@AlexandrosBiratsis 在性能和优雅的实现方面给出了非常好的解决方案。所以 hashColumns 函数将如下所示:

def hashColumns(tableConfig: Map[String, String], salt: String, df: DataFrame): DataFrame = 

    val removedCols = tableConfig.filter(_._2 == "REMOVE").keys.toList
    val hashedCols = tableConfig.filter(_._2 == "HASH").keys.toList

    val cleanedDF = df.drop(removedCols: _ *)

    val transformedCols = hashedCols.map c =>
      when(col(c).isNotNull , sha2(concat(col(c), lit(salt)), 256)).as(c)
    
    val nonHashedCols = (cleanedDF.columns.toSet -- hashedCols.toSet).map(col(_)).toList
    val hashedDF = cleanedDF.select((nonHashedCols ++ transformedCols):_*)

    hashedDF
  

【讨论】:

以上是关于散列火花数据框的多列的主要内容,如果未能解决你的问题,请参考以下文章

聚合火花数据框中的多列(所有组合)

来自数据框的火花过滤器列以及来自集合的单词

验证输入火花数据帧中的时间戳以生成正确的输出火花数据帧

如何替换火花数据框所有列中的多个字符?

计算火花数据框中的字数

如何在火花数据框中将列连接到一个