使用 Dataframes 的 Spark Overlap 算法
Posted
技术标签:
【中文标题】使用 Dataframes 的 Spark Overlap 算法【英文标题】:Spark Overlap algorithm using Dataframes 【发布时间】:2019-02-04 15:53:10 【问题描述】:给定一个包含字段的数据源:product_id
- product
- start_time
- end_time
我正在尝试使用 Dataframe 函数为相同的product
(基于start_time
和end_time
)构建捕获重叠记录的逻辑。
------------------------------------------------
| product_id | product | start_time | end_time |
------------------------------------------------
| 1 | bottle | 2 | 4 |
| 2 | bottle | 3 | 5 |
| 3 | bottle | 2 | 3 |
| 4 | bottle | 6 | 7 |
| 1 | can | 2 | 4 |
| 2 | can | 5 | 6 |
| 3 | can | 2 | 4 |
我想在输出中接收
-------------------------------------------------------------------------------------------------
| product_id_a | product_id_b | product | start_time_a | end_time_a | start_time_b | end_time_b |
-------------------------------------------------------------------------------------------------
| 1 | 2 | bottle | 2 | 4 | 3 | 5 |
| 1 | 3 | bottle | 2 | 4 | 2 | 3 |
因为bottle_1
与bottle_2
和bottle_3
有重叠时间,如果满足以下条件,则有两条记录重叠:
max(a.start_time, b.start_time) < min(a.end_time, b.end_time)
!(a.start_time == b.start_time && a.end_time == b.end_time)
a.start_time != b.start_time || a.end_time != b.end_time
最后两个条件只是指定我对start_time
和end_time
相等的情况不感兴趣(例如can_1
和can_3
不在预期结果中,即使它们具有相同的@987654341 @ 和 end_time
)。
问题的结构很容易想到使用 RDD 的 MapReduce 解决方案,但我对 Dataframes 的解决方案感兴趣。
提示:有没有可能用groupBy().agg()
指定一个有趣的条件来达到所描述的逻辑?
如有任何进一步的解释,请随时询问
不重复,属于How to aggregate over rolling time window with groups in Spark
不幸的是,在报告的答案中使用了F.lag
,在我的情况下,这不是一个足够好的条件:F.lag 仅使用与以前记录的比较,但在报告的示例中无法按预期工作,因为那bottle_1
不会被报告为与 bottle_3
重叠,因为它们不是连续的记录
【问题讨论】:
【参考方案1】:每个条件都可以直接翻译成SQL
from pyspark.sql.functions import col, least, greatest
cond1 = (
greatest(col("a.start_time"), col("b.start_time")) <
least(col("a.end_time"), col("b.end_time"))
)
cond2 = ~(
(col("a.start_time") == col("b.start_time")) &
(col("a.end_time") == col("b.end_time"))
)
cond3 = (
(col("a.start_time") != col("b.start_time")) |
(col("a.end_time") != col("b.end_time"))
)
所以你可以加入和过滤。
(df.alias("a").join(df.alias("b"), ["product"]).filter(cond1 & cond2 & cond3))
【讨论】:
【参考方案2】:基于@Andronicus solution,我在纯Python 中提出了这种方法。
有必要自己加入DataFrame
以检查行是否重叠。当然,您需要使用条件df.product_id < duplicate_df.product_id
省略self(两个相同的Row
和相反的product_id
s 重叠)。
整个代码:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, "bottle", 2, 4),
(2, "bottle", 3, 5),
(3, "bottle", 2, 3),
(4, "bottle", 6, 7),
(1, "can", 2, 4),
(2, "can", 5, 6),
(3, "can", 2, 4)],
['product_id', 'product', 'start_time', 'end_time'])
duplicate_df = df
conditions = [df.product == duplicate_df.product,
df.product_id < duplicate_df.product_id,
df.start_time != duplicate_df.start_time,
df.end_time != duplicate_df.end_time,
F.least(df.end_time, duplicate_df.end_time) >
F.greatest(df.start_time, duplicate_df.start_time)]
df.join(duplicate_df, conditions)
【讨论】:
【参考方案3】:试试这个:
df.join(cloneDf, $"label").where($"label" !== $"label1").where($"min" < $"max1").where($"min1" < $"max").show()
您需要制作DataFrame
的笛卡尔积来检查,如果行重叠,您可以随意映射它们。当然,您需要省略 self - 两个相同的 Row
s 重叠。
整个代码:
val df = SparkEmbedded.ss.createDataFrame(Seq(
(1, 2, 5),
(2, 4, 7),
(3, 6, 9)
)).toDF("product_id", "min", "max")
import SparkEmbedded.ss.implicits._
val cloneDf = df.select(df.columns.map(col):_*)
.withColumnRenamed("product_id", "product_id1")
.withColumnRenamed("min", "min1")
.withColumnRenamed("max", "max1")
df.crossJoin(cloneDf)
.where($"product_id" < $"product_id1")
.where($"min" < $"max1")
.where($"min1" < $"max").show()
为了清楚起见,我拆分了where
子句。
结果是:
+-----+---+---+------+----+----+
|label|min|max|label1|min1|max1|
+-----+---+---+------+----+----+
| 1| 2| 5| 2| 4| 7|
| 2| 4| 7| 3| 6| 9|
+-----+---+---+------+----+----+
这个例子是用 Scala 编写的,但是 Python 有类似的 API。
【讨论】:
非常感谢!两种解决方案(您的和@user11013893)都帮助我解决了我的问题!你的解决方案有where($"label" < $"label1")
,它帮助我避免了重复的结果,而另一个提供了更有趣的功能。可能crossJoin
不是必需的:基于product
的普通内部连接(参见示例)就足够了。我不明白为什么我的问题被否决了(我投了赞成票的答案也是如此)。但我认为您的答案在where($"label" < $"label1")
中使用< condition
更有用。我会接受你的
为了提高答案的可读性,您能否编辑您的帖子并使其更接近我报告的示例?所以把where($"label" < $"label1")
改成where($"product_id" < $"product_id1")
,基于product
的join...等等
我建议将 python 代码放在单独的答案中,因为它完全改变了编程风格。
我开始使用 python 使用 ML,但它对我来说太动态了。 Scala 是完美的(我专业地用 Java 开发);>以上是关于使用 Dataframes 的 Spark Overlap 算法的主要内容,如果未能解决你的问题,请参考以下文章
使用 Dataframes 从 Informix 到 Spark 的 JDBC
如何使用 Spark DataFrames 查询 JSON 数据列?
如何在 spark dataframes/spark sql 中使用模式读取 json
是否可以将 Spark 中的 data.table 与 Spark Dataframes 一起使用?