在 StructType 数组上应用 UDF
Posted
技术标签:
【中文标题】在 StructType 数组上应用 UDF【英文标题】:Apply UDF on an Array of StructType 【发布时间】:2019-05-02 15:47:53 【问题描述】:我有一个具有以下架构的数据框:
root
|-- urlA: string (nullable = true)
|-- urlB: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- distCol: double (nullable = true)
| | |-- url: string (nullable = true)
我想使用 UDF 访问结构中的元素,以便我可以对 distCol 值进行排序并获取 distCol 最小的 url(在 urlB 中)(实际上是前 N 个)
输入:
+--------------------+---------------------------------+
| urlA| urlB|
+--------------------+---------------------------------+
| some_url|[[0.02, url_0], [0.03, url_1],...|
+--------------------+---------------------------------+
输出(理想情况下):
+--------------------+------------------------------------+
| urlA| urlB|
+--------------------+------------------------------------+
| some_url|[[url_best_score_0, url_best_0],...]|
+--------------------+------------------------------------+
我的 udf:
def rank_url(row_url):
ranked_url = sorted(row_url[0], key=lambda x: x[0], reverse=False)[0:5]
return row_url
url_udf = udf(rank_url, ArrayType(StringType())
df = model.approxSimilarityJoin(pca_df, pca_df, 1.0).groupBy("datasetA.url").agg(collect_list(struct("distCol", "datasetB.url")).alias("urlB")).withColumn("urlB", url_udf("urlB"))
我想做类似的事情,但 row_url 并不能真正以这种方式访问。你有什么想法吗?
【问题讨论】:
【参考方案1】:您的主要问题来自您的 UDF 输出类型以及您如何访问列元素。以下是解决方法,struct1
很关键。
from pyspark.sql.types import ArrayType, StructField, StructType, DoubleType, StringType
from pyspark.sql import functions as F
# Define structures
struct1 = StructType([StructField("distCol", DoubleType(), True), StructField("url", StringType(), True)])
struct2 = StructType([StructField("urlA", StringType(), True), StructField("urlB", ArrayType(struct1), True)])
# Create DataFrame
df = spark.createDataFrame([
['url_a1', [[0.03, 'url1'], [0.02, 'url2'], [0.01, 'url3']]],
['url_a2', [[0.05, 'url4'], [0.03, 'url5']]]
], struct2)
输入:
+------+------------------------------------------+
|urlA |urlB |
+------+------------------------------------------+
|url_a1|[[0.03, url1], [0.02, url2], [0.01, url3]]|
|url_a2|[[0.05, url4], [0.03, url5]] |
+------+------------------------------------------+
UDF:
# Define udf
top_N = 5
def rank_url(array):
ranked_url = sorted(array, key=lambda x: x['distCol'])[0:top_N]
return ranked_url
url_udf = F.udf(rank_url, ArrayType(struct1))
# Apply udf
df2 = df.select('urlA', url_udf('urlB'))
df2.show(truncate=False)
输出:
+------+------------------------------------------+
|urlA |rank_url(urlB) |
+------+------------------------------------------+
|url_a1|[[0.01, url3], [0.02, url2], [0.03, url1]]|
|url_a2|[[0.03, url5], [0.05, url4]] |
+------+------------------------------------------+
【讨论】:
谢谢,这很快,而 struct1 的技巧正是我所需要的。谢谢你,阿里乌斯! @Arius 在定义结构时我得到NameError: name 's1' is not defined
@PIG 我的错,我在没有采取足够预防措施的情况下重命名了我的变量。 s1
引用了 struct1
。我编辑了答案。
@Arius 你能解释一下,在排序中使用[0:5]
@PIG 他只想保留前 N 个 url,其中 N = 5。我正在编辑以使其更清晰。以上是关于在 StructType 数组上应用 UDF的主要内容,如果未能解决你的问题,请参考以下文章
从 UDF 返回 StructType 的 ArrayType 时出错(并在多个 UDF 中使用单个函数)
如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果
Apache Spark SQL StructType 和 UDF