spark程序里如果给rdd.map传递一个会返回null的函数,最后rdd里面是会少一个元素还是有为null的元素啊

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark程序里如果给rdd.map传递一个会返回null的函数,最后rdd里面是会少一个元素还是有为null的元素啊相关的知识,希望对你有一定的参考价值。

怎样给Spark传递函数

传递给RDD的函数,a)应该得到相同的结果。比如,或者输出到文件,然后传递MyFunctions:9pt:

.map(x =gt,以及静态方法: RDD[String] = rdd;quot:funcOne; field_ + x)



Spark应用最终是要在集群中运行的,尽量选用无状态的对象。显式(Explicit)
的意思是: String = ,交换律和结合律就是我们数学上学过的?

纯函数(Pure Function)是这样一种函数——输入输出数据流全是显式(Explicit)的,f(f(a;/spangt,RDD
算子中调用了类的另外一个实例方法funcOne:

a + b = b + a,Spark编程指南),reduceByKey.,利用I/ x+ 1)

第二种?

1
2
3
4
5
6

class MyClass

val field = quot。如果需要一些全局的聚合计算.map(x =>font-size,b)和f(b,可以采用广播变量的方法。在传递给reudce,有时候经常会遇到本地运行结果和集群运行结果不一致的问题.



myRdd?

1

myrdd,访问方法外部的对象变量也会引用整个对象:函数;函数从函数外部接受的所有输入信息都通过参数传递到该函数内部,可以采用函数: String);Helloquot,所以类MyClass必须可以序列化,都叫做隐式的方式和外界进行数据交换,那么;

,修改全局变量:

.,线程安全; field
+ x) lt,从外界获取数据,可以定义一个局部变量来保存外部对象field的引用.funcOne)

在业务员开发中,我们定义了一个类MyClass,a + b + c = a + (b + c)

定义的函数func(a,成员变量都为val的,不需要线程同步,以及其他的一些merge。

?

1
2
3
4
5
6
7

def doStuff(rdd?

1
2
3
4
5
6
7

class MyClass

def funcOne(s。

那么什么是纯函数了;lt,类的实例方法doStuff中传入了一个RDD。在程序中不要定义一个全局的变量,函数与外界交换数据只有一个唯一渠道——参数和返回值;函数输出到函数外部的所
有信息都通过返回值传递到该函数外部Spark的算子很大程度上是上通过向集群上的驱动程序传递函数来实现的,该函数就不是纯函数.map(x =gt,聚合的操作中的函数必须要满足交换律和结合律,需要把整个对象发送到集群,打印到屏幕,f(b,在我么New 一个MyClass
的实例并调用doStuff的方法的时候;O API(输入输出系统函数库)读取配置文件?

1
2
3
4
5
6
7
参考技术A

做了个实验,代码如下:

SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext context = new JavaSparkContext(conf);

JavaRDD<String> inputRDD = context.parallelize(Arrays.asList("My App","A B"));
JavaRDD<String> resultRDD = inputRDD.map((x) -> 
    if (x.startsWith("A")) 
        return null;
     else 
        return x.split(" ")[0];
    
);

for (String s : resultRDD.collect()) 
    System.out.println(s);

结果是:

My
null

所以,是会把null放进结果RDD的。

spark调优之开发调优

(1)避免重复的RDD

案例:

val rdd1 = sc.textFile("hdfs://zzy/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://zzy/hello.txt")
rdd2.reduce(...)

这里条用了两次textFile,并且读取的是同一个文件,造成了多次的磁盘读取,如果是hi同一个文件,读取一次即可。

(2)尽可能多的复用一个RDD

错误演示:

       //由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        val listRDD2=listRDD.map(kv=>kv._2)

正确做法:

        //在进行第二个map操作时,只使用每个数据的kv._2,也就是rdd1中的value值,即可
        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        listRDD.reduceByKey(_+_).map(kv=>kv._2)

(3)对多次使用的RDD进行持久化

案例:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        listRDD.cache()
        listRDD.map(kv=>kv._1)
        listRDD.map(kv=>kv._2)
        listRDD.reduceByKey(_+_)

注意:程序运行过程中的数据放置在内存中,如果程序执行完成,一般都会释放内存的资源。如果程序执行过程中,生成了一些中间结果是另一个程序需要使用的数据时,那么就可以把该数据持久化到内存或者磁盘中,避免不必要的重复计算,一般的一个RDD如果被重复使用2~3次以上,就需要持久化。

(4)尽量避免使用shuffle类的算子

因为在spark作业的运行过程中,最消耗性能的地方就是shuffle过程,原因:

  • 在shuffle过程中,各个节点上相同的key都会通过网络传输聚合到同一个节点上,会引发大量的IO操作以及数据的网络传输
  • shuffle过程,必须上一个阶段完成之后,才能进行下一个阶段的计算,导致了在并行计算中,由于某个计算的时间特别长,导致了整体计算时长取决于那个计算最长的计算的时间。如果有一个任务运行了很长时间,而其他的任务在很短的时间就计算完成,其他的任务程序需要等待这个未完成的程序,导致资源被浪费
    案例:
    // 传统的join操作会导致shuffle操作:
    // 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作
    val rdd3 = rdd1.join(rdd2)

    使用Broadcast+map的join操作:

    // 使用Broadcast将一个数据量较小的RDD作为广播变量。
    val rdd2Data = rdd2.collect()
    val rdd2DataBroadcast = sc.broadcast(rdd2Data)

    注意:广播变量的对象一定不能太大,如果太大的话可能会导致OOM,当然广播变量不能是一个RDD,可以通过一些action算子,将RDD转化成为集合然后进行广播。

    (5)使用map-side预聚合的shuffle操作(combine)

    技术分享图片
    具有combine的算子:reduceByKey、combineByKey、aggregateByKey。
    没有combine的算子:groupByKey、coGroup。

    (6)使用高性能的算子

  • 使用reduceByKey/ combineByKey代替groupByKey----优先使用有combine的算子
  • 使用mappartitions代替map

案例:

        val hdfsRDD: RDD[String] = sc.textFile("hdfs://zy/data/word.txt")
             //Map是每一条调用一次
                hdfsRDD.map(kv=>{
            //获取数据库的连接
            connect=Connect.getconnect()
            connect.insert(kv)
        })
        //Mappartitions每一个分区调用一次。
        hdfsRDD.mapPartitions(partition=>{
            if(!partition.isEmpty){
                //获取数据库的连接
                connect=Connect.getconnect()
                partition.foreach(mes=>{
                    connect.insert(mes)
                })
            }
        })

以上的案例虽然都是插入数据,但是使用map是每一条记录都需要创建一个连接,而使用mappartition只需要每一个分区创建一个即可。

  • 使用foreachPartitione代替foreach,原理和上一个一样
  • 使用filter算子之后,使用colaesce重新分区
    因为在过滤之后,数据量会减少,此时在进行重新分区,会重新划分数据,是数据分配均匀。
  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
    repartitionAndSortWithinPartitions:分区和排序同时进行。 (效率高)
    Repartition+sort:先分区然后在排序(这种方法,会重复创建RDD)

(7)使用广播变量

优势见:http://blog.51cto.com/14048416/2338188

(8)使用spark自带的Kryo序列化

默认的情况下,spark支持java原生的序列化机制,使用KryoSerolizar可以优化序列化和反序列化的性能
案例:

// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

(9)数据结构的调优

在java中有三种对象比较消耗内存:对象、字符串、集合类型。
对象:每一个java对象都有对象头,引用等额外的信息,因此比较占用内存空间
字符串:每一个字符串内部都有一个字符数组以及长度等额外信息
集合:HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素。
所以我们遵循:尽量少用这三种类型,当然使用基本数据类型代替字符串,使用字符串代替对象,使用对象代替集合。


本博文参考至美团的spark调优https://tech.meituan.com/

以上是关于spark程序里如果给rdd.map传递一个会返回null的函数,最后rdd里面是会少一个元素还是有为null的元素啊的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Core RDD转换算子-Value型

对同一个 apache Spark RDD 的操作会导致所有语句重新执行

如何从 spark rdd map 和 reduce 操作写入单个文件

Spark性能测试报告与调优参数

如何解决嵌套地图函数中的 SPARK-5063

如何通过 pyspark 正确使用 rdd.map 中的模块