Day625.线程池常见错误 -Java业务开发常见错误

Posted 阿昌喜欢吃黄桃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day625.线程池常见错误 -Java业务开发常见错误相关的知识,希望对你有一定的参考价值。

线程池常见错误

Hi,今天阿昌又来了!

在程序中,我们会用各种池化技术来缓存创建昂贵的对象,比如线程池连接池内存池

一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定的策略调整池中缓存对象的数量,实现池的动态伸缩。

由于线程的创建比较昂贵,随意、没有控制地创建大量线程会造成性能问题,因此短平快的任务一般考虑使用线程池来处理,而不是直接创建线程。以下就是今天学习到的内容的笔记,感谢阅读


一、正确的创建线程池

Java 中的 Executors 类定义了一些快捷的工具方法,来帮助我们快速创建线程池。

《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor 来创建线程池。

这一条规则的背后,是大量血淋淋的生产事故,最典型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

首先,我们来看一下 newFixedThreadPool 为什么可能会出现 OOM 的问题。我们写一段测试代码,来初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提交任务,每个任务都会创建一个比较大的字符串然后休眠一小时:

@GetMapping("oom1")
public void oom1() throws InterruptedException 
    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    //打印线程池的信息,稍后我会解释这段代码
    printStats(threadPool); 
    for (int i = 0; i < 100000000; i++) 
        threadPool.execute(() -> 
            String payload = IntStream.rangeClosed(1, 1000000)
                    .mapToObj(__ -> "a")
                    .collect(Collectors.joining("")) + UUID.randomUUID().toString();
            try 
                TimeUnit.HOURS.sleep(1);
             catch (InterruptedException e) 
            
            log.info(payload);
        );
    

    threadPool.shutdown();
    threadPool.awaitTermination(1, TimeUnit.HOURS);

执行程序后不久,日志中就出现了如下 OOM:

Exception in thread "http-nio-45678-ClientPoller" java.lang.OutOfMemoryError: GC overhead limit exceeded

翻看 newFixedThreadPool 方法的源码不难发现,线程池的工作队列直接 new 了一个 LinkedBlockingQueue,而默认构造方法的 LinkedBlockingQueue 是一个 Integer.MAX_VALUE 长度的队列,可以认为是无界的:

public static ExecutorService newFixedThreadPool(int nThreads) 
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());


public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable 
    ...


    /**
     * Creates a @code LinkedBlockingQueue with a capacity of
     * @link Integer#MAX_VALUE.
     */
    public LinkedBlockingQueue() 
        this(Integer.MAX_VALUE);
    
...

虽然使用 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界的。

如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。我们再把刚才的例子稍微改一下,改为使用 newCachedThreadPool 方法来获得线程池。

程序运行不久后,同样看到了如下 OOM 异常:

[11:30:30.487] [http-nio-45678-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause
java.lang.OutOfMemoryError: unable to create new native thread 

从日志中可以看到,这次 OOM 的原因是无法创建线程,翻看 newCachedThreadPool 的源码可以看到,这种线程池的最大线程数是 Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。

这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。

由于我们的任务需要 1 小时才能执行完成,大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM:

public static ExecutorService newCachedThreadPool() 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

其实,大部分 Java 开发同学知道这两种线程池的特性,只是抱有侥幸心理,觉得只是使用线程池做一些轻量级的任务,不可能造成队列积压或开启大量线程。

但,现实往往是残酷的。一个案例:
用户注册后,我们调用一个外部服务去发送短信,发送短信接口正常时可以在 100 毫秒内响应,TPS 100 的注册量,CachedThreadPool 能稳定在占用 10 个左右线程的情况下满足需求。在某个时间点,外部短信服务不可用了,我们调用这个服务的超时又特别长,比如 1 分钟,1 分钟可能就进来了 6000 用户,产生 6000 个发送短信的任务,需要 6000 个线程,没多久就因为无法创建线程导致了 OOM,整个应用程序崩溃。因此,我同样不建议使用 Executors 提供的两种快捷的线程池,原因如下:

  • 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
  • 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。

除了建议手动声明线程池以外,我还建议用一些监控手段来观察线程池的状态

线程池这个组件往往会表现得任劳任怨、默默无闻,除非是出现了拒绝策略,否则压力再大都不会抛出一个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。


二、线程池是否需要复用

某项目生产环境时不时有报警提示线程数过多,超过 2000 个,收到报警后查看监控发现,瞬时线程数比较多但过一会儿又会降下来,线程数抖动很厉害,而应用的访问量变化不大。

为了定位问题,我们在线程数比较高的时候进行线程栈抓取,抓取后发现内存中有 1000 多个自定义线程池。一般而言,线程池肯定是复用的,有 5 个以内的线程池都可以认为正常,而 1000 多个线程池肯定不正常。

在项目代码里,我们没有搜到声明线程池的地方,搜索 execute 关键字后定位到,原来是业务代码调用了一个类库来获得线程池,类似如下的业务代码:调用 ThreadPoolHelper 的 getThreadPool 方法来获得线程池,然后提交数个任务到线程池处理,看不出什么异常。

@GetMapping("wrong")
public String wrong() throws InterruptedException 
    ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();
    IntStream.rangeClosed(1, 10).forEach(i -> 
        threadPool.execute(() -> 
            ...
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
            
        );
    );
    return "OK";

但是,来到 ThreadPoolHelper 的实现让人大跌眼镜,getThreadPool 方法居然是每次都使用 Executors.newCachedThreadPool 来创建一个线程池。

class ThreadPoolHelper 
    public static ThreadPoolExecutor getThreadPool() 
        //线程池没有复用
        return (ThreadPoolExecutor) Executors.newCachedThreadPool();
    

newCachedThreadPool 会在需要时创建必要多的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程。

如果业务操作并发量较大的话,的确有可能一下子开启几千个线程。

那,为什么我们能在监控中看到线程数量会下降,而不会撑爆内存呢?回到 newCachedThreadPool 的定义就会发现,它的核心线程数是 0,而 keepAliveTime 是 60 秒,也就是在 60 秒之后所有的线程都是可以回收的。

好吧,就因为这个特性,我们的业务程序死得没太难看。

要修复这个 Bug 也很简单,使用一个静态字段来存放线程池的引用,返回线程池的代码直接返回这个静态字段即可。

这里一定要记得我们的最佳实践,手动创建线程池。修复后的 ThreadPoolHelper 类如下:

class ThreadPoolHelper 
  private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    10, 50,
    2, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());
  public static ThreadPoolExecutor getRightThreadPool() 
    return threadPoolExecutor;
  


三、是否正确的选择线程池的拒绝策略

线程池的意义在于复用,那这是不是意味着程序应该始终使用一个线程池呢?

当然不是。要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:

  • 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队列。
  • 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。

业务代码使用了线程池异步处理一些内存中的数据,但通过监控发现处理得非常慢,整个处理过程都是内存中的计算不涉及 IO 操作,也需要数秒的处理时间,应用程序 CPU 占用也不是特别高,有点不可思议。经排查发现,业务代码使用的线程池,还被一个后台的文件批处理任务用到了。

或许是够用就好的原则,这个线程池只有 2 个核心线程,最大线程也是 2,使用了容量为 100 的 ArrayBlockingQueue 作为工作队列,使用了 CallerRunsPolicy 拒绝策略:

private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        2, 2,
        1, TimeUnit.HOURS,
        new ArrayBlockingQueue<>(100),
        new ThreadFactoryBuilder().setNameFormat("batchfileprocess-threadpool-%d").get(),
        new ThreadPoolExecutor.CallerRunsPolicy());

这里,我们模拟一下文件批处理的代码,在程序启动后通过一个线程开启死循环逻辑,不断向线程池提交任务,任务的逻辑是向一个文件中写入大量的数据:

@PostConstruct
public void init() 
    printStats(threadPool);

    new Thread(() -> 
        //模拟需要写入的大量数据
        String payload = IntStream.rangeClosed(1, 1_000_000)
                .mapToObj(__ -> "a")
                .collect(Collectors.joining(""));
        while (true) 
            threadPool.execute(() -> 
                try 
                    //每次都是创建并写入相同的数据到相同的文件
                    Files.write(Paths.get("demo.txt"), Collections.singletonList(LocalTime.now().toString() + ":" + payload), UTF_8, CREATE, TRUNCATE_EXISTING);
                 catch (IOException e) 
                    e.printStackTrace();
                
                log.info("batch file processing done");
            );
        
    ).start();

可以想象到,这个线程池中的 2 个线程任务是相当重的。

通过 printStats 方法打印出的日志,我们观察下线程池的负担:

可以看到,线程池的 2 个线程始终处于活跃状态,队列也基本处于打满状态

因为开启了 CallerRunsPolicy 拒绝处理策略,所以当线程满载队列也满的情况下,任务会在提交任务的线程,或者说调用 execute 方法的线程执行,也就是说不能认为提交到线程池的任务就一定是异步处理的。如果使用了 CallerRunsPolicy 策略,那么有可能异步任务变为同步执行。

从日志的第四行也可以看到这点。这也是这个拒绝策略比较特别的原因。不知道写代码的同学为什么设置这个策略,或许是测试时发现线程池因为任务处理不过来出现了异常,而又不希望线程池丢弃任务,所以最终选择了这样的拒绝策略。

不管怎样,这些日志足以说明线程池是饱和状态。可以想象到,业务代码复用这样的线程池来做内存计算,命运一定是悲惨的。我们写一段代码测试下,向线程池提交一个简单的任务,这个任务只是休眠 10 毫秒没有其他逻辑:

private Callable<Integer> calcTask() 
    return () -> 
        TimeUnit.MILLISECONDS.sleep(10);
        return 1;
    ;


@GetMapping("wrong")
public int wrong() throws ExecutionException, InterruptedException 
    return threadPool.submit(calcTask()).get();

我们使用 wrk 工具对这个接口进行一个简单的压测,可以看到 TPS 为 75,性能的确非常差。


细想一下,问题其实没有这么简单。

因为原来执行 IO 任务的线程池使用的是 CallerRunsPolicy 策略,所以直接使用这个线程池进行异步计算的话,当线程池饱和的时候,计算任务会在执行 Web 请求的 Tomcat 线程执行,这时就会进一步影响到其他同步处理的线程,甚至造成整个应用程序崩溃。

解决方案很简单,使用独立的线程池来做这样的“计算任务”即可。

计算任务打了双引号,是因为我们的模拟代码执行的是休眠操作,并不属于 CPU 绑定的操作,更类似 IO 绑定的操作,如果线程池线程数设置太小会限制吞吐能力:

private static ThreadPoolExecutor asyncCalcThreadPool = new ThreadPoolExecutor(
  200, 200,
  1, TimeUnit.HOURS,
  new ArrayBlockingQueue<>(1000),
  new ThreadFactoryBuilder().setNameFormat("asynccalc-threadpool-%d").get());


@GetMapping("right")
public int right() throws ExecutionException, InterruptedException 
  return asyncCalcThreadPool.submit(calcTask()).get();

使用单独的线程池改造代码后再来测试一下性能,TPS 提高到了 1727

可以看到,盲目复用线程池混用线程的问题在于,别人定义的线程池属性不一定适合你的任务,而且混用会相互干扰


四、总结

线程池管理着线程,线程又属于宝贵的资源,有许多应用程序的性能问题都来自线程池的配置和使用不当。

  • 第一,Executors 类提供的一些快捷声明线程池的方法虽然简单,但隐藏了线程池的参数细节。因此,使用线程池时,我们一定要根据场景和需求配置合理的线程数、任务队列、拒绝策略、线程回收策略,并对线程进行明确的命名方便排查问题。
  • 第二,既然使用了线程池就需要确保线程池是在复用的,每次 new 一个线程池出来可能比不用线程池还糟糕。如果你没有直接声明线程池而是使用其他同学提供的类库来获得一个线程池,请务必查看源码,以确认线程池的实例化方式和配置是符合预期的。
  • 第三,复用线程池不代表应用程序始终使用同一个线程池,我们应该根据任务的性质来选用不同的线程池。特别注意 IO 绑定的任务和 CPU 绑定的任务对于线程池属性的偏好,如果希望减少任务间的相互干扰,考虑按需使用隔离的线程池
  • 线程池作为应用程序内部的核心组件往往缺乏监控(如果你使用类似 RabbitMQ 这样的 MQ 中间件,运维同学一般会帮我们做好中间件监控),往往到程序崩溃后才发现线程池的问题,很被动。

以上是关于Day625.线程池常见错误 -Java业务开发常见错误的主要内容,如果未能解决你的问题,请参考以下文章

day19-线程之间的通信&线程池&设计模式

day19-线程之间的通信&线程池&设计模式

Python入门学习-DAY37-进程池与线程池协程gevent模块

重修课程day34(网络编程八之线程二)

Day12 线程池RabbitMQ和SQLAlchemy

Day440.Sentinel -谷粒商城