Spark / Scala - RDD填充最后一个非空值

Posted

技术标签:

【中文标题】Spark / Scala - RDD填充最后一个非空值【英文标题】:Spark/Scala - RDD fill with last non null value 【发布时间】:2021-10-25 02:13:26 【问题描述】:

我有一个如下所示的 rdd:

timestamp,user_id,search_id        
[2021-08-14 14:38:31,user_a,null]
[2021-08-14 14:42:01,user_a,ABC]
[2021-08-14 14:55:12,user_a,null]
[2021-08-14 14:56:19,user_a,null] 
[2021-08-14 15:01:36,user_a,null]
[2021-08-14 15:02:22,user_a,null]
[2021-08-15 07:38:07,user_b,XYZ] 
[2021-08-15 07:39:59,user_b,null]    

我想通过用 user_id 分组的最新非空值(如果有)填充“search_id”中的空值,将没有 search_id 的事件与以前的 search_ids 关联。

因此,我的输出将如下所示:

timestamp,user_id,search_id        
[2021-08-14 14:38:31,user_a,null]
[2021-08-14 14:42:01,user_a,ABC]
[2021-08-14 14:55:12,user_a,ABC]
[2021-08-14 14:56:19,user_a,ABC] 
[2021-08-14 15:01:36,user_a,ABC]
[2021-08-14 15:02:22,user_a,ABC]
[2021-08-15 07:38:07,user_b,XYZ] 
[2021-08-15 07:39:59,user_b,XYZ]    

我找到了使用 org.apache.spark.sql.functions.last 和此处的窗口的 spark 数据帧的解决方案 --> Spark Window function last not null value 但我的上下文目前不允许我将 rdd 转换为数据帧,所以我想知道是否有任何你知道如何做到这一点。

【问题讨论】:

“我的上下文不允许我将 rdd 转换为数据帧”是什么意思? 抱歉,应该指定:由于遗留问题,我需要在仍在使用 spark 【参考方案1】:

我猜 groupBy 用户 (https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/rdd/RDD.html#groupBy(scala.Function1,%20scala.reflect.ClassTag) ) 然后 flatMapWith (不要忘记对分组项目进行排序,因为 groupBy 不保留顺序)这将修复您的搜索 ID。所有这一切都假设您每个用户没有太多项目

【讨论】:

【参考方案2】:

完成此操作的一种方法是知道我们需要调用 lag() 函数的最大次数。 试试这个。

输入:

val df1=spark.sql("""
select timestamp'2021-08-14 14:38:31' timestamp, 'user_a' user_id, 'null' search_id union all 
select '2021-08-14 14:42:01' , 'user_a', 'ABC'  union all 
select '2021-08-14 14:55:12' , 'user_a', 'null'  union all 
select '2021-08-14 14:56:19' , 'user_a', 'null'   union all 
select '2021-08-14 15:01:36' , 'user_a', 'null'  union all 
select '2021-08-14 15:02:22' , 'user_a', 'null'  union all 
select '2021-08-15 07:38:07' , 'user_b', 'XYZ'   union all 
select '2021-08-15 07:39:59' , 'user_b', 'null'  
""")
df1.orderBy("timestamp").show(false)
df1.printSchema
df1.createOrReplaceTempView("df1")

+-------------------+-------+---------+
|timestamp          |user_id|search_id|
+-------------------+-------+---------+
|2021-08-14 14:38:31|user_a |null     |
|2021-08-14 14:42:01|user_a |ABC      |
|2021-08-14 14:55:12|user_a |null     |
|2021-08-14 14:56:19|user_a |null     |
|2021-08-14 15:01:36|user_a |null     |
|2021-08-14 15:02:22|user_a |null     |
|2021-08-15 07:38:07|user_b |XYZ      |
|2021-08-15 07:39:59|user_b |null     |
+-------------------+-------+---------+

现在计算最大次数

val max_count = spark.sql(" select max(c) from (select count(*) c from df1 group by user_id)").as[Long].first
max_count: Long = 6

创建一个可变数据框,以便我们可以循环并将其分配给相同的 df。

var df2=df1

for( i <- 1 to max_count.toInt )

    df2=df2.withColumn("search_id",expr(""" case when search_id <> 'null' then search_id 
                         else lag(search_id) over(partition by user_id order by timestamp) end """))

df2.orderBy("timestamp").show(false)

+-------------------+-------+---------+
|timestamp          |user_id|search_id|
+-------------------+-------+---------+
|2021-08-14 14:38:31|user_a |null     |
|2021-08-14 14:42:01|user_a |ABC      |
|2021-08-14 14:55:12|user_a |ABC      |
|2021-08-14 14:56:19|user_a |ABC      |
|2021-08-14 15:01:36|user_a |ABC      |
|2021-08-14 15:02:22|user_a |ABC      |
|2021-08-15 07:38:07|user_b |XYZ      |
|2021-08-15 07:39:59|user_b |XYZ      |
+-------------------+-------+---------+

【讨论】:

以上是关于Spark / Scala - RDD填充最后一个非空值的主要内容,如果未能解决你的问题,请参考以下文章

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

Spark RDD API(scala)

从 Scala 上的 Spark RDD 对象构建 RDD LabeledPoint

来自 RDD 映射的 Spark Scala 序列化错误

rdd.mapPartitions 从 Spark Scala 中的 udf 返回布尔值