spark scala数据帧中键值对的增量值计数
Posted
技术标签:
【中文标题】spark scala数据帧中键值对的增量值计数【英文标题】:Increment value count of key-value pairs in spark scala dataframe 【发布时间】:2020-10-19 06:25:43 【问题描述】:我是 spark & scala 的新手,我正在尝试使用另一列中的值来增加列中键值对的值。
下面是输入的DataFrame。
val inputDF = Seq(
(1, "Visa", 1, None),
(2, "MC", 2, Some("Visa -> 1")),
(3, "Amex", 1, None),
(4, "Amex", 3, Some("Visa -> 1, MC -> 1")),
(5, "Amex", 4, Some("Visa -> 2, MC -> 1")),
(6, "MC", 1, None),
(7, "Visa", 5, Some("Visa -> 2, MC -> 1, Amex -> 1")),
(8, "Visa", 6, Some("Visa -> 2, MC -> 2, Amex -> 1")),
(9, "MC", 1, None),
(10, "MC", 2, Some("Amex -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details |
+---------+---------+---------------+-----------------------------+
|1 |Visa |1 |null |
|2 |MC |2 |Visa -> 1 |
|3 |Amex |1 |null |
|4 |Amex |3 |Visa -> 1, MC -> 1 |
|5 |Amex |4 |Visa -> 2, MC -> 1 |
|6 |MC |1 |null |
|7 |Visa |5 |Visa -> 2, MC -> 1, Amex -> 1|
|8 |Visa |6 |Visa -> 2, MC -> 2, Amex -> 1|
|9 |MC |1 |null |
|10 |MC |2 |Amex -> 1 |
+---------+---------+---------------+-----------------------------+
现在从上面的输入中,如果 card_type_details 的值为 null,则从 card_type 中获取值并添加 -> 1(如第一行)。
如果 card_type_details 的值不为空,则检查 card_type 是否已经作为 card_type_details 中的键存在。如果是,则将对应键的值加1,否则,添加一个新的键值对(如第二行和第七行)。
以下是预期输出:
val expectedOutputDF = Seq(
(1, "Visa", 1, Some("Visa -> 1")),
(2, "MC", 2, Some("Visa -> 1, MC -> 1")),
(3, "Amex", 1, Some("Amex -> 1")),
(4, "Amex", 3, Some("Visa -> 1, MC -> 1, Amex -> 1")),
(5, "Amex", 4, Some("Visa -> 2, MC -> 1, Amex -> 1")),
(6, "MC", 1, Some("MC -> 1")),
(7, "Visa", 5, Some("Visa -> 3, MC -> 1, Amex -> 1")),
(8, "Visa", 6, Some("Visa -> 3, MC -> 2, Amex -> 1")),
(9, "MC", 1, Some("MC -> 1")),
(10, "MC", 2, Some("Amex -> 1, MC -> 1"))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details |
+---------+---------+---------------+-----------------------------+
|1 |Visa |1 |Visa -> 1 |
|2 |MC |2 |Visa -> 1, MC -> 1 |
|3 |Amex |1 |Amex -> 1 |
|4 |Amex |3 |Visa -> 1, MC -> 1, Amex -> 1|
|5 |Amex |4 |Visa -> 2, MC -> 1, Amex -> 1|
|6 |MC |1 |MC -> 1 |
|7 |Visa |5 |Visa -> 3, MC -> 1, Amex -> 1|
|8 |Visa |6 |Visa -> 3, MC -> 2, Amex -> 1|
|9 |MC |1 |MC -> 1 |
|10 |MC |2 |Amex -> 1, MC -> 1 |
+---------+---------+---------------+-----------------------------+
关于如何提取这个有什么建议吗?
【问题讨论】:
card_type_details
列的数据类型是什么??
@Srinivas 我已将 card_type_details 视为字符串。假设最终输出将写入 Cassandra 表。
【参考方案1】:
假设card_type_details
是map
类型。检查下面的代码。
scala> df.show(false)
+---------+---------+---------------+-----------------------------+
|person_id|card_type|number_of_cards|card_type_details |
+---------+---------+---------------+-----------------------------+
|1 |Visa |1 |null |
|2 |MC |2 |Visa -> 1 |
|3 |Amex |1 |null |
|4 |Amex |3 |Visa -> 1, MC -> 1 |
|5 |Amex |4 |Visa -> 2, MC -> 1 |
|6 |MC |1 |null |
|7 |Visa |5 |Visa -> 2, MC -> 1, Amex -> 1|
|8 |Visa |6 |Visa -> 2, MC -> 2, Amex -> 1|
|9 |MC |1 |null |
|10 |MC |2 |Amex -> 1 |
+---------+---------+---------------+-----------------------------+
创建表达式。
scala> :paste
// Entering paste mode (ctrl-D to finish)
val colExpr = when(size($"card_type_details") === 0, map($"card_type",lit(1)))
.otherwise(
when(
expr("card_type_details[card_type]").isNotNull,
map_concat(
expr("map(card_type,card_type_details[card_type] + 1)"),
expr("map_filter(card_type_details,(k,v) -> k != card_type)")
)
)
.otherwise(map_concat($"card_type_details",map($"card_type",lit(1))))
)
// Exiting paste mode, now interpreting.
colExpr: org.apache.spark.sql.Column = CASE WHEN (size(card_type_details) = 0) THEN map(card_type, 1) ELSE CASE WHEN (card_type_details[card_type] IS NOT NULL) THEN map_concat(map(card_type, (card_type_details[card_type] + 1)), map_filter(card_type_details, lambdafunction((NOT (k = card_type)), k, v))) ELSE map_concat(card_type_details, map(card_type, 1)) END END
scala> indf.withColumn("new_card_type_details",colExpr).show(false)
+---------+---------+---------------+-------------------------------+-------------------------------+
|person_id|card_type|number_of_cards|card_type_details |new_card_type_details |
+---------+---------+---------------+-------------------------------+-------------------------------+
|1 |Visa |1 |[] |[Visa -> 1] |
|2 |MC |2 |[Visa -> 1] |[Visa -> 1, MC -> 1] |
|3 |Amex |1 |[] |[Amex -> 1] |
|4 |Amex |3 |[Visa -> 1, MC -> 1] |[Visa -> 1, MC -> 1, Amex -> 1]|
|5 |Amex |4 |[Visa -> 2, MC -> 1] |[Visa -> 2, MC -> 1, Amex -> 1]|
|6 |MC |1 |[] |[MC -> 1] |
|7 |Visa |5 |[Visa -> 2, MC -> 1, Amex -> 1]|[Visa -> 3, MC -> 1, Amex -> 1]|
|8 |Visa |6 |[Visa -> 2, MC -> 2, Amex -> 1]|[Visa -> 3, MC -> 2, Amex -> 1]|
|9 |MC |1 |[] |[MC -> 1] |
|10 |MC |2 |[Amex -> 1] |[Amex -> 1, MC -> 1] |
+---------+---------+---------------+-------------------------------+-------------------------------+
【讨论】:
看起来 id 8,9 在 new_card_type_details 列中的输出值不正确。 在输入中将 card_type_details 作为地图数据类型传递时,我得到不同的输出。附上下面cmets中的输入输出。val inputDF2 = Seq((1, "Visa", 1, Map[String, Int]()), (2, "MC", 2, Map("Visa" -> 1)), (3, "Amex", 1, Map[String, Int]()), (4, "Amex", 3, Map("Visa" -> 1, "MC" -> 1)), (5, "Amex", 4, Map("Visa" -> 2, "MC" -> 1)),(6, "MC", 1, Map[String, Int]()), (7, "Visa", 5, Map("Visa" -> 2, "MC" -> 1, "Amex" -> 1)), (8, "Visa", 6, Map("Visa" -> 2, "MC" -> 2, "Amex" -> 1)), (9, "MC", 1, Map[String, Int]()),(10, "MC", 2, Map("Amex" -> 1))).toDF("person_id", "card_type", "number_of_cards", "card_type_details")
inputDF2.withColumn("new_card_type_details",colExpr).show(false)
没有。当 card_type_details 不为空时,我在 new_card_type_details 列中为空。
我得到正确的值,你使用更新的解决方案吗?以上是关于spark scala数据帧中键值对的增量值计数的主要内容,如果未能解决你的问题,请参考以下文章
无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值