使用 Dataframes 的 Spark Overlap 算法

Posted

技术标签:

【中文标题】使用 Dataframes 的 Spark Overlap 算法【英文标题】:Spark Overlap algorithm using Dataframes 【发布时间】:2019-02-04 15:53:10 【问题描述】:

给定一个包含字段的数据源:product_id - product - start_time - end_time

我正在尝试使用 Dataframe 函数为相同的product(基于start_timeend_time)构建捕获重叠记录的逻辑。

------------------------------------------------
| product_id | product | start_time | end_time |
------------------------------------------------
|      1     | bottle  |     2      |    4     |
|      2     | bottle  |     3      |    5     |
|      3     | bottle  |     2      |    3     |
|      4     | bottle  |     6      |    7     |
|      1     |   can   |     2      |    4     |
|      2     |   can   |     5      |    6     |
|      3     |   can   |     2      |    4     |

我想在输出中接收

-------------------------------------------------------------------------------------------------
| product_id_a | product_id_b | product | start_time_a | end_time_a | start_time_b | end_time_b |
-------------------------------------------------------------------------------------------------
|       1      |       2      | bottle  |      2       |     4      |      3       |     5      |
|       1      |       3      | bottle  |      2       |     4      |      2       |     3      |

因为bottle_1bottle_2bottle_3有重叠时间,如果满足以下条件,则有两条记录重叠:

max(a.start_time, b.start_time) < min(a.end_time, b.end_time) !(a.start_time == b.start_time && a.end_time == b.end_time) a.start_time != b.start_time || a.end_time != b.end_time

最后两个条件只是指定我对start_timeend_time 相等的情况不感兴趣(例如can_1can_3 不在预期结果中,即使它们具有相同的@987654341 @ 和 end_time)。

问题的结构很容易想到使用 RDD 的 MapReduce 解决方案,但我对 Dataframes 的解决方案感兴趣。

提示:有没有可能用groupBy().agg() 指定一个有趣的条件来达到所描述的逻辑?

如有任何进一步的解释,请随时询问

不重复,属于How to aggregate over rolling time window with groups in Spark

不幸的是,在报告的答案中使用了F.lag,在我的情况下,这不是一个足够好的条件:F.lag 仅使用与以前记录的比较,但在报告的示例中无法按预期工作,因为那bottle_1 不会被报告为与 bottle_3 重叠,因为它们不是连续的记录

【问题讨论】:

【参考方案1】:

每个条件都可以直接翻译成SQL

from pyspark.sql.functions import col, least, greatest

cond1 = (
    greatest(col("a.start_time"), col("b.start_time")) < 
    least(col("a.end_time"), col("b.end_time"))
)

cond2 = ~(
    (col("a.start_time") == col("b.start_time")) & 
    (col("a.end_time") == col("b.end_time"))
)

cond3 = (
    (col("a.start_time") != col("b.start_time")) | 
    (col("a.end_time") != col("b.end_time"))
)

所以你可以加入和过滤。

(df.alias("a").join(df.alias("b"), ["product"]).filter(cond1 & cond2 & cond3))

【讨论】:

【参考方案2】:

基于@Andronicus solution,我在纯Python 中提出了这种方法。

有必要自己加入DataFrame 以检查行是否重叠。当然,您需要使用条件df.product_id &lt; duplicate_df.product_id 省略self(两个相同的Row 和相反的product_ids 重叠)。

整个代码:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [(1, "bottle", 2, 4),
     (2, "bottle", 3, 5),
     (3, "bottle", 2, 3),
     (4, "bottle", 6, 7),
     (1, "can", 2, 4),
     (2, "can", 5, 6),
     (3, "can", 2, 4)], 
     ['product_id', 'product', 'start_time', 'end_time'])

duplicate_df = df

conditions = [df.product == duplicate_df.product,
              df.product_id < duplicate_df.product_id,
              df.start_time != duplicate_df.start_time, 
              df.end_time != duplicate_df.end_time,
              F.least(df.end_time, duplicate_df.end_time) >
              F.greatest(df.start_time, duplicate_df.start_time)]

df.join(duplicate_df, conditions)

【讨论】:

【参考方案3】:

试试这个:

df.join(cloneDf, $"label").where($"label" !== $"label1").where($"min" < $"max1").where($"min1" < $"max").show()

您需要制作DataFrame 的笛卡尔积来检查,如果行重叠,您可以随意映射它们。当然,您需要省略 self - 两个相同的 Rows 重叠。

整个代码:

val df = SparkEmbedded.ss.createDataFrame(Seq(
  (1, 2, 5),
  (2, 4, 7),
  (3, 6, 9)
)).toDF("product_id", "min", "max")
import SparkEmbedded.ss.implicits._
val cloneDf = df.select(df.columns.map(col):_*)
    .withColumnRenamed("product_id", "product_id1")
    .withColumnRenamed("min", "min1")
    .withColumnRenamed("max", "max1")
df.crossJoin(cloneDf)
  .where($"product_id" < $"product_id1")
  .where($"min" < $"max1")
  .where($"min1" < $"max").show()

为了清楚起见,我拆分了where 子句。

结果是:

+-----+---+---+------+----+----+
|label|min|max|label1|min1|max1|
+-----+---+---+------+----+----+
|    1|  2|  5|     2|   4|   7|
|    2|  4|  7|     3|   6|   9|
+-----+---+---+------+----+----+

这个例子是用 Scala 编写的,但是 Python 有类似的 API。

【讨论】:

非常感谢!两种解决方案(您的和@user11013893)都帮助我解决了我的问题!你的解决方案有where($"label" &lt; $"label1"),它帮助我避免了重复的结果,而另一个提供了更有趣的功能。可能crossJoin 不是必需的:基于product 的普通内部连接(参见示例)就足够了。我不明白为什么我的问题被否决了(我投了赞成票的答案也是如此)。但我认为您的答案在where($"label" &lt; $"label1") 中使用&lt; condition 更有用。我会接受你的 为了提高答案的可读性,您能否编辑您的帖子并使其更接近我报告的示例?所以把where($"label" &lt; $"label1")改成where($"product_id" &lt; $"product_id1"),基于product的join...等等 我建议将 python 代码放在单独的答案中,因为它完全改变了编程风格。 我开始使用 python 使用 ML,但它对我来说太动态了。 Scala 是完美的(我专业地用 Java 开发);>

以上是关于使用 Dataframes 的 Spark Overlap 算法的主要内容,如果未能解决你的问题,请参考以下文章

使用 Dataframes 从 Informix 到 Spark 的 JDBC

如何使用 Spark DataFrames 查询 JSON 数据列?

如何在 spark dataframes/spark sql 中使用模式读取 json

是否可以将 Spark 中的 data.table 与 Spark Dataframes 一起使用?

大数据(spark sql 和 spark dataframes 连接)

使用 Spark DataFrames 在特定时间范围内获取唯一计数