使用 spark 和 scala 进行连接计数时获得性能的最佳方法

Posted

技术标签:

【中文标题】使用 spark 和 scala 进行连接计数时获得性能的最佳方法【英文标题】:Best way to gain performance when doing a join count using spark and scala 【发布时间】:2017-01-20 10:13:58 【问题描述】:

我需要验证摄取操作,基本上,我在 HDFS 中有两个大文件,一个是 avro 格式的(摄取的文件),另一个是 parquet 格式的(合并文件)。

Avro 文件具有以下架构:

文件名、日期、计数、afield1、afield2、afield3、afield4、afield5、afield6、...afieldN

Parquet 文件具有以下架构:

fileName,anotherField1,anotherField1,anotherField2,anotherFiel3,anotherField14,...,anotherFieldN

如果我尝试在 DataFrame 中加载这两个文件,然后尝试使用简单的 join-where,我的本地计算机中的作业需要超过 24 小时!这是不可接受的。

ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count()

¿实现这一目标的最佳方法是什么? ¿ 在执行 join-where-count 之前从 DataFrame 中删除列? ¿计算每个数据帧的计数,然后加入和求和?

PD

我正在阅读有关 map-side-joint 技术的信息,但如果有一个小文件能够放入 RAM,那么这种技术似乎对我有用,但我不能保证,所以,我想知道哪个是社区实现这一目标的首选方式。

http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/

【问题讨论】:

你不能计算每个数据帧的计数然后加入和求和吗? 我想我可以,@mtoto,但是,首先,我想知道实现这一目标的最佳方法。实际上我已经运行了这句话 ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count() 以了解数字。工作完成后,我会尝试你的建议。 ¿ 您应该如何编写该代码? 不确定问题是什么:您只想知道两个数据集中常见文件名的数量吗?还是区别? 以有效的方式计算两个数据集中常见文件名的数量。 两个数据集中的文件名是否唯一? 【参考方案1】:

我会通过将数据剥离到我感兴趣的字段 (filename) 来解决这个问题,并使用它来自的源(原始数据集)制作一组唯一的文件名。 此时,两个中间数据集具有相同的模式,因此我们可以将它们合并并计算。这应该比在完整数据上使用join 快几个数量级。

// prepare some random dataset
val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s"file$i", i, "rubbish"))
val data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s"file$i", i, "crap"))

val df1 = sparkSession.createDataFrame(data1).toDF("filename", "index", "data")
val df2 = sparkSession.createDataFrame(data2).toDF("filename", "index", "data")

// select only the column we are interested in and tag it with the source.
// Lets make it distinct as we are only interested in the unique file count
val df1Filenames = df1.select("filename").withColumn("df", lit("df1")).distinct
val df2Filenames = df2.select("filename").withColumn("df", lit("df2")).distinct

// union both dataframes
val union = df1Filenames.union(df2Filenames).toDF("filename","source")

// let's count the occurrences of filename, by using a groupby operation
val occurrenceCount = union.groupBy("filename").count

// we're interested in the count of those files that appear in both datasets (with a count of 2)
occurrenceCount.filter($"count"===2).count

【讨论】:

我必须证明这一点。我将发布我的幼稚解决方案(尚未完成)中遇到的数字。 @aironman 也可以在笔记本中查看:gist.github.com/maasg/824e60cc522deada0986169dae733549 拜托,这不是信不信的问题,而是用我的数据进行测试的问题。感谢您的帮助,我会在工作完成后发布数据。 嗨@maasg。早上好,好吧,我的本地机器(i7 2.5 GHZ 16GB RAM osx 10,12.2)中的这个查询: ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count() 需要两个天!完成。在您的帮助下,103 秒... :) 令人惊叹且令人印象深刻。谢谢你,兄弟!我和你学到了很多。 太棒了! Me debes la cerveza :-)

以上是关于使用 spark 和 scala 进行连接计数时获得性能的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章

spark streaming 一个批次取多少数据

如何进行外连接:Spark Scala SQLContext

将 DataFrame 的数据带回本地节点以在 spark/scala 中执行进一步操作(计数/显示)

spark scala数据帧中键值对的增量值计数

Spark集群搭建记录 | 云计算[CentOS7] | Scala Maven项目访问Spark实现单词计数

使用 Spark Scala 为数据中的每个组选择窗口操作后的最新时间戳记录