有没有办法在分区的 spark 数据集上并行运行操作?
Posted
技术标签:
【中文标题】有没有办法在分区的 spark 数据集上并行运行操作?【英文标题】:Is there a way to run manipulations on partitioned spark datasets in parallel? 【发布时间】:2019-07-01 22:14:26 【问题描述】:我有一个数据集列表,我想按我所有数据集共有的特定键进行分区,然后运行一些对所有分区数据集都相同的连接/分组。
我正在尝试以这样一种方式设计算法,即我使用 Spark 的 partitionBy 通过特定键创建分区。
现在,一种方法是循环运行每个分区上的操作,但这并不高效。
我想看看我是否有手动分区数据,我可以在这些数据集上并行运行操作。
我刚刚开始学习 Spark,如果这是一个幼稚的问题,请原谅我。
考虑不同数据集中的客户 ID 及其行为数据(例如浏览/点击等)数据集。说一个用于浏览,另一个用于点击。首先,我正在考虑按客户 ID 对我的数据进行分区,然后对于每个分区(客户),加入某些属性,例如浏览器或设备,以查看每个客户的行为方式。所以基本上,它就像一个嵌套的并行化。
这在 Spark 中是否可行?我有什么明显的遗漏吗?我可以参考一些文档?
【问题讨论】:
【参考方案1】:试试这个 -
1. Create test dataset (Totol Record = 70000+) to perform parallel operation on each
scala> ds.count
res137: Long = 70008
scala> ds.columns
res124: Array[String] = Array(awards, country)
2. Assume partition column as "country".
scala> ds.select("country").distinct.show(false)
+-------+
|country|
+-------+
|CANADA |
|CHINA |
|USA |
|EUROPE |
|UK |
|RUSSIA |
|INDIA |
+-------+
3. Get sum of records for each country [ **Without parallel process for each partition**]
scala> val countries = ds.select("country").distinct.collect
countries: Array[org.apache.spark.sql.Row] = Array([CANADA], [CHINA], [USA], [EUROPE], [UK], [RUSSIA], [INDIA])
scala> val startTime = System.currentTimeMillis()
startTime: Long = 1562047887130
scala> countries.foreach(country => ds.filter(ds("country") === country(0)).groupBy("country").count.show(false))
+-------+-----+
|country|count|
+-------+-----+
|CANADA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|CHINA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|USA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|EUROPE |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|UK |10002|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|RUSSIA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|INDIA |10001|
+-------+-----+
scala> val endTime = System.currentTimeMillis()
endTime: Long = 1562047896088
scala> println(s"Total Execution Time : $(endTime - startTime) / 1000 Seconds")
Total Execution Time : **8 Seconds**
4. Get sum of records for each country [ **With parallel process for each partition**]
scala> val startTime = System.currentTimeMillis()
startTime: Long = 1562048057431
scala> countries.par.foreach(country => ds.filter(ds("country") === country(0)).groupBy("country").count.show(false))
+-------+-----+
|country|count|
+-------+-----+
|INDIA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|CANADA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|RUSSIA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|USA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|UK |10002|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|CHINA |10001|
+-------+-----+
+-------+-----+
|country|count|
+-------+-----+
|EUROPE |10001|
+-------+-----+
scala> val endTime = System.currentTimeMillis()
endTime: Long = 1562048060273
scala> println(s"Total Execution Time : $(endTime - startTime) / 1000 Seconds")
Total Execution Time : **2 Seconds**
结果:-
With parallel process on each partition, it took ~ **2 Seconds**
Without parallel process on each partition, it took ~ **8 Seconds**
我测试了每个国家/地区的记录数量,您可以执行任何流程,例如写入 hive 表或 hdfs 文件等。
希望对您有所帮助。
【讨论】:
以上是关于有没有办法在分区的 spark 数据集上并行运行操作?的主要内容,如果未能解决你的问题,请参考以下文章