多线程编程模型:Pipeline模式 上

Posted 从头开始自学java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程编程模型:Pipeline模式 上相关的知识,希望对你有一定的参考价值。

简介

核心思想:将一个任务分解为若干个阶段(Stage),前阶段的输出为下阶段的输入,各个阶段由不同的工作者线程负责执行。

各个任务的各个阶段是并行(Parallel)处理的。

具体任务的处理是串行的,即完成一个任务要依次执行各个阶段,但从整体任务上看,不同任务的各个阶段的执行是并行的。

例子

从顾客的迎接和安排座位、点菜、菜肴烹煮、上菜到买单,每个阶段都有相应的工作人员(迎宾、服务员、厨师、传菜员和收银员)负责,而不是由一名工作人员对一桌客人负责到底,从而减少了顾客的等待,提高了服务效率。

Pipeline模式示意图

架构——分类

将任务分解为若干个阶段,并将每个阶段抽象为一个对象。阶段对象由不同工作者线程驱动,并将输出作为下一个阶段的输入。

分类(是否包含并发处理阶段):线性Pipeline、非线性Pipeline。

多线程编程模型:Pipeline模式 上


架构——参与者

多线程编程模型:Pipeline模式 上


● Pipe(输入、输出和处理):对阶段的抽象。负责对输入进行处理,并将输出作为下一阶段的输入。

◇ process:处理前一阶段的输入。

◇ init:初始化Pipe。

◇ shutdown:关闭Pipe。

◇ setNextPipe:设置下一阶段。

● PipeContext:阶段的计算环境,主要用于异常处理。

◇ handleError:对抛出的异常进行处理。

● AbstractPipe:Pipe接口的抽象实现类。

◇ process:调用doProcess方法对输入进行处理,并处理结果提交给下一阶段。

◇ init:保存传递过来的PipeContext实例,子类可根据需要覆盖该方法。

◇ shutdown:空默认实现。

◇ setNextPipe:设置当前阶段的下一处理阶段。

◇ doProcess:留给子类实现的抽象方法。待子类实现的处理逻辑。

● WorkerThreadPipeDecorator:工作者线程的Pipe。将输入存入队列,由指定个数的工作者线程对队列中的输入进行处理。该类自身主要负责工作者线程的生命周期的管理。它通过调用delegate(委托Pipe实例)的相应方法实现Pipe接口中定义的各个方法。

◇ process:将前一个阶段的输入存入队列,由工作者线程运行时取出进行处理。

◇ init:启动工作者线程,并调用委托Pipe的init方法。

◇ shutdown:停止工作者线程,并调用委托Pipe的shutdown方法。

◇ setNextPipe:调用委托Pipe实例的setNextPipe方法。

◇ dispatch:取出队列中的输入并调用委托Pipe的process方法。

● ThreadPoolPipeDecorator:线程池的Pipe。Pipe接口的实现委托给delegate(委托Pipe)。

◇ process:接收前一个阶段的输入,并向线程池提交一个对该输入进行相应处理的任务。

◇ init:调用委托Pipe的init方法。

◇ shutdown:关闭当前服务,并调用委托Pipe的shutdown方法。

◇ setNextPipe:调用委托Pipe的setNextPipe方法。

● AbstractParallelPipe:AbstractPipe的子类,支持并行处理的Pipe实现类。该类对其每个输入(原始任务)生成相应的一组子任务,并以并行的方式去执行这些子任务。各个子任务的执行结果会被合并为相应原始任务的输出结果。

◇ buildTasks:留给子类实现的抽象方法。根据输入构造一组子任务。

◇ combineResults:留给子类实现的抽象方法。对各个并行子任务的处理结果进行合并,形成相应输入的输出结果。

◇ invokeParallel:实现以并行的方式执行一组任务。

◇ doProcess:实现该类对其输入的处理逻辑。

● ConcreteParallelPipe:由应用定义的AbstractParallelPipe的子类。

◇ buildTasks:根据指定的输入构造一组子任务。

◇ combineResults:对各个并行子任务的处理结果进行合并,形成相应输入元素的输出结果。

● Pipeline:复合Pipe,可包含多个Pipe。

◇ addPipe:添加一个Pipe。

● SimplePipeline:基于AbstractPipe的Pipeline。

◇ addPipe:添加一个Pipe。

◇ addAsWorkerThreadBasedPipe:将WorkerThreadPipeDecorator包装后的Pipe加入Pipeline。

◇ addAsThreadPoolBasedPipe:将ThreadPoolPipeDecorator包装后的Pipe加入Pipeline。

序列图


第1步:客户端创建Pipeline。

第2~4步:客户端创建各个Pipe。

第5步:客户端调用Pipeline的add方法添加各个Pipe。

第6步:客户端创建PipeContext。

第7步:客户端调用Pipeline的init方法。

第8~13步:init方法调用各个Pipe的init方法,初始化各个Pipe。

第14步:init方法调用返回。


第1步:客户端调用Pipeline的process方法。

第2、3步:process方法调用第一个Pipe的process方法。

第4步:process方法返回。

第5步:第一个Pipe实例开始处理任务,其dispatch方法被工作者线程(或者线程池中的工作者线程)调用。

第6步:dispatch方法调用pipe1的委托Pipe实例delegate1的process方法。

第7步:delegate1的process方法调用其doProcess方法对输入元素inputA进行处理,相应处理结果为outA。

第8、9步:delegate1的process方法获取其下一个Pipe实例(同时也是pipe1的下一个Pipe实例)pipe2,并调用pipe2的process方法,将outA作为输入元素提交给pipe2处理。

第10步:pipe1的process方法返回。


以上是关于多线程编程模型:Pipeline模式 上的主要内容,如果未能解决你的问题,请参考以下文章

.NET多线程编程

多线程编程 基础篇

并行编程(Future)

Java多线程编程之不可变对象模式

百万年薪python之路 -- 并发编程之 多线程 一

Python网络编程 -- TCP/IP