基于spark中的列组合数据

Posted

技术标签:

【中文标题】基于spark中的列组合数据【英文标题】:Combining data based on column in spark 【发布时间】:2016-07-15 20:50:51 【问题描述】:

我在 hive 表中有以下格式的数据。

user |  purchase | time_of_purchase

我想获取数据

user | list of purchases ordered by time

如何在 pyspark 或 hiveQL 中执行此操作?

我尝试在 hive 中使用 collect_list,但它没有按时间戳正确保留顺序。

编辑: 按照 KartikKannapur 的要求添加样本数据。 这是一个示例数据

94438fef-c503-4326-9562-230e78796f16 | Bread | Jul 7 20:48
94438fef-c503-4326-9562-230e78796f16 | Shaving Cream | July 10 14:20
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk | July 7 3:48
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Bread | July 7 3:49
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Lotion | July 7 15:30

我想要的输出是

94438fef-c503-4326-9562-230e78796f16 | Bread , Shaving Cream
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk , Bread , Lotion

【问题讨论】:

在 Spark 是否可以通过其他库或任何形式的 RDD 转换等来做到这一点。 您始终可以转换为 RDD 和 groupByKey,但会造成性能损失。 你应该可以使用 Spark SQL 来做同样的事情。如果您需要更多帮助,请发布您正在使用的数据示例。 @KartikKannapur 我很好奇,你将如何单独使用 DataFrames 来做到这一点? collect_* 仅支持原语 ( 1.6.0) 中单独安排,因此您可以推断顺序,PySpark 中没有 Dataset 支持,并且所有其他方法都需要繁琐的技巧将数据移入和移出 Python,这相当于使用 RDD。更不用说它是另一个按键组:) 【参考方案1】:

一种方法是

首先创建一个配置单元上下文并将表读取到一个 RDD。

from pyspark import HiveContext
purchaseList = HiveContext(sc).sql('from purchaseList select *')

然后处理RDD

from datetime import datetime as dt
purchaseList = purchaseList.map(lambda x:(x[0],[x[1],dt.strptime(x[2],"%b %d %H:%M")]))
purchaseByUser = purchaseList.groupByKey()
purchaseByUser = purchaseByUser.map(lambda x:(x[0],[y[0] for y in sorted(x[1], key=lambda z:z[1])]))
print(purchaseByUser.take(2))

输出

[('94438fef-c503-4326-9562-230e78796f16', ['Bread', 'Shaving Cream']), ('a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad', ['Milk', 'Bread', 'Lotion'])]

将 RDD 保存为新的 hive 表

schema_rdd = HiveContext(sc).inferSchema(purchaseByUser)
schema_rdd.saveAsTable('purchaseByUser')

有关读取和写入 hive 表的信息,请参见 *** question 和 spark docs

【讨论】:

以上是关于基于spark中的列组合数据的主要内容,如果未能解决你的问题,请参考以下文章

组合具有不同列数的 Spark 数据帧

SQL中基于Common ID的列值组合

基于 Spark DF 中 2 列的组合过滤行

如何组合熊猫数据透视表中的列?

是否可以组合视图和存储过程中的列?

将三个数据帧中的列一一组合