创建事件调度线程安全信号量
Posted
技术标签:
【中文标题】创建事件调度线程安全信号量【英文标题】:Creating an Event Dispatch Thread safe semaphore 【发布时间】:2020-02-07 01:01:44 【问题描述】:我一直在尝试制作一个二进制信号量,它能够安全地阻止在事件调度线程 (EDT) 上运行的方法的执行,而不会实际阻止线程处理更多事件。最初这似乎是不可能的,但 Java 有一些与此相关的内置功能,但我无法让它工作。
用例
目前,如果您从 EDT 显示模态摆动对话框,它似乎会阻塞 EDT(因为您显示模态对话框的方法在对话框关闭之前不会继续到下一行),但确实有一些使 EDT 进入一个新的事件循环的幕后魔法,该循环将继续调度事件,直到模态对话框关闭。
我的团队目前的应用程序从 swing 迁移到 JavaFX 的速度非常缓慢(这是一个有点棘手的过渡),我希望能够从 AWT 事件调度线程显示模态 JavaFX 对话框,其方式与 swing 模态对话框的显示方式相同显示。似乎拥有某种 EDT 安全的信号量可以满足这个用例,并且可能在未来的其他用途中派上用场。
方法
java.awt.EventQueue.createSecondaryLoop()
是一种创建SecondaryLoop
对象的方法,然后您可以使用它来启动新的事件处理循环。当您调用SecondaryLoop.enter()
时,该调用将在处理新的事件循环时阻塞(注意 call 阻塞,但 thread 并未阻塞,因为它正在继续一个事件处理循环)。新的事件循环将一直持续到您调用SecondaryLoop.exit()
(这并不完全正确,请参阅我的related SO question)。
所以我创建了一个信号量,其中阻塞调用获取导致等待普通线程的闩锁,或进入 EDT 的辅助循环。每次对获取的阻塞调用还添加了一个解除阻塞操作,以便在释放信号量时调用(对于普通线程,它只是减少锁存器,对于 EDT,它退出辅助循环)。
这是我的代码:
import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@SuppressWarnings("serial")
public class EventDispatchThreadSafeBinarySemaphore extends Semaphore
/** Operations used to unblock threads when a semaphore is released.
* Must be a stack because secondary loops have to be exited in the
* reverse of the order in which they were entered in order to unblock
* the execution of the method that entered the loop.
*/
private Stack<Runnable> releaseOperations = new Stack<>();
private boolean semaphoreAlreadyAcquired = false;
public EventDispatchThreadSafeBinarySemaphore()
super(0);
@Override
public boolean isFair()
return false;
@Override
public void acquire() throws InterruptedException
Runnable blockingOperation = () -> ;
synchronized(this)
if(semaphoreAlreadyAcquired)
//We didn't acquire the semaphore, need to set up an operation to execute
//while we're waiting on the semaphore and an operation for another thread
//to execute in order to unblock us when the semaphore becomes available
if(EventQueue.isDispatchThread())
//For the EDT, we don't want to actually block, rather we'll enter a new loop that will continue
//processing AWT events.
SecondaryLoop temporaryAwtLoop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();
releaseOperations.add(() -> temporaryAwtLoop.exit());
blockingOperation = () ->
if(!temporaryAwtLoop.enter())
//I don't think we'll run into this, but I'm leaving this here for now for debug purposes
System.err.println("Failed to enter event loop");
;
else
//Non-dispatch thread is a little simpler, we'll just wait on a latch
CountDownLatch blockedLatch = new CountDownLatch(1);
releaseOperations.add(() -> blockedLatch.countDown());
blockingOperation = () ->
try
blockedLatch.await();
catch (InterruptedException e)
//I'll worry about handling this better once I have the basics figured out
e.printStackTrace();
;
else
semaphoreAlreadyAcquired = true;
//This part must be executed outside of the synchronized block so that we don't block
//the EDT if it tries to acquire the semaphore while this statement is blocked
blockingOperation.run();
@Override
public void release()
synchronized(this)
if(releaseOperations.size() > 0)
//Release the last blocked thread
releaseOperations.pop().run();
else
semaphoreAlreadyAcquired = false;
这是我的相关 JUnit 测试代码(我为大尺寸道歉,这是迄今为止我能想到的最小的最小可验证示例):
public class TestEventDispatchThreadSafeBinarySemaphore
private static EventDispatchThreadSafeBinarySemaphore semaphore;
//See https://***.com/questions/58192008/secondaryloop-enter-not-blocking-until-exit-is-called-on-the-edt
//for why we need this timer
private static Timer timer = new Timer(500, null);
@BeforeClass
public static void setupClass()
timer.start();
@Before
public void setup()
semaphore = new EventDispatchThreadSafeBinarySemaphore();
@AfterClass
public static void cleanupClass()
timer.stop();
//This test passes just fine
@Test(timeout = 1000)
public void testBlockingAcquireReleaseOnEDT() throws InterruptedException
semaphore.acquire();
CountDownLatch edtCodeStarted = new CountDownLatch(1);
CountDownLatch edtCodeFinished = new CountDownLatch(1);
SwingUtilities.invokeLater(() ->
//One countdown to indicate that this has begun running
edtCodeStarted.countDown();
try
semaphore.acquire();
catch (InterruptedException e)
throw new RuntimeException(e);
//This countdown indicates that it has finished running
edtCodeFinished.countDown();
);
//Ensure that the code on the EDT has started
edtCodeStarted.await();
assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());
//Ensure that things can still run on the EDT
CountDownLatch edtActiveCheckingLatch = new CountDownLatch(1);
SwingUtilities.invokeLater(() -> edtActiveCheckingLatch.countDown());
//If we get past this line, then we know that the EDT is live even though the
//code in the invokeLater call is blocked
edtActiveCheckingLatch.await();
assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());
semaphore.release();
//If we get past this line, then the code on the EDT got past the semaphore
edtCodeFinished.await();
//This test fails intermittently, but so far only after the previous test was run first
@Test(timeout = 10000)
public void testConcurrentAcquiresOnEDT() throws InterruptedException
int numThreads =100;
CountDownLatch doneLatch = new CountDownLatch(numThreads);
try
semaphore.acquire();
//Queue up a bunch of threads to acquire and release the semaphore
//as soon as it becomes available
IntStream.range(0, numThreads)
.parallel()
.forEach((threadNumber) ->
SwingUtilities.invokeLater(() ->
try
semaphore.acquire();
catch (InterruptedException e)
e.printStackTrace();
finally
semaphore.release();
//Count down the latch to indicate that the thread terminated
doneLatch.countDown();
)
);
semaphore.release();
doneLatch.await();
catch (InterruptedException e)
throw new RuntimeException(e);
问题
testConcurrentAcquiresOnEDT
有时会通过,有时会失败。我相信我知道为什么。我深入研究了 Java 源代码和WaitDispatchSupport
(SecondaryLoop
的具体实现),循环基本上继续调度事件,直到一个名为keepBlockingEDT
的标志被清除。它将在事件之间检查这一点。当我调用exit
时,它会清除该标志并发送一个事件来唤醒事件队列,以防它正在等待更多事件。但是,它不会导致 enter()
方法立即退出(而且我认为它不可能)。
下面是死锁的结果:
主线程获取信号量 EDT 线程尝试获取信号量,但它已经被获取,所以它: 创建一个新的二级循环 创建一个 Runnable,它将退出新的辅助循环并将其推送到releaseOperations
堆栈
进入二级循环,导致执行阻塞(注意最后一步必须在synchronized
块之外
主线程释放信号量,导致以下情况发生:
releaseOperations
堆栈被弹出并在辅助循环中调用 exit
exit
调用将第二循环的 keepBlockingEDT
标志设置为 false
回到 EDT,它刚刚检查完 keepBlockingEDT
标志(就在它被设置为 false 之前),它正在获取下一个事件。
事实证明,下一个事件是另一个阻塞信号量的可运行对象,因此它会尝试获取它
这会在原始SecondaryLoop
之上创建另一个SecondaryLoop
并输入它
此时,原来的SecondaryLoop
已经清除了keepBlockingEDT
标志,它将能够停止阻塞,除了它当前被阻塞运行第二个SecondaryLoop
。第二个SecondaryLoop
永远不会调用它,因为现在没有人真正获得信号量,因此我们永远阻塞。
我已经为此工作了几天,我想出的每一个想法都是死路一条。
我相信我有一个可能的部分解决方案,即不允许一次在信号量上阻塞多个线程(如果另一个线程试图获取它,我将抛出一个 IllegalStateException)。如果它们每个都使用自己的信号量,我仍然可以有多个辅助循环,但每个信号量最多会创建 1 个辅助循环。我认为这会起作用,并且可以很好地满足我最可能的用例(因为大多数情况下我只想从事件线程中显示一个 JavaFX 模态对话框)。我只是想知道是否有其他人有其他想法,因为我觉得我已经接近制作一些很酷的东西了,但它就是不太奏效。
如果您有任何想法,请告诉我。 “我很确定这是不可能的,这就是为什么......”也是一个可以接受的答案。
【问题讨论】:
除了贴出的实际答案/建议外,请注意,您自己推理的结论自然是:您的信号量必须是“递归的”——也就是说,就像synchronized
本身不会阻塞如果同一个线程已经持有锁,如果调用线程已经获取并且尚未释放,您的信号量必须始终授予该信号量。
【参考方案1】:
使用Semaphore
很可能不是正确的方法。您想要的是进入嵌套事件循环,而不是使用阻塞机制。从阅读 API 来看,你似乎也把事情复杂化了。同样,您只需要在一个 UI 线程上进入一个嵌套事件循环,然后在另一个 UI 线程完成其工作后退出该循环。我认为以下内容符合您的要求:
import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javafx.application.Platform;
import javax.swing.SwingUtilities;
public class Foo
public static <T> T getOnFxAndWaitOnEdt(Supplier<? extends T> supplier)
Objects.requireNonNull(supplier, "supplier");
if (!EventQueue.isDispatchThread())
throw new IllegalStateException("current thread != EDT");
final SecondaryLoop loop = Toolkit.getDefaultToolkit()
.getSystemEventQueue()
.createSecondaryLoop();
final AtomicReference<T> valueRef = new AtomicReference<>();
Platform.runLater(() ->
valueRef.set(supplier.get());
SwingUtilities.invokeLater(loop::exit);
);
loop.enter();
return valueRef.get();
public static <T> T getOnEdtAndWaitOnFx(Supplier<? extends T> supplier)
Objects.requireNonNull(supplier, "supplier");
if (!Platform.isFxApplicationThread())
throw new IllegalStateException(
"current thread != JavaFX Application Thread");
final Object key = new Object();
final AtomicReference<T> valueRef = new AtomicReference<>();
SwingUtilities.invokeLater(() ->
valueRef.set(supplier.get());
Platform.runLater(() -> Platform.exitNestedEventLoop(key, null));
);
Platform.enterNestedEventLoop(key);
return valueRef.get();
Platform#enterNestedEventLoop
和 Platform#exitNestedEventLoop
方法是在 JavaFX 9 中添加的,尽管在 JavaFX 8 中有等效的内部方法。使用 AtomicReference
的原因是因为在 lambda 表达式中使用时局部变量必须是 final 或有效 final .但是,由于通知单独线程的方式,我不相信AtomicReference
的#get()
和#set(T)
方法提供的波动语义是严格需要的,但我已经使用这些方法以防万一。
这是一个使用上述内容显示来自 Event Dispatch Thread 的模态 JavaFX 对话框的示例:
Optional<T> optional = Foo.getOnFxAndWaitOnEdt(() ->
Dialog<T> dialog = new Dialog<>();
// configure dialog...
return dialog.showAndWait();
);
上述实用方法用于从 Event Dispatch 线程 到 JavaFX 应用程序线程 通信,反之亦然。这就是为什么需要进入嵌套事件循环的原因,否则其中一个 UI 线程将不得不阻塞,这将冻结关联的 UI。如果您在非 UI 线程上并且需要在等待结果的同时在 UI 线程上运行操作,则解决方案要简单得多:
// Run on EDT
T result = CompletableFuture.supplyAysnc(/*Supplier*/, SwingUtilities::invokeLater).join();
// Run on FX thread
T result = CompletableFuture.supplyAsync(/*Supplier*/, Platform::runLater).join();
对join()
的调用将阻塞调用线程,因此请确保不要从任何一个 UI 线程调用该方法。
【讨论】:
是的。你说得对。过于复杂的事情是我的面包和黄油:)。这将满足我当前的用例。我只是认为通用的通用 EDT 安全信号量也可以满足我的用例,并且也可以用于其他用例,所以我有点得意忘形。不过,这些其他潜在用例很有可能永远不会发生。我将再给这个问题几天,看看是否有人提出了通用解决方案。如果没有,我会接受这个作为答案。谢谢! @NateW 当您提到通用的通用解决方案时,我不确定您的想法是什么,但我已经编辑了我的答案以使其更通用。我还添加了一种在 EDT 上运行某些东西的方法,同时在 FX 线程上“等待”(即与您的用例相反)。以上是关于创建事件调度线程安全信号量的主要内容,如果未能解决你的问题,请参考以下文章