PySpark:转换DataFrame中给定列的值

Posted

技术标签:

【中文标题】PySpark:转换DataFrame中给定列的值【英文标题】:PySpark: Transform values of given column in the DataFrame 【发布时间】:2021-09-28 01:48:59 【问题描述】:

总的来说,我是 PySpark 和 Spark 的新手。 我想对 DataFrame 中的给定列应用转换,本质上为该特定列上的每个值调用一个函数。

我的 DataFrame df 看起来像这样:

df.show()

+------------+--------------------+
|version     |         body       |
+------------+--------------------+
|           1|9gIAAAASAQAEAAAAA...|
|           2|2gIAAAASAQAEAAAAA...|
|           3|3gIAAAASAQAEAAAAA...|
|           1|7gIAKAASAQAEAAAAA...|
+------------+--------------------+

我需要为version1 的每一行读取body 列的值,然后对其进行解密(我有自己的逻辑/函数,它接受一个字符串并返回一个解密的字符串)。最后,将解密后的值以 csv 格式写入 S3 存储桶。

def decrypt(encrypted_string: str):
    # code that returns decrypted string

所以,当我执行以下操作时,我会得到相应的过滤值,我需要应用我的解密函数。

df.where(col('version') =='1')\
     .select(col('body')).show()

+--------------------+
|                body|
+--------------------+
|9gIAAAASAQAEAAAAA...|
|7gIAKAASAQAEAAAAA...|
+--------------------+

但是,我不清楚如何做到这一点。我尝试使用collect(),但它违背了使用 Spark 的目的。

我也尝试如下使用.rdd.map,但没有奏效。

df.where(col('version') =='1')\
     .select(col('body'))\
     .rdd.map(lambda x: decrypt).toDF().show()

OR 

     .rdd.map(decrypt).toDF().show()

有人可以帮忙吗。

【问题讨论】:

【参考方案1】:

请尝试:

from pyspark.sql.functions import udf
decrypt_udf = udf(decrypt, StringType())
df.where(col('version') =='1').withColumn('body', decrypt_udf('body'))

【讨论】:

感谢您的回答,这正是我根据我提到的帖子所尝试的。 正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center。【参考方案2】:

从这篇文章中得到了一些线索:Pyspark DataFrame UDF on Text Column。 看起来我可以简单地通过以下方式获得它。我之前没有使用udf 就这样做了,所以它不起作用。

dummy_function_udf = udf(decrypt, StringType())

df.where(col('version') == '1')\
   .select(col('body')) \
   .withColumn('decryptedBody', dummy_function_udf('body')) \
   .show() 

【讨论】:

以上是关于PySpark:转换DataFrame中给定列的值的主要内容,如果未能解决你的问题,请参考以下文章

PySpark Dataframe:将一个单词附加到列的每个值

pyspark:比较给定列的值时从数据框中获取公共数据

跨 PySpark DataFrame 列的字符串匹配

如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?

Pyspark 从具有不同列的行/数据创建 DataFrame

将每一行的值汇总为布尔值(PySpark)