在线程池中休眠的线程
Posted
技术标签:
【中文标题】在线程池中休眠的线程【英文标题】:Thread sleeping in a Thread-Pool 【发布时间】:2019-08-12 14:22:54 【问题描述】:假设我们有一个线程数有限的线程池。
Executor executor = Executors.newFixedThreadPool(3);
现在假设其中一个活动任务必须休眠 3 秒(无论出于何种原因)。
executor.execute(() ->
try
Thread.sleep(3000L);
catch (InterruptedException ignore)
);
我们如何实现这样的线程池,当一个任务休眠时(或waits在监视器上/condition),线程1 可以有效地用于运行另一个任务吗?
1线程我并不是指“物理”Java 线程,因为在线程处于休眠状态时这是不可能的。我的意思是,线程池有一个抽象的实现,实际上似乎允许线程在睡眠期间运行另一个任务。关键是总是有 N 个同时运行(非睡眠)的任务。
有点类似于监视器处理对关键区域的访问的方式:
如果线程等待资源,则该资源可以被另一个线程使用。 如果线程收到通知,则将其放入等待集中以(重新)获得对该资源的访问权。【问题讨论】:
让它做一些工作而不是睡觉,我猜。 @PavelSmirnov 你能在上面构建一个工作抽象吗?一种解释等待资源的情况?如果执行的作业也休眠(执行另一个作业),然后是另一个,然后是另一个,等等......那么原始任务将永远没有机会继续。 好吧,如果你有一个需要锁的工作,一个线程可以尝试获取锁,并且只有在它成功的情况下才能完成这项工作。否则,它可以切换到另一个在队列中等待的作业并尝试这样做,或者返回到原始线程的任务。我想表明的一点是,如果可能的话,你最好做一些工作,而不是仅仅睡觉。 这让我想起了 lua 中的协程——查一下。这可能会给你一些启发。 有趣的问题。我不相信您会得到令人满意的答案,因为您基本上是在尝试在操作系统之上实现线程调度程序。您可能会通过使用锁和 P > K 后台线程以及自定义“睡眠”方法来获得。 【参考方案1】:您所要求的实质上是在 JVM/OS 线程之上实现协程/光纤。 Sanhong Li 很好地讲述了阿里巴巴的工程师是如何实现这种结构的——这个想法是,你需要依赖自己的 Selector,而不是依赖 OS 线程调度程序。
另请参阅 Loom project 了解光纤(用户级绿色线程)。
【讨论】:
【参考方案2】:我实现了一个最小的工作示例,它基本上可以满足我的想法。
一个Task接口(很像runnable接口,只是带有一个传递的Context来执行等待)
package io.medev.***;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
public interface Task
/**
* Wraps the given runnable into a Task with a not guessable execution time (meaning guessExecutionTime always returns Long.MAX_VALUE)
* @param runnable The runnable to wrap
* @return a Task wrapping this runnable
*/
static Task wrap(Runnable runnable)
return wrap(runnable, Long.MAX_VALUE);
/**
* Wraps the given runnable using the given guessedExecutionTimeMillis
* @param runnable The runnable to wrap
* @param guessedExecutionTimeMillis The guessed execution time in millis for this runnable
* @return a Task wrapping this runnable
*/
static Task wrap(Runnable runnable, long guessedExecutionTimeMillis)
return new Task()
@Override
public long guessExecutionTimeMillis()
return guessedExecutionTimeMillis;
@Override
public void run(Context context)
runnable.run();
;
/**
* Should more or less guess how long this task will run
* @return The execution time of this Task in milliseconds
*/
long guessExecutionTimeMillis();
void run(Context context);
interface Context
/**
* Block until the condition is met, giving other Tasks time to execute
* @param condition the condition to check
* @throws InterruptedException if the current thread is interrupted
*/
void idle(BooleanSupplier condition) throws InterruptedException;
/**
* Blocks at least for the given duration, giving other Tasks time to execute
* @param timeout
* @param timeUnit
* @throws InterruptedException if the current thread is interrupted
*/
void idle(long timeout, TimeUnit timeUnit) throws InterruptedException;
/**
* Blocks until the condition is met or the timeout expires, giving other Tasks time to execute
* @param condition the condition to check
* @param timeout
* @param timeUnit
* @throws InterruptedException if the current thread is interrupted
*/
void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException;
还有一个基本的固定线程池执行器——但你必须依赖这里的具体实现:
package io.medev.***;
import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;
public class TimeEfficientExecutor implements Executor
private final BlockingQueue<Task> taskQueue;
private final CountDownLatch latch;
private volatile boolean alive;
public TimeEfficientExecutor(int threads)
this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
this.latch = new CountDownLatch(threads);
this.alive = true;
for (int i = 0; i < threads; i++)
Thread thread = new Thread(new TimeEfficientExecutorRunnable());
thread.start();
@Override
public void execute(Runnable runnable)
execute(Task.wrap(runnable));
public void execute(Runnable runnable, long guessedExecutionTimeMillis)
execute(Task.wrap(runnable, guessedExecutionTimeMillis));
public void execute(Task task)
this.taskQueue.offer(task);
public void shutdown()
this.alive = false;
public void awaitShutdown() throws InterruptedException
this.latch.await();
public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException
this.latch.await(timeout, timeUnit);
private class TimeEfficientExecutorRunnable implements Runnable
@Override
public void run()
try
while (TimeEfficientExecutor.this.alive)
Task task = TimeEfficientExecutor.this.taskQueue.poll();
if (task != null)
try
task.run(new IdleTaskContext());
catch (Exception e)
// TODO: logging
finally
TimeEfficientExecutor.this.latch.countDown();
private class IdleTaskContext implements Task.Context
@Override
public void idle(BooleanSupplier condition) throws InterruptedException
idle(condition, Long.MAX_VALUE);
@Override
public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException
idle(() -> false, timeout, timeUnit);
@Override
public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException
idle(condition, System.currentTimeMillis() + timeUnit.toMillis(timeout));
private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException
long leftMillis = idleUntilTs - System.currentTimeMillis();
while (TimeEfficientExecutor.this.alive && !condition.getAsBoolean() && leftMillis >= 1L)
Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
leftMillis = idleUntilTs - System.currentTimeMillis();
if (task != null)
if (leftMillis >= 1L && task.guessExecutionTimeMillis() < leftMillis)
task.run(new IdleTaskContext());
else
TimeEfficientExecutor.this.taskQueue.offer(task);
请注意,您不能只是逐步降低堆栈 - 堆栈绑定到正在执行的线程。这意味着如果某些“子”任务开始空闲,则不可能跳回底层空闲任务。您必须“信任”每个任务在 guessExecutionTimeMillis
-Method 中返回的内容。
由于Executor中使用了PriorityQueue,队列总是会返回执行时间最短的任务。
【讨论】:
以上是关于在线程池中休眠的线程的主要内容,如果未能解决你的问题,请参考以下文章