如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

Posted

技术标签:

【中文标题】如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]【英文标题】:how to implement uniqueConcatenate, uniqueCount in spark scala [closed] 【发布时间】:2022-01-10 17:01:58 【问题描述】:

我正在尝试转换数据,旧代码在 Tibco 中并使用 uniqueConcatenate、uniqueCount 函数。

我不确定我们如何在 spark scala 中实现相同的输出。

uniqueConcatenate 示例:

uniqueCount 示例:

我尝试使用 collect_set,但因为我需要通过其他列进行分区,这似乎不适合我。

请帮帮我!

【问题讨论】:

【参考方案1】:

对于 uniqueConcatenate,您可以使用 collect_set() 函数将列聚合到集合中。 例如:

import org.apache.spark.sql.functions.collect_set, concat_ws
import spark.implicits._

case class Record(col1: Option[Int] = None, col2: Option[Int] = None, col3: Option[Int] = None)

val df: DataFrame = Seq(Record(Some(1), Some(1), Some(1)), Record(Some(1), None, Some(3)), Record(Some(1), Some(3), Some(3))).toDF("col1", "col2", "col3")

df.show()

/*
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   1|   1|
|   1|null|   3|
|   1|   3|   3|
+----+----+----+
*/

df.agg(
  concat_ws(". ", collect_set("col1")).as("col1"),
  concat_ws(". ", collect_set("col2")).as("col2"),
  concat_ws(". ", collect_set("col3")).as("col3")
).show()

/*
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|1. 3|1. 3|
+----+----+----+
*/

对于 uniqueCount,您可以以类似的方式使用countDistinct

import org.apache.spark.sql.functions.countDistinct

df.agg(
  countDistinct("col1").as("col1"),
  countDistinct("col2").as("col2"),
  countDistinct("col3").as("col3")
).show()

/*
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   2|
+----+----+----+
*/

【讨论】:

添加更多细节:我的情况类似于 - uniqueConcatenate(col_1) over (col_2) uniqueCount(col_3) over (col_4) over 似乎我需要使用 WIndow 函数 partitionBy 列,以上两个我尝试但没有得到预期的输出 上面的 sn-ps 完全解决了你的问题。如果您期望 *** 中的人会解决您的任务 - 这不是正确的地方。 抱歉,我正在尝试从这里寻求帮助,以解决我在经过多次不同试验后面临的问题。感谢您的帮助和反馈。 谢谢!我之前尝试过 countDistinct,对于具有整数或 bigint 值的 col,它工作正常,但我有 varchar col 即示例

以上是关于如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

在Scala spark中实现动态字符串插值?

在 spark 中实现 informatica 逻辑

如何在 Scala 中实现 DAO?

如何在 Scala 中实现 Kafka Consumer

如何在 Scala 中实现真正的 Singleton

您将如何在 Scala 中实现缓存方面