PySpark:转换DataFrame中给定列的值
Posted
技术标签:
【中文标题】PySpark:转换DataFrame中给定列的值【英文标题】:PySpark: Transform values of given column in the DataFrame 【发布时间】:2021-09-28 01:48:59 【问题描述】:总的来说,我是 PySpark 和 Spark 的新手。 我想对 DataFrame 中的给定列应用转换,本质上为该特定列上的每个值调用一个函数。
我的 DataFrame df
看起来像这样:
df.show()
+------------+--------------------+
|version | body |
+------------+--------------------+
| 1|9gIAAAASAQAEAAAAA...|
| 2|2gIAAAASAQAEAAAAA...|
| 3|3gIAAAASAQAEAAAAA...|
| 1|7gIAKAASAQAEAAAAA...|
+------------+--------------------+
我需要为version
为1
的每一行读取body
列的值,然后对其进行解密(我有自己的逻辑/函数,它接受一个字符串并返回一个解密的字符串)。最后,将解密后的值以 csv 格式写入 S3 存储桶。
def decrypt(encrypted_string: str):
# code that returns decrypted string
所以,当我执行以下操作时,我会得到相应的过滤值,我需要应用我的解密函数。
df.where(col('version') =='1')\
.select(col('body')).show()
+--------------------+
| body|
+--------------------+
|9gIAAAASAQAEAAAAA...|
|7gIAKAASAQAEAAAAA...|
+--------------------+
但是,我不清楚如何做到这一点。我尝试使用collect()
,但它违背了使用 Spark 的目的。
我也尝试如下使用.rdd.map
,但没有奏效。
df.where(col('version') =='1')\
.select(col('body'))\
.rdd.map(lambda x: decrypt).toDF().show()
OR
.rdd.map(decrypt).toDF().show()
有人可以帮忙吗。
【问题讨论】:
【参考方案1】:请尝试:
from pyspark.sql.functions import udf
decrypt_udf = udf(decrypt, StringType())
df.where(col('version') =='1').withColumn('body', decrypt_udf('body'))
【讨论】:
感谢您的回答,这正是我根据我提到的帖子所尝试的。 正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center。【参考方案2】:从这篇文章中得到了一些线索:Pyspark DataFrame UDF on Text Column。
看起来我可以简单地通过以下方式获得它。我之前没有使用udf
就这样做了,所以它不起作用。
dummy_function_udf = udf(decrypt, StringType())
df.where(col('version') == '1')\
.select(col('body')) \
.withColumn('decryptedBody', dummy_function_udf('body')) \
.show()
【讨论】:
以上是关于PySpark:转换DataFrame中给定列的值的主要内容,如果未能解决你的问题,请参考以下文章
PySpark Dataframe:将一个单词附加到列的每个值
如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?