如何在 pyspark Hive SQL 中获取等效的 postgres 命令“nth_value”?

Posted

技术标签:

【中文标题】如何在 pyspark Hive SQL 中获取等效的 postgres 命令“nth_value”?【英文标题】:How to get postgres command 'nth_value' equivalent in pyspark Hive SQL? 【发布时间】:2020-07-21 21:28:48 【问题描述】:

我正在解决这个例子: https://www.windowfunctions.com/questions/grouping/5 在这里,他们使用 Oracle 或 postgres 命令 nth_value 来获得答案,但这在 pyspark 使用的 Hive SQL 中没有实现,我想知道如何在 pyspark 中获得相同的结果。

postgres sql 代码

所有大于第 4 的权重都被分配到第 4 小的权重 前三个最轻的权重被赋值为 99.9

select name, weight, 
coalesce(nth_value(weight, 4) over (order by weight), 99.9) as imagined_weight
from cats 
order by weight

问题:如何使用 pyspark 得到以下结果?

name    weight  imagined_weight
Tigger  3.8 99.9
Molly   4.2 99.9
Ashes   4.5 99.9
Charlie 4.8 4.8
Smudge  4.9 4.8
Felix   5.0 4.8
Puss    5.1 4.8
Millie  5.4 4.8
Alfie   5.5 4.8
Misty   5.7 4.8
Oscar   6.1 4.8
Smokey  6.1 4.8

数据

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext, SQLContext
spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sqc = sqlContext
# spark_df = sqlContext.createDataFrame(pandas_df)

df = pd.DataFrame(
    'name': [
        'Molly', 'Ashes', 'Felix', 'Smudge', 'Tigger', 'Alfie', 'Oscar',
        'Millie', 'Misty', 'Puss', 'Smokey', 'Charlie'
    ],
    'breed': [
        'Persian', 'Persian', 'Persian', 'British Shorthair',
        'British Shorthair', 'Siamese', 'Siamese', 'Maine Coon', 'Maine Coon',
        'Maine Coon', 'Maine Coon', 'British Shorthair'
    ],
    'weight': [4.2, 4.5, 5.0, 4.9, 3.8, 5.5, 6.1, 5.4, 5.7, 5.1, 6.1, 4.8],
    'color': [
        'Black', 'Black', 'Tortoiseshell', 'Black', 'Tortoiseshell', 'Brown',
        'Black', 'Tortoiseshell', 'Brown', 'Tortoiseshell', 'Brown', 'Black'
    ],
    'age': [1, 5, 2, 4, 2, 5, 1, 5, 2, 2, 4, 4]
)

schema = StructType([
    StructField('name', StringType(), True),
    StructField('breed', StringType(), True),
    StructField('weight', DoubleType(), True),
    StructField('color', StringType(), True),
    StructField('age', IntegerType(), True),
])

sdf = sqlContext.createDataFrame(df, schema)
sdf.createOrReplaceTempView("cats")

spark.sql('select * from cats limit 2').show()

到目前为止我的尝试

# My attempt
q = """
select weight from (
  select name,weight, 
         ROW_NUMBER() over (ORDER BY weight) as row_no
  from cats group by weight,name
  ) res 
where res.row_no = 4
"""
spark.sql(q).show()

【问题讨论】:

【参考方案1】:

另一种选择是row_number() 和条件窗口函数:

select
    name,
    weight,
    coalesce(
        max(case when rn = 4 then weight end) over(order by rn),
        99.9
    ) imagined_weight
from (select c.*, row_number() over(order by weight) rn from cats c) c

【讨论】:

以上是关于如何在 pyspark Hive SQL 中获取等效的 postgres 命令“nth_value”?的主要内容,如果未能解决你的问题,请参考以下文章

如何获取 HIVE/PySpark 表中每一列的唯一值?

如何在 pyspark 中使用“不存在”的 SQL 条件?

如何使用 jupyter notebook 在 pyspark 中的 Hive 上使用 %sql Magic 字符串启用 spark SQL

如何使用 Hive 上下文中的 Pyspark 调用用 Java 编写的 Hive UDF

在 Pyspark/Hive 中处理不断变化的数据类型

无法使用 pyspark 从 hive 表中查询复杂的 SQL 语句