Spark Dataframe 映射函数

Posted

技术标签:

【中文标题】Spark Dataframe 映射函数【英文标题】:Spark Dataframe map function 【发布时间】:2018-04-13 11:30:10 【问题描述】:
val df1 = Seq(("Brian", 29, "0-A-1234")).toDF("name", "age", "client-ID")
val df2 = Seq(("1234", 555-5555, "1234 anystreet")).toDF("office-ID", "BusinessNumber", "Address")

我正在尝试在数据帧的每一行上运行一个函数(在流中)。此函数将包含 scala 代码和 Spark 数据帧 api 代码的组合。例如,我想从 df 中获取 3 个特征,并使用它们来过滤名为 df2 的第二个数据帧。我的理解是 UDF 无法做到这一点。现在我的所有过滤代码都可以正常工作,但无法将其应用于 df 的每一行。

我的目标是能够做类似的事情

df.select("ID","preferences").map(row => ( //filter df2 using row(0), row(1) and row(3) ))

数据框无法连接,它们之间没有可连接的关系。

虽然我使用的是 Scala,但 Java 或 Python 的答案可能会很好。

我也可以通过其他方式来实现这一点。如果我可以将行中的数据提取到单独的变量中(请记住这是流式传输),那也很好。

【问题讨论】:

我们还不知道是什么案子?流媒体还是什么? 两者都是来自 kafka 主题的流式数据帧。 【参考方案1】:

我的理解是 UDF 无法做到这一点。

没错,但map 也不行(localDatasets 似乎是个例外Why does this Spark code make NullPointerException?)。像这样的嵌套逻辑只能用joins来表达:

如果Datasets 都是streaming,则它必须是等值的。这意味着即使:

数据框无法连接,它们之间没有可连接的关系。

你必须以某种方式推导出一个非常接近filter 条件的方法。

如果一个Dataset 不是streaming,你可以用crossJoin 后跟filter 暴力破解,但当然不推荐。

【讨论】:

我什至不知道如何在没有派生键集的情况下在另一个数据帧上定义谓词。感觉像是一种 crossJoin 的解决方案…… 我什至没有,因为 OP 省略了filter 逻辑。但是,如果您可以过滤,您总是可以导出一个,即使是一个虚拟的 (1 = 1)。将这里的 LSH 视为这种模式的表示...... :) 好的,非常感谢,我会考虑这个,当我有我的解决方案时,我会把它作为更新发布。 @user9613318 我一直这样做:) 所以我遵循了加入的建议。我使用了交叉加入。对于我的用例,第一个数据框“应该”只有 1 行。如果它有更多,那么这可能是一条非常糟糕的道路。我想到的另一个解决方案是将 df1 的每一行作为数组嵌入到 df2 中列的元素中。我相信这会更好地扩展。

以上是关于Spark Dataframe 映射函数的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL仅映射一列DataFrame

客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

Spark DataFrame 映射错误

如何在 Spark 中对嵌套的 Dataframe 进行平面映射

如何使用 JSON 映射文件在 Spark 中使用 Scala 生成新的 DataFrame

PySpark 将 Dataframe 作为额外参数传递给映射