根据时间范围和第二个参数减少数据帧

Posted

技术标签:

【中文标题】根据时间范围和第二个参数减少数据帧【英文标题】:Reducing Dataframe based on time range and a second parameter 【发布时间】:2018-07-11 13:17:34 【问题描述】:

我有一组类似于以下的数据,我正试图找到一种方法来减少在 python 中使用 spark 数据帧。

uuid  if_id start_time           end_time             ip_addr
1     03    2018/07/01 13:00:00  2018/07/01 13:00:01  1.1.1.1
1     03    2018/07/01 13:01:05  2018/07/01 13:02:00  1.1.1.1
1     03    2018/07/01 15:00:00  2018/07/01 15:00:30  1.1.1.1
1     03    2018/07/02 01:00:00  2018/07/02 01:00:07  1.2.3.4
1     03    2018/07/02 08:30:00  2018/07/02 08:32:04  1.2.3.4
1     03    2018/07/02 12:00:00  2018/07/02 12:01:00  1.1.1.1
1     05    2018/07/01 15:00:02  2018/07/01 15:00:35  2.2.2.2
1     05    2018/07/01 13:45:23  2018/07/01 13:45:40  2.2.2.2

我需要将上述数据减少到以下:

uuid  if_id start_time           end_time             ip_addr
1     03    2018/07/01 13:00:00  2018/07/01 15:00:30  1.1.1.1
1     03    2018/07/02 01:00:00  2018/07/02 08:32:04  1.2.3.4
1     03    2018/07/02 12:00:00  2018/07/02 12:01:00  1.1.1.1
1     05    2018/07/01 13:45:23  2018/07/01 15:00:35  2.2.2.2

最终数据集应表示一个表格,显示在给定时间段(start_time 到 end_time)内,哪个 IP 地址被分配给由 uuid 标识的特定主机的接口 (if_id)。

如果给定接口不可能随时间改变 IP 地址,例如 uuid=1 和 if_id=3,则可以使用 groupBy 和窗口规范来处理,以提取最小 start_time 和最大时间结束。但是,鉴于地址可以更改,我不确定如何在不多次传递数据的情况下解决此问题。

任何建议的方法将不胜感激。

【问题讨论】:

How to aggregate over rolling time window with groups in Spark 【参考方案1】:

使用 user8371915 建议的链接,我想出了以下解决方案。

import pyspark.sql.functions as func
from pyspark.sql.window import Window

df = spark.createDataFrame([Row(uuid=1, int_id='03', event_start=701130000, event_end=701130001, ip='1.1.1.1'),
                            Row(uuid=1, int_id='03', event_start=701130105, event_end=701130200, ip='1.1.1.1'),
                            Row(uuid=1, int_id='05', event_start=701134523, event_end=701134540, ip='2.2.2.2'),
                            Row(uuid=1, int_id='03', event_start=701150000, event_end=701150030, ip='1.1.1.1'),
                            Row(uuid=1, int_id='05', event_start=701150002, event_end=701150035, ip='2.2.2.2'),
                            Row(uuid=1, int_id='03', event_start=702010000, event_end=702010007, ip='1.2.3.4'),
                            Row(uuid=1, int_id='03', event_start=702083000, event_end=702083204, ip='1.2.3.4'),
                            Row(uuid=1, int_id='03', event_start=702120000, event_end=702120100, ip='1.1.1.1')])

window1 = Window.partitionBy('uuid', 'int_id').orderBy('event_start', 'event_end')

window2 = Window.partitionBy('uuid', 'int_id', 'time_group') \
                .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# get previous row's ip address
prev_ip = func.lag('ip', 1).over(window1)

#indicate if an IP address change has occurred between current and previous rows
indicator = func.coalesce((col('ip') != prev_ip).cast('integer'), lit(1))

# Cumulative sum of indicators over the window
time_group = func.sum(indicator).over(window1).alias('time_group')

#Add time_group expression to the table:
df = df.select('*', time_group)

# Add begin and end time period for each interface ip address
df = df.select('uuid', 'int_id', 'ip', 
               func.min('event_start').over(window2).alias('period_begin'),
               func.max('event_end').over(window2).alias('period_end')) \
       .dropDuplicates() \
       .orderBy('uuid', 'int_id', 'period_begin', 'ip')

df.show(truncate=False)

以上产生以下结果: +----+------+-------+------------+----------+ |uuid|int_id|ip |period_begin|period_end| +----+------+-------+------------+----------+ |1 |03 |1.1.1.1|701130000 |701150030 | |1 |03 |1.2.3.4|702010000 |702083204 | |1 |03 |1.1.1.1|702120000 |702120100 | |1 |05 |2.2.2.2|701134523 |701150035 | +----+------+-------+------------+----------+

【讨论】:

以上是关于根据时间范围和第二个参数减少数据帧的主要内容,如果未能解决你的问题,请参考以下文章

1061 Dating

根据第一个数据帧从第二个数据帧获取数据

如何根据第二个数据帧映射第一个数据帧中的值?

根据第二个数据帧的匹配列更新熊猫数据帧

如何为数据系列中的数据范围命名?

如何根据来自其他 pyspark 数据帧的日期值过滤第二个 pyspark 数据帧?