正在分析的 pyspark udf 打印行

Posted

技术标签:

【中文标题】正在分析的 pyspark udf 打印行【英文标题】:pyspark udf print row being analyzed 【发布时间】:2019-01-18 11:04:28 【问题描述】:

我在 pyspark udf 函数中遇到问题,我想打印产生问题的行号。

我尝试在 Python 中使用等效的“静态变量”来计算行数,这样当使用新行调用 udf 时,计数器会递增。但是,它不起作用:

import pyspark.sql.functions as F
def myF(input):
    myF.lineNumber += 1
    if (somethingBad):
        print(myF.lineNumber)
    return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

如何计算调用 udf 的次数,以便在 pyspark 中找到产生问题的行数?

【问题讨论】:

谁投反对票? 一个 UDF 应用于每一行......你无法计算 UDF 中的行数,因为 UDF 被复制并应用于每一行...... 如果您更仔细地阅读了这个问题,您会发现我并没有试图直接计算行数,而是试图让一个计数器计算函数被调用的次数。 我的错......但无论如何,这仍然是不可能的,唯一的原因是函数的副本在工作人员级别执行并且只有结果被返回到驱动程序级别。因此,您无法访问 lineNumber 的最终值。 即使我在本地执行 pyspark(我的意思是只在我的计算机上,没有分发)? 【参考方案1】:

udfs 在 worker 中执行,因此它们内部的 print 语句不会显示在输出中(来自驱动程序)。处理 UDF 问题的最佳方法是将 UDF 的返回类型更改为结构或列表,并将错误信息与返回的输出一起传递。在下面的代码中,我只是将错误信息添加到您最初返回的字符串 res 中。

import pyspark.sql.functions as F
def myF(input):
  myF.lineNumber += 1
  if (somethingBad):
    res += 'Error in line ".format(myF.lineNumber)
  return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

【讨论】:

以上是关于正在分析的 pyspark udf 打印行的主要内容,如果未能解决你的问题,请参考以下文章

PySpark UDF 优化挑战

PySpark - 从 UDF 获取行索引

PySpark 分组并逐行应用 UDF 操作

Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf

pyspark:删除所有行中具有相同值的列

Pyspark:访问 UDF 中行内的列