使用 RDD 列表作为数据帧过滤操作的参数
Posted
技术标签:
【中文标题】使用 RDD 列表作为数据帧过滤操作的参数【英文标题】:use RDD list as parameter for dataframe filter operation 【发布时间】:2017-09-15 09:09:55 【问题描述】:我有以下代码sn-p。
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
sc = SparkContext()
spark = SparkSession.builder.appName("test").getOrCreate()
schema = StructType([
StructField("name", StringType(), True),
StructField("a", StringType(), True),
StructField("b", StringType(), True),
StructField("c", StringType(), True),
StructField("d", StringType(), True),
StructField("e", StringType(), True),
StructField("f", StringType(), True)])
arr = [("Alice", "1", "2", None, "red", None, None), \
("Bob", "1", None, None, None, None, "apple"), \
("Charlie", "2", "3", None, None, None, "orange")]
df = spark.createDataFrame(arr, schema)
df.show()
#+-------+---+----+----+----+----+------+
#| name| a| b| c| d| e| f|
#+-------+---+----+----+----+----+------+
#| Alice| 1| 2|null| red|null| null|
#| Bob| 1|null|null|null|null| apple|
#|Charlie| 2| 3|null|null|null|orange|
#+-------+---+----+----+----+----+------+
现在,我有一个类似的 RDD:
lrdd = sc.parallelize([['a', 'b'], ['c', 'd', 'e'], ['f']])
我的目标是找到具有空属性子集的名称,即在上面的示例中:
'c,d,e': ['Bob', 'Charlie'], 'f': ['Alice']
现在,我想到了一个相当幼稚的解决方案,即收集列表,然后循环遍历查询数据框的子集。
def build_filter_condition(l):
return ' AND '.join(["( is NULL)".format(x) for x in l])
res =
for alist in lrdd.collect():
cond = build_filter_condition(alist)
p = df.select("name").where(cond)
if p and p.count() > 0:
res[','.join(alist)] = p.rdd.map(lambda x: x[0]).collect()
print(res)
这很有效,但效率极低。 还要考虑目标属性架构类似于 10000 个属性,导致 lrdd 中有超过 600 个不相交的列表。
所以,我的问题是: 如何有效地使用分布式集合的内容作为查询 sql 数据框的参数? 任何提示表示赞赏。
非常感谢。
【问题讨论】:
【参考方案1】:您应该重新考虑数据的格式。而不是有这么多列,你应该explode
它来获得更多行以允许分布式计算:
import pyspark.sql.functions as psf
df = df.select(
"name",
psf.explode(
psf.array(
*[psf.struct(
psf.lit(c).alias("feature_name"),
df[c].alias("feature_value")
) for c in df.columns if c != "name"]
)
).alias("feature")
).select("name", "feature.*")
+-------+------------+-------------+
| name|feature_name|feature_value|
+-------+------------+-------------+
| Alice| a| 1|
| Alice| b| 2|
| Alice| c| null|
| Alice| d| red|
| Alice| e| null|
| Alice| f| null|
| Bob| a| 1|
| Bob| b| null|
| Bob| c| null|
| Bob| d| null|
| Bob| e| null|
| Bob| f| apple|
|Charlie| a| 2|
|Charlie| b| 3|
|Charlie| c| null|
|Charlie| d| null|
|Charlie| e| null|
|Charlie| f| orange|
+-------+------------+-------------+
我们将对lrdd
做同样的事情,但我们会先对其进行一些更改:
subsets = spark\
.createDataFrame(lrdd.map(lambda l: [l]), ["feature_set"])\
.withColumn("feature_name", psf.explode("feature_set"))
+-----------+------------+
|feature_set|feature_name|
+-----------+------------+
| [a, b]| a|
| [a, b]| b|
| [c, d, e]| c|
| [c, d, e]| d|
| [c, d, e]| e|
| [f]| f|
+-----------+------------+
现在我们可以在feature_name
上加入这些,并过滤feature_set
和name
,其feature_value
完全为空。如果 lrdd 表不是太大,你应该broadcast
它
df_join = df.join(psf.broadcast(subsets), "feature_name")
res = df_join.groupBy("feature_set", "name").agg(
psf.count("*").alias("count"),
psf.sum(psf.isnull("feature_value").cast("int")).alias("nb_null")
).filter("nb_null = count")
+-----------+-------+-----+-------+
|feature_set| name|count|nb_null|
+-----------+-------+-----+-------+
| [c, d, e]|Charlie| 3| 3|
| [f]| Alice| 1| 1|
| [c, d, e]| Bob| 3| 3|
+-----------+-------+-----+-------+
之后你可以随时groupBy
feature_set
【讨论】:
你成就了我的一天!我将此标记为“已接受的答案”。当然,由于输入数据的大小,我应该避免使用groupBy
,这会导致驱动程序过载。无论如何,这绝对是一个明智的入手方法。谢谢。
很高兴能帮上忙 :)【参考方案2】:
你可以试试这个方法。
首先交叉连接两个数据框
from pyspark.sql.types import *
lrdd = sc.parallelize([['a', 'b'], ['c', 'd', 'e'], ['f']]).
map(lambda x: ("key", x))
schema = StructType([StructField("K", StringType()),
StructField("X", ArrayType(StringType()))])
df2 = spark.createDataFrame(lrdd, schema).select("X")
df3 = df.crossJoin(df2)
交叉连接的结果
+-------+---+----+----+----+----+------+---------+
| name| a| b| c| d| e| f| X|
+-------+---+----+----+----+----+------+---------+
| Alice| 1| 2|null| red|null| null| [a, b]|
| Alice| 1| 2|null| red|null| null|[c, d, e]|
| Alice| 1| 2|null| red|null| null| [f]|
| Bob| 1|null|null|null|null| apple| [a, b]|
|Charlie| 2| 3|null|null|null|orange| [a, b]|
| Bob| 1|null|null|null|null| apple|[c, d, e]|
| Bob| 1|null|null|null|null| apple| [f]|
|Charlie| 2| 3|null|null|null|orange|[c, d, e]|
|Charlie| 2| 3|null|null|null|orange| [f]|
+-------+---+----+----+----+----+------+---------+
现在使用 udf 过滤掉行
from pyspark.sql.functions import udf, struct, collect_list
def foo(data):
d = list(filter(lambda x: data[x], data['X']))
print(d)
if len(d)>0:
return(False)
else:
return(True)
udf_foo = udf(foo, BooleanType())
df4 = df3.filter(udf_foo(struct([df3[x] for x in df3.columns]))).select("name", 'X')
df4.show()
+-------+---------+
| name| X|
+-------+---------+
| Alice| [f]|
| Bob|[c, d, e]|
|Charlie|[c, d, e]|
+-------+---------+
然后使用 groupby 和 collect_list 得到想要的输出
df4.groupby("X").agg(collect_list("name").alias("name")).show()
+--------------+---------+
| name | X|
+--------------+---------+
| [ Alice] | [f]|
|[Bob, Charlie]|[c, d, e]|
+--------------+---------+
【讨论】:
哇。这看起来很神奇。我没有想到crossJoin。谢谢。不幸的是,在最大的情况下它无法org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeRowJoiner grows beyond 64 KB
。 More details。我会进一步调查你的方法。再次感谢。以上是关于使用 RDD 列表作为数据帧过滤操作的参数的主要内容,如果未能解决你的问题,请参考以下文章