使用通过 Flow API 实现的处理器转换数据流

Posted

技术标签:

【中文标题】使用通过 Flow API 实现的处理器转换数据流【英文标题】:Transform data stream using processor implemented via Flow APIs 【发布时间】:2018-08-05 05:38:31 【问题描述】:

我正在阅读来自 Oracle 的 Community#DOC-1006738 与 Flow.PublisherFlow.Subscriber 的并发概念相关的内容。上面可以找到使用处理器转换数据流的示例代码,其中有这两行代码,这让我有点困惑。

//Create Processor and Subscriber  
MyFilterProcessor<String, String> filterProcessor = 
                                      new MyFilterProcessor<>(s -> s.equals("x")); 

问题 1. MyFilterProcessor 怎么可能是 &lt;String, String&gt; 类型?

我最初的想法是,这些可能是&lt;String, Boolean&gt;,但这会违反下一行中对订阅者定义的进一步定义:-

MyTransformProcessor<String, Integer> transformProcessor = 
                              new MyTransformProcessor<>(s -> Integer.parseInt(s));  

附加说明,除非我明确地将上述内容转换为(更正)

MyTransformProcessor<String, Integer>(s -> Integer.parseInt(s))

我在读取parseInt时遇到错误,无法应用于Object

-- 为什么我需要在此处显式转换 RHS? --


虽然代码大部分出现在共享链接中,但我使用的有用的构造函数定义是

public class MyTransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R>        
    private Function function;
    MyTransformProcessor(Function<? super T, ? extends R> function)   
        super();  
        this.function = function;  
     
    ...
 

filterProcessor 相同的一个:-

public class MyFilterProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> 
    private Function function;
    MyFilterProcessor(Function<? super T, ? extends R> function) 
        super();
        this.function = function;
    
    ...

问题。现在有了这些更改(一个在解决问题 1 之后,另一个来自附加说明),如何正确实施示例?还是我只是错过了一些非常基本的东西?

【问题讨论】:

示例/可重现代码在following package on github 中。信息 - 使用 Java-8 标记 Function 实现,使用 Java-9 标记在上下文中使用 Flow API。 欢迎任何反对的理由 :) 我没有投反对票,但我想知道您为什么以单线程的“方式”使用Flow API?也许是为了更好地理解?我建议仔细阅读 JDK-9 API 文档...(拉力赛,我懒得这样做)。祝你好运,朋友。 我也没有投反对票,但您的帖子似乎只是博客中的一个错误,我分享您的困惑。不过,这是一篇在 Java 9 发布前一年发表的关于 JDK 9 Flow API 的文章,文章作者甚至没有提供 MyFilterProcessor 的源代码。也许我误解了你的担心,但这真的值得担心吗?为什么不直接选择一个更有效的教程呢? @MarkoPacak GitHub 代码现在应该很清楚了。确保只有相关的类存在,并且命名法与共享包中的问题相同。要查看的类是TransformDataStreamUsingProcessor.java 【参考方案1】:

我认为您的主要错误是将MyFilterProcessor 实现为MyTransformProcessor 的(几乎)完全相同的副本。

由于作者没有贴出该类的代码,我尝试猜测它的行为基于:

... = new MyFilterProcessor<>(s -> s.equals("x"));

名称Filter 表明该组件旨在接受然后仅重新发布某些值。此时,计算结果为boolean(或Predicate&lt;T&gt;)的函数在上下文中是完全可以接受的(因此s -&gt; s.equals("x"))。

页尾的初始数据流

String[] items = "1", "x", "2", "x", "3", "x";  

似乎证实了我的假设。作者只是想过滤掉"x" 值,并将此任务交给MyFilterProcessorMyFilterProcessor 必须先评估每种类型,然后再将其发布到管道的其余部分;并且输出类型必须与输入类型相同。


构造函数应该如下所示:

MyFilterProcessor(Predicate<? super R> predicate)  /* ... */ 
// or
MyFilterProcessor(Function<? super R, Boolean> function)  /* ... */ 

onNext 据说只转发某些元素:

if (! predicate.test(item)) 
    int max = submit(item); // get the estimated maximum lag
    subscription.request(max);

我对@9​​87654335@的定义有两个想法:

1) public class MyFilterProcessor&lt;T, R&gt; extends SubmissionPublisher&lt;R&gt; implements Flow.Processor&lt;R, R&gt;

因为Flow.Processor是为了接受和转发相同的类型。

我似乎无法在任何地方适应T 类型。这就是我被阻止的地方。

2) public class MyFilterProcessor&lt;T, R&gt; extends SubmissionPublisher&lt;R&gt; implements Flow.Processor&lt;T, R&gt;

但是,在onNext 中,您必须将&lt;T&gt; 转换为&lt;R&gt; (丑陋,非常丑陋)

if (! predicate.test(item)) 
    int max = submit( (R) item);
    subscription.request(max);

在这种情况下,您将测试 Predicate&lt;? super T&gt;

如果你愿意稍微重构一下,因为SubmissionPublisher 已经继承了Flow.Publisher 的行为,你可以让这个类实现Flow.Subscriber

public class MyFilterProcessor<R> extends SubmissionPublisher<R> implements Flow.Subscriber<R>

等等

MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
// or, if you follow my example:
MyFilterProcessor<String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));

终于成功了。


如果您打印 MyFilterProcessorMySubscriber 中的值,您将得到以下输出:

Publishing Items...
FilterProcessor: Receiving: 1
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 2
FilterProcessor: Receiving: x
FilterProcessor: Receiving: 3
FilterProcessor: Receiving: x
Got: 1
Got: 2
Got: 3

这是预期的结果。


测试时,请记住等待管道完成后再退出应用程序,因为SubmissionPublisher 会在另一个Thread 中发出元素。

另外,与文章相反,请有常识改变

private Function function; 
// ...
submit((R) function.apply(item));  

private Function<? super T, ? extends R> function;
// ...
submit(function.apply(item));

为什么我需要在此处显式转换 RHS?

我仍在努力理解您是如何得到 cannot be applied to Object 错误的。您使用的是哪个jdk 号码和IDE?

【讨论】:

@nullpointer 我对你的观点感兴趣。这是您要寻找的答案吗?

以上是关于使用通过 Flow API 实现的处理器转换数据流的主要内容,如果未能解决你的问题,请参考以下文章

Docusign REST API:使用 Microsoft Flow 的 JWT Grant 实现 OAuth 身份验证

ktor websocket flow api是如何工作的?

第三章 flink流处理API - map和flatmap

RDD的处理过程

SSIS教程:创建简单的ETL包 -- 4. 增加错误处理流程(Adding Error Flow Redirection)

Spring Cloud Data Flow初探