Java:在特定队列大小后阻止提交的 ExecutorService [重复]

Posted

技术标签:

【中文标题】Java:在特定队列大小后阻止提交的 ExecutorService [重复]【英文标题】:Java: ExecutorService that blocks on submission after a certain queue size [duplicate] 【发布时间】:2011-05-30 03:35:21 【问题描述】:

我正在尝试编写一个解决方案,其中单个线程产生可以并行执行的 I/O 密集型任务。每个任务都有重要的内存数据。所以我希望能够限制当前待处理的任务数量。

如果我这样创建 ThreadPoolExecutor:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

然后当队列填满并且所有线程都已经忙时,executor.submit(callable) 会抛出 RejectedExecutionException

当队列已满且所有线程都忙时,我该怎么做才能使executor.submit(callable) 阻塞?

编辑: 我试过this:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

而且它在某种程度上达到了我想要达到的效果,但是以一种不优雅的方式(基本上被拒绝的线程是在调用线程中运行的,所以这会阻止调用线程提交更多)。

编辑:(提出问题 5 年后)

对于阅读此问题及其答案的任何人,请不要将已接受的答案视为一种正确的解决方案。请通读所有答案和 cmets。

【问题讨论】:

我以前使用过信号量来做到这一点,就像@axtavt 链接到的非常相似的问题的答案一样。 上面提到的问题也有基于RejectedExecutionHandler的答案 @TomWolk 一方面,当调用者线程也在执行一项任务时,您将获得比numWorkerThreads 多一个并行执行的任务。但是,更重要的问题是,如果调用者线程得到一个长时间运行的任务,其他线程可能会闲置等待下一个任务。 @TahirAkhtar,真的;队列应该足够长,这样当调用者必须自己执行任务时它不会干涸。但我认为如果多一个线程,即调用者线程,可以用来执行任务,那将是一个优势。如果调用者只是阻塞,则调用者的线程将处于空闲状态。我使用 CallerRunsPolicy 的队列容量是线程池容量的三倍,它运行良好且流畅。与此解决方案相比,我会考虑通过框架过度工程来进行锻炼。 @TomWalk +1 好点。似乎另一个区别是,如果任务从队列中被拒绝并由调用者线程运行,那么调用者线程将开始无序处理请求,因为它没有在队列中等待轮到它。当然,如果您已经选择使用线程,那么您必须正确处理任何依赖项,但请记住一些事情。 【参考方案1】:

我知道这是一个老问题,但有一个类似的问题,即创建新任务非常快,如果由于现有任务完成速度不够快而发生过多的 OutOfMemoryError。

在我的情况下,Callables 已提交,我需要结果,因此我需要存储 executor.submit() 返回的所有 Futures。我的解决方案是将Futures 放入最大尺寸的BlockingQueue。一旦该队列已满,就不会再生成任务,直到某些任务完成(从队列中删除元素)。在伪代码中:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try    
    Thread taskGenerator = new Thread() 
        @Override
        public void run() 
            while (reader.hasNext) 
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try 
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(future);
                 catch (InterruptedException ex) 
                    Thread.currentThread().interrupt();         
                
            
        executor.shutdown();
        
    
    taskGenerator.start();
    
    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) 
        Future future = futures.take();
        // do something
    
 catch (InterruptedException ex) 
    Thread.currentThread().interrupt();     
 catch (ExecutionException ex) 
    throw new MyException(ex);
 finally 
    executor.shutdownNow();

【讨论】:

compoundFuture 是干什么用的? 这是变量的原始名称,我在本示例中并未始终“重命名”。【参考方案2】:

如果您使用的是 spring-integration,那么使用 CallerBlocksPolicy 类怎么样?

这个类实现了RejectedExecutionHandler接口,它是一个处理不能被ThreadPoolExecutor执行的任务的处理程序。

您可以像这样使用此政策。

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());

CallerBlocksPolicyCallerRunsPolicy 的主要区别在于它是在调用者线程中阻塞还是运行任务。

请参考this code。

【讨论】:

看起来是个不错的选择。如果它在单独的实用程序库中,用户会更容易【参考方案3】:

我已经按照装饰器模式实现了一个解决方案,并使用信号量来控制执行任务的数量。您可以将它与任何 Executor 一起使用,并且:

指定正在进行的任务的最大值 指定等待任务执行许可的最大超时时间(如果超时没有获得许可,则抛出RejectedExecutionException
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor 

    private static final class PermitReleasingDecorator implements Runnable 

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) 
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        

        @Override
        public void run() 
            try 
                this.delegate.run();
            
            finally 
                // however execution goes, release permit for next task
                this.semaphore.release();
            
        

        @Override
        public final String toString() 
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        
    

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) 
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) 
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) 
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        
        this.taskLimit = new Semaphore(maximumTaskNumber);
    

    @Override
    public final void execute(final Runnable command) 
        Objects.requireNonNull(command, "'command' must not be null");
        try 
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) 
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            
        
        catch (final InterruptedException e) 
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    

    @Override
    public final String toString() 
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    

【讨论】:

【参考方案4】:

我也做过同样的事情。诀窍是创建一个 BlockingQueue,其中 offer() 方法实际上是 put()。 (你可以使用任何你想要的基本 BlockingQueue 实现)。

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 

    public LimitedQueue(int maxSize)
    
        super(maxSize);
    

    @Override
    public boolean offer(E e)
    
        // turn offer() and add() into a blocking calls (unless interrupted)
        try 
            put(e);
            return true;
         catch(InterruptedException ie) 
            Thread.currentThread().interrupt();
        
        return false;
    


请注意,这仅适用于 corePoolSize==maxPoolSize 的线程池,因此在此处要小心(请参阅 cmets)。

【讨论】:

或者,您可以扩展 SynchronousQueue 以防止缓冲,只允许直接切换。 优雅直接解决问题。 offer() 变成了 put(),而 put() 的意思是“……在必要时等待空间可用” 我认为这不是一个好主意,因为它改变了 offer 方法的协议。 Offer 方法应该是一个非阻塞调用。 我不同意 - 这会改变 ThreadPoolExecutor.execute 的行为,如果你有一个 corePoolSize 澄清一下——你的解决方案只有在你保持corePoolSize==maxPoolSize的约束条件下才有效。没有它,它不再让 ThreadPoolExecutor 具有设计的行为。我一直在寻找一个解决这个问题的方法,它没有那个限制;有关我们最终采用的方法,请参阅下面的替代答案。【参考方案5】:

当前接受的答案有一个潜在的重大问题 - 它会改变 ThreadPoolExecutor.execute 的行为,因此如果您有 corePoolSize &lt; maxPoolSize,ThreadPoolExecutor 逻辑将永远不会在核心之外添加额外的工作人员。

来自ThreadPoolExecutor.execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) 
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    
    else if (!addWorker(command, false))
        reject(command);

具体来说,永远不会命中最后一个“else”块。

更好的选择是做一些类似于 OP 已经在做的事情 - 使用 RejectedExecutionHandler 做同样的 put 逻辑:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
    try 
        if (!executor.isShutdown()) 
            executor.getQueue().put(r);
        
     catch (InterruptedException e) 
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    

正如 cmets 中指出的(参考this answer),使用这种方法需要注意一些事项:

    如果corePoolSize==0,则存在竞争条件,池中的所有线程都可能在任务可见之前死亡 使用包装队列任务的实现(不适用于ThreadPoolExecutor)将导致问题,除非处理程序也以相同的方式包装它。

牢记这些问题,此解决方案适用于大多数典型的 ThreadPoolExecutor,并且可以正确处理 corePoolSize &lt; maxPoolSize 的情况。

【讨论】:

致那些投反对票的人——你能提供一些见解吗?这个答案有什么不正确/误导/危险的吗?我想有机会解决您的疑虑。 我没有投反对票,但似乎是a very bad idea @vanOekel - 感谢您的链接 - 如果使用这种方法,该答案提出了一些应该知道的有效案例,但 IMO 并没有使它成为一个“非常糟糕的主意” - 它仍然解决了一个问题出现在当前接受的答案中。我已经用这些警告更新了我的答案。 如果core pool size为0,如果任务提交给executor,如果队列满了executor就会开始创建thread/s来处理任务。那为什么容易出现死锁。没明白你的意思。你能详细说明一下吗? @ShirgillFarhanAnsari - 这是上一条评论中提出的情况。它可能会发生,因为直接添加到队列不会触发创建线程/启动工作人员。这是一种边缘情况/竞争条件,可以通过非零核心池大小来缓解【参考方案6】:

这是我最终解决这个问题的方法:

(注意:这个解决方案确实阻塞了提交 Callable 的线程,所以它可以防止 RejectedExecutionException 被抛出)

public class BoundedExecutor extends ThreadPoolExecutor

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) 
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException

        semaphore.acquire();            
        return submit(task);                    
    


    @Override
    protected void afterExecute(Runnable r, Throwable t) 
        super.afterExecute(r, t);

        semaphore.release();
    

【讨论】:

我认为这不适用于 corePoolSize &lt; maxPoolSize ... :| 它适用于corePoolSize &lt; maxPoolSize的情况。在这些情况下,信号量将可用,但不会有线程,SynchronousQueue 将返回 false。 ThreadPoolExecutor 然后将旋转一个新线程。这个解决方案的问题是它有一个竞态条件。在semaphore.release() 之后,但在线程完成execute 之前,submit() 将获得信号量许可。 如果 super.submit() 在execute() 完成之前运行,作业将被拒绝。 @LuísGuilherme 但是 semaphore.release() 永远不会被调用之前线程完成执行。因为这个调用是在 afterExecute(...) 方法中完成的。我在您描述的场景中遗漏了什么吗? afterExecute 由运行任务的同一线程调用,因此尚未完成。自己做测试。实施该解决方案,并向执行者投入大量工作,如果工作被拒绝,则投入。您会注意到,是的,这有一个竞争条件,并且不难重现它。 转到 ThreadPoolExecutor 并检查 runWorker(Worker w) 方法。您会看到 afterExecute 完成后发生的事情,包括解锁工作人员和增加已完成任务的数量。因此,您允许任务进入(通过释放信号量)而无需处理它们(通过调用 processWorkerExit)。【参考方案7】:

我遇到了类似的问题,我通过使用来自 ThreadPoolExecutorbeforeExecute/afterExecute 钩子实现了它:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor 

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) 
        super.beforeExecute(t, r);
        taskLock.lock();
        try 
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) 
                try 
                    unpaused.await();
                 catch (InterruptedException e) 
                    t.interrupt();
                
            
            currentTaskCount++;
         finally 
            taskLock.unlock();
        
    

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) 
        super.afterExecute(r, t);
        taskLock.lock();
        try 
            currentTaskCount--;
            unpaused.signalAll();
         finally 
            taskLock.unlock();
        
    

这对你来说应该足够好了。顺便说一句,最初的实现是基于任务大小的,因为一个任务可能比另一个任务大 100 倍,并且提交两个巨大的任务正在杀死盒子,但是运行一个大任务和大量小任务是可以的。如果您的 I/O 密集型任务的大小大致相同,您可以使用此类,否则请告诉我,我将发布基于大小的实现。

附:你会想检查ThreadPoolExecutor javadoc。 Doug Lea 提供的关于如何轻松定制的用户指南非常好。

【讨论】:

我想知道当线程在 beforeExecute() 中持有锁并看到maxTaskCount &lt; currentTaskCount 并开始等待unpaused 条件时会发生什么。同时,另一个线程尝试在 afterExecute() 中获取锁以表示任务完成。不会是死锁吗? 我还注意到,当队列满时,这个解决方案不会阻塞提交任务的线程。所以RejectedExecutionException 还是可以的。 ReentrantLock/Condition 类的语义类似于 synchronised&wait/notify 提供的语义。当条件等待方法被调用时,锁被释放,所以不会出现死锁。 对,这个 ExecutorService 会在提交时阻塞任务而不阻塞调用者线程。作业刚被提交,当系统资源足够时将被异步处理。【参考方案8】:

我认为这就像使用 ArrayBlockingQueue 而不是 LinkedBlockingQueue 一样简单。

忽略我……那是完全错误的。 ThreadPoolExecutor 调用 Queue#offer 而不是 put 会产生您需要的效果。

您可以扩展ThreadPoolExecutor 并提供execute(Runnable) 的实现,它调用put 代替offer

恐怕这不是一个完全令人满意的答案。

【讨论】:

以上是关于Java:在特定队列大小后阻止提交的 ExecutorService [重复]的主要内容,如果未能解决你的问题,请参考以下文章

JAVA线程池shutdown和shutdownNow的区别

html表单在特定输入上按Enter键时阻止提交[重复]

Java API 调用将根据队列大小进行合并或在最旧项目插入队列后 4 秒内执行

Java线程池中的四种拒绝策略

java在超时值后杀死线程[重复]

表单提交后如何阻止变量重置?