用阻塞队列实现生产者消费者模式三(终止任务)

Posted Neo Yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用阻塞队列实现生产者消费者模式三(终止任务)相关的知识,希望对你有一定的参考价值。

除了正常任务处理完后结束生产者、消费者线程;用户还 可以中途强制结束任务。针对用户终止任务的情况,如何结束程序处理? 还是使用毒丸任务。

一、定义一个任务管理器类保存任务被终止的标记

/**
 * 任务管理类. 管理任务是否被终止的标识信息.
 *
 * @author elon
 * @since 1.0
 */
public class TaskManager 
    /**
     * 标识任务是否被终止
     */
    private volatile boolean isTaskTerminated = false;

    public static TaskManager instance() 
        return TaskManagerBuilder.instance;
    

    public boolean isTaskTerminated() 
        return isTaskTerminated;
    

    public void terminateTask() 
        isTaskTerminated = true;
    

    public void clearTerminateFlag() 
        isTaskTerminated = false;
    

    private static class TaskManagerBuilder 
        private static TaskManager instance = new TaskManager();
    

TaskManager 是一个单例,定义了一个布尔变量isTaskTerminated 用于标记任务是否被终止了。这里定义一个单例只是为了简单说明问题,实际项目开发中这个标记信息一般是保存在数据库或者redis中,方便微服务能正常获取。每个任务也会有一个唯一的taskId标识。

二、添加终止任务接口

import com.elon.datamanager.TaskManager;
import com.elon.service.ProConsService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/v1/produce-consumer")
@Api(tags = "生产者消费者服务")
public class ProduceConsumerController 
    /**
     * 终止任务
     */
    @PostMapping("/terminate-task")
    @ApiOperation(value = "终止任务")
    public void terminateTask() 
        TaskManager.instance().terminateTask();
    

三、生产者处理任务被终止的场景

public void run() 
    EnumTaskEndType taskEndType = EnumTaskEndType.COMPLETE;
    for (int i = 1; i <= taskAmount; ++i) 
        try 
            // 处理任务被用户终止的场景
            if (TaskManager.instance().isTaskTerminated()) 
                taskEndType = EnumTaskEndType.TERMINATE;
                break;
            

            Task task = new Task();
            task.setTaskName("任务:" + i);
            blockingDeque.offer(task, 2, TimeUnit.MILLISECONDS);
            LOGGER.info("生产任务:", task);
         catch (InterruptedException e) 
            LOGGER.error("创建任务异常.", e);
        
    

    // 放入毒丸任务已结束消费者线程
    try 
        Task task = new Task();
        task.setTaskName("任务处理结束毒丸");
        task.setTaskEndType(taskEndType);
        blockingDeque.offer(task, 2, TimeUnit.MILLISECONDS);
     catch (InterruptedException e) 
        LOGGER.error("放入毒丸任务异常.", e);
    

生产者在检测到任务被终止后停止生产,并往阻塞队列插入一个毒丸,以快速终止消费者线程。

四、在消费者线程处理任务被终止的场景

@Override
public void run() 
    while (true) 
        try 
            Task task = blockingDeque.take();
            if (task.getTaskEndType() != EnumTaskEndType.NA) 
                LOGGER.info("收到毒丸任务.", task);
                blockingDeque.offerFirst(task, 2, TimeUnit.MILLISECONDS);

                final int consumerAmount = blockingDeque.decrementConsumer();
                if (consumerAmount <= 0) 
                    LOGGER.info("所有消费者停止任务处理.");
                
                return;
            

            // 处理任务被用户终止的场景
            if (TaskManager.instance().isTaskTerminated()) 
                Task terminateTask = new Task();
                terminateTask.setTaskName("任务终止毒丸");
                terminateTask.setTaskEndType(EnumTaskEndType.TERMINATE);
                blockingDeque.offerFirst(terminateTask);
                continue;
            

            // 消费任务(等待10秒,模拟实际业务处理时间)
            LOGGER.info("消费任务.", task);
            Thread.sleep(10*1000);
         catch (InterruptedException e) 
            LOGGER.error("消费任务异常.", e);
        
    

消费者线程检测到任务被终止,创建一个毒丸任务放到队列。

五、调用接口终止执行中的任务

源码地址:https://github.com/ylforever/elon-producerconsumer

以上是关于用阻塞队列实现生产者消费者模式三(终止任务)的主要内容,如果未能解决你的问题,请参考以下文章

阻塞队列和生产者-消费者模式

用阻塞队列实现生产者消费者模式二(多线程消费)

用阻塞队列和线程池简单实现生产者和消费者场景

用阻塞队列实现生产者消费者模式一(单线程消费)

阻塞队列实现生产者消费者模式

多线程-并发编程-生产者消费者模式及非阻塞队列与阻塞队列实现