ThreadPoolExecutor带Queue缓冲队列的线程池 + JMeter模拟并发下单请求

Posted hahajava

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor带Queue缓冲队列的线程池 + JMeter模拟并发下单请求相关的知识,希望对你有一定的参考价值。

 

.原文:https://blog.csdn.net/u011677147/article/details/80271174

拓展:  

https://github.com/jwpttcg66/GameThreadPool/blob/85bb392151324e68addec355d85d9ce22b4ab1e2/src/test/java/com/snowcattle/game/thread/ThreadPoolTest.java
游戏中常用的线程池,顺序队列和非顺序队列

 

 

 

@RestController
public class TestController {

    @Autowired
    TestThreadPoolManager testThreadPoolManager;

    /**
     * 测试模拟下单请求 入口
     * @param id
     * @return
     */
    @GetMapping("/start/{idhaha}")
    public String start(@PathVariable Long idhaha) {
        //模拟的随机数
        String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();

        testThreadPoolManager.addOrders(orderNo);

        return "Test ThreadPoolExecutor start";
    }

    /**
     * 停止服务
     * @param id
     * @return
     */
    @GetMapping("/end/{id}")
    public String end(@PathVariable Long id) {

        testThreadPoolManager.shutdown();

        Queue q = testThreadPoolManager.getMsgQueue();
        System.out.println("关闭了线程服务,还有未处理的信息条数:" + q.size());
        return "Test ThreadPoolExecutor start";
    }
}

 

 

@Component
public class TestThreadPoolManager implements BeanFactoryAware {

    //用于从IOC里取对象
    private BeanFactory factory; //如果实现Runnable的类是通过spring的application.xml文件进行注入,可通过 factory.getBean()获取,这里只是提一下


    private final static int CORE_POOL_SIZE = 2;

    private final static int MAX_POOL_SIZE = 10;

    private final static int KEEP_ALIVE_TIME = 0;
    // 线程池所使用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 50;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }

    /**
     * 线程队列:用于储存在队列中的订单,防止重复提交,在真实场景中,可用redis代替 验证重复
     */
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();


    /**
     * 线程缓冲队列:订单的缓冲队列,当线程池满了,则将订单存入到此缓冲队列
     */
    Queue<Object> msgQueue = new LinkedBlockingQueue<Object>();


    /**
     * 在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,针对这些问题java线程池提供了以下几种策略:
     * AbortPolicy
     * DiscardPolicy
     * DiscardOldestPolicy
     * CallerRunsPolicy
     * 自定义
     *
     * 当前采用的就是自定义:线程池的容量满了,执行下面代码,将订单存入到缓冲队列
     */
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //订单加入到缓冲队列
            msgQueue.offer(((BusinessThread) r).getAcceptStr());
            System.out.println("系统任务太忙了(有界队列已经满了),把此订单交给(调度线程池)逐一处理,订单号:" + ((BusinessThread) r).getAcceptStr());
        }
    };


    /**创建线程池
     *
     * 概念解释及原理(无实例):
     * https://uule.iteye.com/blog/1123185
     * https://www.cnblogs.com/trust-freedom/p/6594270.html
     * https://www.cnblogs.com/zedosu/p/6665306.html
     *
     *含实例的:
     * https://blog.csdn.net/x631617479/article/details/83001198 :自定义连接池ThreadPoolExecutor执行顺序
     * https://blog.csdn.net/changyuan101/article/details/50755157 :ThreadPoolExecutor自定义RejectedExecutionHandler当队列满时改为调用BlockingQueue. put来实现生产者的阻塞
     *
     * 处理任务的优先级为:
     * corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用(调度线程池)handler处理被拒绝的任务。
     *
     *
     * */
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POOL_SIZE,                             // 线程池维护线程的最少数量
            MAX_POOL_SIZE,                              // 线程池维护线程的最大数量
            KEEP_ALIVE_TIME,                            // 线程池维护线程所允许的空闲时间
            TimeUnit.SECONDS,                           // 线程池维护线程所允许的空闲时间的单位
            new ArrayBlockingQueue(WORK_QUEUE_SIZE),    // 线程池所使用的缓冲队列
            this.handler                                // 线程池对拒绝任务的处理策略
    );


    /**将任务加入订单线程池*/
    public void addOrders(String orderId){
        System.out.println("此订单准备添加到线程池,订单号:" + orderId);
        //验证当前进入的订单是否已经存在
        if (cacheMap.get(orderId) == null) {
            cacheMap.put(orderId, new Object());
            BusinessThread businessThread = new BusinessThread(orderId);
            threadPool.execute(businessThread);
        }
    }

    /**
     * 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。
     */
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);


    /**
     * 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池
     */
    final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //判断缓冲队列是否存在记录
            if(!msgQueue.isEmpty()){
                //当线程池的队列容量少于 WORK_QUEUE_SIZE(缓冲队列max),则开始把缓冲队列的订单 加入到 线程池
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    String orderId = (String) msgQueue.poll();
                    BusinessThread businessThread = new BusinessThread(orderId);
                    threadPool.execute(businessThread);
                    System.out.println("(调度线程池)缓冲队列出现订单业务,重新添加到线程池,订单号:"+orderId);
                }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);


    /**获取消息缓冲队列*/
    public Queue<Object> getMsgQueue() {
        return msgQueue;
    }

    /**终止订单线程池+调度线程池*/
    public void shutdown() {
        //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
        System.out.println("终止订单线程池+调度线程池:"+scheduledFuture.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();

    }
}

 

@Component
@Scope("prototype")//spring 多例
public class BusinessThread implements Runnable{

    private String acceptStr;

    public BusinessThread() {
        super();
    }

    public BusinessThread(String acceptStr) {
        this.acceptStr = acceptStr;
    }

    public String getAcceptStr() {
        return acceptStr;
    }

    public void setAcceptStr(String acceptStr) {
        this.acceptStr = acceptStr;
    }

    @Override
    public void run() {
        //业务操作
        System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr);

        //线程阻塞
        /*try {
            Thread.sleep(1000);
            System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
    }
}

 

以上是关于ThreadPoolExecutor带Queue缓冲队列的线程池 + JMeter模拟并发下单请求的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolExecutor中的核心线程数、最大线程数区别详解

亲 , ThreadPoolExecutor : KeepAliveTime了解一下~

带标准IO带缓存区和非标准IO 遇到fork是的情况分析

一文带你读懂 C/C++ 语言输入输出流与缓存区

线程池系列三:ThreadPoolExecutor讲解

如何在 python 3 中将队列与并发未来的 ThreadPoolExecutor 一起使用?