PySpark - RDD 中对象的时间重叠

Posted

技术标签:

【中文标题】PySpark - RDD 中对象的时间重叠【英文标题】:PySpark - Time Overlap for Object in RDD 【发布时间】:2015-07-01 02:49:29 【问题描述】:

我的目标是根据时间重叠对对象进行分组。

我的rdd 中的每个对象都包含一个start_timeend_time

我可能效率低下,但我计划做的是根据每个对象是否与任何其他对象有任何时间重叠,为每个对象分配一个重叠 ID。我有时间重叠的逻辑。然后,我希望按overlap_id分组。

首先,

mapped_rdd = rdd.map(assign_overlap_id)
final_rdd = mapped_rdd.reduceByKey(combine_objects)

现在这是我的问题。如何编写 assign_overlap_id 函数?

def assign_overlap_id(x):
  ...
  ...
  return (overlap_id, x)

【问题讨论】:

如何定义重叠?任何种类,开始,结束,内部,相等? 是的,我正在寻找任何类型的时间重叠 如果一个对象与多个其他对象重叠会发生什么?所有三个都会获得相同的重叠 ID 吗?此重叠 ID 是否会与其他具有共享重叠对象的重叠 ID 合并?这将很快变得复杂。还是我误会了? 没错,任何具有任何类型重叠的对象都应该具有相同的重叠id。 overlay_id 可以是唯一的,或者 RDD 中的每个对象都可以具有相同的 overlay_id 【参考方案1】:

使用 Spark SQL 和数据框的简单解决方案:

斯卡拉:

import org.apache.spark.sql.functions.udf

case class Interval(start_time: Long, end_time: Long)

val rdd = sc.parallelize(
    Interval(0, 3) :: Interval(1, 4) ::
    Interval(2, 5) :: Interval(3, 4) ::
    Interval(5, 8) :: Interval(7, 10) :: Nil
)

val df = sqlContext.createDataFrame(rdd)

// Simple check if a given intervals overlap
def overlaps(start_first: Long, end_first: Long,
        start_second: Long, end_second: Long):Boolean = 
    (start_second > start_first & start_second < end_first) |
    (end_second > start_first & end_second < end_first) 


// Register udf and data frame aliases
// It look like Spark SQL doesn't support
// aliases in FROM clause [1] so we have to
// register df twice
sqlContext.udf.register("overlaps", overlaps)
df.registerTempTable("df1")
df.registerTempTable("df2")

// Join and filter
sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show

使用 PySpark 也是如此

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

rdd = sc.parallelize([
    (0, 3), (1, 4), 
    (2, 5), (3, 4),
    (5, 8), (7, 10)
])

df = sqlContext.createDataFrame(rdd, ('start_time', 'end_time'))

def overlaps(start_first, end_first, start_second, end_second):
    return ((start_first < start_second < end_first) or
        (start_first < end_second < end_first))

sqlContext.registerFunction('overlaps', overlaps, BooleanType())
df.registerTempTable("df1")
df.registerTempTable("df2")

sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show()

按窗口分组的低级转换

更聪明一点的方法是使用某个指定宽度的窗口生成候选对。这是一个相当简化的解决方案:

斯卡拉:

// Generates list of "buckets" for a given interval
def genRange(interval: Interval) = interval match 
    case Interval(start_time, end_time) => 
      (start_time / 10L * 10L) to (((end_time / 10) + 1) * 10) by 1
    



// For each interval generate pairs (bucket, interval)
val pairs = rdd.flatMap( (i: Interval) => genRange(i).map((r) => (r, i)))

// Join (in the worst case scenario it is still O(n^2)
// But in practice should be better than a naive
// Cartesian product
val candidates = pairs.
    join(pairs).
    map(
        case (k, (Interval(s1, e1), Interval(s2, e2))) => (s1, e1, s2, e2)
   ).distinct


// For each candidate pair check if there is overlap
candidates.filter  case (s1, e1, s2, e2) => overlaps(s1, e1, s2, e2) 

Python:

def genRange(start_time, end_time):
    return xrange(start_time / 10L * 10L, ((end_time / 10) + 1) * 10)

pairs = rdd.flatMap(lambda (s, e): ((r, (s, e)) for r in genRange(s, e)))
candidates = (pairs
    .join(pairs)
    .map(lambda (k, ((s1, e1), (s2, e2))): (s1, e1, s2, e2))
    .distinct())

candidates.filter(lambda (s1, e1, s2, e2): overlaps(s1, e1, s2, e2))

虽然在某些数据集上对于生产就绪的解决方案就足够了,但您应该考虑实施一些最先进的算法,例如 NCList。

    http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

【讨论】:

这太棒了 - 非常感谢!我尝试了第二个,现在将尝试使用数据框和 SparkSQL 的第一个建议

以上是关于PySpark - RDD 中对象的时间重叠的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:在“NoneType”对象上过滤掉 RDD 元素失败是不可迭代的

Pyspark:从列表的 RDD 创建一个火花数据框,其中列表的某些元素是对象

如何使用 pyspark.resultiterable.ResultIterable 对象

PySpark 使用函数创建多索引配对 RDD

Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame

AttributeError:“元组”对象在 pyspark 中没有属性“startswith”