将最大值时间戳放入 PySpark 中的数组中
Posted
技术标签:
【中文标题】将最大值时间戳放入 PySpark 中的数组中【英文标题】:Put largest value timestamp in array in PySpark 【发布时间】:2019-12-07 10:08:42 【问题描述】:我有一个 PySpark 数据框(比如 df1
),其中包含以下列
1.> category
- 包含独特的类别类型
2.> start_time_array
- 按升序排列的时间戳数组
3.> end_time_array
- 按升序排列的时间戳数组
4.> lenStart
- start_time_array
中的数组长度
5.> lenEnd
- end_time_array
中的数组长度
以下是df1
的示例:
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B| [2017-02-18 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 1|
+--------+------------------------------------------+------------------------------------------+--------+------+
还有另一个数据框df2
,其中包含两列category
和timestamp
。 df2
包含与df1
相同的category
值,df1
中数组内的时间戳值是df2
中时间戳的子集。以下是df2
的示例
+--------+-------------------+
|category| timestamp|
+--------+-------------------+
| A|2017-01-16 00:00:00|
| A|2017-01-18 00:00:00|
| A|2017-01-25 00:00:00|
| A|2017-01-27 00:00:00|
| B|2017-02-14 00:00:00|
| B|2017-02-18 00:00:00|
| B|2017-02-21 00:00:00|
| B|2017-02-22 00:00:00|
| B|2017-02-24 00:00:00|
| B|2017-02-25 00:00:00|
+--------+-------------------+
正如我们在上面的df1
示例中看到的,对于category -> B
,lenStart=2
不等于lenEnd=1
。在df1
的所有行中,lenStart = lenEnd
或lenStart = lenEnd+1
对于df1
中lenStart = lenEnd+1
的所有行,我想取timestamp
的最大值(适当的category
) 并将其放入end_time_array
的数组中。我该怎么做?
以下是使用来自df2
的信息处理df1
后的预期输出
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B|[2017-02-18 00:00:00, 2017-02-25 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 2|
+--------+------------------------------------------+------------------------------------------+--------+------+
【问题讨论】:
【参考方案1】:这应该适用于 Spark 1.5+:
import pyspark.sql.functions as F
df3 = df1.where(F.col('lenStart') == (F.col('lenEnd') + 1)).select('category')
df4 = df2.join(df3, 'Category').groupby('Category').agg(F.max('timestamp').alias('max'))
df5 = df1.join(df4, 'Category', 'left')
df1_changed = df5.withColumn('end_time_array', F.when(F.col('max').isNull(),
F.col('end_time_array')).otherwise(F.concat(F.col('end_time_array'),
F.array(F.col('max')))))
df1_changed = df1_changed.withColumn('lenEnd', F.size(F.col('end_time_array')))
df1_changed
将有一个修改后的end_time_array
列,当您请求的条件适用时,它会添加想要的值,否则,它保持不变。
【讨论】:
以上是关于将最大值时间戳放入 PySpark 中的数组中的主要内容,如果未能解决你的问题,请参考以下文章