Spark:在 UDF 或映射函数中加入

Posted

技术标签:

【中文标题】Spark:在 UDF 或映射函数中加入【英文标题】:Spark: Join within UDF or map function 【发布时间】:2016-12-18 11:29:20 【问题描述】:

我必须编写一个复杂的 UDF,在其中我必须与不同的表进行连接,并返回匹配的数量。实际用例要复杂得多,但我已将这里的用例简化为最少的可重现代码。这是 UDF 代码。

def predict_id(date,zip):
    filtered_ids = contest_savm.where((F.col('postal_code')==zip)  & (F.col('start_date')>=date))
    return filtered_ids.count()

当我使用以下代码定义 UDF 时,我得到一长串控制台错误:

predict_id_udf = F.udf(predict_id,types.IntegerType())

错误的最后一行是:

py4j.Py4JException: Method __getnewargs__([]) does not exist

我想知道最好的方法是什么。我也试过map这样:

result_rdd = df.select("party_id").rdd\
  .map(lambda x: predict_id(x[0],x[1]))\
  .distinct()

这也导致了类似的最终错误。我想知道,如果有的话,我可以在 UDF 或 map 函数中对原始数据帧的每一行进行连接。

【问题讨论】:

【参考方案1】:

我必须编写一个复杂的 UDF,在其中我必须与不同的表进行连接,并返回匹配的数量。

这在设计上是不可能的。我想达到这样的效果,你必须使用高级 DF / RDD 运算符:

df.join(ontest_savm,
    (F.col('postal_code')==df["zip"])  & (F.col('start_date') >= df["date"])
).groupBy(*df.columns).count()

【讨论】:

以上是关于Spark:在 UDF 或映射函数中加入的主要内容,如果未能解决你的问题,请参考以下文章

使用用户定义的函数在 spark 中加入数据集时需要填充其他信息

如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?

Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射

在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列

Spark 根据现有列的映射值创建新列

如何在 Spark Udf 中传递地图?