如何将 Spark Dataframe 列的每个值作为字符串传递给 python UDF?

Posted

技术标签:

【中文标题】如何将 Spark Dataframe 列的每个值作为字符串传递给 python UDF?【英文标题】:How to pass each value of Spark Dataframe column as string to python UDF? 【发布时间】:2018-11-18 09:30:08 【问题描述】:

我正在尝试 GPG 加密 spark 数据帧列 FName

df = spark.createDataFrame([('Andy', 'NY'), ('Bob', 'PA'), ('Cindy', 'DC')], ("FName", "City"))

我创建了一个 udf,它将字符串值作为输入并将加密的字符串作为输出。

gpg = gnupg.GPG(gnupghome='/home/testgpguser/gpghome')
encrypt_str = udf(lambda string_value: gpg.encrypt(string_value, 'myrecepeintemailid', passphrase='mypassphrase'))

我正在应用我的 udf 如下:

df = df.withColumn('Encrypted_FName', encrypt_str(col('FName')))

但是,我想整个列都通过了,并且它没有正确加密值。

如何遍历数据帧的每个值并将其作为string_value 传递给udf

【问题讨论】:

你的输出是什么样的? 如果您怀疑存在问题(我认为整个列都已通过并且它没有正确加密值)然后请edit 该问题并提供足够的信息以诊断那个。有什么例外吗?输出格式是否错误?您是否只是假设或确实确认了这一点,如果是,如何确认? 【参考方案1】:

您可以这样做创建一个新的数据框。

我对必须散列的列有类似的问题。 python函数定义如下:

def make_hash(txt):
    import hashlib
    m = hashlib.sha256()
    m.update(txt.encode())
    print ("hashed ", m)
    return m.hexdigest()  

定义了一个udf:

from pyspark.sql.functions import udf
u_make_hash = udf(make_hash)    

并创建了一个新的 DataFrame,除了散列列之外的所有列:

streamingOutputDF = streamingInputDF.select(u_make_hash(streamingInputDF['connectionDeviceId']).alias("Id"), streamingInputDF['*']) \
                                    .drop("connectionDeviceId")   

我没有检查你的udf,假设没问题,下面的语句应该这样做:

dfnew = df.select((encrypt_str['FName']).alias("Encrypted_FName"))

【讨论】:

【参考方案2】:

用循环尝试DataFrame.columns

for col_name in df.columns:
    df = df.withColumn('Encrypted_'.format(col_name), encrypt_str(col(col_name)))

【讨论】:

以上是关于如何将 Spark Dataframe 列的每个值作为字符串传递给 python UDF?的主要内容,如果未能解决你的问题,请参考以下文章

Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值

将 Dataframe 列的值与列表值进行比较

spark 统计Dataframe 列的中空值比例

如何提高具有数组列的 DataFrame 的 Spark SQL 查询性能?

Spark DataFrame - 区分缺少列的记录与坏值

Spark Hive:通过另一个 DataFrame 的列的值过滤一个 DataFrame 的行