创建事件调度线程安全信号量

Posted

技术标签:

【中文标题】创建事件调度线程安全信号量【英文标题】:Creating an Event Dispatch Thread safe semaphore 【发布时间】:2020-02-07 01:01:44 【问题描述】:

我一直在尝试制作一个二进制信号量,它能够安全地阻止在事件调度线程 (EDT) 上运行的方法的执行,而不会实际阻止线程处理更多事件。最初这似乎是不可能的,但 Java 有一些与此相关的内置功能,但我无法让它工作。

用例

目前,如果您从 EDT 显示模态摆动对话框,它似乎会阻塞 EDT(因为您显示模态对话框的方法在对话框关闭之前不会继续到下一行),但确实有一些使 EDT 进入一个新的事件循环的幕后魔法,该循环将继续调度事件,直到模态对话框关闭。

我的团队目前的应用程序从 swing 迁移到 JavaFX 的速度非常缓慢(这是一个有点棘手的过渡),我希望能够从 AWT 事件调度线程显示模态 JavaFX 对话框,其方式与 swing 模态对话框的显示方式相同显示。似乎拥有某种 EDT 安全的信号量可以满足这个用例,并且可能在未来的其他用途中派上用场。

方法

java.awt.EventQueue.createSecondaryLoop() 是一种创建SecondaryLoop 对象的方法,然后您可以使用它来启动新的事件处理循环。当您调用SecondaryLoop.enter() 时,该调用将在处理新的事件循环时阻塞(注意 call 阻塞,但 thread 并未阻塞,因为它正在继续一个事件处理循环)。新的事件循环将一直持续到您调用SecondaryLoop.exit()(这并不完全正确,请参阅我的related SO question)。

所以我创建了一个信号量,其中阻塞调用获取导致等待普通线程的闩锁,或进入 EDT 的辅助循环。每次对获取的阻塞调用还添加了一个解除阻塞操作,以便在释放信号量时调用(对于普通线程,它只是减少锁存器,对于 EDT,它退出辅助循环)。

这是我的代码:


import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

@SuppressWarnings("serial")
public class EventDispatchThreadSafeBinarySemaphore extends Semaphore

    /** Operations used to unblock threads when a semaphore is released.
     * Must be a stack because secondary loops have to be exited in the
     * reverse of the order in which they were entered in order to unblock
     * the execution of the method that entered the loop.
     */
    private Stack<Runnable> releaseOperations = new Stack<>();

    private boolean semaphoreAlreadyAcquired = false;


    public EventDispatchThreadSafeBinarySemaphore() 
        super(0);
    

    @Override
    public boolean isFair() 
        return false;
    

    @Override
    public void acquire() throws InterruptedException 

        Runnable blockingOperation = () -> ;

        synchronized(this) 
            if(semaphoreAlreadyAcquired) 

                //We didn't acquire the semaphore, need to set up an operation to execute
                //while we're waiting on the semaphore and an operation for another thread
                //to execute in order to unblock us when the semaphore becomes available

                if(EventQueue.isDispatchThread()) 

                    //For the EDT, we don't want to actually block, rather we'll enter a new loop that will continue
                    //processing AWT events.
                    SecondaryLoop temporaryAwtLoop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();

                    releaseOperations.add(() -> temporaryAwtLoop.exit());

                    blockingOperation = () -> 

                        if(!temporaryAwtLoop.enter()) 
                            //I don't think we'll run into this, but I'm leaving this here for now for debug purposes
                            System.err.println("Failed to enter event loop");
                        
                    ;
                
                else 

                    //Non-dispatch thread is a little simpler, we'll just wait on a latch
                    CountDownLatch blockedLatch = new CountDownLatch(1);
                    releaseOperations.add(() -> blockedLatch.countDown());
                    blockingOperation = () -> 
                        try 
                            blockedLatch.await();
                         catch (InterruptedException e) 
                            //I'll worry about handling this better once I have the basics figured out
                            e.printStackTrace();
                        
                    ;
                
            
            else 
                semaphoreAlreadyAcquired = true;
            
        

        //This part must be executed outside of the synchronized block so that we don't block
        //the EDT if it tries to acquire the semaphore while this statement is blocked
        blockingOperation.run();

    

    @Override
    public void release() 
        synchronized(this) 
            if(releaseOperations.size() > 0) 
                //Release the last blocked thread
                releaseOperations.pop().run();
            
            else 
                semaphoreAlreadyAcquired = false;
            
        
    



这是我的相关 JUnit 测试代码(我为大尺寸道歉,这是迄今为止我能想到的最小的最小可验证示例):

public class TestEventDispatchThreadSafeBinarySemaphore 

    private static EventDispatchThreadSafeBinarySemaphore semaphore;
        //See https://***.com/questions/58192008/secondaryloop-enter-not-blocking-until-exit-is-called-on-the-edt
        //for why we need this timer
        private static Timer timer = new Timer(500, null);
        @BeforeClass
    public static void setupClass() 
        timer.start();
    

    @Before
    public void setup() 
        semaphore = new EventDispatchThreadSafeBinarySemaphore();
    
        @AfterClass
    public static void cleanupClass() 
        timer.stop();
    

        //This test passes just fine
        @Test(timeout = 1000)
    public void testBlockingAcquireReleaseOnEDT() throws InterruptedException 

        semaphore.acquire();

        CountDownLatch edtCodeStarted = new CountDownLatch(1);
        CountDownLatch edtCodeFinished = new CountDownLatch(1);

        SwingUtilities.invokeLater(() -> 
            //One countdown to indicate that this has begun running
            edtCodeStarted.countDown();
            try 
                semaphore.acquire();
             catch (InterruptedException e) 
                throw new RuntimeException(e);
            

            //This countdown indicates that it has finished running
            edtCodeFinished.countDown();

        );

        //Ensure that the code on the EDT has started
        edtCodeStarted.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        //Ensure that things can still run on the EDT
        CountDownLatch edtActiveCheckingLatch = new CountDownLatch(1);
        SwingUtilities.invokeLater(() -> edtActiveCheckingLatch.countDown());

        //If we get past this line, then we know that the EDT is live even though the 
        //code in the invokeLater call is blocked
        edtActiveCheckingLatch.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        semaphore.release();

        //If we get past this line, then the code on the EDT got past the semaphore
        edtCodeFinished.await();
    

        //This test fails intermittently, but so far only after the previous test was run first
    @Test(timeout = 10000)
    public void testConcurrentAcquiresOnEDT() throws InterruptedException 

        int numThreads =100;

        CountDownLatch doneLatch = new CountDownLatch(numThreads);

        try 
            semaphore.acquire();

            //Queue up a bunch of threads to acquire and release the semaphore
            //as soon as it becomes available
            IntStream.range(0, numThreads)
                    .parallel()
                    .forEach((threadNumber) -> 
                        SwingUtilities.invokeLater(() -> 
                            try 
                                semaphore.acquire();
                             catch (InterruptedException e) 
                                e.printStackTrace();
                            
                            finally 
                                semaphore.release();
                                //Count down the latch to indicate that the thread terminated
                                doneLatch.countDown();
                            
                        )
                    );

            semaphore.release();

            doneLatch.await();
         catch (InterruptedException e) 
            throw new RuntimeException(e);
        
    

问题

testConcurrentAcquiresOnEDT 有时会通过,有时会失败。我相信我知道为什么。我深入研究了 Java 源代码和WaitDispatchSupportSecondaryLoop 的具体实现),循环基本上继续调度事件,直到一个名为keepBlockingEDT 的标志被清除。它将在事件之间检查这一点。当我调用exit 时,它会清除该标志并发送一个事件来唤醒事件队列,以防它正在等待更多事件。但是,它不会导致 enter() 方法立即退出(而且我认为它不可能)。

下面是死锁的结果:

主线程获取信号量 EDT 线程尝试获取信号量,但它已经被获取,所以它: 创建一个新的二级循环 创建一个 Runnable,它将退出新的辅助循环并将其推送到 releaseOperations 堆栈 进入二级循环,导致执行阻塞(注意最后一步必须在synchronized 块之外 主线程释放信号量,导致以下情况发生: releaseOperations 堆栈被弹出并在辅助循环中调用 exit exit 调用将第二循环的 keepBlockingEDT 标志设置为 false 回到 EDT,它刚刚检查完 keepBlockingEDT 标志(就在它被设置为 false 之前),它正在获取下一个事件。 事实证明,下一个事件是另一个阻塞信号量的可运行对象,因此它会尝试获取它 这会在原始SecondaryLoop 之上创建另一个SecondaryLoop 并输入它 此时,原来的SecondaryLoop 已经清除了keepBlockingEDT 标志,它将能够停止阻塞,除了它当前被阻塞运行第二个SecondaryLoop。第二个SecondaryLoop 永远不会调用它,因为现在没有人真正获得信号量,因此我们永远阻塞。

我已经为此工作了几天,我想出的每一个想法都是死路一条。

我相信我有一个可能的部分解决方案,即不允许一次在信号量上阻塞多个线程(如果另一个线程试图获取它,我将抛出一个 IllegalStateException)。如果它们每个都使用自己的信号量,我仍然可以有多个辅助循环,但每个信号量最多会创建 1 个辅助循环。我认为这会起作用,并且可以很好地满足我最可能的用例(因为大多数情况下我只想从事件线程中显示一个 JavaFX 模态对话框)。我只是想知道是否有其他人有其他想法,因为我觉得我已经接近制作一些很酷的东西了,但它就是不太奏效。

如果您有任何想法,请告诉我。 “我很确定这是不可能的,这就是为什么......”也是一个可以接受的答案。

【问题讨论】:

除了贴出的实际答案/建议外,请注意,您自己推理的结论自然是:您的信号量必须是“递归的”——也就是说,就像synchronized 本身不会阻塞如果同一个线程已经持有锁,如果调用线程已经获取并且尚未释放,您的信号量必须始终授予该信号量。 【参考方案1】:

使用Semaphore 很可能不是正确的方法。您想要的是进入嵌套事件循环,而不是使用阻塞机制。从阅读 API 来看,你似乎也把事情复杂化了。同样,您只需要在一个 UI 线程上进入一个嵌套事件循环,然后在另一个 UI 线程完成其工作后退出该循环。我认为以下内容符合您的要求:

import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javafx.application.Platform;
import javax.swing.SwingUtilities;

public class Foo 

    public static <T> T getOnFxAndWaitOnEdt(Supplier<? extends T> supplier) 
        Objects.requireNonNull(supplier, "supplier");
        if (!EventQueue.isDispatchThread()) 
            throw new IllegalStateException("current thread != EDT");
        

        final SecondaryLoop loop = Toolkit.getDefaultToolkit()
                .getSystemEventQueue()
                .createSecondaryLoop();
        final AtomicReference<T> valueRef = new AtomicReference<>();

        Platform.runLater(() -> 
            valueRef.set(supplier.get());
            SwingUtilities.invokeLater(loop::exit);
        );
        loop.enter();

        return valueRef.get();
    

    public static <T> T getOnEdtAndWaitOnFx(Supplier<? extends T> supplier) 
        Objects.requireNonNull(supplier, "supplier");
        if (!Platform.isFxApplicationThread()) 
            throw new IllegalStateException(
                    "current thread != JavaFX Application Thread");
        

        final Object key = new Object();
        final AtomicReference<T> valueRef = new AtomicReference<>();

        SwingUtilities.invokeLater(() -> 
            valueRef.set(supplier.get());
            Platform.runLater(() -> Platform.exitNestedEventLoop(key, null));
        );
        Platform.enterNestedEventLoop(key);

        return valueRef.get();
    


Platform#enterNestedEventLoopPlatform#exitNestedEventLoop 方法是在 JavaFX 9 中添加的,尽管在 JavaFX 8 中有等效的内部方法。使用 AtomicReference 的原因是因为在 lambda 表达式中使用时局部变量必须是 final 或有效 final .但是,由于通知单独线程的方式,我不相信AtomicReference#get()#set(T) 方法提供的波动语义是严格需要的,但我已经使用这些方法以防万一。

这是一个使用上述内容显示来自 Event Dispatch Thread 的模态 JavaFX 对话框的示例:

Optional<T> optional = Foo.getOnFxAndWaitOnEdt(() -> 
    Dialog<T> dialog = new Dialog<>();
    // configure dialog...
    return dialog.showAndWait();
);

上述实用方法用于从 Event Dispatch 线程JavaFX 应用程序线程 通信,反之亦然。这就是为什么需要进入嵌套事件循环的原因,否则其中一个 UI 线程将不得不阻塞,这将冻结关联的 UI。如果您在非 UI 线程上并且需要在等待结果的同时在 UI 线程上运行操作,则解决方案要简单得多:

// Run on EDT
T result = CompletableFuture.supplyAysnc(/*Supplier*/, SwingUtilities::invokeLater).join();

// Run on FX thread
T result = CompletableFuture.supplyAsync(/*Supplier*/, Platform::runLater).join();

join() 的调用将阻塞调用线程,因此请确保不要从任何一个 UI 线程调用该方法。

【讨论】:

是的。你说得对。过于复杂的事情是我的面包和黄油:)。这将满足我当前的用例。我只是认为通用的通用 EDT 安全信号量也可以满足我的用例,并且也可以用于其他用例,所以我有点得意忘形。不过,这些其他潜在用例很有可能永远不会发生。我将再给这个问题几天,看看是否有人提出了通用解决方案。如果没有,我会接受这个作为答案。谢谢! @NateW 当您提到通用的通用解决方案时,我不确定您的想法是什么,但我已经编辑了我的答案以使其更通用。我还添加了一种在 EDT 上运行某些东西的方法,同时在 FX 线程上“等待”(即与您的用例相反)。

以上是关于创建事件调度线程安全信号量的主要内容,如果未能解决你的问题,请参考以下文章

Qt的信号槽机制可以保障线程安全吗

如何创建线程?如何保证线程安全?

linux线程详解:线程概念线程调度线程安全线程模型

在事件分派之前检查 null ......线程安全吗?

java中多线程并发并行线程与进程线程调度创建线程的方式

java中多线程并发并行线程与进程线程调度创建线程的方式