如何使用 Spark DataFrame 将最后一个非空值结转到后续行

Posted

技术标签:

【中文标题】如何使用 Spark DataFrame 将最后一个非空值结转到后续行【英文标题】:how to carry over last non empty value to subsequent rows using Spark DataFrame 【发布时间】:2015-10-28 00:04:52 【问题描述】:

我有一个像这样的稀疏数据集:

  ip,ts,session
  "123","1","s1"
  "123","2",""
  "123","3",""
  "123","4",""
  "123","10","s2"
  "123","11",""
  "123","12",""
  "222","5","s6"
  "222","6",""
  "222","7",""

我需要像这样使它密集:

  ip,ts,session
  "123","1","s1"
  "123","2","s1"
  "123","3","s1"
  "123","4","s1"
  "123","10","s2"
  "123","11","s2"
  "123","12","s2"
  "222","5","s6"
  "222","6","s6"
  "222","7","s6"

我知道如何使用 RDD - 通过 ip 和在 partitionMap groupBy(ip).sortBy(ts).scan()() 内重新分区:扫描函数会将先前计算的值结转到下一次迭代并决定使用先前值或保持当前值并将新选择传递给下一次“扫描”迭代

现在我尝试仅使用 DataFrame,而不返回 RDD。 我正在查看 Window 函数,但我能想到的只是组内的第一个值,这是不一样的。或者我只是不明白如何创建正确的范围。

【问题讨论】:

你能把ts列为序列号吗? 【参考方案1】:

您可以使用多个自联接来实现。基本上,您想创建一个包含所有“开始会话”记录的数据集(filter($"session" !== "")),然后将其与原始数据集相结合,过滤掉“会话开始”晚于当前会话的记录(@ 987654323@)。然后你想找出每个ipmax($"r_ts")。最后一次连接只是为了从原始数据集中检索session 值。

data.join(
  data.filter($"session" !== "").select(
    $"ip" as "r_ip", $"session" as "r_session", $"ts" as "r_ts"
  ), 
  $"ip" === $"r_ip"
)
.filter($"ts" >= $"r_ts")
.groupBy($"ip",$"ts")
.agg(max($"r_ts") as "r_ts")
.join(
  data.select($"session",$"ts" as "l_ts"), 
  $"r_ts" === $"l_ts"
)
.select($"ip",$"ts",$"session")

顺便说一句,我的解决方案假定列 ts 类似于事务序列——它是一个递增的 Int 值。如果不是,您可以使用我的DataFrame-ified zipWithIndex solution 创建一个用于相同目的的列。

【讨论】:

ts 在我的例子中是一个时间戳,所以是的,它正在增加 我认为你有一个错误: .join( data.select($"session",$"ts" as "l_ts"), $"r_ts" === $"l_ts" :这也应该使用 ip 作为连接的一部分,它应该加入到你有会话和 r_ts 和 ip 的第一部分。但我明白了!谢谢一堆!【参考方案2】:

我的最终代码重用了 David Griffin 的想法:dataWithSessionSparse 是我的问题中描述的起始数据集

val denseSessRecordsOnly = dataWithSessionSparse
  .filter(col("sessionId") !== "")
  .select(col("ip").alias("r_ip"), col("sessionId").alias("r_sessionId"), col("ts").alias("r_ts")) // isolates first records for all sessions

val dataWithSessionDense = dataWithSessionSparse
  .join(denseSessRecordsOnly, col("ip") === col("r_ip")) // explodes each event to relate to all sessions within ip
  .filter(col("ts") >= col("r_ts")) //flters out exploded dataset to have each event to be related to sessions prior or at the time of event
  .groupBy(col("ip"),col("ts")).agg(max(col("r_ts")).alias("r_ts")) //takes sessionId with max ts.
  .join(
    denseSessRecordsOnly.select(col("r_ip").alias("l_ip"),col("r_sessionId").alias("sessionId"),col("r_ts").alias("l_ts")),
    col("r_ts") === col("l_ts") && col("ip")===col("l_ip"))
  .select(col("ip"),col("ts"),col("sessionId"))

【讨论】:

以上是关于如何使用 Spark DataFrame 将最后一个非空值结转到后续行的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL - 如何将 DataFrame 写入文本文件?

Spark:如何将 DataFrame 更改为 LibSVM 并执行逻辑回归

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

如何将多维数组添加到现有的 Spark DataFrame

Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?

如何从 Spark 2.0 中的 DataFrame 列创建数据集?