PySpark 更新某些列的值
Posted
技术标签:
【中文标题】PySpark 更新某些列的值【英文标题】:PySpark update values for certain columns 【发布时间】:2017-05-10 15:57:19 【问题描述】:我正在努力确定更新多列中的值但返回整个数据集的最佳方法 - spark 变化很快,许多答案似乎已经过时。
我在一个小型集群上运行 spark 2.1,创建如下数据框:
df = spark.read.options(header="true",sep = '|').csv(path = 'file:///usr//local//raw_data//somefile.txt')
print df.columns
['ID','field1','field2','field3','value'] #there are actually many more columns, this is just an example
我需要将下面的映射函数应用于field1、field2和field3,但保留整个数据集
def mappingFunction(val,dict):
if val in dict:
return dict(val)
else:
return val
非常简单,我可以在 pandas 中这样做:
df['field1'] = df['field1'].map(mapDict)
df['field2'] = df['field2'].map(mapDict)
df['field3'] = df['field3'].map(mapDict)
在 pyspark 中,我看到有一个 df.rdd.map() 功能,但这似乎是一种“过时”的方法来解决这个问题——而且我已经将基础数据集按列拆分,所以我没有我想我应该回到 RDD。
我还看到了 pyspark.sql.functions.udf(f, returnType=StringType),这似乎是我想要使用的。
我的问题是:
有人可以确认在这种情况下定义 UDF 是正确的方法吗?
如果是这样,我如何一次将 UDF 应用到多个列? 因为我要遍历行,所以最好的查询设计似乎是应用我的映射函数一次到所有三列,但我不确定如何在我正在做的所有其他事情的背景下做到这一点。
我该如何做才能返回完整的数据集,并更新这些值? 我将要进行的所有聚合/操作都需要使用更新后的列价值观。
感谢任何见解!
【问题讨论】:
【参考方案1】:您最好将字典转换为 broadcast
变量,然后定义查找 udf
并使用生成器表达式将其应用于所有相关列:
让我们先创建一个虚拟数据集和字典:
df = sc.parallelize([
("a",1,1,2,2),
("b",2,2,3,3),
("c",3,4,3,3)]).toDF(['ID','field1','field2','field3','value'])
myDict = 1: "y", 2: "x", 3: "z"
现在我们将字典转换为broadcast
变量并定义查找udf
:
broadcastVar = sc.broadcast(myDict)
def lookup(x):
if broadcastVar.value.get(x) is None:
return x
else:
return broadcastVar.value.get(x)
lookup_udf = udf(lookup)
现在剩下的就是生成一个列名list
,我们将把我们的函数应用到(包含"field"
的所有内容),并将其放入带有udf
的生成器表达式中:
from pyspark.sql.functions import col
cols = [s for s in df.columns if "field" in s]
df.select(*(lookup_udf(col(c)).alias(c) if c in cols else c for c in df.columns)).show()
+---+------+------+------+-----+
| ID|field1|field2|field3|value|
+---+------+------+------+-----+
| a| y| y| x| 2|
| b| x| x| z| 3|
| c| z| 4| z| 3|
+---+------+------+------+-----+
【讨论】:
谢谢!很有帮助!最后一个问题 - 我希望这些更新是“永久性的” - 然后我可以运行其他聚合/计算。现在你的最后一个输出只是一个 .show() 函数。我是否会将最后一行替换为: df = df.select(*(lookup_udf(col(c)).alias(c) if c in cols else c for c in df.columns)).collect() 您可以只分配没有.show()
的输出,例如df1 = df.select(..
,我以.show()
结尾以打印转换后数据的外观。最后不要使用collect()
,这会将数据带入驱动节点。
呃——我知道那个。谢谢!以上是关于PySpark 更新某些列的值的主要内容,如果未能解决你的问题,请参考以下文章