用阻塞队列实现生产者消费者模式二(多线程消费)

Posted Neo Yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用阻塞队列实现生产者消费者模式二(多线程消费)相关的知识,希望对你有一定的参考价值。

在性能优化时使用生产者消费者模式,很多时候是为了提升数据的消费处理能力。采用多个消费者线程并发处理数据,能起到性能提升的效果。

下文在《用阻塞队列实现生产者消费者模式一(单线程消费)》的基础上描述。

一、在接口层增加测试接口

package com.elon.rest;

import com.elon.service.ProConsService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/v1/produce-consumer")
@Api(tags = "生产者消费者服务")
public class ProduceConsumerController 
    /**
     * 启动多线程任务.
     *
     * @param taskAmount 任务数量
     * @param consumerThreadAmount 线程数量
     */
    @PostMapping("/start-muti-task/taskAmount/consumerThreadAmount")
    @ApiOperation(value="启动多线程任务")
    public void startMutiTask(@PathVariable("taskAmount") int taskAmount,
                              @PathVariable("consumerThreadAmount") int consumerThreadAmount) 
        ProConsService proConsService = new ProConsService();
        proConsService.startMutiTask(taskAmount, consumerThreadAmount);
    

二、增加多线程消费处理类

package com.elon.service;

import com.elon.constant.EnumTaskEndType;
import com.elon.model.MutiLinkedBlockingDeque;
import com.elon.model.Task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.TimeUnit;

/**
 * 多线程消费者类.
 *
 * @author elon
 * @version 1.0
 */
public class MutiConsumer implements Runnable 
    private static final Logger LOGGER = LogManager.getLogger(MutiConsumer.class);

    private MutiLinkedBlockingDeque blockingDeque = null;

    public MutiConsumer(MutiLinkedBlockingDeque blockingDeque) 
        this.blockingDeque = blockingDeque;
    

    @Override
    public void run() 
        while (true) 
            try 
                Task task = blockingDeque.take();
                if (task.getTaskEndType() != EnumTaskEndType.NA) 
                    LOGGER.info("收到毒丸任务.", task);
                    blockingDeque.offerFirst(task, 2, TimeUnit.MILLISECONDS);

                    final int consumerAmount = blockingDeque.decrementConsumer();
                    if (consumerAmount <= 0) 
                        LOGGER.info("所有消费者停止任务处理.");
                    
                    return;
                

                // 消费任务(等待10秒,模拟实际业务处理时间)
                LOGGER.info("消费任务.", task);
                Thread.sleep(10*1000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    

这个类与Consumer相比的差异是增加了判断是否所有消费线程都停止了的处理。在实际开发中,这个地方一般需要做一些修改数据库中任务的状态,设置任务结束时间等收尾的工作。

三、定义多线程消费者阻塞队列

package com.elon.model;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多线程消费者阻塞队列.
 *
 * @author elon
 * @version 1.0
 */
public class MutiLinkedBlockingDeque extends LinkedBlockingDeque<Task>  
    /**0
     * 消费者数量
     */
    private AtomicInteger consumerAmount = new AtomicInteger();

    public MutiLinkedBlockingDeque(int amount) 
        consumerAmount.set(amount);
    

    public int decrementConsumer() 
        return consumerAmount.decrementAndGet();
    

MutiLinkedBlockingDeque从基类LinkedBlockingDeque派生,增加了一个消费者线程数量计数器。在线程退出时刷新该计数器,用于判断是否所有的消费者线程都已退出.

四、在服务层初始化队列和多个消费者线程

package com.elon.service;

import com.elon.model.MutiLinkedBlockingDeque;
import com.elon.model.Task;

import java.util.concurrent.LinkedBlockingDeque;

/**
 * 生产者消费者服务。启动生产者消费者线程.
 *
 * @author elon
 * @version 1.0
 */
public class ProConsService 
    /**
     * 开始多线程任务.
     *
     * @param taskAmount           任务数量
     * @param consumerThreadAmount 消费者线程数量
     * @author elon
     */
    public void startMutiTask(int taskAmount, int consumerThreadAmount) 
        MutiLinkedBlockingDeque blockingDeque = new MutiLinkedBlockingDeque(consumerThreadAmount);

        Producer producer = new Producer(blockingDeque, taskAmount);
        new Thread(producer).start();

        for (int i = 1; i <= consumerThreadAmount; ++i) 
            MutiConsumer consumer = new MutiConsumer(blockingDeque);
            new Thread(consumer).start();
        
    

五、调用接口测试


测试时指定参数:生产10个任务,启动3个消费者线程同时消费。任务处理完后所有消费者线程收到毒丸任务退出,整个任务结束。

源码地址:https://github.com/ylforever/elon-producerconsumer

以上是关于用阻塞队列实现生产者消费者模式二(多线程消费)的主要内容,如果未能解决你的问题,请参考以下文章

用阻塞队列实现生产者消费者模式一(单线程消费)

多线程-并发编程-生产者消费者模式及非阻塞队列与阻塞队列实现

Java多线程-----实现生产者消费者模式的几种方式

通过阻塞队列实现生产者和消费者异步解耦

多线程之消费者生产者模式加入阻塞队列

并发8借助redis 实现多线程生产消费阻塞队列