如何在火花中使用逗号分隔符将相同的列值连接到新列
Posted
技术标签:
【中文标题】如何在火花中使用逗号分隔符将相同的列值连接到新列【英文标题】:how to concat the same column value to a new column with comma delimiters in spark 【发布时间】:2016-10-13 13:44:49 【问题描述】:输入数据格式如下:
+--------------------+-------------+--------------------+
| date | user | product |
+--------------------+-------------+--------------------+
| 2016-10-01 | Tom | computer |
+--------------------+-------------+--------------------+
| 2016-10-01 | Tom | iphone |
+--------------------+-------------+--------------------+
| 2016-10-01 | Jhon | book |
+--------------------+-------------+--------------------+
| 2016-10-02 | Tom | pen |
+--------------------+-------------+--------------------+
| 2016-10-02 | Jhon | milk |
+--------------------+-------------+--------------------+
输出格式如下:
+-----------+-----------------------+
| user | products |
+-----------------------------------+
| Tom | computer,iphone,pen |
+-----------------------------------+
| Jhon | book,milk |
+-----------------------------------+
输出显示每个用户按日期购买的所有产品。
我想使用 Spark 处理这些数据,请问谁能帮帮我?谢谢。
【问题讨论】:
Spark Dataframe groupby with agg performing list appending的可能重复 Concatenate columns in apache spark dataframe的可能重复 【参考方案1】:最好使用 map-reduceBykey() 组合而不是 groupBy.. 还假设数据没有
#Read the data using val ordersRDD = sc.textFile("/file/path")
val ordersRDD = sc.parallelize( List(("2016-10-01","Tom","computer"),
("2016-10-01","Tom","iphone"),
("2016-10-01","Jhon","book"),
("2016-10-02","Tom","pen"),
("2016-10-02","Jhon","milk")))
#group by (date, user), sort by key & reduce by user & concatenate products
val dtusrGrpRDD = ordersRDD.map(rec => ((rec._2, rec._1), rec._3))
.sortByKey().map(x=>(x._1._1, x._2))
.reduceByKey((acc, v) => acc+","+v)
#if needed, make it to DF
scala> dtusrGrpRDD.toDF("user", "product").show()
+----+-------------------+
|user| product|
+----+-------------------+
| Tom|computer,iphone,pen|
|Jhon| book,milk|
+----+-------------------+
【讨论】:
请问您的 Spark 是哪个版本的?这里有一个错误:无法解析符号 sortByKey。 使用 DataFrames 而不是 RDDs 通常是首选,因为您可以让执行引擎负责决定最佳执行计划,它会自动应用各种优化(谓词下推、代码生成等) . databricks.com/blog/2016/07/14/…【参考方案2】:如果您使用的是 HiveContext(您应该是):
使用 python 的示例:
from pyspark.sql.functions import collect_set
df = ... load your df ...
new_df = df.groupBy("user").agg(collect_set("product").alias("products"))
如果您不想对产品中的结果列表进行重复数据删除,则可以使用 collect_list 代替。
【讨论】:
【参考方案3】:对于数据帧,它是两行:
import org.apache.spark.sql.functions.collect_list
//collect_set nistead of collect_list if you don't want duplicates
val output = join.groupBy("user").agg(collect_list($"product"))
GroupBy 会给你一个分组的用户集帖子,你可以在分组的数据集上迭代和 collect_list 或 collect_set。
【讨论】:
以上是关于如何在火花中使用逗号分隔符将相同的列值连接到新列的主要内容,如果未能解决你的问题,请参考以下文章