根据时间范围和第二个参数减少数据帧
Posted
技术标签:
【中文标题】根据时间范围和第二个参数减少数据帧【英文标题】:Reducing Dataframe based on time range and a second parameter 【发布时间】:2018-07-11 13:17:34 【问题描述】:我有一组类似于以下的数据,我正试图找到一种方法来减少在 python 中使用 spark 数据帧。
uuid if_id start_time end_time ip_addr
1 03 2018/07/01 13:00:00 2018/07/01 13:00:01 1.1.1.1
1 03 2018/07/01 13:01:05 2018/07/01 13:02:00 1.1.1.1
1 03 2018/07/01 15:00:00 2018/07/01 15:00:30 1.1.1.1
1 03 2018/07/02 01:00:00 2018/07/02 01:00:07 1.2.3.4
1 03 2018/07/02 08:30:00 2018/07/02 08:32:04 1.2.3.4
1 03 2018/07/02 12:00:00 2018/07/02 12:01:00 1.1.1.1
1 05 2018/07/01 15:00:02 2018/07/01 15:00:35 2.2.2.2
1 05 2018/07/01 13:45:23 2018/07/01 13:45:40 2.2.2.2
我需要将上述数据减少到以下:
uuid if_id start_time end_time ip_addr
1 03 2018/07/01 13:00:00 2018/07/01 15:00:30 1.1.1.1
1 03 2018/07/02 01:00:00 2018/07/02 08:32:04 1.2.3.4
1 03 2018/07/02 12:00:00 2018/07/02 12:01:00 1.1.1.1
1 05 2018/07/01 13:45:23 2018/07/01 15:00:35 2.2.2.2
最终数据集应表示一个表格,显示在给定时间段(start_time 到 end_time)内,哪个 IP 地址被分配给由 uuid 标识的特定主机的接口 (if_id)。
如果给定接口不可能随时间改变 IP 地址,例如 uuid=1 和 if_id=3,则可以使用 groupBy 和窗口规范来处理,以提取最小 start_time 和最大时间结束。但是,鉴于地址可以更改,我不确定如何在不多次传递数据的情况下解决此问题。
任何建议的方法将不胜感激。
【问题讨论】:
How to aggregate over rolling time window with groups in Spark 【参考方案1】:使用 user8371915 建议的链接,我想出了以下解决方案。
import pyspark.sql.functions as func
from pyspark.sql.window import Window
df = spark.createDataFrame([Row(uuid=1, int_id='03', event_start=701130000, event_end=701130001, ip='1.1.1.1'),
Row(uuid=1, int_id='03', event_start=701130105, event_end=701130200, ip='1.1.1.1'),
Row(uuid=1, int_id='05', event_start=701134523, event_end=701134540, ip='2.2.2.2'),
Row(uuid=1, int_id='03', event_start=701150000, event_end=701150030, ip='1.1.1.1'),
Row(uuid=1, int_id='05', event_start=701150002, event_end=701150035, ip='2.2.2.2'),
Row(uuid=1, int_id='03', event_start=702010000, event_end=702010007, ip='1.2.3.4'),
Row(uuid=1, int_id='03', event_start=702083000, event_end=702083204, ip='1.2.3.4'),
Row(uuid=1, int_id='03', event_start=702120000, event_end=702120100, ip='1.1.1.1')])
window1 = Window.partitionBy('uuid', 'int_id').orderBy('event_start', 'event_end')
window2 = Window.partitionBy('uuid', 'int_id', 'time_group') \
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# get previous row's ip address
prev_ip = func.lag('ip', 1).over(window1)
#indicate if an IP address change has occurred between current and previous rows
indicator = func.coalesce((col('ip') != prev_ip).cast('integer'), lit(1))
# Cumulative sum of indicators over the window
time_group = func.sum(indicator).over(window1).alias('time_group')
#Add time_group expression to the table:
df = df.select('*', time_group)
# Add begin and end time period for each interface ip address
df = df.select('uuid', 'int_id', 'ip',
func.min('event_start').over(window2).alias('period_begin'),
func.max('event_end').over(window2).alias('period_end')) \
.dropDuplicates() \
.orderBy('uuid', 'int_id', 'period_begin', 'ip')
df.show(truncate=False)
以上产生以下结果:
+----+------+-------+------------+----------+
|uuid|int_id|ip |period_begin|period_end|
+----+------+-------+------------+----------+
|1 |03 |1.1.1.1|701130000 |701150030 |
|1 |03 |1.2.3.4|702010000 |702083204 |
|1 |03 |1.1.1.1|702120000 |702120100 |
|1 |05 |2.2.2.2|701134523 |701150035 |
+----+------+-------+------------+----------+
【讨论】:
以上是关于根据时间范围和第二个参数减少数据帧的主要内容,如果未能解决你的问题,请参考以下文章