处理线程池中的优先级
Posted
技术标签:
【中文标题】处理线程池中的优先级【英文标题】:Handling priority in ThreadPools 【发布时间】:2016-10-27 19:37:23 【问题描述】:目前,我们的一个应用中有 2 个线程池:
-
第一个是用来处理定时任务的
第二个处理正在运行的每个计划任务的并行处理
需要设置两个不同的池来自以下想法:如果多个计划任务在主(第一个)池中排队并且它在同一个池中触发它的子任务(并行处理),这将导致竞争条件 as 也会在其他计划任务“后面”排队,因此实际上什么都不会结束并且会发生死锁。
如果子任务的优先级高于计划任务怎么办?他们会“跳过”队列并暂停计划任务以完成吗?或者这不会发生?有没有办法强迫这种行为?或者当 ThreadPoolExecutor 已经在运行任务时不能暂停它们?
池 1 在 Spring 的应用程序上下文 XML 配置文件中定义为:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.0.xsd">
<context:annotation-config />
<context:component-scan base-package="cl.waypoint.mailer.reportes" />
<task:annotation-driven scheduler="myScheduler" />
<task:scheduler id="myScheduler" pool-size="2" />
<aop:aspectj-autoproxy />
</beans>
Pool 2 在代码中定义如下:
public static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 30, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(), new ThreadFactory()
final AtomicLong count = new AtomicLong(0);
private String namePreffix = "TempAndDoor";
@Override
public Thread newThread(Runnable r)
Thread t = new Thread(r);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setName(MessageFormat.format("0-1", namePreffix, count.getAndIncrement()));
return t;
);
【问题讨论】:
“处理计划任务”和“运行计划任务”有什么区别? @Kayaman 你问这两个池之间的区别?实际上两个池都运行任务,但第二个池为第一个池中的任务运行子任务,这样解释更好吗? 添加代码以便我们了解您是如何使用线程池的(顺便问一下,它们是如何配置的?固定线程池?fork join pool?) @GonzaloVasquez 是的,这是一个更好的解释。您可能希望显示一些代码以更清楚地了解您拥有哪些类型的队列等。 @LuisRamirez-Monterosa 我刚刚添加了池定义 【参考方案1】:如果我理解正确 myScheduler (pool1) 用于安排任务,一旦时间开始,它会将任务提交给 executor (池 2)。
给出问题
如果子任务的优先级高于计划任务怎么办?
不清楚您如何区分它们或您的真正意思,但我理解的是子任务是计划任务。
我认为你应该有一些类似的代码:
@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething()
executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
其中 executor 是您要在其中创建的静态对象。(不应该是全部大写和最终的;))
根据它的长名称,您提交给执行者的是“子任务”或执行者醒来提交的任务。在这种情况下,您的 myScheduler 将始终及时唤醒,因为执行的是非阻塞的。
另一方面,由于您有一个 LinkedBlockingDeque,这意味着按顺序执行,您的 executor 可能会得到备份。
另一个问题:
他们会“跳过”队列并暂停计划任务以完成吗?
它们不会跳转,调度程序会启动,提交新任务并再次进入休眠状态。队列开始被填满
下一个问题:
有没有办法强制这种行为?或者当 ThreadPoolExecutor 已经在运行任务时不能暂停它们?
您可以一起取消任务,但您将跟踪所有提交的任务
你可以抓住未来
Future aFuture = executor.submit(aTaskThatRunsInParallelOrWhatYouCallASubTask);
不知何故你需要知道你真的想取消任务,你需要调用
aFuture.cancel();
看起来你需要的东西更多,我建议看一下非常成熟的 JMS 或可能更容易掌握的 AKKA。
【讨论】:
【参考方案2】:以下代码可能对您有用。
com.job.Job
package com.job;
import java.util.concurrent.CountDownLatch;
public class Job implements Runnable
public enum JobPriority
HIGH, MEDIUM, LOW
private String jobName;
private JobPriority jobPriority;
private CountDownLatch cdl;
public Job(String jobName, JobPriority jobPriority)
super();
this.jobName = jobName;
this.jobPriority = jobPriority;
@Override
public void run()
try
System.out.println("Job:" + jobName + " Priority:" + jobPriority);
Thread.sleep(3000);
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
finally
cdl.countDown();
public String getJobName()
return jobName;
public void setJobName(String jobName)
this.jobName = jobName;
public JobPriority getJobPriority()
return jobPriority;
public void setJobPriority(JobPriority jobPriority)
this.jobPriority = jobPriority;
public void initCdl(CountDownLatch countDownLatch)
this.cdl = countDownLatch;
// standard setters and getters
com.scheduler.PriorityJobScheduler
package com.scheduler;
import java.util.Comparator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import com.job.Job;
public class PriorityJobScheduler
private ExecutorService priorityJobPoolExecutor;
private ExecutorService priorityJobScheduler = Executors.newSingleThreadExecutor();
private PriorityBlockingQueue<Job> priorityQueue;
private CountDownLatch countDownLatch;
private int jobCount = 0;
public PriorityJobScheduler(Integer poolSize, Integer queueSize)
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
countDownLatch = new CountDownLatch(poolSize);
priorityQueue = new PriorityBlockingQueue<Job>(queueSize, Comparator.comparing(Job::getJobPriority));
priorityJobScheduler.execute(() ->
while (true)
try
Job j = priorityQueue.take();
j.initCdl(countDownLatch);
priorityJobPoolExecutor.execute(j);
jobCount++;
if (jobCount >= poolSize)
countDownLatch.await();
jobCount = 0 ;
countDownLatch = new CountDownLatch(poolSize);
catch (InterruptedException e)
// exception needs special handling
break;
);
public void scheduleJob(Job job)
priorityQueue.add(job);
public void cleanUp()
priorityJobScheduler.shutdown();
测试
import java.util.ArrayList;
import java.util.List;
import com.job.Job;
import com.job.Job.JobPriority;
import com.scheduler.PriorityJobScheduler;
public class Test
private static int POOL_SIZE = 3;
private static int QUEUE_SIZE = 10000;
public static void main(String[] args)
PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);
for (int i = 0; i < 100; i++)
Job job1 = new Job("Job" + i + "low", JobPriority.LOW);
Job job2 = new Job("Job" + i + "medium", JobPriority.MEDIUM);
Job job3 = new Job("Job" + i + "high", JobPriority.HIGH);
if (i < 30)
pjs.scheduleJob(job1);
else if (i < 60)
pjs.scheduleJob(job2);
else
pjs.scheduleJob(job3);
try
Thread.sleep(5);
catch (InterruptedException e) // TODO
e.printStackTrace();
pjs.cleanUp();
来源:https://www.baeldung.com/java-priority-job-schedule
Baeldung 的原始示例在此代码段中没有使用 CountDownLatch。您也可以在 Baeldung 网站上浏览教程。如果您对在这种情况下使用 CountDownLatch 有任何疑问,请随时在评论部分提问。
【讨论】:
以上是关于处理线程池中的优先级的主要内容,如果未能解决你的问题,请参考以下文章