如何等待多个服务完成?

Posted

技术标签:

【中文标题】如何等待多个服务完成?【英文标题】:How to wait for multiple services to complete? 【发布时间】:2022-01-15 05:29:39 【问题描述】:

我正在运行多个这样的服务: (例如多线程读取文件)

for (int i = 0; i < 3; i++) 
        ReadService readService = new ReadService();
        readService.start();


//wait until all services have been completed
System.out.println("All services done!");

ReadService 是一个扩展 Service 类的类,它正在做一些事情,比如读取文件。 它是从另一个不是 JavaFX 应用程序线程的线程调用的。

我怎样才能等到所有这些服务都完成后再致电System.out.println

可重现的例子:

import javafx.application.Application;
import javafx.concurrent.Service;
import javafx.concurrent.Task;
import javafx.stage.Stage;

public class HelloApplication extends Application 

    @Override
    public void start(Stage stage) 
        for (int i = 0; i < 3; i++) 
            ReadService readService = new ReadService();
            readService.start();
        
        // wait until all services have been completed
        System.out.println("Services done");
        // continue with some other code, for example check the read file on anything
    

    public static void main(String[] args) 
        launch();
    


class ReadService extends Service<Boolean> 

    @Override
    protected Task<Boolean> createTask() 
        return new Task<>() 
            @Override
            protected synchronized Boolean call() throws Exception 
                // do something, for example read a file
                System.out.println("wait...");
                wait(5000);
                System.out.println("waiting done");
                return null;
            
        ;
    

【问题讨论】:

这个问题需要更多的上下文。 ReadService 是什么?这段代码在哪里被调用(例如,因为这个问题标记为 javafx,它在 FX 应用程序线程上)?完成所有服务后,您想做什么? (更新用户界面?还有别的吗?)理想情况下,创建并发布minimal reproducible example。 @James_D 我用一个可重现的例子更新了我的帖子。我不是从 JavaFX 线程调用服务,而是从 JavaFX 线程调用的线程调用服务。服务完成后,我正在对读取的文件做一些事情(比如验证、创建对象......)。希望现在更清楚了 不要使用服务,使用三个任务来完成工作,任务是futuretask的子类,所以你可以对它们调用get()来等待它们完成。但是不要在 FX 线程上调用 get(),而是创建一个运行 3 个子任务的第 4 个控制任务,并在所有子任务上调用 get 以暂停直到完成。使用来自执行者的工作窃取池执行您的任务。 【参考方案1】:

您正在从 FX 应用程序线程(执行 start() 方法的位置)启动服务,并且您不能阻塞该线程。因此,一种方法是为已完成的服务数量创建一个计数器,并在达到零时做出响应。

请注意,这里的所有新内容(创建和更新 servicesPending 属性,以及服务完成时执行的代码)都在 FX 应用程序线程上执行,因此如果您在服务很齐全。

@Override
public void start(Stage stage) 

    int numServices = 3 ;

    IntegerProperty servicesPending = new SimpleIntegerProperty(numServices);

    servicesPending.addListener((obs, oldValue, newValue) -> 
         if (newValue == 0) 
             // code to execute when all services are complete
             System.out.println("Services done");
         
    );

    for (int i = 0; i < numServices; i++) 
        ReadService readService = new ReadService();
        readService.setOnSucceeded(e -> servicesPending.set(servicesPending.get() - 1));
        readService.start();
    
    

另一方面,如果您在服务完成时所做的工作与 UI 无关,那么您可以创建一个新线程来阻塞直到服务完成,然后在该后台线程上执行工作。实现此目的的一种方法是使用CountDownLatch

@Override
public void start(Stage stage) 

    int numServices = 3 ;

    CountDownLatch latch = new CountDownLatch(numServices);

    for (int i = 0; i < numServices; i++) 
        ReadService readService = new ReadService();
        readService.setOnSucceeded(e -> latch.countDown());
        readService.start();
    

    Thread onServicesCompleted = new Thread(() -> 
        try 
            latch.await();
         catch(InterruptedException exc) 
            Thread.currentThread().interrupt();
        
        System.out.println("Services done");
        // other work to do when services are complete...
    );
    onServicesCompleted.start();
    

@jewelsea 在评论中提出了类似的解决方案。如果你使用Tasks 而不是Services,你可以调用get(),它会阻塞直到任务完成:

@Override
public void start(Stage stage) 

    int numServices = 3 ;

    List<Task<Boolean>> tasks = new ArrayList<>();
    ExecutorService exec = Executors.newCachedThreadPool();


    for (int i = 0; i < numServices; i++) 
        ReadService readService = new ReadService();
        tasks.add(readService);
    

    exec.invokeAll(tasks);

    Task<Void> onServicesCompleted = new Task<>() 
        @Override
        protected Void call() throws Exception 
            for (Task<Boolean> task : tasks) 
                task.get();
            
            System.out.println("Services Done");
            // other work to be done...
        
    ;
    exec.execute(onServicesCompleted);


class ReadService extends Task<Boolean> 
    @Override
    protected synchronized Boolean call() throws Exception 
        // do something, for example read a file
        System.out.println("wait...");
        wait(5000);
        System.out.println("waiting done");
        return null;
    

如果您想在单个 ReadServices 都完成后做更多的后台工作,然后想在此之后做 UI 工作(只需使用 onServicesCompleted.setOnSucceeded(...)),这个解决方案就很好。

【讨论】:

谢谢。最后一个例子不起作用。首先,我必须在onServicesCompletedcall() 方法中return null。其次,exec.invokeAll(tasks); 表示'invokeAll(java.util.Collection&lt;? extends java.util.concurrent.Callable&lt;T&gt;&gt;)' in 'java.util.concurrent.ExecutorService' cannot be applied to '(java.util.List&lt;javafx.concurrent.Task&lt;java.lang.Boolean&gt;&gt;)', reason: no instance(s) of type variable(s) T exist so that Task&lt;Boolean&gt; conforms to Callable&lt;T&gt;

以上是关于如何等待多个服务完成?的主要内容,如果未能解决你的问题,请参考以下文章

如何等待多个 goroutine 完成?

在 iOS 中等待多个网络异步调用

等待多个异步调用完成?

如何在 Java 中等待多个任务完成?

等待多个承诺完成

如何等待多个异步功能完成?