concat_ws 从 spark 数据帧的输出中删除空字符串

Posted

技术标签:

【中文标题】concat_ws 从 spark 数据帧的输出中删除空字符串【英文标题】:concat_ws removes null string from output in spark data frame 【发布时间】:2018-02-11 17:41:17 【问题描述】:

这是我的数据框的输出

val finaldf.show(false)

+------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|DataPartition     |TimeStamp                |Source_organizationId|Source_sourceId|FilingDateTime           |SourceTypeCode|DocumentId|Dcn       |DocFormat|StatementDate            |IsFilingDateTimeEstimated|ContainsPreliminaryData|CapitalChangeAdjustmentDate|CumulativeAdjustmentFactor|ContainsRestatement|FilingDateTimeUTCOffset|ThirdPartySourceCode|ThirdPartySourcePriority|SourceTypeId|ThirdPartySourceCodeId|FFAction|!||
+------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|SelfSourcedPrivate|2017-11-02T10:23:59+00:00|4298009288           |80             |2017-09-28T23:00:00+00:00|10K           |null      |171105584 |ASFILED  |2017-07-31T00:00:00+00:00|false                    |false                  |2017-07-31T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |
|SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170           |364            |2017-08-08T17:00:00+00:00|10Q           |null      |null      |null     |2017-07-30T00:00:00+00:00|false                    |false                  |2017-07-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011836     |1000716240            |I|!|       |
|SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170           |365            |2017-10-10T17:00:00+00:00|10K           |null      |null      |null     |2017-09-30T00:00:00+00:00|false                    |false                  |2017-09-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |
|SelfSourcedPublic |2017-11-21T12:17:49+00:00|4295904170           |365            |2017-10-10T17:00:00+00:00|10K           |null      |null      |null     |2017-09-30T00:00:00+00:00|false                    |false                  |2017-09-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |

concat_ws null 何时从行中删除。

val finaldf = diff.foldLeft(tempReorder)(temp2df, colName) => temp2df.withColumn(colName, lit("null"))
//finaldf.show(false)

val headerColumn = data.columns.toSeq
val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val finaldfWithDelimiter=finaldf.select(concat_ws("|^|",finaldf.schema.fieldNames.map(col): _*).as("concatenated")).withColumnRenamed("concatenated", header)
finaldfWithDelimiter.show(false)

我得到低于输出

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|DataPartition|^|TimeStamp|^|Source_organizationId|^|Source_sourceId|^|FilingDateTime|^|SourceTypeCode|^|DocumentId|^|Dcn|^|DocFormat|^|StatementDate|^|IsFilingDateTimeEstimated|^|ContainsPreliminaryData|^|CapitalChangeAdjustmentDate|^|CumulativeAdjustmentFactor|^|ContainsRestatement|^|FilingDateTimeUTCOffset|^|ThirdPartySourceCode|^|ThirdPartySourcePriority|^|SourceTypeId|^|ThirdPartySourceCodeId|^|FFAction|!||
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|SelfSourcedPrivate|^|2017-11-02T10:23:59+00:00|^|4298009288|^|80|^|2017-09-28T23:00:00+00:00|^|10K|^|171105584|^|ASFILED|^|2017-07-31T00:00:00+00:00|^|false|^|false|^|2017-07-31T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011835|^|1000716240|^|I|!|                                                                                                                                                                 |
|SelfSourcedPublic|^|2017-11-21T12:09:23+00:00|^|4295904170|^|364|^|2017-08-08T17:00:00+00:00|^|10Q|^|2017-07-30T00:00:00+00:00|^|false|^|false|^|2017-07-30T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011836|^|1000716240|^|I|!|                                                                                                                                                                                       |
|SelfSourcedPublic|^|2017-11-21T12:09:23+00:00|^|4295904170|^|365|^|2017-10-10T17:00:00+00:00|^|10K|^|2017-09-30T00:00:00+00:00|^|false|^|false|^|2017-09-30T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011835|^|1000716240|^|I|!|   

在输出 DocumentId 中为 null 被替换。

无法弄清楚我错过了什么?

【问题讨论】:

【参考方案1】:

concat_ws 确实在连接过程中删除了 null 列。如果要为串联结果中的每个null 保留一个占位符,一种方法是为na.fill() 创建一个与类型相关的colName -> nullValueMap,以在串联之前转换数据帧,如下所示:

val df = Seq(
  (new Integer(1), "a"),
  (new Integer(2), null),
  (null, "c")
).toDF("col1", "col2")

df.withColumn("concat", concat_ws("|", df.columns.map(col): _*)).
  show
// +----+----+------+
// |col1|col2|concat|
// +----+----+------+
// |   1|   a|   1|a|
// |   2|null|     2|
// |null|   c|     c|
// +----+----+------+

val naMap = df.dtypes.map( t => t._2 match 
  case "StringType" => (t._1, "(n/a)")
  case "IntegerType" => (t._1, 0)
  case "LongType" => (t._1, 0L)
  // cases for other types ...
 ).toMap
// naMap: scala.collection.immutable.Map[String,Any] = 
//   Map(col1 -> 0, col2 -> (n/a))

df.na.fill(naMap).
  withColumn("concat", concat_ws("|", df.columns.map(col): _*)).
  show
// +----+-----+-------+
// |col1| col2| concat|
// +----+-----+-------+
// |   1|    a|    1|a|
// |   2|(n/a)|2|(n/a)|
// |   0|    c|    0|c|
// +----+-----+-------+

【讨论】:

所以我必须为我的所有列编写 naMap 吗?...我怎样才能在我的解决方案中合并...基本上三个两个是为了避免 null 首先是 withColumns 然后填充?可以你请在我的代码中编辑.. 并且 null 作为字符串也被删除? 您只需要在naMap 中设置可能包含null 的列的类型,以指定相应的null 应该替换为什么。例如,我的示例中的 naMap 涵盖了可能为 null 的 String、Int 或 Long 类型的列。在您的情况下,只需在select 之前应用na.fill(naMap),例如val finaldfWithDelimiter = finaldf.na.fill(naMap).select(...)... @LeoC 很抱歉,但仍然无法为我工作naMap 我必须创建 naMap 吗? 是的,您创建naMap 并将na.fill(naMap) 应用于finaldf,然后再执行连接。请注意,我建议的只是一种不提供特定列的空填充方式。方法na.fill() 有一些变体,因此您可以选择最适合您的用例的方法。例如,您可以选择一个让您提供相同类型的特定列列表的选项。 此解决方案修改了原始列,请记住这一点,这可能是某些用例的问题。【参考方案2】:

由于concat_ws 忽略包含null 的列,您将不得不处理它们。

一种解决方案是按照建议的here 为na.fill() 创建一个与类型相关的colName -> nullValueMap,但是您必须指定所有情况。

另一种方法,既然你想获得一个String,是使用format_string函数:

// Proof of concept in Scala (I don't have the compiler to test it).
df
.withColumn(
  "concat",
  format_string(
    (for (c <- df.columns) yield "%s").mkString("|"),
    df.columns.map(col): _*
  ),
)

/*
  Same solution tested in PySpark.

  format_string(
    '|'.join(['%s' for c in df.columns]),
    *df.columns
  )
*/

通过这种方式,您将避免 Map 定义,并将为数据框列中的任何 null 值放置一个空字符串。

【讨论】:

这种方法的优点是原始列没有被修改,如果你看@Leo C的解决方案, col1 和 col2 的值正在被修改,并形成一些可能是问题。 缺点是null值会被'null'字符串替换。 @ruloweb 不,它不会放置“null”,而只会放置“”。因此,如果只有 col2 为空,您将获得类似:“col1||col3”。 嗯,它实际上在我的测试中做到了,至少在 spark 2.4.5 中: val df = List(("a", null, "c"), ("a", "b ", "c")).toDF("val1", "val2", "val3") df.withColumn("concat", format_string((for (c 【参考方案3】:

您也可以使用udf,例如:

val concatUDF: UserDefinedFunction = udf((columns: Seq[String]) =>
  columns.map(c => if (c == null) "" else c).reduceLeft((a, b) => s"$a:$b"))

df.withColumn("concatenated", concatUDF(array(columns.map(col): _*)))

其中arrayorg.apache.spark.sql.functions.array。这不会替换原始列,并将返回空值的空字符串,或者您希望替换的任何内容 (if (c == null) "")。

此外,您可以扩展 UDF 以支持多种类型。

【讨论】:

以上是关于concat_ws 从 spark 数据帧的输出中删除空字符串的主要内容,如果未能解决你的问题,请参考以下文章

使用定义的 StructType 转换 Spark 数据帧的值

将数据帧的每一行转换为字符串

concat_ws 使用在hive spark-sql上的区别

pyspark:如何获取 spark 数据帧的 Spark SQLContext?

spark数据帧的分区数?

在过滤损坏的记录字段时,Spark 的 .count() 函数与数据帧的内容不同