Pyspark 使用 udf 处理数组列并返回另一个数组

Posted

技术标签:

【中文标题】Pyspark 使用 udf 处理数组列并返回另一个数组【英文标题】:Pyspark process array column using udf and return another array 【发布时间】:2019-02-06 17:37:56 【问题描述】:

使用 udf 处理数组列并返回另一个数组

以下是我的输入:

docID 带状疱疹 D1 [23, 25, 39,59] D2 [34, 45, 65]

我想通过处理 shingles 数组列来生成一个名为 hashes 的新列: 例如,我想提取最小值和最大值(这只是一个示例,以表明我想要一个固定长度的数组列,我实际上并不想找到最小值或最大值)

docID 带状疱疹哈希 D1 [23, 25, 39,59] [23,59] D2 [34, 45, 65] [34,65]

我创建了一个如下的udf:

def generate_minhash_signatures(shingles, coeffA, coeffB):
    signature = []
    minHashCode = nextPrime + 1
    maxHashCode = 0
    for shingleID in shingles:
        if shingleID < minHashCode:
            minHashCode = shingleID
        if shingleID > maxHashCode:
            maxHashCode = shingleID
    return [minHashCode, maxHashCode]

minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()

但它给出了以下错误:

TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

实际udf:

def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
    signature = []
    for i in range(0, numHashes):
        minHashCode = nextPrime + 1
        for shingleID in shingles:
            hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

            if hashCode < minHashCode:
                minHashCode = hashCode

        signature.append(minHashCode)
    return signature

【问题讨论】:

什么是shingle_udf?? 更新了,应该是用minhash_udf How to pass a constant value to Python UDF?的可能重复 还有Passing Array to Python Spark Lit Function 【参考方案1】:

您的udf 期望所有三个参数都是列。 coeffAcoeffB 可能不仅仅是您需要使用 lit 转换为列对象的数值:

import pyspark.sql.functions as f
df.withColumn('min_max_hash', minhash_udf(f.col("shingles"), f.lit(coeffA), f.lit(coeffB)))

如果coeffAcoeffB 是列表,请使用f.array 创建如下文字:

df.withColumn('min_max_hash', 
  minhash_udf(f.col("shingles"), 
  f.array(*map(f.lit, coeffA)),
  f.array(*map(f.lit, coeffB))
)

或者将列参数和非列参数分开如下:

def generate_minhash_signatures(coeffA, coeffB, numHashes)
    def generate_minhash_signatures_inner(shingles):
        signature = []
        for i in range(0, numHashes):
            minHashCode = nextPrime + 1
            for shingleID in shingles:
                hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

                if hashCode < minHashCode:
                    minHashCode = hashCode

            signature.append(minHashCode)
        return signature
    return f.udf(generate_minhash_signatures_inner, ArrayType(IntegerType()))

然后你可以调用函数为:

df.withColumn('min_max_hash', generate_minhash_signatures(coeffA, coeffB, numHashes)("shingles"))

【讨论】:

coeffA 和 coeffB 是固定长度的列表。我更新了问题以包含实际的 udf。【参考方案2】:

我的问题不完全一样。但是一个类似的—— 我必须发送三个数组类型列作为输入 并获得一个数组类型(字符串类型)作为输出

我正在返回一个列表并尝试了许多其他方法,但都没有成功。

def func_req(oldlist , newlist , pve):
    deleted_stores = list(set(oldlist) - set(newlist))
    new_stores = list(set(newlist) - set(oldlist))
    old_map = dict(zip(list(oldlist), list(pvector)))
    for key in deleted_stores:
        old_map.pop(key)
    for key in newlist:
        if key not in old_map.keys():
            old_map[key] = 'PTest'
    pvec=list(old_map.values())
    return pvec

我在这个声明中这样称呼它:

df_diff = df3.withColumn(
    'updatedp',
    func_req(f.col('oldlist'), f.col('presentlist'), f.col('pvec'))
)

它给了我一个错误:

AssertionError: col 应该是 Column

解决方案

然后,我偶然发现了这篇文章并介绍了一个包装函数 -

func_req_wrapper = f.udf(func_req, ArrayType(StringType()))

并调用它:

df_diff = df3.withColumn(
    'updatedp', 
    func_req_wrapper('oldlist',  'presentlist', 'pvec')
)

【讨论】:

以上是关于Pyspark 使用 udf 处理数组列并返回另一个数组的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:使用一列索引另一列(两列的udf?)

Pyspark 中数组元素上的 UDF 还添加了静态元素

pyspark 数据框 UDF 异常处理

如何使用逗号分隔值拆分列并存储在 PySpark Dataframe 中的数组中?如下所示

在 pyspark 中使用 UDF 和简单数据帧

pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为