PySpark DataFrame:标记某些列值更改的行

Posted

技术标签:

【中文标题】PySpark DataFrame:标记某些列值更改的行【英文标题】:PySpark DataFrame: Mark rows where some column value changes 【发布时间】:2018-08-29 19:06:10 【问题描述】:

我有一个 PySpark 数据框,其中包含“人员”和“时间戳”列(以及与问题无关的其他列)。解释是用户当时做了某事。

我想对一个“人”的所有行进行分组,其中“时间戳”的差异不超过“阈值”值(例如 5 分钟)。

有什么想法可以在 PySpark 中实现这一点吗?最好使用 DataFrame 作为结果?

欣赏你的想法!

【问题讨论】:

【参考方案1】:

假设您有['people','timestamp','activity'] 的列

SData = Row("people","session_start", "session_end")

def getSessions(dt):
    info = dt[1]
    data = []
    session_start = info[0][0]
    session_end = info[0][0]
    for x in info[1:]:
        if ((x[1] - session_end) > 5*60*1000):
            data.append(SData(dt[0], session_start, session_end)
            session_start = x[1]
        session_end = x[1]
    data.append(SData(dt[0],session_start, session_end))
    return data


rdd  = df.rdd.map(lambda x: (x[0],(x[1],x[2])))

df = rdd.groupByKey().mapValues(lambda x: sorted(x, key=lambda z:z)).flatMap(getSessions).toDF()

基本上是把它映射到rdd,然后再映射到df。

另一种没有 rdd 的方法是创建一个 udf 返回会话数组。最后我们可以使用explode来获取数据行。

【讨论】:

以上是关于PySpark DataFrame:标记某些列值更改的行的主要内容,如果未能解决你的问题,请参考以下文章

PySpark Dataframe 将两列转换为基于第三列值的元组新列

PySpark DataFrame 根据另一列中时间戳值的最小/最大条件更新列值

KeyError:在熊猫中尝试使用.loc方法将布尔列值更改为字符串时

识别 pyspark 中第一次出现的列值,然后根据它增加另一列

在 PySpark 数据框中拆分和计算列值

PySpark:数据框:Numeric + Null 列值导致 NULL 而不是数值