有spark大佬知道下面这个代码哪里可以优化吗?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了有spark大佬知道下面这个代码哪里可以优化吗?相关的知识,希望对你有一定的参考价值。
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('appName').setMaster('Local[4]')
sc = SparkContext.getOrCreate(conf)
rdd = sc.textFile('books.txt',4)
first= rdd.first()
rdd1 = rdd.filter(lambda x: x!=first)
rdd2 = rdd1.map(lambda line: line.split('\t'))
rdd3 = rdd2.map(lambda x:(x[1],1))
rdd4 = rdd3.reduceByKey(lambda x,y: x+y)
rdd5 = rdd4.filter(lambda x: x[1] != 1)
res = rdd5.collect()
print(res)
在每个 RDD 上的操作都被映射到了 RDD 上的所有分区中的每个元素,所以如果能将几个操作合并到一起,就可以减少分区数量。例如,可以将 rdd2 和 rdd3 合并成一个操作。
在 rdd1 上的 filter 操作中,如果第一行是文件中的第一行,那么这个 filter 操作将会在所有分区中进行,因此可以将其改为在读取文件时直接跳过第一行。
在 rdd4 上的 reduceByKey 操作之后,可以考虑使用 sortByKey 操作来将结果按照键排序,而不是在最后使用 collect 操作来将所有数据加载到内存中并排序。
最后优化后的代码如下:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('appName').setMaster('Local[4]')
sc = SparkContext.getOrCreate(conf)
rdd = sc.textFile('books.txt', 4)
rdd1 = rdd.map(lambda line: line.split('\\t')) \\
.filter(lambda x: x[0] != 'title') \\
.map(lambda x: (x[1], 1)) \\
.reduceByKey(lambda x, y: x + y) \\
.sortByKey()
res = rdd1.collect()
print(res)
参考技术A 之所以没有被作为默认的序列化类库的原因,主要是因为Kryo要求,如果要达到它的最佳性能的话,那么就一定要注册你自定义的类 参考技术B 本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。 参考技术C 第二个图上看到下面所有的Locality Level 都为: NODE_LOCAL第一个图上的调度很多的task也是NODE_LOCAL
Spark Streaming 在哪里运行?
【中文标题】Spark Streaming 在哪里运行?【英文标题】:Where does Spark Streaming run? 【发布时间】:2017-01-24 17:18:05 【问题描述】:据我了解,Spark 可以使用 Spark Streaming 分析流。 Kafka 可以接收来自多个来源的数据。 我不明白的是,如果我有一个 Kafka 集群从多个源接收数据,数据会发送到运行 Spark Streaming 的数据库吗?还是 Spark Streaming 在应用服务器上运行?
【问题讨论】:
您可以尝试将Spark集群部署在Kafka(Zookeeper)同一个集群中。我们在生产中的环境很少,一个是 Kafka + zookeeper(在同一个集群中)和 spark 独立的独立集群的主要优点是维护(版本升级,......)但我们也有在生产中运行的 Kafka zookeeper 和 Apache Storm 在同一个集群上(物理服务器100T)。您可以尝试将 Spark(作为独立的一个 jar)添加到您的 Kafka 集群(具有所有必要的配置端口,...)。 另外忘记添加,如果你是运行在 Kubernetes 或 Mesos 上,那么无论你使用哪个服务器,资源管理都会由它们来完成。 【参考方案1】:如果您使用 Spark Streaming,则需要设置一个 Spark 集群,然后将 Spark Streaming 作业提交到集群。因此,您将需要 2 个集群:Kafka + Spark(或者实际上是 3 个,因为您还需要一个用于 Kafka 的 Zookeeper 集群)。
【讨论】:
所以如果我有一个 spark 集群,那么 spark 作业是在应用服务器上运行的吗?并以 mapreduce 样式分发到 spark 集群?以上是关于有spark大佬知道下面这个代码哪里可以优化吗?的主要内容,如果未能解决你的问题,请参考以下文章