为啥在 .distinct() 之后总是有 .collect()?

Posted

技术标签:

【中文标题】为啥在 .distinct() 之后总是有 .collect()?【英文标题】:Why there is always a .collect() after a .distinct()?为什么在 .distinct() 之后总是有 .collect()? 【发布时间】:2019-09-28 17:35:46 【问题描述】:

火花初学者。经常看到 .distinct().collect() 结构。在 distinct() 函数之后使用 collect() 函数的内在原因可能是什么?

【问题讨论】:

distinct() 只会删除所有重复项。 collect() 会将所有元素带入主内存。因此,两者一起收集所有独特的元素。仅此而已,没什么特别的。为什么你的代码会这样,只有你和你的团队可能知道。 @LuisMiguelMejíaSuárez 我猜这取决于数据结构。如果您的数据结构是非严格的,例如 Stream,则 distinctcollect 的顺序很重要。 当然,在这种情况下,它肯定是 RDDDataset (无论如何,存在于其他集合不同,因此顺序总是很重要)。但这与我的评论有什么关系?我刚才说了,两个方法一起调用没有什么特别的,就像一起调用任何其他两个方法一样,就像调用任何方法一样。这就是您构建程序的方式。 distinct 后面总是有一个collect 是不正确的。其实这些方法并没有什么共同点,可能是巧合一直看到他们在一起 【参考方案1】:

distinct 是一种转换。这意味着它不会立即执行,而只会在调用某个操作时执行。

collect 是一个动作。调用 collect 方法会导致运行所有之前的转换。

collect 之后调用 distinct 在 Spark 外部可能会增加程序的内存占用,因为程序也会生成重复的元素。在 Spark 中,在 collect 之后调用 distinct 也可能导致整个程序失败。

你可以在这里找到更多解释:https://dzone.com/articles/getting-lazy-with-scala

【讨论】:

【参考方案2】:

Spark 利用了“惰性评估”的概念。惰性求值意味着 Spark 将等到最后一刻执行计算指令图,通常是为了寻找增强执行计划的方法。惰性求值包括转换和动作的概念。 Spark 将记录转换(例如distinct()sort()sum())并构建到逻辑计划中。该计划称为 DAG(有向无环图)。我们需要调用一个动作来让 Spark 执行 DAG。操作示例包括count()show()collect()。动作基本上是将我们的数据转换结果带回相应语言的本机对象的任何东西,在本例中是 Python。

在您的示例中,当您调用 distinct() 时,Spark 实际上并没有执行 DAG。当您在 distinct 之后调用操作时,它会执行 DAG,例如 distinct().collect()distinct().show()distinct().count()。此外,collect() 只是一个函数,它将 DataFrame 作为此处指定的行对象的 Python 列表返回...collect()。您可以从其他操作中选择关注distinct()collect() 只是教程中经常使用的示例,因为它显示了数据集中 Row 的结构。

【讨论】:

【参考方案3】:

这可能有几个原因。

collect 是一个动作。如上所述,Spark 会执行所有导致开始的操作(如果不使用缓存)并在您调用 collect 时返回结果。但是,您看到了在collect 之前看到distinct 的原因可能是collect 将结果返回给驱动程序并限制结果,它只选择不同的值然后返回给驱动程序。这样会减少要获取的记录数,并且可以避免OutOfMemory 错误。

您将这两种方法放在一起没有其他原因。请注意,您同时看到这两个只是巧合。我从来没有在我的任何项目中一起使用它们。

【讨论】:

以上是关于为啥在 .distinct() 之后总是有 .collect()?的主要内容,如果未能解决你的问题,请参考以下文章

为啥安装了jira之后,开启atlassian JIRA服务时,总是不行

VirtualDocumentRoot 和域总是路由到本地主机 - 为啥?

为啥 distinct 不能与 Laravel + Postgresql 一起使用?

为啥 COUNT(DISTINCT (*)) 不起作用?

为啥 DISTINCT 在 Pig 中比 GROUP BY/FOREACH 快

为啥 DISTINCT 在这种情况下不起作用? (SQL)