spark scala数据帧中键值对的增量值计数

Posted

技术标签:

【中文标题】spark scala数据帧中键值对的增量值计数【英文标题】:Increment value count of key-value pairs in spark scala dataframe 【发布时间】:2020-10-19 06:25:43 【问题描述】:

我是 spark & scala 的新手,我正在尝试使用另一列中的值来增加列中键值对的值。

下面是输入的DataFrame。

val inputDF = Seq(
(1, "Visa", 1, None), 
(2, "MC", 2, Some("Visa -> 1")), 
(3, "Amex", 1, None), 
(4, "Amex", 3, Some("Visa -> 1, MC -> 1")), 
(5, "Amex", 4, Some("Visa -> 2, MC -> 1")),
(6, "MC", 1, None), 
(7, "Visa", 5, Some("Visa -> 2, MC -> 1, Amex -> 1")), 
(8, "Visa", 6, Some("Visa -> 2, MC -> 2, Amex -> 1")), 
(9, "MC", 1, None),
(10, "MC", 2, Some("Amex -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")

+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details            |
+---------+---------+---------------+-----------------------------+
|1        |Visa     |1              |null                         |
|2        |MC       |2              |Visa -> 1                    |
|3        |Amex     |1              |null                         |
|4        |Amex     |3              |Visa -> 1, MC -> 1           |
|5        |Amex     |4              |Visa -> 2, MC -> 1           |
|6        |MC       |1              |null                         |
|7        |Visa     |5              |Visa -> 2, MC -> 1, Amex -> 1|
|8        |Visa     |6              |Visa -> 2, MC -> 2, Amex -> 1|
|9        |MC       |1              |null                         |
|10       |MC       |2              |Amex -> 1                    |
+---------+---------+---------------+-----------------------------+

现在从上面的输入中,如果 card_type_details 的值为 null,则从 card_type 中获取值并添加 -> 1(如第一行)。

如果 card_type_details 的值不为空,则检查 card_type 是否已经作为 card_type_details 中的键存在。如果是,则将对应键的值加1,否则,添加一个新的键值对(如第二行和第七行)。

以下是预期输出:

val expectedOutputDF = Seq(
(1, "Visa", 1, Some("Visa -> 1")), 
(2, "MC", 2, Some("Visa -> 1, MC -> 1")), 
(3, "Amex", 1, Some("Amex -> 1")), 
(4, "Amex", 3, Some("Visa -> 1, MC -> 1, Amex -> 1")), 
(5, "Amex", 4, Some("Visa -> 2, MC -> 1, Amex -> 1")),
(6, "MC", 1, Some("MC -> 1")), 
(7, "Visa", 5, Some("Visa -> 3, MC -> 1, Amex -> 1")), 
(8, "Visa", 6, Some("Visa -> 3, MC -> 2, Amex -> 1")), 
(9, "MC", 1, Some("MC -> 1")),
(10, "MC", 2, Some("Amex -> 1, MC -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")

+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details            |
+---------+---------+---------------+-----------------------------+
|1        |Visa     |1              |Visa -> 1                    |
|2        |MC       |2              |Visa -> 1, MC -> 1           |
|3        |Amex     |1              |Amex -> 1                    |
|4        |Amex     |3              |Visa -> 1, MC -> 1, Amex -> 1|
|5        |Amex     |4              |Visa -> 2, MC -> 1, Amex -> 1|
|6        |MC       |1              |MC -> 1                      |
|7        |Visa     |5              |Visa -> 3, MC -> 1, Amex -> 1|
|8        |Visa     |6              |Visa -> 3, MC -> 2, Amex -> 1|
|9        |MC       |1              |MC -> 1                      |
|10       |MC       |2              |Amex -> 1, MC -> 1           |
+---------+---------+---------------+-----------------------------+

关于如何提取这个有什么建议吗?

【问题讨论】:

card_type_details 列的数据类型是什么?? @Srinivas 我已将 card_type_details 视为字符串。假设最终输出将写入 Cassandra 表。 【参考方案1】:

假设card_type_detailsmap 类型。检查下面的代码。

scala> df.show(false)
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details            |
+---------+---------+---------------+-----------------------------+
|1        |Visa     |1              |null                         |
|2        |MC       |2              |Visa -> 1                    |
|3        |Amex     |1              |null                         |
|4        |Amex     |3              |Visa -> 1, MC -> 1           |
|5        |Amex     |4              |Visa -> 2, MC -> 1           |
|6        |MC       |1              |null                         |
|7        |Visa     |5              |Visa -> 2, MC -> 1, Amex -> 1|
|8        |Visa     |6              |Visa -> 2, MC -> 2, Amex -> 1|
|9        |MC       |1              |null                         |
|10       |MC       |2              |Amex -> 1                    |
+---------+---------+---------------+-----------------------------+

创建表达式。

scala> :paste
// Entering paste mode (ctrl-D to finish)

val colExpr = when(size($"card_type_details") === 0, map($"card_type",lit(1)))
.otherwise(
    when(
        expr("card_type_details[card_type]").isNotNull,
        map_concat(
            expr("map(card_type,card_type_details[card_type] + 1)"),
            expr("map_filter(card_type_details,(k,v) -> k != card_type)")
        )
    )
    .otherwise(map_concat($"card_type_details",map($"card_type",lit(1))))
)

// Exiting paste mode, now interpreting.

colExpr: org.apache.spark.sql.Column = CASE WHEN (size(card_type_details) = 0) THEN map(card_type, 1) ELSE CASE WHEN (card_type_details[card_type] IS NOT NULL) THEN map_concat(map(card_type, (card_type_details[card_type] + 1)), map_filter(card_type_details, lambdafunction((NOT (k = card_type)), k, v))) ELSE map_concat(card_type_details, map(card_type, 1)) END END
scala> indf.withColumn("new_card_type_details",colExpr).show(false)
+---------+---------+---------------+-------------------------------+-------------------------------+
|person_id|card_type|number_of_cards|card_type_details              |new_card_type_details          |
+---------+---------+---------------+-------------------------------+-------------------------------+
|1        |Visa     |1              |[]                             |[Visa -> 1]                    |
|2        |MC       |2              |[Visa -> 1]                    |[Visa -> 1, MC -> 1]           |
|3        |Amex     |1              |[]                             |[Amex -> 1]                    |
|4        |Amex     |3              |[Visa -> 1, MC -> 1]           |[Visa -> 1, MC -> 1, Amex -> 1]|
|5        |Amex     |4              |[Visa -> 2, MC -> 1]           |[Visa -> 2, MC -> 1, Amex -> 1]|
|6        |MC       |1              |[]                             |[MC -> 1]                      |
|7        |Visa     |5              |[Visa -> 2, MC -> 1, Amex -> 1]|[Visa -> 3, MC -> 1, Amex -> 1]|
|8        |Visa     |6              |[Visa -> 2, MC -> 2, Amex -> 1]|[Visa -> 3, MC -> 2, Amex -> 1]|
|9        |MC       |1              |[]                             |[MC -> 1]                      |
|10       |MC       |2              |[Amex -> 1]                    |[Amex -> 1, MC -> 1]           |
+---------+---------+---------------+-------------------------------+-------------------------------+

【讨论】:

看起来 id 8,9 在 new_card_type_details 列中的输出值不正确。 在输入中将 card_type_details 作为地图数据类型传递时,我得到不同的输出。附上下面cmets中的输入输出。 val inputDF2 = Seq((1, "Visa", 1, Map[String, Int]()), (2, "MC", 2, Map("Visa" -> 1)), (3, "Amex", 1, Map[String, Int]()), (4, "Amex", 3, Map("Visa" -> 1, "MC" -> 1)), (5, "Amex", 4, Map("Visa" -> 2, "MC" -> 1)),(6, "MC", 1, Map[String, Int]()), (7, "Visa", 5, Map("Visa" -> 2, "MC" -> 1, "Amex" -> 1)), (8, "Visa", 6, Map("Visa" -> 2, "MC" -> 2, "Amex" -> 1)), (9, "MC", 1, Map[String, Int]()),(10, "MC", 2, Map("Amex" -> 1))).toDF("person_id", "card_type", "number_of_cards", "card_type_details") inputDF2.withColumn("new_card_type_details",colExpr).show(false) 没有。当 card_type_details 不为空时,我在 new_card_type_details 列中为空。 我得到正确的值,你使用更新的解决方案吗?

以上是关于spark scala数据帧中键值对的增量值计数的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值

以键为新列重塑键值对的火花数据框

python字典中键值对的值为中文,打印成转义字符,怎么解决

反转字典中键值对的顺序(Python)[重复]

Python中删除字典中键值对的方法

6.spark core之键值对操作