Pyspark 使用 udf 处理数组列并返回另一个数组
Posted
技术标签:
【中文标题】Pyspark 使用 udf 处理数组列并返回另一个数组【英文标题】:Pyspark process array column using udf and return another array 【发布时间】:2019-02-06 17:37:56 【问题描述】:使用 udf 处理数组列并返回另一个数组
以下是我的输入:
docID 带状疱疹 D1 [23, 25, 39,59] D2 [34, 45, 65]
我想通过处理 shingles 数组列来生成一个名为 hashes 的新列: 例如,我想提取最小值和最大值(这只是一个示例,以表明我想要一个固定长度的数组列,我实际上并不想找到最小值或最大值)
docID 带状疱疹哈希 D1 [23, 25, 39,59] [23,59] D2 [34, 45, 65] [34,65]
我创建了一个如下的udf:
def generate_minhash_signatures(shingles, coeffA, coeffB):
signature = []
minHashCode = nextPrime + 1
maxHashCode = 0
for shingleID in shingles:
if shingleID < minHashCode:
minHashCode = shingleID
if shingleID > maxHashCode:
maxHashCode = shingleID
return [minHashCode, maxHashCode]
minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()
但它给出了以下错误:
TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
实际udf:
def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
signature = []
for i in range(0, numHashes):
minHashCode = nextPrime + 1
for shingleID in shingles:
hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime
if hashCode < minHashCode:
minHashCode = hashCode
signature.append(minHashCode)
return signature
【问题讨论】:
什么是shingle_udf
??
更新了,应该是用minhash_udf
How to pass a constant value to Python UDF?的可能重复
还有Passing Array to Python Spark Lit Function
【参考方案1】:
您的udf
期望所有三个参数都是列。 coeffA
和 coeffB
可能不仅仅是您需要使用 lit
转换为列对象的数值:
import pyspark.sql.functions as f
df.withColumn('min_max_hash', minhash_udf(f.col("shingles"), f.lit(coeffA), f.lit(coeffB)))
如果coeffA
和coeffB
是列表,请使用f.array
创建如下文字:
df.withColumn('min_max_hash',
minhash_udf(f.col("shingles"),
f.array(*map(f.lit, coeffA)),
f.array(*map(f.lit, coeffB))
)
或者将列参数和非列参数分开如下:
def generate_minhash_signatures(coeffA, coeffB, numHashes)
def generate_minhash_signatures_inner(shingles):
signature = []
for i in range(0, numHashes):
minHashCode = nextPrime + 1
for shingleID in shingles:
hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime
if hashCode < minHashCode:
minHashCode = hashCode
signature.append(minHashCode)
return signature
return f.udf(generate_minhash_signatures_inner, ArrayType(IntegerType()))
然后你可以调用函数为:
df.withColumn('min_max_hash', generate_minhash_signatures(coeffA, coeffB, numHashes)("shingles"))
【讨论】:
coeffA 和 coeffB 是固定长度的列表。我更新了问题以包含实际的 udf。【参考方案2】:我的问题不完全一样。但是一个类似的—— 我必须发送三个数组类型列作为输入 并获得一个数组类型(字符串类型)作为输出
我正在返回一个列表并尝试了许多其他方法,但都没有成功。
def func_req(oldlist , newlist , pve):
deleted_stores = list(set(oldlist) - set(newlist))
new_stores = list(set(newlist) - set(oldlist))
old_map = dict(zip(list(oldlist), list(pvector)))
for key in deleted_stores:
old_map.pop(key)
for key in newlist:
if key not in old_map.keys():
old_map[key] = 'PTest'
pvec=list(old_map.values())
return pvec
我在这个声明中这样称呼它:
df_diff = df3.withColumn(
'updatedp',
func_req(f.col('oldlist'), f.col('presentlist'), f.col('pvec'))
)
它给了我一个错误:
AssertionError: col 应该是 Column
解决方案
然后,我偶然发现了这篇文章并介绍了一个包装函数 -
func_req_wrapper = f.udf(func_req, ArrayType(StringType()))
并调用它:
df_diff = df3.withColumn(
'updatedp',
func_req_wrapper('oldlist', 'presentlist', 'pvec')
)
【讨论】:
以上是关于Pyspark 使用 udf 处理数组列并返回另一个数组的主要内容,如果未能解决你的问题,请参考以下文章