记一次调优过程—Spark读取OBS文件入ES

Posted 追求无悔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了记一次调优过程—Spark读取OBS文件入ES相关的知识,希望对你有一定的参考价值。

需求描述:

读取OBS中的文件(文件格式为snappy压缩格式),解析数据,并将数据插入ES的过程。

数据规模:

单个目录下的文件个数最大上限为500个,每个文件压缩后为115MB,压缩比为5.5,每条数据大小为600B-1KB。
目录格式:/data/xxx/userId/yyyyMMdd
存入ES对应索引:xxx-userId-yyyyMMdd

实现过程

1. 初始化

3. 存入ES

具体插入操作:

public void batchPutJson(String index, List<String> jsonList) throws EsOperateException 
    try 
        RestHighLevelClient highClient = getHighLevelClient();
        BulkRequest bulkRequest = new BulkRequest();
        if (jsonList == null) 
            return;
        
        int num = 0;
        for (String data: jsonList) 
            num++;
            if (num % BATCH_SIZE <= 0) 
               highClient.bulk(bulkRequest, RequestOptions.DEFAULT);
               bulkRequest = new BulkRequest();
             else 
               bulkRequest.add(new IndexRequest(index.toLowerCase(Locale.ROOT)).source(data, XContentType.JSON));
            
        
        // 如果刚好bulkRequest没有请求,则不需要执行插入操作
        if (bulkRequest.numberOfActions() > 0) 
           // 避免异常:ActionRequestValidationException: Validation Failed: 1: no requests added
           highClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        
    catch (IOException | EsConnectException ex) 
        log.error("batch put json failed: ", ex);
        throw new EsOperateException("batch put exception: " + ex.getMessage());
   

测试过程

1. ES模板事先已创建好


副本数为1,分片数为5,刷新间隔为10s

2. 索引为插入数据时自动创建

3. OBS文件

文件个数6个,单个大小115MB,数据量大约为666万条

4. 测试结果

耗时为整个任务的耗时,任务包含其他初始化过程和任务停止过程

  1. 不设置Repartition,1个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入1w条,总耗时14min;
  2. 不设置Repartition,1个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入4w条,总耗时17min;
  3. 不设置Repartition,2个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入1w条,总耗时10min;
  4. 不设置Repartition,5个Executor,ExecutorCPU为1U,Executor内存为1G,ES单次插入2w条,总耗时6.5min;

    优化过程

    看着这个耗时时间,不禁想到,如果按照最大规格,那不是得等死。。。
    开始了优化之路

    1. 任务耗时分析


    发现任务的最大耗时之处在于这一句:

    dataset.toJSON().toJavaRDD()
    .mapPartitions(new ColdToHotFunction(this.owner, date))
    .foreach((VoidFunction<String>) str -> log.info("data to es result is: ", str));

    都知道,文件读取结果dataset,真正有结果是在这一句,所以这一句包含两个步骤,1是读取文件,2是存入ES,那到底是哪一步耗时呢?

    通过进入任务详情查看,还是不太确定耗时大的真正原因。

    2. 尝试

    认为是读文件和数据转化过程慢:
    将Repartition设置为6,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,最后总耗时还是需要6min左右,并没有实质性提高。
    Executor数量 * Executor CPU >= Repartition时才能保证单次任务可以充分执行。

既然不是文件读取,那就是插入ES慢?(枉我一开始这么信任ES)
增加插入耗时打印:

emmm... 原来就是你啊,每次插入2w条,这么慢的嘛!!!

3. 优化

ES服务端已经没法改变了,那只能从业务侧进行优化了
我已经采用了批量插入,还有什么优化策略呢???
最终通过以下两点:

  1. 一开始刷新间隔时间为10s,改为刷新间隔60s,因为没有必要刷新那么频繁;
  2. 原来是在插入数据的时候自动创建的索引,改为插入数据前,先手动创建索引,且设置副本数为0,在整个过程结束之后,再将副本数更新为1。因为副本数会导致每次插入数据会插多遍,很影响速率。

    优化结果检查

    经过上面的优化后,再测试一下性能:

  3. Repartition设为10,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,总耗时4min;
  4. Repartition设为6,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,总耗时3.6min;


    此种方式,在数据量越大的时候,效果应该就越明显了。
    出于公司资源有限的情况考虑,Executor数量 * Executor CPU >= Repartition时能最大发挥多线程优势,所以采用了第二种方式。

心得

此次调优过程看了很多优化方面的博客,因为任务所做的事情简单,没有涉及到太多的可优化点,但是感觉还有很大的优化空间。
作为一个小菜鸡,能通过自己慢慢摸索,测试完成效率提升4倍,也是很开心的事情!!!
如果大佬们有好的建议,希望留言,我好继续进行优化,感谢!!!

以上是关于记一次调优过程—Spark读取OBS文件入ES的主要内容,如果未能解决你的问题,请参考以下文章

17 记一次 spark 读取大数据表 OOM OutOfMemoryError: GC overhead limit exceeded

17 记一次 spark 读取大数据表 OOM OutOfMemoryError: GC overhead limit exceeded

记一次canal delay 调优过程

记一次 Laravel 应用性能调优经历

spark调优之开发调优

记一次Grpc接口压力测试&性能调优