对两列应用 Window.partitionBy 以在 pyspark 中获取 n-core 数据集

Posted

技术标签:

【中文标题】对两列应用 Window.partitionBy 以在 pyspark 中获取 n-core 数据集【英文标题】:apply Window.partitionBy for two columns to get n-core dataset in pyspark 【发布时间】:2021-12-26 09:09:26 【问题描述】:

我有一个包含 2M 个条目的数据集,其中包含用户、项目、评级信息。我想过滤掉数据,使其包含至少有 2 个用户评分的项目和至少有 2 个项目评分的用户。我可以使用窗口函数完成一个约束,但不知道如何完成。

输入:

user product rating
J p1 3
J p2 4
M p1 4
M p3 3
B p2 3
B p4 3
B p3 3
N p3 2
N p5 4

这里是示例数据。

from pyspark import SparkContext
from pyspark.sql import SparkSession
# Create Spark Context
sc = SparkSession.builder.master("local[*]")\
     .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.2")\
     .getOrCreate()

sampleData = (("J", "p1", 3), \
    ("J", "p2", 4),  \
    ("M", "p1", 4),   \
    ("M", "p3", 3),  \
    ("B", "p2", 3),  \
    ("B", "p4", 3),  \
    ("B", "p3", 3),  \
    ("N", "p3", 2),\
    ("N", "p5", 4) \
  )
 
columns= ["user", "product", "rating"]

df = sc.createDataFrame(data = sampleData, schema = columns)

想要的输出是,

user product rating
J p1 3
J p2 4
M p1 4
M p3 3
B p2 3
B p3 3

我用来满足“至少评价 2 个项目的用户”的窗口函数是

from pyspark.sql import functions as F
from pyspark.sql.functions import  count, col
from pyspark.sql.window import Window

window = Window.partitionBy("user")

df.withColumn("count", F.count("rating").over(window))\
    .filter(F.col("count") >= 2).drop("count")

【问题讨论】:

【参考方案1】:

下面呢?

df = spark.createDataFrame(data = sampleData, schema = columns)
df_p = df.groupBy('product').count().filter('count >= 2').select('product')
df = df.join(df_p, ['product'], 'inner')
df_u = df.select('user').groupBy('user').count().filter('count >= 
2').select('user')
df = df.join(df_u, ['user'], 'inner')

给出以下输出:

user product rating
B p2 3
B p3 3
M p1 4
M p3 3
J p2 4
J p1 3

【讨论】:

感谢这项工作! @krishthw 我看到你收回了答案。这不能解决您想要的其他情况吗? 抱歉。有用!如果我可以使用窗口功能,我会很高兴。无论如何,我可以接受你的!谢谢! 很高兴它成功了!【参考方案2】:

您可以使用两个窗口函数来做到这一点。我对df语法不太熟悉,这里是sql:

df.createOrReplaceTempView("ratings")

spark.sql("""
SELECT USER,
       product,
       rating,
       Count(*)OVER (partition BY USER )    num_ratings_for_user,
       Count(*)OVER (partition BY product ) num_raters_for_product
FROM   ratings 
""")

您可以对此进行过滤。

【讨论】:

【参考方案3】:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

window1 = Window.partitionBy("user")
window2 = Window.partitionBy("product")

df.withColumn("count_users", F.count("rating").over(window1))\
  .filter(F.col("count_users") >= 2)\
  .withColumn("count_prod", F.count("rating").over(window2))\
  .filter(F.col("count_prod") >= 2)\
  .drop("count", "count_users", "count_prod")\
  .show()

用户 N 评价了超过 1 个产品,因此输出应为:

+----+-------+------+
|user|product|rating|
+----+-------+------+
|   J|     p1|     3|
|   M|     p1|     4|
|   B|     p2|     3|
|   J|     p2|     4|
|   B|     p3|     3|
|   M|     p3|     3|
|   N|     p3|     2|
+----+-------+------+

【讨论】:

N 不应出现在输出中。如果有 N,则两个约束都不会满足。 “我想过滤掉数据,使其包含至少有 2 个用户评价的项目和至少评价 2 个项目的用户” N 评价了 2 个项目,P3 被超过 2 个用户评价 @Luiz Viola,是的,过滤后的数据集应该满足这两个约束。不是输入。这就是为什么我包括“......所以它包括” 如果我的措辞让你感到困惑,我很抱歉。谢谢你的努力! @Luiz Viola,如果您熟悉它,那么使用窗口函数获得所需的输出会很棒。谢谢 我不懂逻辑。阅读您的要求,考虑 N - P3 是有意义的。我不知道你希望它如何被窗口函数处理

以上是关于对两列应用 Window.partitionBy 以在 pyspark 中获取 n-core 数据集的主要内容,如果未能解决你的问题,请参考以下文章

CreateCriteria 对两列组合进行排序

对两列数据进行排序并保留不重复的值

在 Postgresql 中,对两列的组合强制唯一

在 Postgresql 中,对两列的组合强制唯一

对两列字符串数据执行一次热编码

选择对两列求和并希望将结果用作选择条件的查询 [重复]