将 spark DataFrame 中的行收集到 JSON 对象中,然后将对象放入另一个 DF
Posted
技术标签:
【中文标题】将 spark DataFrame 中的行收集到 JSON 对象中,然后将对象放入另一个 DF【英文标题】:Collect rows from spark DataFrame into JSON object, then put the object to another DF 【发布时间】:2018-03-20 09:05:06 【问题描述】:我有一个包含一些应用程序使用数据的 Spark DataFrame。 我的目标是从这个 DataFrame 中收集某些指标,然后将它们累积在一起。
例如,我可能想在此 DataFrame 中获取我的产品的用户总数:
df.select($"user").count.distinct
100500
然后我想跨不同的应用程序版本构建统计数据
df.groupBy("version").count.toJSON.show(false)
+-----------------------------------------+
|value |
+-----------------------------------------+
|"version":"1.2.3.4","count":4051 |
|"version":"1.2.3.5","count":1 |
|"version":"1.2.4.6","count":1 |
|"version":"2.0.0.1","count":30433 |
|"version":"3.1.2.3","count":112195|
|"version":"3.1.0.4","count":11457 |
+-----------------------------------------+
然后我想压缩第二个DF中的记录,所以最后我需要一个这样的对象:
"totalUsers":100500, "versions":[
"version":"1.2.3.4","count":4051,
"version":"1.2.3.5","count":1,
"version":"1.2.4.6","count":1,
"version":"2.0.0.1","count":30433,
"version":"3.1.2.3","count":112195,
"version":"3.1.0.4","count":11457]
那么这个对象应该被写入另一个spark DF。
实现这一点的正确方法是什么?
免责声明:我对火花很陌生,所以如果我的问题太幼稚,我很抱歉。 我读过很多类似的问题,包括看似相似的问题,如this 和this。后者很接近,但仍然没有给出如何将多行累积到一个对象中的线索。我也无法从Apache Spark docs 中理解它。
【问题讨论】:
所以,您希望以对象 "totalUsers":100500, "versions":[. ...
的单个实例结束???
@nabongs 是的,没错。
@VasiliyGalkin 请看这里***.com/questions/46482058/…
@statut 感谢您的链接。但是,它解释了如何在多列之间连接数据,而我正在寻找行连接。
@VasiliyGalkin 你能发布原始数据框(df)吗?一些示例行将帮助我们测试并为您提供正确的解决方案
【参考方案1】:
尝试使用collect_list函数,例如:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
totalUsers = 100500
agg = df.groupBy().agg(F.collect_list("value").alias('versions')).withColumn("totalUsers", lit(totalUsers)).show()
其中df
是具有聚合版本的数据框。我得到以下结果:
+--------------------+----------+
| versions|totalUsers|
+--------------------+----------+
|["version":"1.2....| 100500|
+--------------------+----------+
我的示例是用 Python 编写的,但我相信您可以将相同的方法用于您的语言。
【讨论】:
以上是关于将 spark DataFrame 中的行收集到 JSON 对象中,然后将对象放入另一个 DF的主要内容,如果未能解决你的问题,请参考以下文章
在 groupby 之后将 Spark DataFrame 的行聚合到 String
pyspark dataframe foreach 填充列表
按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除