用阻塞队列实现生产者消费者模式三(终止任务)
Posted Neo Yang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用阻塞队列实现生产者消费者模式三(终止任务)相关的知识,希望对你有一定的参考价值。
除了正常任务处理完后结束生产者、消费者线程;用户还 可以中途强制结束任务。针对用户终止任务的情况,如何结束程序处理? 还是使用毒丸任务。
一、定义一个任务管理器类保存任务被终止的标记
/**
* 任务管理类. 管理任务是否被终止的标识信息.
*
* @author elon
* @since 1.0
*/
public class TaskManager
/**
* 标识任务是否被终止
*/
private volatile boolean isTaskTerminated = false;
public static TaskManager instance()
return TaskManagerBuilder.instance;
public boolean isTaskTerminated()
return isTaskTerminated;
public void terminateTask()
isTaskTerminated = true;
public void clearTerminateFlag()
isTaskTerminated = false;
private static class TaskManagerBuilder
private static TaskManager instance = new TaskManager();
TaskManager 是一个单例,定义了一个布尔变量isTaskTerminated 用于标记任务是否被终止了。这里定义一个单例只是为了简单说明问题,实际项目开发中这个标记信息一般是保存在数据库或者redis中,方便微服务能正常获取。每个任务也会有一个唯一的taskId标识。
二、添加终止任务接口
import com.elon.datamanager.TaskManager;
import com.elon.service.ProConsService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/v1/produce-consumer")
@Api(tags = "生产者消费者服务")
public class ProduceConsumerController
/**
* 终止任务
*/
@PostMapping("/terminate-task")
@ApiOperation(value = "终止任务")
public void terminateTask()
TaskManager.instance().terminateTask();
三、生产者处理任务被终止的场景
public void run()
EnumTaskEndType taskEndType = EnumTaskEndType.COMPLETE;
for (int i = 1; i <= taskAmount; ++i)
try
// 处理任务被用户终止的场景
if (TaskManager.instance().isTaskTerminated())
taskEndType = EnumTaskEndType.TERMINATE;
break;
Task task = new Task();
task.setTaskName("任务:" + i);
blockingDeque.offer(task, 2, TimeUnit.MILLISECONDS);
LOGGER.info("生产任务:", task);
catch (InterruptedException e)
LOGGER.error("创建任务异常.", e);
// 放入毒丸任务已结束消费者线程
try
Task task = new Task();
task.setTaskName("任务处理结束毒丸");
task.setTaskEndType(taskEndType);
blockingDeque.offer(task, 2, TimeUnit.MILLISECONDS);
catch (InterruptedException e)
LOGGER.error("放入毒丸任务异常.", e);
生产者在检测到任务被终止后停止生产,并往阻塞队列插入一个毒丸,以快速终止消费者线程。
四、在消费者线程处理任务被终止的场景
@Override
public void run()
while (true)
try
Task task = blockingDeque.take();
if (task.getTaskEndType() != EnumTaskEndType.NA)
LOGGER.info("收到毒丸任务.", task);
blockingDeque.offerFirst(task, 2, TimeUnit.MILLISECONDS);
final int consumerAmount = blockingDeque.decrementConsumer();
if (consumerAmount <= 0)
LOGGER.info("所有消费者停止任务处理.");
return;
// 处理任务被用户终止的场景
if (TaskManager.instance().isTaskTerminated())
Task terminateTask = new Task();
terminateTask.setTaskName("任务终止毒丸");
terminateTask.setTaskEndType(EnumTaskEndType.TERMINATE);
blockingDeque.offerFirst(terminateTask);
continue;
// 消费任务(等待10秒,模拟实际业务处理时间)
LOGGER.info("消费任务.", task);
Thread.sleep(10*1000);
catch (InterruptedException e)
LOGGER.error("消费任务异常.", e);
消费者线程检测到任务被终止,创建一个毒丸任务放到队列。
五、调用接口终止执行中的任务
源码地址:https://github.com/ylforever/elon-producerconsumer
以上是关于用阻塞队列实现生产者消费者模式三(终止任务)的主要内容,如果未能解决你的问题,请参考以下文章