PySpark:对于每一行,根据条件计算另一个表

Posted

技术标签:

【中文标题】PySpark:对于每一行,根据条件计算另一个表【英文标题】:PySpark: for Each row, count another table based on condition 【发布时间】:2020-07-27 16:53:46 【问题描述】:

对于表 1 中的每一行,我正在尝试计算表 2 中的行数,并根据表 1 中的值满足条件。

表 1 中的年龄应介于表 2 的 StartAge 和 EndAge 之间,或者等于 StartAge 和 EndAge。

这可以使用 udf 和 withColumn 吗?我尝试了几种方法来做到这一点,例如使用 withColumn 和 withColumn 与 UDF,但两种方法都失败了。

def counter(a):
    return table2.where((table2.StartAge <= a) & (table2.EndAge >=a)).count()

counter_udf = udf(lambda age: counter(age), IntegerType())

table1 = table1.withColumn('Count', counter_udf('Age ID'))

这有意义吗? 谢谢。

输入输出示例:

【问题讨论】:

Please don't post images of code/data (or links to them) ,最好将它们粘贴为文本,以便用户复制数据 【参考方案1】:

看看这个。你可以使用 spark-sql 来实现它。

    from pyspark.sql import SparkSession

    spark = SparkSession.builder \
        .appName('SO')\
        .getOrCreate()

    sc= spark.sparkContext

    df = sc.parallelize([([3]), ([4]), ([5])]).toDF(["age"])

    df1 = spark.createDataFrame([(0, 10), (7, 15), (5, 10), (3, 20), (5, 35), (4, 5),]
                           , ['age_start', 'age_end'])

    df.createTempView("table1")

    df1.createTempView("table2")



    spark.sql('select  t1.age as age_id, count(*) as count from table1 t1 join table2  t2 on  t1.age >=t2.age_start and t1.age<=t2.age_end group by t1.age order by count').show()

    # +------+-----+
    # |age_id|count|
    # +------+-----+
    # |     3|    2|
    # |     4|    3|
    # |     5|    5|
    # +------+-----+

【讨论】:

【参考方案2】:

如果你想在你的脚本中使用 UDF,你必须先用 spark 注册它。

使用这行代码应该有助于修复您的错误:

_ = spark.udf.register("counter_udf", counter_udf)

【讨论】:

谢谢你的回答,我明天早上试试,告诉你。但我很好奇,没有UDF有没有办法做到这一点?为什么将注册分配给下划线? @aruydzi 下划线设置为 UDF 注册,以确保编译器计算操作。我相信 Loka 的评论将是非 UDF 方法的一个选项。

以上是关于PySpark:对于每一行,根据条件计算另一个表的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK:根据条件用另一个行值更新一行中的值?

根据 p:dataTable 的每一行中的另一个 p:selectOneMenu 填充 p:selectOneMenu

为 pyspark 数据帧的每一行评估多个 if elif 条件

对于表中的每一行,选择另一个表中由值连接的最近日期

Oracle:Sql根据同一表另一行的值更新同一表的行

根据另一个表中的条件计算表中的行数