在pyspark中旋转一行的值
Posted
技术标签:
【中文标题】在pyspark中旋转一行的值【英文标题】:Rotating the values of a row in pyspark 【发布时间】:2020-02-06 20:32:42 【问题描述】:我目前正在清理数据集,并且一直在尝试使用 pyspark。数据从 csv 读入数据帧,我需要的值在它们各自的行中,但对于某些行,值是混合的。我需要旋转这些行的值,以便这些值位于正确的列中。例如,假设我有以下数据集:
+-------+-------+-------+
| A | B | C |
+-------+-------+-------+
| 2 | 3 | 1 |
+-------+-------+-------+
但第一行的值应该是
+-------+-------+-------+
| A | B | C |
+-------+-------+-------+
| 1 | 2 | 3 |
+-------+-------+-------+
我当前的解决方案是添加一个临时列,并为每一列重新分配值,并在删除旧列的同时重命名临时列:
// Add temporary column C
+-------+-------+-------+-------+
| A | B | C | tmp_C |
+-------+-------+-------+-------+
| 2 | 3 | 1 | 1 |
+-------+-------+-------+-------+
// Shift values
+-------+-------+-------+-------+
| A | B | C | tmp_C |
+-------+-------+-------+-------+
| 2 | 2 | 3 | 1 |
+-------+-------+-------+-------+
// Drop old column
+-------+-------+-------+
| B | C | tmp_C |
+-------+-------+-------+
| 2 | 3 | 1 |
+-------+-------+-------+
// Rename new column
+-------+-------+-------+
| B | C | A |
+-------+-------+-------+
| 2 | 3 | 1 |
+-------+-------+-------+
我在 pyspark 中实现的方式如下:
from pyspark.sql import SparkSession
from pyspark.sql.function import when, col
def clean_data(spark_session, file_path):
df = (
spark_session
.read
.csv(file_path, header='true')
)
df = (
df
.withColumn(
"tmp_C",
when(
col("C") == 1,
col("C")
).otherwise("A")
)
.withColumn(
"C",
when(
col("C") == 1,
col("B")
).otherwise("C")
)
.withColumn(
"B",
when(
col("C") == 1,
col("A")
).otherwise("B")
)
)
df = df.drop("A")
df = df.withColumnRenamed("tmp_C", "A")
return df
对我来说,这看起来不太好,我不确定这是解决这个问题的最佳方法。我对 Spark 很陌生,想知道解决这种情况的最佳方法,尽管这确实有效。另外,我还想知道这是否是 Spark 的一个很好的用例(请注意,我使用的数据集很大,而且还有比这更多的字段。上面的例子大大简化了)。
【问题讨论】:
【参考方案1】:好吧,如果您按旋转顺序将每一列映射到其对应的列中,这可能会更快。
// generate columns map
maps = dict(zip(['C', 'A', 'B'], ['A', 'B', 'C']))
// regular approach:
// select columns with alias maps
df.select([col(c).alias(maps.get(c, c)) for c in df.columns])
// row scan approach:
// select columns with alias maps that satisfied specific condition
df.select([when(<map-condition>, col(c).alias(maps.get(c, c))).otherwise(col(c)) for c in df.columns])
希望这会有所帮助。
【讨论】:
是否有办法实现这一点,这样只有在行显示格式不正确时才会发生旋转(错误在存在的所有行上都相同)?由于某些行的格式正确,因此别名会导致数据反向不正确。在考虑了更多问题之后,听起来我除了扫描所有行并使用临时列执行轮换之外别无选择。这听起来对吗? 没错,这种方法可以完美扫描满足与不正确显示格式相关的地图条件的行。您可以查看我的附加更新,并将是的,如果数据集很大,您应该使用 Spark。
您最好只重命名列而不是移动实际数据?假设这个数据问题是系统的,就像你的例子一样。由于基于列名而不是位置重命名的问题,这有点令人费解,因此您必须先更改为临时名称。
from functools import reduce
old_cols = df.columns
new_cols = old_cols[1:] + [old_cols[0]]
temp_cols = [col + "_" for col in new_cols]
# Rename columns with temporary names
df_temp = reduce(lambda df, idx: df.withColumnRenamed(old_cols[idx], temp_cols[idx]), range(len(old_cols)), df)
# Rename columns to align with correct data
df = reduce(lambda df_temp, idx: df_temp.withColumnRenamed(temp_cols[idx], new_cols[idx]), range(len(temp_cols)), df_temp)
# Then revert back to original column order
df = df.select(old_cols)
【讨论】:
我忘记在代码中添加的一件事是,在每次调用withColumn
时,我都有一个参数检查该行是否有错误(是的,错误是系统性的)。因此,如果所有行都显示相同的错误,但某些行是正确的,则此解决方案将起作用。我将进行编辑以反映这一点。以上是关于在pyspark中旋转一行的值的主要内容,如果未能解决你的问题,请参考以下文章
如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框