使用 RDD 列表作为数据帧过滤操作的参数

Posted

技术标签:

【中文标题】使用 RDD 列表作为数据帧过滤操作的参数【英文标题】:use RDD list as parameter for dataframe filter operation 【发布时间】:2017-09-15 09:09:55 【问题描述】:

我有以下代码sn-p。

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import * 

sc = SparkContext()
spark = SparkSession.builder.appName("test").getOrCreate()

schema = StructType([                                                                           
         StructField("name", StringType(), True),
         StructField("a", StringType(), True),
         StructField("b", StringType(), True),
         StructField("c", StringType(), True),
         StructField("d", StringType(), True),
         StructField("e", StringType(), True),
         StructField("f", StringType(), True)])

arr = [("Alice", "1", "2", None, "red", None, None), \
       ("Bob", "1", None, None, None, None, "apple"), \
       ("Charlie", "2", "3", None, None, None, "orange")]

df = spark.createDataFrame(arr, schema)
df.show()

#+-------+---+----+----+----+----+------+
#|   name|  a|   b|   c|   d|   e|     f|
#+-------+---+----+----+----+----+------+
#|  Alice|  1|   2|null| red|null|  null|
#|    Bob|  1|null|null|null|null| apple|  
#|Charlie|  2|   3|null|null|null|orange|
#+-------+---+----+----+----+----+------+

现在,我有一个类似的 RDD:

lrdd = sc.parallelize([['a', 'b'], ['c', 'd', 'e'], ['f']])

我的目标是找到具有空属性子集的名称,即在上面的示例中:

'c,d,e': ['Bob', 'Charlie'], 'f': ['Alice']

现在,我想到了一个相当幼稚的解决方案,即收集列表,然后循环遍历查询数据框的子集。

def build_filter_condition(l):
    return ' AND '.join(["( is NULL)".format(x) for x in l])

res = 
for alist in lrdd.collect():
    cond = build_filter_condition(alist)
    p = df.select("name").where(cond)
    if p and p.count() > 0:
        res[','.join(alist)] = p.rdd.map(lambda x: x[0]).collect()

print(res)

这很有效,但效率极低。 还要考虑目标属性架构类似于 10000 个属性,导致 lrdd 中有超过 600 个不相交的列表。

所以,我的问题是: 如何有效地使用分布式集合的内容作为查询 sql 数据框的参数? 任何提示表示赞赏。

非常感谢。

【问题讨论】:

【参考方案1】:

您应该重新考虑数据的格式。而不是有这么多列,你应该explode 它来获得更多行以允许分布式计算:

import pyspark.sql.functions as psf
df = df.select(
    "name", 
    psf.explode(
        psf.array(
            *[psf.struct(
                psf.lit(c).alias("feature_name"), 
                df[c].alias("feature_value")
            ) for c in df.columns if c != "name"]
        )
    ).alias("feature")
).select("name", "feature.*")

    +-------+------------+-------------+
    |   name|feature_name|feature_value|
    +-------+------------+-------------+
    |  Alice|           a|            1|
    |  Alice|           b|            2|
    |  Alice|           c|         null|
    |  Alice|           d|          red|
    |  Alice|           e|         null|
    |  Alice|           f|         null|
    |    Bob|           a|            1|
    |    Bob|           b|         null|
    |    Bob|           c|         null|
    |    Bob|           d|         null|
    |    Bob|           e|         null|
    |    Bob|           f|        apple|
    |Charlie|           a|            2|
    |Charlie|           b|            3|
    |Charlie|           c|         null|
    |Charlie|           d|         null|
    |Charlie|           e|         null|
    |Charlie|           f|       orange|
    +-------+------------+-------------+

我们将对lrdd 做同样的事情,但我们会先对其进行一些更改:

subsets = spark\
    .createDataFrame(lrdd.map(lambda l: [l]), ["feature_set"])\
    .withColumn("feature_name", psf.explode("feature_set"))

    +-----------+------------+
    |feature_set|feature_name|
    +-----------+------------+
    |     [a, b]|           a|
    |     [a, b]|           b|
    |  [c, d, e]|           c|
    |  [c, d, e]|           d|
    |  [c, d, e]|           e|
    |        [f]|           f|
    +-----------+------------+

现在我们可以在feature_name 上加入这些,并过滤feature_setname,其feature_value 完全为空。如果 lrdd 表不是太大,你应该broadcast

df_join = df.join(psf.broadcast(subsets), "feature_name")
res = df_join.groupBy("feature_set", "name").agg(
    psf.count("*").alias("count"), 
    psf.sum(psf.isnull("feature_value").cast("int")).alias("nb_null")
).filter("nb_null = count")

    +-----------+-------+-----+-------+
    |feature_set|   name|count|nb_null|
    +-----------+-------+-----+-------+
    |  [c, d, e]|Charlie|    3|      3|
    |        [f]|  Alice|    1|      1|
    |  [c, d, e]|    Bob|    3|      3|
    +-----------+-------+-----+-------+

之后你可以随时groupByfeature_set

【讨论】:

你成就了我的一天!我将此标记为“已接受的答案”。当然,由于输入数据的大小,我应该避免使用groupBy,这会导致驱动程序过载。无论如何,这绝对是一个明智的入手方法。谢谢。 很高兴能帮上忙 :)【参考方案2】:

你可以试试这个方法。

首先交叉连接两个数据框

    from pyspark.sql.types import *
    lrdd = sc.parallelize([['a', 'b'], ['c', 'd', 'e'], ['f']]).
                         map(lambda x: ("key", x))

    schema = StructType([StructField("K", StringType()),
                         StructField("X", ArrayType(StringType()))])

    df2 = spark.createDataFrame(lrdd, schema).select("X")
    df3 = df.crossJoin(df2)

交叉连接的结果

    +-------+---+----+----+----+----+------+---------+
|   name|  a|   b|   c|   d|   e|     f|        X|
+-------+---+----+----+----+----+------+---------+
|  Alice|  1|   2|null| red|null|  null|   [a, b]|
|  Alice|  1|   2|null| red|null|  null|[c, d, e]|
|  Alice|  1|   2|null| red|null|  null|      [f]|
|    Bob|  1|null|null|null|null| apple|   [a, b]|
|Charlie|  2|   3|null|null|null|orange|   [a, b]|
|    Bob|  1|null|null|null|null| apple|[c, d, e]|
|    Bob|  1|null|null|null|null| apple|      [f]|
|Charlie|  2|   3|null|null|null|orange|[c, d, e]|
|Charlie|  2|   3|null|null|null|orange|      [f]|
+-------+---+----+----+----+----+------+---------+

现在使用 udf 过滤掉行

from pyspark.sql.functions import udf, struct, collect_list 

def foo(data):

    d = list(filter(lambda x: data[x], data['X']))
    print(d)
    if len(d)>0:
        return(False)
    else:
        return(True)

udf_foo = udf(foo, BooleanType())

df4 = df3.filter(udf_foo(struct([df3[x] for x in df3.columns]))).select("name", 'X')



df4.show()
+-------+---------+
|   name|        X|
+-------+---------+
|  Alice|      [f]|
|    Bob|[c, d, e]|
|Charlie|[c, d, e]|
+-------+---------+

然后使用 groupby 和 collect_list 得到想要的输出

df4.groupby("X").agg(collect_list("name").alias("name")).show()
 +--------------+---------+
 |   name       |        X|
 +--------------+---------+
 | [ Alice]     |      [f]|
 |[Bob, Charlie]|[c, d, e]|
 +--------------+---------+

【讨论】:

哇。这看起来很神奇。我没有想到crossJoin。谢谢。不幸的是,在最大的情况下它无法org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeRowJoiner grows beyond 64 KB。 More details。我会进一步调查你的方法。再次感谢。

以上是关于使用 RDD 列表作为数据帧过滤操作的参数的主要内容,如果未能解决你的问题,请参考以下文章

如何使用列表包含值来过滤列的数据帧

PySpark:从数据框列表创建 RDD

如果包含字符串列表,则过滤 pyspark 数据帧

pyspark 行列表的 RDD 到 DataFrame

熊猫数据帧的 PySpark rdd

导致具有多个参数的过滤列表的自定义操作?