处理线程池中的优先级

Posted

技术标签:

【中文标题】处理线程池中的优先级【英文标题】:Handling priority in ThreadPools 【发布时间】:2016-10-27 19:37:23 【问题描述】:

目前,我们的一个应用中有 2 个线程池:

    第一个是用来处理定时任务的 第二个处理正在运行的每个计划任务的并行处理

需要设置两个不同的池来自以下想法:如果多个计划任务在主(第一个)池中排队并且它在同一个池中触发它的子任务(并行处理),这将导致竞争条件 as 也会在其他计划任务“后面”排队,因此实际上什么都不会结束并且会发生死锁。

如果子任务的优先级高于计划任务怎么办?他们会“跳过”队列并暂停计划任务以完成吗?或者这不会发生?有没有办法强迫这种行为?或者当 ThreadPoolExecutor 已经在运行任务时不能暂停它们?

池 1 在 Spring 的应用程序上下文 XML 配置文件中定义为:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"  xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/aop
 http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
 http://www.springframework.org/schema/context
 http://www.springframework.org/schema/context/spring-context-3.0.xsd
 http://www.springframework.org/schema/task
 http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <context:annotation-config />
    <context:component-scan base-package="cl.waypoint.mailer.reportes" />
    <task:annotation-driven scheduler="myScheduler" />
    <task:scheduler id="myScheduler" pool-size="2" />
    <aop:aspectj-autoproxy />


</beans>

Pool 2 在代码中定义如下:

public static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 30, TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(), new ThreadFactory() 
            final AtomicLong count = new AtomicLong(0);
            private String namePreffix = "TempAndDoor";

            @Override
            public Thread newThread(Runnable r) 
                Thread t = new Thread(r);
                t.setDaemon(false);
                t.setPriority(Thread.NORM_PRIORITY);
                t.setName(MessageFormat.format("0-1", namePreffix, count.getAndIncrement()));
                return t;
            
        );

【问题讨论】:

“处理计划任务”和“运行计划任务”有什么区别? @Kayaman 你问这两个池之间的区别?实际上两个池都运行任务,但第二个池为第一个池中的任务运行子任务,这样解释更好吗? 添加代码以便我们了解您是如何使用线程池的(顺便问一下,它们是如何配置的?固定线程池?fork join pool?) @GonzaloVasquez 是的,这是一个更好的解释。您可能希望显示一些代码以更清楚地了解您拥有哪些类型的队列等。 @LuisRamirez-Monterosa 我刚刚添加了池定义 【参考方案1】:

如果我理解正确 myScheduler (pool1) 用于安排任务,一旦时间开始,它会将任务提交给 executor (池 2)。

给出问题

如果子任务的优先级高于计划任务怎么办?

不清楚您如何区分它们或您的真正意思,但我理解的是子任务是计划任务。

我认为你应该有一些类似的代码:

@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething() 
    executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);

其中 executor 是您要在其中创建的静态对象。(不应该是全部大写和最终的;))

根据它的长名称,您提交给执行者的是“子任务”或执行者醒来提交的任务。在这种情况下,您的 myScheduler 将始终及时唤醒,因为执行的是非阻塞的。

另一方面,由于您有一个 LinkedBlockingDeque,这意味着按顺序执行,您的 executor 可能会得到备份。

另一个问题:

他们会“跳过”队列并暂停计划任务以完成吗?

它们不会跳转,调度程序会启动,提交新任务并再次进入休眠状态。队列开始被填满

下一个问题:

有没有办法强制这种行为?或者当 ThreadPoolExecutor 已经在运行任务时不能暂停它们?

您可以一起取消任务,但您将跟踪所有提交的任务

你可以抓住未来

Future aFuture = executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);

不知何故你需要知道你真的想取消任务,你需要调用

aFuture.cancel();

看起来你需要的东西更多,我建议看一下非常成熟的 JMS 或可能更容易掌握的 AKKA。

【讨论】:

【参考方案2】:

以下代码可能对您有用。

com.job.Job

package com.job;

import java.util.concurrent.CountDownLatch;

public class Job implements Runnable 

    public enum JobPriority 
        HIGH, MEDIUM, LOW
    

    private String jobName;
    private JobPriority jobPriority;
    private CountDownLatch cdl;

    public Job(String jobName, JobPriority jobPriority) 
        super();
        this.jobName = jobName;
        this.jobPriority = jobPriority;
    

    @Override
    public void run() 
        try 
            System.out.println("Job:" + jobName + " Priority:" + jobPriority);
            Thread.sleep(3000);

         catch (InterruptedException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
         finally 
            cdl.countDown();
        
    

    public String getJobName() 
        return jobName;
    

    public void setJobName(String jobName) 
        this.jobName = jobName;
    

    public JobPriority getJobPriority() 
        return jobPriority;
    

    public void setJobPriority(JobPriority jobPriority) 
        this.jobPriority = jobPriority;
    

    public void initCdl(CountDownLatch countDownLatch) 
        this.cdl = countDownLatch;
    

    // standard setters and getters


com.scheduler.PriorityJobScheduler

package com.scheduler;

import java.util.Comparator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;

import com.job.Job;

public class PriorityJobScheduler 

    private ExecutorService priorityJobPoolExecutor;
    private ExecutorService priorityJobScheduler = Executors.newSingleThreadExecutor();
    private PriorityBlockingQueue<Job> priorityQueue;
    private CountDownLatch countDownLatch;
    private int jobCount = 0;

    public PriorityJobScheduler(Integer poolSize, Integer queueSize) 
        priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
        countDownLatch = new CountDownLatch(poolSize);
        priorityQueue = new PriorityBlockingQueue<Job>(queueSize, Comparator.comparing(Job::getJobPriority));
        priorityJobScheduler.execute(() -> 
            while (true) 
                try 

                    Job j = priorityQueue.take();

                    j.initCdl(countDownLatch);

                    priorityJobPoolExecutor.execute(j);

                    jobCount++;

                    if (jobCount >= poolSize) 
                      countDownLatch.await();
                      jobCount = 0 ;
                      countDownLatch = new CountDownLatch(poolSize);
                

                 catch (InterruptedException e) 
                    // exception needs special handling
                    break;
                
            
        );
    

    public void scheduleJob(Job job) 
        priorityQueue.add(job);
    

    public void cleanUp() 

        priorityJobScheduler.shutdown();
    


测试

import java.util.ArrayList;
import java.util.List;

import com.job.Job;
import com.job.Job.JobPriority;
import com.scheduler.PriorityJobScheduler;

public class Test 

    private static int POOL_SIZE = 3;
    private static int QUEUE_SIZE = 10000;

    public static void main(String[] args) 

        
        PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);

        for (int i = 0; i < 100; i++) 
            Job job1 = new Job("Job" + i + "low", JobPriority.LOW);
            Job job2 = new Job("Job" + i + "medium", JobPriority.MEDIUM);
            Job job3 = new Job("Job" + i + "high", JobPriority.HIGH);

            if (i < 30)
                pjs.scheduleJob(job1);
            else if (i < 60)
                pjs.scheduleJob(job2);
            else
                pjs.scheduleJob(job3);

            try 
                Thread.sleep(5);
             catch (InterruptedException e)  // TODO
                e.printStackTrace();
            

        

        pjs.cleanUp();

    


来源:https://www.baeldung.com/java-priority-job-schedule

Baeldung 的原始示例在此代码段中没有使用 CountDownLatch。您也可以在 Baeldung 网站上浏览教程。如果您对在这种情况下使用 CountDownLatch 有任何疑问,请随时在评论部分提问。

【讨论】:

以上是关于处理线程池中的优先级的主要内容,如果未能解决你的问题,请参考以下文章

并发编程004 --- 线程池

20210601 线程池中的异常处理

20210601 线程池中的异常处理

Java线程池中的四种拒绝策略

线程池

协程,yield,i多路复用,复习