用阻塞队列实现生产者消费者模式二(多线程消费)
Posted Neo Yang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用阻塞队列实现生产者消费者模式二(多线程消费)相关的知识,希望对你有一定的参考价值。
在性能优化时使用生产者消费者模式,很多时候是为了提升数据的消费处理能力。采用多个消费者线程并发处理数据,能起到性能提升的效果。
下文在《用阻塞队列实现生产者消费者模式一(单线程消费)》的基础上描述。
一、在接口层增加测试接口
package com.elon.rest;
import com.elon.service.ProConsService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/v1/produce-consumer")
@Api(tags = "生产者消费者服务")
public class ProduceConsumerController
/**
* 启动多线程任务.
*
* @param taskAmount 任务数量
* @param consumerThreadAmount 线程数量
*/
@PostMapping("/start-muti-task/taskAmount/consumerThreadAmount")
@ApiOperation(value="启动多线程任务")
public void startMutiTask(@PathVariable("taskAmount") int taskAmount,
@PathVariable("consumerThreadAmount") int consumerThreadAmount)
ProConsService proConsService = new ProConsService();
proConsService.startMutiTask(taskAmount, consumerThreadAmount);
二、增加多线程消费处理类
package com.elon.service;
import com.elon.constant.EnumTaskEndType;
import com.elon.model.MutiLinkedBlockingDeque;
import com.elon.model.Task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* 多线程消费者类.
*
* @author elon
* @version 1.0
*/
public class MutiConsumer implements Runnable
private static final Logger LOGGER = LogManager.getLogger(MutiConsumer.class);
private MutiLinkedBlockingDeque blockingDeque = null;
public MutiConsumer(MutiLinkedBlockingDeque blockingDeque)
this.blockingDeque = blockingDeque;
@Override
public void run()
while (true)
try
Task task = blockingDeque.take();
if (task.getTaskEndType() != EnumTaskEndType.NA)
LOGGER.info("收到毒丸任务.", task);
blockingDeque.offerFirst(task, 2, TimeUnit.MILLISECONDS);
final int consumerAmount = blockingDeque.decrementConsumer();
if (consumerAmount <= 0)
LOGGER.info("所有消费者停止任务处理.");
return;
// 消费任务(等待10秒,模拟实际业务处理时间)
LOGGER.info("消费任务.", task);
Thread.sleep(10*1000);
catch (InterruptedException e)
e.printStackTrace();
这个类与Consumer相比的差异是增加了判断是否所有消费线程都停止了的处理。在实际开发中,这个地方一般需要做一些修改数据库中任务的状态,设置任务结束时间等收尾的工作。
三、定义多线程消费者阻塞队列
package com.elon.model;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多线程消费者阻塞队列.
*
* @author elon
* @version 1.0
*/
public class MutiLinkedBlockingDeque extends LinkedBlockingDeque<Task>
/**0
* 消费者数量
*/
private AtomicInteger consumerAmount = new AtomicInteger();
public MutiLinkedBlockingDeque(int amount)
consumerAmount.set(amount);
public int decrementConsumer()
return consumerAmount.decrementAndGet();
MutiLinkedBlockingDeque从基类LinkedBlockingDeque派生,增加了一个消费者线程数量计数器。在线程退出时刷新该计数器,用于判断是否所有的消费者线程都已退出.
四、在服务层初始化队列和多个消费者线程
package com.elon.service;
import com.elon.model.MutiLinkedBlockingDeque;
import com.elon.model.Task;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 生产者消费者服务。启动生产者消费者线程.
*
* @author elon
* @version 1.0
*/
public class ProConsService
/**
* 开始多线程任务.
*
* @param taskAmount 任务数量
* @param consumerThreadAmount 消费者线程数量
* @author elon
*/
public void startMutiTask(int taskAmount, int consumerThreadAmount)
MutiLinkedBlockingDeque blockingDeque = new MutiLinkedBlockingDeque(consumerThreadAmount);
Producer producer = new Producer(blockingDeque, taskAmount);
new Thread(producer).start();
for (int i = 1; i <= consumerThreadAmount; ++i)
MutiConsumer consumer = new MutiConsumer(blockingDeque);
new Thread(consumer).start();
五、调用接口测试
测试时指定参数:生产10个任务,启动3个消费者线程同时消费。任务处理完后所有消费者线程收到毒丸任务退出,整个任务结束。
源码地址:https://github.com/ylforever/elon-producerconsumer
以上是关于用阻塞队列实现生产者消费者模式二(多线程消费)的主要内容,如果未能解决你的问题,请参考以下文章