PySpark 更新某些列的值

Posted

技术标签:

【中文标题】PySpark 更新某些列的值【英文标题】:PySpark update values for certain columns 【发布时间】:2017-05-10 15:57:19 【问题描述】:

我正在努力确定更新多列中的值但返回整个数据集的最佳方法 - spark 变化很快,许多答案似乎已经过时。

我在一个小型集群上运行 spark 2.1,创建如下数据框:

df = spark.read.options(header="true",sep = '|').csv(path = 'file:///usr//local//raw_data//somefile.txt')

print df.columns 
['ID','field1','field2','field3','value'] #there are actually many more columns, this is just an example

我需要将下面的映射函数应用于field1、field2和field3,但保留整个数据集

def mappingFunction(val,dict):
    if val in dict:
        return dict(val)
    else:
        return val

非常简单,我可以在 pandas 中这样做:

df['field1'] = df['field1'].map(mapDict)
df['field2'] = df['field2'].map(mapDict)
df['field3'] = df['field3'].map(mapDict)

在 pyspark 中,我看到有一个 df.rdd.map() 功能,但这似乎是一种“过时”的方法来解决这个问题——而且我已经将基础数据集按列拆分,所以我没有我想我应该回到 RDD。

我还看到了 pyspark.sql.functions.udf(f, returnType=StringType),这似乎是我想要使用的。

我的问题是:

有人可以确认在这种情况下定义 UDF 是正确的方法吗?

如果是这样,我如何一次将 UDF 应用到多个列? 因为我要遍历行,所以最好的查询设计似乎是应用我的映射函数一次到所有三列,但我不确定如何在我正在做的所有其他事情的背景下做到这一点。

我该如何做才能返回完整的数据集,并更新这些值? 我将要进行的所有聚合/操作都需要使用更新后的列价值观。

感谢任何见解!

【问题讨论】:

【参考方案1】:

您最好将字典转换为 broadcast 变量,然后定义查找 udf 并使用生成器表达式将其应用于所有相关列:

让我们先创建一个虚拟数据集和字典:

df = sc.parallelize([
    ("a",1,1,2,2),
    ("b",2,2,3,3),
    ("c",3,4,3,3)]).toDF(['ID','field1','field2','field3','value'])

myDict = 1: "y", 2: "x", 3: "z"

现在我们将字典转换为broadcast 变量并定义查找udf

broadcastVar = sc.broadcast(myDict) 

def lookup(x):

  if broadcastVar.value.get(x) is None:
    return x
  else:
    return broadcastVar.value.get(x)

lookup_udf = udf(lookup)

现在剩下的就是生成一个列名list,我们将把我们的函数应用到(包含"field"的所有内容),并将其放入带有udf的生成器表达式中:

from pyspark.sql.functions import col

cols = [s for s in df.columns if "field" in s]
df.select(*(lookup_udf(col(c)).alias(c) if c in cols else c for c in df.columns)).show()
+---+------+------+------+-----+
| ID|field1|field2|field3|value|
+---+------+------+------+-----+
|  a|     y|     y|     x|    2|
|  b|     x|     x|     z|    3|
|  c|     z|     4|     z|    3|
+---+------+------+------+-----+

【讨论】:

谢谢!很有帮助!最后一个问题 - 我希望这些更新是“永久性的” - 然后我可以运行其他聚合/计算。现在你的最后一个输出只是一个 .show() 函数。我是否会将最后一行替换为: df = df.select(*(lookup_udf(col(c)).alias(c) if c in cols else c for c in df.columns)).collect() 您可以只分配没有.show() 的输出,例如df1 = df.select(..,我以.show() 结尾以打印转换后数据的外观。最后不要使用collect(),这会将数据带入驱动节点。 呃——我知道那个。谢谢!

以上是关于PySpark 更新某些列的值的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:转换DataFrame中给定列的值

Pyspark:用字典中的值替换列的值

PySpark:如何根据其他行值的值更改行+列的值

Pyspark:如何根据另一列的值填充空值

用字典键值(pyspark)替换火花df中一列的值

pyspark:比较给定列的值时从数据框中获取公共数据