Pyspark - 从具有最小值和最大值范围的数组中获取值

Posted

技术标签:

【中文标题】Pyspark - 从具有最小值和最大值范围的数组中获取值【英文标题】:Pyspark - getting values from an array that has a range of min and max values 【发布时间】:2019-05-15 13:21:40 【问题描述】:

我正在尝试在 PySpark 中编写一个查询,该查询将从数组中获取正确的值。

例如,我有一个名为 df 的数据框,其中包含三列“companyId”、“companySize”和“weightingRange”。 “companySize”列只是员工人数。 “weightingRange”列是一个数组,其中包含以下内容

[ "minimum":0, "maximum":100, "weight":123,
  "minimum":101, "maximum":200, "weight":456,
  "minimum":201, "maximum":500, "weight":789
]

所以数据框看起来像这样(weightingRange 同上,为了更清晰的格式,在下面的示例中将其截断)

+-----------+-------------+------------------------+--+
| companyId | companySize |     weightingRange     |  |
+-----------+-------------+------------------------+--+
| ABC1      |         150 | ["maximum":100, etc] |  |
| ABC2      |          50 | ["maximum":100, etc] |  |
+-----------+-------------+------------------------+--+

因此,对于公司规模 = 150 的条目,我需要将权重 456 返回到名为“companyWeighting”的列中

所以它应该显示以下内容

+-----------+-------------+------------------------+------------------+
| companyId | companySize |     weightingRange     | companyWeighting |
+-----------+-------------+------------------------+------------------+
| ABC1      |         150 | ["maximum":100, etc] |              456 |
| ABC2      |          50 | ["maximum":100, etc] |              123 |
+-----------+-------------+------------------------+------------------+

我看过

df.withColumn("tmp",explode(col("weightingRange"))).select("tmp.*")

然后加入但尝试应用这将笛卡尔数据。

建议赞赏!

【问题讨论】:

你能展示一个数据样本吗?该数组是否在每一行中重复? @mayankagrawal 按要求添加样本。数组对每一行重复。 【参考方案1】:

你可以这样接近,

首先创建一个示例数据框,

import pyspark.sql.functions as F

df = spark.createDataFrame([
        ('ABC1', 150, [ "min":0, "max":100, "weight":123,
                        "min":101, "max":200, "weight":456,
                        "min":201, "max":500, "weight":789]),
        ('ABC2', 50, [  "min":0, "max":100, "weight":123,
                        "min":101, "max":200, "weight":456,
                        "min":201, "max":500, "weight":789])],  

        ['companyId' , 'companySize', 'weightingRange'])

然后,创建一个udf 函数并将其应用于每一行以获取新列,

def get_weight(wt,wt_rnge):
    for _d in wt_rnge:
        if _d['min'] <= wt <= _d['max']:
            return _d['weight']

get_weight_udf = F.udf(lambda x,y: get_weight(x,y))
df = df.withColumn('companyWeighting', get_weight_udf(F.col('companySize'), F.col('weightingRange')))
df.show()

你得到的输出是,

+---------+-----------+--------------------+----------------+
|companyId|companySize|      weightingRange|companyWeighting|
+---------+-----------+--------------------+----------------+
|     ABC1|        150|[Map(weight -> 12...|             456|
|     ABC2|         50|[Map(weight -> 12...|             123|
+---------+-----------+--------------------+----------------+

【讨论】:

以上是关于Pyspark - 从具有最小值和最大值范围的数组中获取值的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:在汇总负数分布时,describe() 函数出错——最小值和最大值翻转

如何缩小具有已知最小值和最大值的数字范围

从 pyspark 数据框中获取多个(100+)列的空计数、最小值和最大值的最佳方法

如何将 jQuery UI 所有选定范围(不仅是最小值和最大值)加载到数组中

PHP:将一个数字分隔为具有最小值和最大值的 10er 数组

在 SQL (Big Query) 中生成序列/范围/数组,其中最小值和最大值取自另一个表