将 spark DataFrame 中的行收集到 JSON 对象中,然后将对象放入另一个 DF

Posted

技术标签:

【中文标题】将 spark DataFrame 中的行收集到 JSON 对象中,然后将对象放入另一个 DF【英文标题】:Collect rows from spark DataFrame into JSON object, then put the object to another DF 【发布时间】:2018-03-20 09:05:06 【问题描述】:

我有一个包含一些应用程序使用数据的 Spark DataFrame。 我的目标是从这个 DataFrame 中收集某些指标,然后将它们累积在一起。

例如,我可能想在此 DataFrame 中获取我的产品的用户总数:

df.select($"user").count.distinct
100500

然后我想跨不同的应用程序版本构建统计数据

df.groupBy("version").count.toJSON.show(false)

+-----------------------------------------+
|value                                    |
+-----------------------------------------+
|"version":"1.2.3.4","count":4051  |
|"version":"1.2.3.5","count":1     |
|"version":"1.2.4.6","count":1     |
|"version":"2.0.0.1","count":30433 |
|"version":"3.1.2.3","count":112195|
|"version":"3.1.0.4","count":11457 |
+-----------------------------------------+

然后我想压缩第二个DF中的记录,所以最后我需要一个这样的对象

 "totalUsers":100500, "versions":[
  "version":"1.2.3.4","count":4051,
  "version":"1.2.3.5","count":1,
  "version":"1.2.4.6","count":1,
  "version":"2.0.0.1","count":30433,
  "version":"3.1.2.3","count":112195,
  "version":"3.1.0.4","count":11457] 

那么这个对象应该被写入另一个spark DF

实现这一点的正确方法是什么?

免责声明:我对火花很陌生,所以如果我的问题太幼稚,我很抱歉。 我读过很多类似的问题,包括看似相似的问题,如this 和this。后者很接近,但仍然没有给出如何将多行累积到一个对象中的线索。我也无法从Apache Spark docs 中理解它。

【问题讨论】:

所以,您希望以对象 "totalUsers":100500, "versions":[. ... 的单个实例结束??? @nabongs 是的,没错。 @VasiliyGalkin 请看这里***.com/questions/46482058/… @statut 感谢您的链接。但是,它解释了如何在多列之间连接数据,而我正在寻找行连接。 @VasiliyGalkin 你能发布原始数据框(df)吗?一些示例行将帮助我们测试并为您提供正确的解决方案 【参考方案1】:

尝试使用collect_list函数,例如:

from pyspark.sql import functions as F
from pyspark.sql.functions import lit
totalUsers = 100500
agg = df.groupBy().agg(F.collect_list("value").alias('versions')).withColumn("totalUsers", lit(totalUsers)).show()

其中df 是具有聚合版本的数据框。我得到以下结果:

+--------------------+----------+
|            versions|totalUsers|
+--------------------+----------+
|["version":"1.2....|    100500|
+--------------------+----------+

我的示例是用 Python 编写的,但我相信您可以将相同的方法用于您的语言。

【讨论】:

以上是关于将 spark DataFrame 中的行收集到 JSON 对象中,然后将对象放入另一个 DF的主要内容,如果未能解决你的问题,请参考以下文章

在 groupby 之后将 Spark DataFrame 的行聚合到 String

pyspark dataframe foreach 填充列表

合并spark scala Dataframe中的行

按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除

从包含 Option[T] 的行创建 DataFrame 的问题

如何在加入(广播)和使用 Spark 收集之间进行选择