如何使用 JUnit 测试异步进程

Posted

技术标签:

【中文标题】如何使用 JUnit 测试异步进程【英文标题】:How to use JUnit to test asynchronous processes 【发布时间】:2010-10-12 12:12:38 【问题描述】:

您如何测试使用 JUnit 触发异步进程的方法?

我不知道如何让我的测试等待进程结束(它不完全是单元测试,它更像是一个集成测试,因为它涉及多个类而不仅仅是一个)。

【问题讨论】:

你可以试试 JAT(Java 异步测试):bitbucket.org/csolar/jat JAT 有 1 位观察者,1.5 年未更新。 Awaitility 仅在 1 个月前更新,在撰写本文时版本为 1.6。我不隶属于任何一个项目,但如果我要投资于我的项目的补充,我会在这个时候更加信任 Awaitility。 JAT 仍然没有更新:“上次更新时间为 2013-01-19”。只需节省点击链接的时间。 @LesHazlewood,一位观察者对 JAT 不利,但多年来几乎没有更新......只是一个例子。如果它正常工作,您多久更新一次操作系统的低级 TCP 堆栈? JAT 的替代方案在***.com/questions/631598/… 下方回答。 【参考方案1】:

JUnit 5 有 Assertions.assertTimeout(Duration, Executable)/assertTimeoutPreemptively()(请阅读每个的 Javadoc 以了解区别)而 Mockito 有 verify(mock, timeout(millisecs).times(x))

Assertions.assertTimeout(Duration.ofMillis(1000), () -> 
    myReactiveService.doSth().subscribe()
);

还有:

Mockito.verify(myReactiveService, 
    timeout(1000).times(0)).doSth(); // cannot use never() here

超时在管道中可能是不确定的/脆弱的。所以要小心。

【讨论】:

【参考方案2】:

假设你有这个代码:

public void method() 
        CompletableFuture.runAsync(() -> 
            //logic
            //logic
            //logic
            //logic
        );
    

尝试将其重构为如下内容:

public void refactoredMethod() 
    CompletableFuture.runAsync(this::subMethod);


private void subMethod() 
    //logic
    //logic
    //logic
    //logic

之后,以这种方式测试 subMethod:

org.powermock.reflect.Whitebox.invokeMethod(classInstance, "subMethod"); 

这不是一个完美的解决方案,但它会测试异步执行中的所有逻辑。

【讨论】:

如果subMethod被提取到另一个类,这个解决方案会更好,因此可以在没有powermock/reflection的情况下进行测试【参考方案3】:

对于所有 Spring 用户,这就是我现在通常进行集成测试的方式,其中涉及异步行为:

当异步任务(例如 I/O 调用)完成时,在生产代码中触发应用程序事件。在大多数情况下,无论如何都需要此事件来处理生产中异步操作的响应。

有了这个事件,你就可以在你的测试用例中使用以下策略:

    执行被测系统 监听事件并确保事件已触发 做你的断言

要对此进行分解,您首先需要触发某种域事件。我在这里使用 UUID 来标识已完成的任务,但您当然可以随意使用其他东西,只要它是唯一的。

(注意,下面的代码sn-ps也使用Lombok注解去掉样板代码)

@RequiredArgsConstructor
class TaskCompletedEvent() 
  private final UUID taskId;
  // add more fields containing the result of the task if required

生产代码本身通常如下所示:

@Component
@RequiredArgsConstructor
class Production 

  private final ApplicationEventPublisher eventPublisher;

  void doSomeTask(UUID taskId) 
    // do something like calling a REST endpoint asynchronously
    eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
  


然后我可以使用 Spring @EventListener 在测试代码中捕获已发布的事件。事件侦听器涉及更多一点,因为它必须以线程安全的方式处理两种情况:

    生产代码比测试用例快,并且在测试用例检查事件之前事件已经触发,或者 测试用例比生产代码更快,测试用例必须等待事件。

CountDownLatch 用于第二种情况,如此处其他答案中所述。另请注意,事件处理程序方法上的 @Order 注释可确保在生产中使用的任何其他事件侦听器之后调用此事件处理程序方法。

@Component
class TaskCompletionEventListener 

  private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
  private List<UUID> eventsReceived = new ArrayList<>();

  void waitForCompletion(UUID taskId) 
    synchronized (this) 
      if (eventAlreadyReceived(taskId)) 
        return;
      
      checkNobodyIsWaiting(taskId);
      createLatch(taskId);
    
    waitForEvent(taskId);
  

  private void checkNobodyIsWaiting(UUID taskId) 
    if (waitLatches.containsKey(taskId)) 
      throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
    
  

  private boolean eventAlreadyReceived(UUID taskId) 
    return eventsReceived.remove(taskId);
  

  private void createLatch(UUID taskId) 
    waitLatches.put(taskId, new CountDownLatch(1));
  

  @SneakyThrows
  private void waitForEvent(UUID taskId) 
    var latch = waitLatches.get(taskId);
    latch.await();
  

  @EventListener
  @Order
  void eventReceived(TaskCompletedEvent event) 
    var taskId = event.getTaskId();
    synchronized (this) 
      if (isSomebodyWaiting(taskId)) 
        notifyWaitingTest(taskId);
       else 
        eventsReceived.add(taskId);
      
    
  

  private boolean isSomebodyWaiting(UUID taskId) 
    return waitLatches.containsKey(taskId);
  

  private void notifyWaitingTest(UUID taskId) 
    var latch = waitLatches.remove(taskId);
    latch.countDown();
  


最后一步是在测试用例中执行被测系统。我在这里使用带有 JUnit 5 的 SpringBoot 测试,但这对于使用 Spring 上下文的所有测试应该是一样的。

@SpringBootTest
class ProductionIntegrationTest 

  @Autowired
  private Production sut;

  @Autowired
  private TaskCompletionEventListener listener;

  @Test
  void thatTaskCompletesSuccessfully() 
    var taskId = UUID.randomUUID();
    sut.doSomeTask(taskId);
    listener.waitForCompletion(taskId);
    // do some assertions like looking into the DB if value was stored successfully
  


请注意,与此处的其他答案相比,如果您并行执行测试并且多个线程同时执行异步代码,此解决方案也将起作用。

【讨论】:

【参考方案4】:

您可以尝试使用Awaitility 库。它使测试您正在谈论的系统变得容易。

【讨论】:

友好的免责声明:Johan 是该项目的主要贡献者。 遭受必须等待的根本问题(单元测试需要快速运行)。理想情况下,您真的不想等待比需要的时间长一毫秒,所以我认为在这方面使用CountDownLatch(参见@Martin 的回答)会更好。 真的很棒。 这是一个完美的库,可以满足我的异步流程集成测试要求。非常棒。该库似乎维护得很好,并且具有从基本到高级的功能,我相信足以满足大多数情况。感谢您的精彩参考! 非常棒的建议。谢谢【参考方案5】:

尽可能避免使用并行线程进行测试(大多数情况下)。这只会使您的测试变得不稳定(有时通过,有时失败)。

仅当您需要调用其他库/系统时,您可能需要等待其他线程,在这种情况下,请始终使用 Awaitility 库而不是 Thread.sleep()

永远不要在您的测试中只调用get()join(),否则您的测试可能会在您的 CI 服务器上永远运行,以防未来永远无法完成。在调用get() 之前,始终在测试中首先声明isDone()。对于 CompletionStage,即.toCompletableFuture().isDone()

当您像这样测试非阻塞方法时:

public static CompletionStage<String> createGreeting(CompletableFuture<String> future) 
    return future.thenApply(result -> "Hello " + result);

那么你不应该只通过在测试中传递一个完整的 Future 来测试结果,你还应该确保你的方法 doSomething() 不会通过调用 join()get() 来阻塞。如果您使用非阻塞框架,这一点尤其重要。

为此,请使用您设置为手动完成的未完成未来进行测试:

@Test
public void testDoSomething() throws Exception 
    CompletableFuture<String> innerFuture = new CompletableFuture<>();
    CompletableFuture<String> futureResult = createGreeting(innerFuture).toCompletableFuture();
    assertFalse(futureResult.isDone());

    // this triggers the future to complete
    innerFuture.complete("world");
    assertTrue(futureResult.isDone());

    // futher asserts about fooResult here
    assertEquals(futureResult.get(), "Hello world");

这样,如果你在 doSomething() 中添加future.join(),测试就会失败。

如果您的 Service 使用 ExecutorService,例如 thenApplyAsync(..., executorService),那么在您的测试中注入一个单线程 ExecutorService,例如来自 guava 的那个:

ExecutorService executorService = Executors.newSingleThreadExecutor();

如果你的代码使用了 thenApplyAsync(...) 这样的 forkJoinPool,重写代码以使用 ExecutorService(有很多好的理由),或者使用 Awaitility。

为了缩短示例,我在测试中将 BarService 设置为实现为 Java8 lambda 的方法参数,通常它是您将模拟的注入引用。

【讨论】:

嘿@tkruse,也许你有一个使用这种技术进行测试的公共git repo? @Christiano:那将违反 SO 哲学。相反,当您将它们粘贴到空的 junit 测试类中时,我将方法更改为无需任何额外代码即可编译(所有导入都是 java8+ 或 junit)。随意投票。 我现在明白了。谢谢。我现在的问题是测试方法何时返回 CompletableFuture 但确实接受其他对象作为 CompletableFuture 以外的参数。 在您的情况下,谁创建了该方法返回的 CompletableFuture?如果它是另一个服务,那可以被嘲笑,我的技术仍然适用。如果方法本身创建了 CompletableFuture,情况会发生很大变化,因此您可以提出一个新问题。然后取决于哪个线程将完成您的方法返回的未来。【参考方案6】:

值得一提的是Concurrency in Practice中有一个非常有用的章节Testing Concurrent Programs描述了一些单元测试方法并给出了问题的解决方案。

【讨论】:

那是哪种方法?你能举个例子吗?【参考方案7】:

这里有很多答案,但一个简单的方法是创建一个完整的 CompletableFuture 并使用它:

CompletableFuture.completedFuture("donzo")

所以在我的测试中:

this.exactly(2).of(mockEventHubClientWrapper).sendASync(with(any(LinkedList.class)));
this.will(returnValue(new CompletableFuture<>().completedFuture("donzo")));

我只是确保无论如何都会调用所有这些东西。如果您使用此代码,则此技术有效:

CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();

当所有 CompletableFutures 完成后,它会直接穿过它!

【讨论】:

【参考方案8】:

我找到了一个库socket.io 来测试异步逻辑。使用LinkedBlockingQueue 看起来简单而简洁。这里是example:

    @Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException 
    final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();

    socket = client();
    socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() 
        @Override
        public void call(Object... objects) 
            socket.send("foo", "bar");
        
    ).on(Socket.EVENT_MESSAGE, new Emitter.Listener() 
        @Override
        public void call(Object... args) 
            values.offer(args);
        
    );
    socket.connect();

    assertThat((Object[])values.take(), is(new Object[] "hello client"));
    assertThat((Object[])values.take(), is(new Object[] "foo", "bar"));
    socket.disconnect();

使用 LinkedBlockingQueue 以 API 阻塞直到获得结果,就像同步方式一样。并设置超时以避免假设等待结果的时间过多。

【讨论】:

很棒的方法!【参考方案9】:

另一种方法是使用CountDownLatch 类。

public class DatabaseTest 

    /**
     * Data limit
     */
    private static final int DATA_LIMIT = 5;

    /**
     * Countdown latch
     */
    private CountDownLatch lock = new CountDownLatch(1);

    /**
     * Received data
     */
    private List<Data> receiveddata;

    @Test
    public void testDataRetrieval() throws Exception 
        Database db = new MockDatabaseImpl();
        db.getData(DATA_LIMIT, new DataCallback() 
            @Override
            public void onSuccess(List<Data> data) 
                receiveddata = data;
                lock.countDown();
            
        );

        lock.await(2000, TimeUnit.MILLISECONDS);

        assertNotNull(receiveddata);
        assertEquals(DATA_LIMIT, receiveddata.size());
    

注意您不能只将 syncronized 与常规对象一起用作锁,因为快速回调可以在调用锁的等待方法之前释放锁。请参阅 Joe Walnes 的 this 博客文章。

EDIT由于来自@jtahlborn 和@Ring 的cmets,删除了CountDownLatch 周围的同步块

【讨论】:

请不要按照这个例子,不正确。您应该在 CountDownLatch 上进行同步,因为它在内部处理线程安全。 在同步部分之前,这是一个很好的建议,它可能占用了将近 3-4 小时的调试时间。 ***.com/questions/11007551/… 为错误道歉。我已经适当地编辑了答案。 如果您正在验证 onSuccess 是否被调用,您应该断言 lock.await 返回 true。 @Martin 是正确的,但这意味着您有一个不同的问题需要解决。【参考方案10】:

测试线程/异步代码本质上没有任何问题,特别是如果线程是您正在测试的代码的重点。测试这些东西的一般方法是:

阻塞主测试线程 从其他线程捕获失败的断言 解除对主测试线程的阻塞 重新抛出任何失败

但对于一项测试来说,这是很多样板文件。更好/更简单的方法是只使用ConcurrentUnit:

  final Waiter waiter = new Waiter();

  new Thread(() -> 
    doSomeWork();
    waiter.assertTrue(true);
    waiter.resume();
  ).start();

  // Wait for resume() to be called
  waiter.await(1000);

CountdownLatch 方法相比,这种方法的好处是它不那么冗长,因为任何线程中发生的断言失败都会正确地报告给主线程,这意味着测试应该在应该失败的时候失败。将 CountdownLatch 方法与 ConcurrentUnit 进行比较的文章是 here。

我还为此主题写了blog post,供那些想了解更多细节的人使用。

【讨论】:

我过去使用的类似解决方案是github.com/MichaelTamm/junit-toolbox,也作为junit.org/junit4 上的第三方扩展程序【参考方案11】:

如果测试结果是异步产生的,这就是我现在使用的。

public class TestUtil 

    public static <R> R await(Consumer<CompletableFuture<R>> completer) 
        return await(20, TimeUnit.SECONDS, completer);
    

    public static <R> R await(int time, TimeUnit unit, Consumer<CompletableFuture<R>> completer) 
        CompletableFuture<R> f = new CompletableFuture<>();
        completer.accept(f);
        try 
            return f.get(time, unit);
         catch (InterruptedException | TimeoutException e) 
            throw new RuntimeException("Future timed out", e);
         catch (ExecutionException e) 
            throw new RuntimeException("Future failed", e.getCause());
        
    

使用静态导入,测试看起来不错。 (注意,在这个例子中我开始一个线程来说明这个想法)

    @Test
    public void testAsync() 
        String result = await(f -> 
            new Thread(() -> f.complete("My Result")).start();
        );
        assertEquals("My Result", result);
    

如果未调用f.complete,则测试将在超时后失败。您也可以使用f.completeExceptionally 提前失败。

【讨论】:

【参考方案12】:

如果您使用CompletableFuture(Java 8 中引入)或SettableFuture(来自Google Guava),您可以在测试完成后立即完成,而不是等待预设的时间.您的测试将如下所示:

CompletableFuture<String> future = new CompletableFuture<>();
executorService.submit(new Runnable()          
    @Override
    public void run() 
        future.complete("Hello World!");                
    
);
assertEquals("Hello World!", future.get());

【讨论】:

【参考方案13】:

我更喜欢使用等待和通知。简单明了。

@Test
public void test() throws Throwable 
    final boolean[] asyncExecuted = false;
    final Throwable[] asyncThrowable= null;

    // do anything async
    new Thread(new Runnable() 
        @Override
        public void run() 
            try 
                // Put your test here.
                fail(); 
            
            // lets inform the test thread that there is an error.
            catch (Throwable throwable)
                asyncThrowable[0] = throwable;
            
            // ensure to release asyncExecuted in case of error.
            finally 
                synchronized (asyncExecuted)
                    asyncExecuted[0] = true;
                    asyncExecuted.notify();
                
            
        
    ).start();

    // Waiting for the test is complete
    synchronized (asyncExecuted)
        while(!asyncExecuted[0])
            asyncExecuted.wait();
        
    

    // get any async error, including exceptions and assertationErrors
    if(asyncThrowable[0] != null)
        throw asyncThrowable[0];
    

基本上,我们需要创建一个最终的 Array 引用,以便在匿名内部类内部使用。我宁愿创建一个 boolean[],因为我可以设置一个值来控制我们是否需要 wait()。当一切都完成后,我们只需释放 asyncExecuted。

【讨论】:

如果你的断言失败,主测试线程不会知道。 感谢您的解决方案,帮助我使用 websocket 连接调试代码。 @Jonathan,我更新了代码以捕获任何断言和异常并将其通知主测试线程。【参考方案14】:

如果您想测试逻辑,请不要异步测试它。

例如测试这段代码,它适用于异步方法的结果。

public class Example 
    private Dependency dependency;

    public Example(Dependency dependency) 
        this.dependency = dependency;            
    

    public CompletableFuture<String> someAsyncMethod()
        return dependency.asyncMethod()
                .handle((r,ex) -> 
                    if(ex != null) 
                        return "got exception";
                     else 
                        return r.toString();
                    
                );
    


public class Dependency 
    public CompletableFuture<Integer> asyncMethod() 
        // do some async stuff       
    

在测试中模拟具有同步实现的依赖项。单元测试完全同步,运行时间为 150ms。

public class DependencyTest 
    private Example sut;
    private Dependency dependency;

    public void setup() 
        dependency = Mockito.mock(Dependency.class);;
        sut = new Example(dependency);
    

    @Test public void success() throws InterruptedException, ExecutionException 
        when(dependency.asyncMethod()).thenReturn(CompletableFuture.completedFuture(5));

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("5")));
    

    @Test public void failed() throws InterruptedException, ExecutionException 
        // Given
        CompletableFuture<Integer> c = new CompletableFuture<Integer>();
        c.completeExceptionally(new RuntimeException("failed"));
        when(dependency.asyncMethod()).thenReturn(c);

        // When
        CompletableFuture<String> result = sut.someAsyncMethod();

        // Then
        assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
        String value = result.get();
        assertThat(value, is(equalTo("got exception")));
    

您不测试异步行为,但您可以测试逻辑是否正确。

【讨论】:

【参考方案15】:

我发现对测试异步方法非常有用的一种方法是在对象测试的构造函数中注入一个Executor 实例。在生产中,执行器实例被配置为异步运行,而在测试中它可以被模拟为同步运行。

所以假设我正在尝试测试异步方法Foo#doAsync(Callback c)

class Foo 
  private final Executor executor;
  public Foo(Executor executor) 
    this.executor = executor;
  

  public void doAsync(Callback c) 
    executor.execute(new Runnable() 
      @Override public void run() 
        // Do stuff here
        c.onComplete(data);
      
    );
  

在生产中,我会使用Executors.newSingleThreadExecutor() Executor 实例构造Foo,而在测试中我可能会使用执行以下操作的同步执行器来构造它--

class SynchronousExecutor implements Executor 
  @Override public void execute(Runnable r) 
    r.run();
  

现在我对异步方法的 JUnit 测试非常干净 --

@Test public void testDoAsync() 
  Executor executor = new SynchronousExecutor();
  Foo objectToTest = new Foo(executor);

  Callback callback = mock(Callback.class);
  objectToTest.doAsync(callback);

  // Verify that Callback#onComplete was called using Mockito.
  verify(callback).onComplete(any(Data.class));

  // Assert that we got back the data that we expected.
  assertEquals(expectedData, callback.getData());

【讨论】:

如果我想集成测试涉及像 Spring 的 WebClient 这样的异步库调用的东西,则不起作用【参考方案16】:

如何调用SomeObject.waitnotifyAll 如here 或使用Robotiums Solo.waitForCondition(...) 方法或使用class i wrote 来执行此操作(请参阅 cmets 和测试类了解如何使用)

【讨论】:

等待/通知/中断方法的问题是您正在测试的代码可能会干扰等待线程(我已经看到它发生了)。这就是ConcurrentUnit 使用线程可以等待的私有circuit 的原因,它不会被主测试线程的中断无意中干扰。【参考方案17】:

恕我直言,让单元测试创​​建或等待线程等是不好的做法。您希望这些测试在几秒钟内运行。这就是为什么我想提出一种两步法来测试异步进程。

    测试您的异步进程是否正确提交。您可以模拟接受异步请求的对象,并确保提交的作业具有正确的属性等。 测试您的异步回调是否在做正确的事情。在这里,您可以模拟最初提交的作业并假设它已正确初始化并验证您的回调是否正确。

【讨论】:

当然。但有时您需要测试专门用于管理线程的代码。 对于我们这些使用 Junit 或 TestNG 进行集成测试(而不仅仅是单元测试)或用户验收测试(例如 w/ Cucumber)的人来说,等待异步完成并验证结果是绝对必要。 异步进程是一些最复杂的代码,你说你不应该对它们使用单​​元测试而只用一个线程进行测试?这是一个非常糟糕的主意。 模拟测试通常无法证明功能端到端有效。需要以异步方式测试异步功能以确保其正常工作。如果您愿意,可以将其称为集成测试,但它仍然是一个需要的测试。 这不应该是公认的答案。测试超越了单元测试。 OP 将其称为集成测试而不是单元测试。【参考方案18】:

启动进程并使用Future 等待结果。

【讨论】:

以上是关于如何使用 JUnit 测试异步进程的主要内容,如果未能解决你的问题,请参考以下文章

如何在Gradle中为单个测试类并行执行JUnit测试

java Android - 如何使用IdlingResource等待JUnit断言的异步操作(可能是Observables或EvenBus)。

java Android - 如何使用IdlingResource等待JUnit断言的异步操作(可能是Observables或EvenBus)。

java Android - 如何使用IdlingResource等待JUnit断言的异步操作(可能是Observables或EvenBus)。

java Android - 如何使用IdlingResource等待JUnit断言的异步操作(可能是Observables或EvenBus)。

java Android - 如何使用IdlingResource等待JUnit断言的异步操作(可能是Observables或EvenBus)。