在pyspark中旋转一行的值

Posted

技术标签:

【中文标题】在pyspark中旋转一行的值【英文标题】:Rotating the values of a row in pyspark 【发布时间】:2020-02-06 20:32:42 【问题描述】:

我目前正在清理数据集,并且一直在尝试使用 pyspark。数据从 csv 读入数据帧,我需要的值在它们各自的行中,但对于某些行,值是混合的。我需要旋转这些行的值,以便这些值位于正确的列中。例如,假设我有以下数据集:

+-------+-------+-------+
|   A   |   B   |   C   |
+-------+-------+-------+
|   2   |   3   |   1   |
+-------+-------+-------+

但第一行的值应该是

+-------+-------+-------+
|   A   |   B   |   C   |
+-------+-------+-------+
|   1   |   2   |   3   |
+-------+-------+-------+

我当前的解决方案是添加一个临时列,并为每一列重新分配值,并在删除旧列的同时重命名临时列:

// Add temporary column C
+-------+-------+-------+-------+
|   A   |   B   |   C   | tmp_C |
+-------+-------+-------+-------+
|   2   |   3   |   1   |   1   |
+-------+-------+-------+-------+
// Shift values
+-------+-------+-------+-------+
|   A   |   B   |   C   | tmp_C |
+-------+-------+-------+-------+
|   2   |   2   |   3   |   1   |
+-------+-------+-------+-------+
// Drop old column
+-------+-------+-------+
|   B   |   C   | tmp_C |
+-------+-------+-------+
|   2   |   3   |   1   |
+-------+-------+-------+
// Rename new column
+-------+-------+-------+
|   B   |   C   |   A   |
+-------+-------+-------+
|   2   |   3   |   1   |
+-------+-------+-------+

我在 pyspark 中实现的方式如下:

from pyspark.sql import SparkSession
from pyspark.sql.function import when, col

def clean_data(spark_session, file_path):
    df = (
        spark_session
        .read
        .csv(file_path, header='true')
    )

    df = (
        df
        .withColumn(
            "tmp_C",
            when(
                col("C") == 1,
                col("C")
            ).otherwise("A")
        )
        .withColumn(
            "C",
            when(
                col("C") == 1,
                col("B")
            ).otherwise("C")
        )
        .withColumn(
            "B",
            when(
                col("C") == 1,
                col("A")
            ).otherwise("B")
        )
    )

    df = df.drop("A")
    df = df.withColumnRenamed("tmp_C", "A")

    return df

对我来说,这看起来不太好,我不确定这是解决这个问题的最佳方法。我对 Spark 很陌生,想知道解决这种情况的最佳方法,尽管这确实有效。另外,我还想知道这是否是 Spark 的一个很好的用例(请注意,我使用的数据集很大,而且还有比这更多的字段。上面的例子大大简化了)。

【问题讨论】:

【参考方案1】:

好吧,如果您按旋转顺序将每一列映射到其对应的列中,这可能会更快。

// generate columns map
maps = dict(zip(['C', 'A', 'B'], ['A', 'B', 'C']))

// regular approach:
// select columns with alias maps
df.select([col(c).alias(maps.get(c, c)) for c in df.columns])

// row scan approach:
// select columns with alias maps that satisfied specific condition
df.select([when(<map-condition>, col(c).alias(maps.get(c, c))).otherwise(col(c)) for c in df.columns])

希望这会有所帮助。

【讨论】:

是否有办法实现这一点,这样只有在行显示格式不正确时才会发生旋转(错误在存在的所有行上都相同)?由于某些行的格式正确,因此别名会导致数据反向不正确。在考虑了更多问题之后,听起来我除了扫描所有行并使用临时列执行轮换之外别无选择。这听起来对吗? 没错,这种方法可以完美扫描满足与不正确显示格式相关的地图条件的行。您可以查看我的附加更新,并将 更改为您自己的自定义地图条件。 太棒了,逻辑相似,但这是一个更好的解决方案。谢谢! 不客气。充满信心地继续编码和部署!【参考方案2】:

是的,如果数据集很大,您应该使用 Spark。

您最好只重命名列而不是移动实际数据?假设这个数据问题是系统的,就像你的例子一样。由于基于列名而不是位置重命名的问题,这有点令人费解,因此您必须先更改为临时名称。

from functools import reduce

old_cols = df.columns
new_cols = old_cols[1:] + [old_cols[0]]
temp_cols = [col + "_" for col in new_cols]
# Rename columns with temporary names
df_temp = reduce(lambda df, idx: df.withColumnRenamed(old_cols[idx], temp_cols[idx]), range(len(old_cols)), df)
# Rename columns to align with correct data
df = reduce(lambda df_temp, idx: df_temp.withColumnRenamed(temp_cols[idx], new_cols[idx]), range(len(temp_cols)), df_temp)
# Then revert back to original column order
df = df.select(old_cols)

【讨论】:

我忘记在代码中添加的一件事是,在每次调用 withColumn 时,我都有一个参数检查该行是否有错误(是的,错误是系统性的)。因此,如果所有行都显示相同的错误,但某些行是正确的,则此解决方案将起作用。我将进行编辑以反映这一点。

以上是关于在pyspark中旋转一行的值的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据框为每一行获得第二低的值

如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框

如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?

将每一行的值汇总为布尔值(PySpark)

如何过滤 PySpark 中数组列中的值?

在 Pyspark 中旋转时无法解析列名