Java 9 - 发布者和订阅者如何工作

Posted

技术标签:

【中文标题】Java 9 - 发布者和订阅者如何工作【英文标题】:Java 9 - how publisher and subscriber works 【发布时间】:2019-05-07 00:57:48 【问题描述】:

我试图了解SubscriberPublisherjava 9 中的工作原理。

我在这里创建了一个subscriber 并使用SubmissionPublisher 发布项目。

我正在尝试将 100 个字符串发布到 subscriber。如果我没有将Client 程序变为sleep请参阅MyReactiveApp 中的注释代码),我不会看到所有项目都已发布。

为什么不等待这里处理的所有字符串:

strs.stream().forEach(i -> publisher.submit(i)); // what happens here? 

如果我替换上面的代码,我会看到所有的字符串都打印在控制台中

strs.stream().forEach(System.out::println);

使用SubmissionPublisher 发布的客户端程序。

import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MyReactiveApp 

    public static void main(String args[]) throws InterruptedException 

        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        MySubscriber subs = new MySubscriber();
        publisher.subscribe(subs);


        List<String> strs = getStrs();

        System.out.println("Publishing Items to Subscriber");
        strs.stream().forEach(i -> publisher.submit(i));

        /*while (strs.size() != subs.getCounter()) 
            Thread.sleep(10);
        */

        //publisher.close();

        System.out.println("Exiting the app");

    

    private static List<String> getStrs()

        return Stream.generate(new Supplier<String>() 
            int i =1;
            @Override
            public String get() 
                return "name "+ (i++);
            
        ).limit(100).collect(Collectors.toList());
    


订阅者

import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) 
        this.subscription = subscription;
        subscription.request(100);

    

    @Override
    public void onNext(String item) 
        System.out.println(this.getClass().getSimpleName()+" item "+item);
        //subscription.request(1);
        counter++;

    

    @Override
    public void onError(Throwable throwable) 
        System.out.println(this.getClass().getName()+ " an error occured "+throwable);

    

    @Override
    public void onComplete() 
        System.out.println("activity completed");

    
    public int getCounter() 
        return counter;
    


输出:

Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12

【问题讨论】:

看起来与 Back-Pressure 相关。 【参考方案1】:
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

使用 ForkJoinPool.commonPool() 创建一个新的 SubmissionPublisher 以异步交付给订阅者

见:https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/SubmissionPublisher.html#SubmissionPublisher--

其实

    strs.stream().forEach(i -> publisher.submit(i));

将所有提交排入队列并在另一个线程上异步传递它们。但随后应用程序被终止。这与工作线程的进度无关。这意味着无论工作线程已经交付了多少元素,应用程序都会终止。

每次运行都可能不同。在最坏的情况下,应用程序可能会在第一个项目交付之前终止。

线程

如果您想验证 MyReactiveApp 的 main 方法和 MySubscriber 的 onNext 中的传递是否发生在不同的线程上,您可以打印出相应线程的名称,例如在 MyReactiveApp 的主目录中:

System.out.println(Thread.currentThread().getName()) 

将输出main 作为线程名称。

而 MySubscriber 的 onNext 方法将例如输出类似ForkJoinPool.commonPool-worker-1 的东西。

用户和守护线程

为什么我们还有一个正在运行的线程,但应用程序会终止?

Java中有两种线程:

用户线程 守护线程

当不再有任何用户线程在运行时,Java 程序将终止,即使守护线程仍在运行。

主线程是用户线程。 SubmissionPublisher 在这里使用来自 ForkJoinPool.commonPool() 的工作线程。这些是守护线程。

所有工作线程都使用 Thread.isDaemon() 设置为 true 进行初始化。

https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html

【讨论】:

感谢您的回答,但您能否进一步了解将所有提交排队并在另一个线程上异步交付 ..您指的是publisher.submit吗?据我所知,创建的线程可以在主线程退出后运行。 我已将有关用户和工作线程的信息添加到答案中。 谢谢,您提供了我需要的详细信息。接受作为答案。 在您回答之后,我尝试了 fork join 并发现了类似的行为。但是我想知道如果邮件线程退出,提交到 fork-join 池的线程还没有完成。如果您发现它们在这里相关,请查看是否添加它的详细信息.. 我不确定我是否理解了这个问题。你是说通过实现 ForkJoinWorkerThreadFactory 的newThread 方法并传递用户线程来创建自己的线程工厂?为什么这些线程还没有完成?还是问题不同?

以上是关于Java 9 - 发布者和订阅者如何工作的主要内容,如果未能解决你的问题,请参考以下文章

编写简单的发布者和订阅者(C++)---ROS学习第9篇

编写简单的发布者和订阅者(C++)---ROS学习第9篇

发布者-订阅者配置如何包含在单个 app.config 中?

如何在 React 应用中处理订阅

pycups 或 cups 中的订阅如何工作?

Activemq 中一个主题的多个侦听器如何工作?