Apache Spark:数据框中行值列表的百分比

Posted

技术标签:

【中文标题】Apache Spark:数据框中行值列表的百分比【英文标题】:Apache Spark: Percentile of list of row values in dataframe 【发布时间】:2017-10-03 00:15:28 【问题描述】:

我有一个带有一组计算列的 Apache Spark 数据框。对于数据框中的每一行(大约 2000 行),我希望获取 10 列的行值并找到第 11 列相对于其他 10 列的最接近的值。

我想我会采用这些行值并将其转换为列表,然后使用 abs 值计算来确定最接近的值。

但我被困在如何将行值转换为列表的问题上。我已经取出一列并使用 collect_list 将这些值转换为列表,但不确定当列表来自单行和多列时如何处理。

【问题讨论】:

【参考方案1】:

你应该explode你的列,这样你就可以线性化你的计算。

让我们创建一个示例数据框:

import numpy as np
np.random.seed(0)
df = sc.parallelize([np.random.randint(0, 10, 11).tolist() for _ in range(20)])\
    .toDF(["col" + str(i) for i in range(1, 12)])
df.show()
    +----+----+----+----+----+----+----+----+----+-----+-----+
    |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|col11|
    +----+----+----+----+----+----+----+----+----+-----+-----+
    |   5|   0|   3|   3|   7|   9|   3|   5|   2|    4|    7|
    |   6|   8|   8|   1|   6|   7|   7|   8|   1|    5|    9|
    |   8|   9|   4|   3|   0|   3|   5|   0|   2|    3|    8|
    |   1|   3|   3|   3|   7|   0|   1|   9|   9|    0|    4|
    |   7|   3|   2|   7|   2|   0|   0|   4|   5|    5|    6|
    |   8|   4|   1|   4|   9|   8|   1|   1|   7|    9|    9|
    |   3|   6|   7|   2|   0|   3|   5|   9|   4|    4|    6|
    |   4|   4|   3|   4|   4|   8|   4|   3|   7|    5|    5|
    |   0|   1|   5|   9|   3|   0|   5|   0|   1|    2|    4|
    |   2|   0|   3|   2|   0|   7|   5|   9|   0|    2|    7|
    |   2|   9|   2|   3|   3|   2|   3|   4|   1|    2|    9|
    |   1|   4|   6|   8|   2|   3|   0|   0|   6|    0|    6|
    |   3|   3|   8|   8|   8|   2|   3|   2|   0|    8|    8|
    |   3|   8|   2|   8|   4|   3|   0|   4|   3|    6|    9|
    |   8|   0|   8|   5|   9|   0|   9|   6|   5|    3|    1|
    |   8|   0|   4|   9|   6|   5|   7|   8|   8|    9|    2|
    |   8|   6|   6|   9|   1|   6|   8|   8|   3|    2|    3|
    |   6|   3|   6|   5|   7|   0|   8|   4|   6|    5|    8|
    |   2|   3|   9|   7|   5|   3|   4|   5|   3|    3|    7|
    |   9|   9|   9|   7|   3|   2|   3|   9|   7|    7|    5|
    +----+----+----+----+----+----+----+----+----+-----+-----+

有几种方法可以将行值转换为列表:

创建一个map,其键等于列名,值等于相应的行值。

import pyspark.sql.functions as psf
from itertools import chain
df = df\
    .withColumn("id", psf.monotonically_increasing_id())\
    .select(
        "id", 
        psf.posexplode(
            psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns if c != "col11"])))
        ).alias("pos", "col_name", "value"), "col11")
df.show()
    +---+---+--------+-----+-----+
    | id|pos|col_name|value|col11|
    +---+---+--------+-----+-----+
    |  0|  0|    col1|    5|    7|
    |  0|  1|    col2|    0|    7|
    |  0|  2|    col3|    3|    7|
    |  0|  3|    col4|    3|    7|
    |  0|  4|    col5|    7|    7|
    |  0|  5|    col6|    9|    7|
    |  0|  6|    col7|    3|    7|
    |  0|  7|    col8|    5|    7|
    |  0|  8|    col9|    2|    7|
    |  0|  9|   col10|    4|    7|
    |  1|  0|    col1|    6|    9|
    |  1|  1|    col2|    8|    9|
    |  1|  2|    col3|    8|    9|
    |  1|  3|    col4|    1|    9|
    |  1|  4|    col5|    6|    9|
    |  1|  5|    col6|    7|    9|
    |  1|  6|    col7|    7|    9|
    |  1|  7|    col8|    8|    9|
    |  1|  8|    col9|    1|    9|
    |  1|  9|   col10|    5|    9|
    +---+---+--------+-----+-----+

ArrayType 中使用StructType

df = df\
    .withColumn("id", psf.monotonically_increasing_id())\
    .select(
        "id", 
        psf.explode(
            psf.array([psf.struct(psf.lit(c).alias("col_name"), psf.col(c).alias("value")) 
                       for c in df.columns if c != "col11"])).alias("cols"), 
        "col11").select("cols.*", "col11", "id")
df.show()
    +--------+-----+-----+---+
    |col_name|value|col11| id|
    +--------+-----+-----+---+
    |    col1|    5|    7|  0|
    |    col2|    0|    7|  0|
    |    col3|    3|    7|  0|
    |    col4|    3|    7|  0|
    |    col5|    7|    7|  0|
    |    col6|    9|    7|  0|
    |    col7|    3|    7|  0|
    |    col8|    5|    7|  0|
    |    col9|    2|    7|  0|
    |   col10|    4|    7|  0|
    |    col1|    6|    9|  1|
    |    col2|    8|    9|  1|
    |    col3|    8|    9|  1|
    |    col4|    1|    9|  1|
    |    col5|    6|    9|  1|
    |    col6|    7|    9|  1|
    |    col7|    7|    9|  1|
    |    col8|    8|    9|  1|
    |    col9|    1|    9|  1|
    |   col10|    5|    9|  1|
    +--------+-----+-----+---+

使用ArrayType...

一旦你有一个分解列表,你可以寻找|col11 - value|的最小值:

from pyspark.sql import Window
w = Window.partitionBy("id").orderBy(psf.abs(psf.col("col11") - psf.col("value")))
res = df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1")
res.sort("id").show()
    +--------+-----+-----+----------+---+
    |col_name|value|col11|        id| rn|
    +--------+-----+-----+----------+---+
    |    col5|    7|    7|         0|  1|
    |    col2|    8|    9|         1|  1|
    |    col1|    8|    8|         2|  1|
    |    col2|    3|    4|         3|  1|
    |    col1|    7|    6|         4|  1|
    |    col5|    9|    9|         5|  1|
    |    col2|    6|    6|         6|  1|
    |   col10|    5|    5|         7|  1|
    |    col3|    5|    4|         8|  1|
    |    col6|    7|    7|         9|  1|
    |    col2|    9|    9|8589934592|  1|
    |    col3|    6|    6|8589934593|  1|
    |    col3|    8|    8|8589934594|  1|
    |    col2|    8|    9|8589934595|  1|
    |    col2|    0|    1|8589934596|  1|
    |    col2|    0|    2|8589934597|  1|
    |    col9|    3|    3|8589934598|  1|
    |    col7|    8|    8|8589934599|  1|
    |    col4|    7|    7|8589934600|  1|
    |    col4|    7|    5|8589934601|  1|
    +--------+-----+-----+----------+---+

【讨论】:

谢谢。因此,一旦我有一列包含这些值的列表,是否可以计算该列表的百分位数? 可以直接用窗函数求差值绝对值的最小值。

以上是关于Apache Spark:数据框中行值列表的百分比的主要内容,如果未能解决你的问题,请参考以下文章

spark数据框中orderBy的列列表

Apache Spark - 将 UDF 的结果分配给多个数据框列

Apache Spark将多行连接成单行列表[重复]

Spark DataFrame 数据框空值判断和处理

如何获取集群的最小值和最大值

Apache POI 中行的自动大小高度