在线程池中休眠的线程

Posted

技术标签:

【中文标题】在线程池中休眠的线程【英文标题】:Thread sleeping in a Thread-Pool 【发布时间】:2019-08-12 14:22:54 【问题描述】:

假设我们有一个线程数有限的线程池。

Executor executor = Executors.newFixedThreadPool(3);

现在假设其中一个活动任务必须休眠 3 秒(无论出于何种原因)。

executor.execute(() -> 
    try 
        Thread.sleep(3000L);
     catch (InterruptedException ignore) 
);

我们如何实现这样的线程池,当一个任务休眠时(或waits在监视器上/condition),线程1 可以有效地用于运行另一个任务吗?

1线程我并不是指“物理”Java 线程,因为在线程处于休眠状态时这是不可能的。我的意思是,线程池有一个抽象的实现,实际上似乎允许线程在睡眠期间运行另一个任务。关键是总是有 N 个同时运行(非睡眠)的任务。

有点类似于监视器处理对关键区域的访问的方式:

如果线程等待资源,则该资源可以被另一个线程使用。 如果线程收到通知,则将其放入等待集中以(重新)获得对该资源的访问权。

【问题讨论】:

让它做一些工作而不是睡觉,我猜。 @PavelSmirnov 你能在上面构建一个工作抽象吗?一种解释等待资源的情况?如果执行的作业也休眠(执行另一个作业),然后是另一个,然后是另一个,等等......那么原始任务将永远没有机会继续。 好吧,如果你有一个需要锁的工作,一个线程可以尝试获取锁,并且只有在它成功的情况下才能完成这项工作。否则,它可以切换到另一个在队列中等待的作业并尝试这样做,或者返回到原始线程的任务。我想表明的一点是,如果可能的话,你最好做一些工作,而不是仅仅睡觉。 这让我想起了 lua 中的协程——查一下。这可能会给你一些启发。 有趣的问题。我不相信您会得到令人满意的答案,因为您基本上是在尝试在操作系统之上实现线程调度程序。您可能会通过使用锁和 P > K 后台线程以及自定义“睡眠”方法来获得。 【参考方案1】:

您所要求的实质上是在 JVM/OS 线程之上实现协程/光纤。 Sanhong Li 很好地讲述了阿里巴巴的工程师是如何实现这种结构的——这个想法是,你需要依赖自己的 Selector,而不是依赖 OS 线程调度程序。

另请参阅 Loom project 了解光纤(用户级绿色线程)。

【讨论】:

【参考方案2】:

我实现了一个最小的工作示例,它基本上可以满足我的想法。

一个Task接口(很像runnable接口,只是带有一个传递的Context来执行等待)

package io.medev.***;

import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;

public interface Task 

    /**
     * Wraps the given runnable into a Task with a not guessable execution time (meaning guessExecutionTime always returns Long.MAX_VALUE)
     * @param runnable The runnable to wrap
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable) 
        return wrap(runnable, Long.MAX_VALUE);
    

    /**
     * Wraps the given runnable using the given guessedExecutionTimeMillis
     * @param runnable The runnable to wrap
     * @param guessedExecutionTimeMillis The guessed execution time in millis for this runnable
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable, long guessedExecutionTimeMillis) 
        return new Task() 
            @Override
            public long guessExecutionTimeMillis() 
                return guessedExecutionTimeMillis;
            

            @Override
            public void run(Context context) 
                runnable.run();
            
        ;
    

    /**
     * Should more or less guess how long this task will run
     * @return The execution time of this Task in milliseconds
     */
    long guessExecutionTimeMillis();

    void run(Context context);

    interface Context 

        /**
         * Block until the condition is met, giving other Tasks time to execute
         * @param condition the condition to check
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition) throws InterruptedException;

        /**
         * Blocks at least for the given duration, giving other Tasks time to execute
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(long timeout, TimeUnit timeUnit) throws InterruptedException;

        /**
         * Blocks until the condition is met or the timeout expires, giving other Tasks time to execute
         * @param condition the condition to check
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException;
    

还有一个基本的固定线程池执行器——但你必须依赖这里的具体实现:

package io.medev.***;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class TimeEfficientExecutor implements Executor 

    private final BlockingQueue<Task> taskQueue;
    private final CountDownLatch latch;
    private volatile boolean alive;

    public TimeEfficientExecutor(int threads) 
        this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
        this.latch = new CountDownLatch(threads);
        this.alive = true;

        for (int i = 0; i < threads; i++) 
            Thread thread = new Thread(new TimeEfficientExecutorRunnable());
            thread.start();
        
    

    @Override
    public void execute(Runnable runnable) 
        execute(Task.wrap(runnable));
    

    public void execute(Runnable runnable, long guessedExecutionTimeMillis) 
        execute(Task.wrap(runnable, guessedExecutionTimeMillis));
    

    public void execute(Task task) 
        this.taskQueue.offer(task);
    

    public void shutdown() 
        this.alive = false;
    

    public void awaitShutdown() throws InterruptedException 
        this.latch.await();
    

    public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException 
        this.latch.await(timeout, timeUnit);
    

    private class TimeEfficientExecutorRunnable implements Runnable 

        @Override
        public void run() 
            try 
                while (TimeEfficientExecutor.this.alive) 
                    Task task = TimeEfficientExecutor.this.taskQueue.poll();

                    if (task != null) 
                        try 
                            task.run(new IdleTaskContext());
                         catch (Exception e) 
                            // TODO: logging
                        
                    
                
             finally 
                TimeEfficientExecutor.this.latch.countDown();
            
        
    

    private class IdleTaskContext implements Task.Context 

        @Override
        public void idle(BooleanSupplier condition) throws InterruptedException 
            idle(condition, Long.MAX_VALUE);
        

        @Override
        public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException 
            idle(() -> false, timeout, timeUnit);
        

        @Override
        public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException 
            idle(condition, System.currentTimeMillis() + timeUnit.toMillis(timeout));
        

        private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException 
            long leftMillis = idleUntilTs - System.currentTimeMillis();

            while (TimeEfficientExecutor.this.alive && !condition.getAsBoolean() && leftMillis >= 1L) 
                Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
                leftMillis = idleUntilTs - System.currentTimeMillis();

                if (task != null) 
                    if (leftMillis >= 1L && task.guessExecutionTimeMillis() < leftMillis) 
                        task.run(new IdleTaskContext());
                     else 
                        TimeEfficientExecutor.this.taskQueue.offer(task);
                    
                
            
        
    

请注意,您不能只是逐步降低堆栈 - 堆栈绑定到正在执行的线程。这意味着如果某些“子”任务开始空闲,则不可能跳回底层空闲任务。您必须“信任”每个任务在 guessExecutionTimeMillis-Method 中返回的内容。

由于Executor中使用了PriorityQueue,队列总是会返回执行时间最短的任务。

【讨论】:

以上是关于在线程池中休眠的线程的主要内容,如果未能解决你的问题,请参考以下文章

java 如何获得线程池中正在执行的线程数?

什么是线程池

c# 怎么等待线程池中所有线程都运行结束在运行主线程

线程池参数配置

自己实现一个简单的线程池

什么叫线程池?线程池如何使用?