基于需要外部 API 调用的现有列创建新的 Spark 数据框列的最佳方法是啥?

Posted

技术标签:

【中文标题】基于需要外部 API 调用的现有列创建新的 Spark 数据框列的最佳方法是啥?【英文标题】:What is the best way to create a new Spark dataframe column based on an existing column that requires an external API call?基于需要外部 API 调用的现有列创建新的 Spark 数据框列的最佳方法是什么? 【发布时间】:2018-04-05 15:00:14 【问题描述】:

我在基于 Python 的 Jupyter 笔记本中使用了一个数据框。我想根据现有列的内容添加一个附加列,其中新列的内容来自对原始列运行外部 API 调用。

我尝试的解决方案是使用基于 Python 的 UDF。第一个单元格包含如下内容:

def analysis(old_column):

    new_column = myapi.analyze(text=old_column)
    return(new_column)

analysis_udf = udf(analysis)

第二个单元格:

df2 = df1.withColumn("col2",analysis_udf('col1'))
df2.select('col2').show(n=5)

我的数据框比较大,大约有 70000 行,其中 col1 可以有 100 到 10000+ 个字符的文本。当我在单元格 2 中运行上面的代码时,它实际上似乎运行得相当快(几分钟),并转储了 df2 数据帧的 5 行。所以我以为我是在做生意。但是,我的下一个单元格有以下代码:

df2.cache()
df2.filter(col('col2').isNull()).count()

此代码的目的是缓存新数据帧的内容以缩短对 DF 的访问时间,然后计算数据帧中有多少条目具有由 UDF 生成的空值。这令人惊讶(对我来说)运行了很多小时,最终提供了 6 的输出。我不清楚为什么第二个单元运行得很快,而第三个单元运行得很慢。我会认为 df2.select('col2').show(n=5) 调用会导致 UDF 在所有行上运行,并且那个会很慢,然后随后调用访问数据框的新列会很快。但事实并非如此,所以我认为缓存调用实际上是导致 UDF 在所有行上运行的调用,因此现在任何后续调用都应该很快。所以添加了另一个单元格:

df2.show(n=5)

假设它会运行得很快,但同样,它所用的时间比我预期的要长得多,而且似乎 UDF 又开始运行了。 (?)

我的问题是

    哪个 Spark api 调用实际上会导致 udf 运行(或重新运行),以及如何构造调用以仅运行一次 UDF,以便使用 UDF 的 python 函数输出的文本创建新列。 我已经读到应该避免使用 Python UDF,因为它们很慢(似乎是正确的),那么当我需要使用 API 调用来生成新列时,我有什么替代方案?

【问题讨论】:

有些相关:Pyspark: Best practice to add more columns to a DataFrame. 【参考方案1】:

我会认为 df2.select('col2').show(n=5) 调用会导致 UDF 在

这不是一个正确的假设。鉴于 API 的限制,Spark 将评估尽可能少的数据。因为您使用 Python udf,它将评估收集 5 行所需的最小分区数。

哪个 spark api 调用实际上会导致 udf 运行(或重新运行),以及如何构造调用以仅运行一次 UDF,以便使用 UDF 的 python 函数输出的文本创建新列。

任何评估,如果数据不再缓存(从内存中驱逐)。 结果列可能有任何用途,除非udf 被标记为非确定性。

我已经读到应该避免使用 Python UDF,因为它们很慢(似乎是正确的),那么当我需要使用 API 调用来生成新列时,我有什么替代方案?

除非您想切换到 Scala 或 RDD API,否则唯一的选择是pandas_udf,它的效率更高,但仅支持有限的类型子集。

【讨论】:

谢谢。当您提到“任何评估,如果不再缓存数据”时,您能否澄清一下。这就是我明确包含 df2.cache() 调用的原因,但它似乎并没有阻止对该列的后续访问再次重新执行 UDF。 ***.com/q/42660385/8371915,但在您的情况下,它从未被完全评估过。 我明白了。所以当我运行缓存调用时,实际上只有 5 行被评估,所以这就是缓存的全部内容。相反,如果我运行df2.filter(col('col2').isNull()).count(),我会假设这将强制对每一行的该列进行评估,如果我随后使用df2.cache(),这是否会导致随后使用 df2 列来跳过 UDF 调用? 没有。请再读一遍第一段。它将根据需要评估尽可能多的 partitions 以收集 5 行。所以确切的数字取决于数据分布。 抱歉,我应该更仔细地阅读。但关键是,只有一部分数据(在包含这 5 行的分区上)会被评估以导致 UDF 执行,而其余分区上的数据仍然处于挂起状态,并且尝试访问这些数据会触发了UDF。但是我之前提到的空值检查会导致访问所有行(以及所有分区),因此在那时,使用缓存应该确保不需要再次调用 UDF 以进行后续访问该列数据的尝试.是吗?

以上是关于基于需要外部 API 调用的现有列创建新的 Spark 数据框列的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

在 AngularJS SPA 中使用刷新令牌

在包含记录的现有表中,如何创建一个新的 datetime2(2) 列并使用基于另一列的值填充它?

基于现有值创建列并将逻辑应用于某些值 - SQL Server

pyspark:从现有列创建 MapType 列

42-KVM虚拟化-基于现有虚拟机磁盘为模版创建新的虚拟机

Oauth + SPA + API 后端