Fitter Spark RDD 基于过滤不同 RDD 的结果

Posted

技术标签:

【中文标题】Fitter Spark RDD 基于过滤不同 RDD 的结果【英文标题】:Fitter Spark RDD based on result from filtering of different RDD 【发布时间】:2015-11-18 20:44:03 【问题描述】:
conf = SparkConf().setAppName("my_app")
with SparkContext(conf=conf) as sc:
    sqlContext = SQLContext(sc)
    df = sqlContext.read.parquet(*s3keys)

    # this gives me distinct values as list
    rdd = df.filter(
            (1442170800000 <= df.timestamp) & (
                df.timestamp <= 1442185200000) & (
                    df.lat > 40.7480) & (df.lat < 40.7513) & (
                        df.lon > -73.8492) & (
                            df.lon < -73.8438)).map(lambda p: p.userid).distinct()

    # how do I apply the above list to filter another rdd? 
    df2 = sqlContext.read.parquet(*s3keys_part2)
    # example:
    rdd = df2.filter(df2.col1 in (rdd values from above))

【问题讨论】:

这似乎是将两个数据框连接在一起的工作。 谢谢。你能再扩展一下吗?我主要来自熊猫,不确定这个概念如何适用于 RDD。 【参考方案1】:

正如Matthew Graves 所述,您需要的是加入。这或多或少的意思是这样的:

pred = ((1442170800000 <= df.timestamp) & 
        (df.timestamp <= 1442185200000) &
        (df.lat > 40.7480) &
        (df.lat < 40.7513) &
        (df.lon > -73.8492) &
        (df.lon < -73.8438))

users = df.filter(pred).select("userid").distinct()

users.join(df2, users.userid == df2.col1)

【讨论】:

【参考方案2】:

这是 Scala 代码,而不是 Python,但希望它仍然可以作为示例。

val x = 1 to 9
val df2 = sc.parallelize(x.map(a => (a,a*a))).toDF()
val df3 = sc.parallelize(x.map(a => (a,a*a*a))).toDF()

这为我们提供了两个数据帧,每个数据帧都有名为 _1 和 _2 的列,它们是前九个自然数及其平方/立方体。

val fil = df2.filter("_1 < 5") // Nine is too many, let's go to four.
val filJoin = fil.join(df3,fil("_1") === df3("_1")
filJoin.collect

这让我们明白了:

Array[org.apache.spark.sql.Row] = Array([1,1,1,1], [2,4,2,8], [3,9,3,27], [4,16,4,64])

要将此应用于您的问题,我将从以下内容开始:

rdd2 = rdd.join(df2, rdd.userid == df2.userid, 'inner')

但请注意,我们需要告诉它要加入哪些列,对于df2,这可能不是userid。我还建议您使用 .select('userid').distinct() 而不是 map(lambda p: p.userid),这样它仍然是一个数据框。

您可以了解更多关于加入here。

【讨论】:

以上是关于Fitter Spark RDD 基于过滤不同 RDD 的结果的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD数据过滤

过滤计数等于输入文件 rdd Spark 的列

在 spark rdd 中过滤索引

Spark任务流程笔记

07 Spark RDD编程 综合实例 英文词频统计

Spark 开发调优