多线程编程模型:Pipeline模式 上
Posted 从头开始自学java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程编程模型:Pipeline模式 上相关的知识,希望对你有一定的参考价值。
简介
核心思想:将一个任务分解为若干个阶段(Stage),前阶段的输出为下阶段的输入,各个阶段由不同的工作者线程负责执行。
各个任务的各个阶段是并行(Parallel)处理的。
具体任务的处理是串行的,即完成一个任务要依次执行各个阶段,但从整体任务上看,不同任务的各个阶段的执行是并行的。
例子
从顾客的迎接和安排座位、点菜、菜肴烹煮、上菜到买单,每个阶段都有相应的工作人员(迎宾、服务员、厨师、传菜员和收银员)负责,而不是由一名工作人员对一桌客人负责到底,从而减少了顾客的等待,提高了服务效率。
Pipeline模式示意图
架构——分类
将任务分解为若干个阶段,并将每个阶段抽象为一个对象。阶段对象由不同工作者线程驱动,并将输出作为下一个阶段的输入。
分类(是否包含并发处理阶段):线性Pipeline、非线性Pipeline。
架构——参与者
● Pipe(输入、输出和处理):对阶段的抽象。负责对输入进行处理,并将输出作为下一阶段的输入。
◇ process:处理前一阶段的输入。
◇ init:初始化Pipe。
◇ shutdown:关闭Pipe。
◇ setNextPipe:设置下一阶段。
● PipeContext:阶段的计算环境,主要用于异常处理。
◇ handleError:对抛出的异常进行处理。
● AbstractPipe:Pipe接口的抽象实现类。
◇ process:调用doProcess方法对输入进行处理,并处理结果提交给下一阶段。
◇ init:保存传递过来的PipeContext实例,子类可根据需要覆盖该方法。
◇ shutdown:空默认实现。
◇ setNextPipe:设置当前阶段的下一处理阶段。
◇ doProcess:留给子类实现的抽象方法。待子类实现的处理逻辑。
● WorkerThreadPipeDecorator:工作者线程的Pipe。将输入存入队列,由指定个数的工作者线程对队列中的输入进行处理。该类自身主要负责工作者线程的生命周期的管理。它通过调用delegate(委托Pipe实例)的相应方法实现Pipe接口中定义的各个方法。
◇ process:将前一个阶段的输入存入队列,由工作者线程运行时取出进行处理。
◇ init:启动工作者线程,并调用委托Pipe的init方法。
◇ shutdown:停止工作者线程,并调用委托Pipe的shutdown方法。
◇ setNextPipe:调用委托Pipe实例的setNextPipe方法。
◇ dispatch:取出队列中的输入并调用委托Pipe的process方法。
● ThreadPoolPipeDecorator:线程池的Pipe。Pipe接口的实现委托给delegate(委托Pipe)。
◇ process:接收前一个阶段的输入,并向线程池提交一个对该输入进行相应处理的任务。
◇ init:调用委托Pipe的init方法。
◇ shutdown:关闭当前服务,并调用委托Pipe的shutdown方法。
◇ setNextPipe:调用委托Pipe的setNextPipe方法。
● AbstractParallelPipe:AbstractPipe的子类,支持并行处理的Pipe实现类。该类对其每个输入(原始任务)生成相应的一组子任务,并以并行的方式去执行这些子任务。各个子任务的执行结果会被合并为相应原始任务的输出结果。
◇ buildTasks:留给子类实现的抽象方法。根据输入构造一组子任务。
◇ combineResults:留给子类实现的抽象方法。对各个并行子任务的处理结果进行合并,形成相应输入的输出结果。
◇ invokeParallel:实现以并行的方式执行一组任务。
◇ doProcess:实现该类对其输入的处理逻辑。
● ConcreteParallelPipe:由应用定义的AbstractParallelPipe的子类。
◇ buildTasks:根据指定的输入构造一组子任务。
◇ combineResults:对各个并行子任务的处理结果进行合并,形成相应输入元素的输出结果。
● Pipeline:复合Pipe,可包含多个Pipe。
◇ addPipe:添加一个Pipe。
● SimplePipeline:基于AbstractPipe的Pipeline。
◇ addPipe:添加一个Pipe。
◇ addAsWorkerThreadBasedPipe:将WorkerThreadPipeDecorator包装后的Pipe加入Pipeline。
◇ addAsThreadPoolBasedPipe:将ThreadPoolPipeDecorator包装后的Pipe加入Pipeline。
序列图
第1步:客户端创建Pipeline。
第2~4步:客户端创建各个Pipe。
第5步:客户端调用Pipeline的add方法添加各个Pipe。
第6步:客户端创建PipeContext。
第7步:客户端调用Pipeline的init方法。
第8~13步:init方法调用各个Pipe的init方法,初始化各个Pipe。
第14步:init方法调用返回。
第1步:客户端调用Pipeline的process方法。
第2、3步:process方法调用第一个Pipe的process方法。
第4步:process方法返回。
第5步:第一个Pipe实例开始处理任务,其dispatch方法被工作者线程(或者线程池中的工作者线程)调用。
第6步:dispatch方法调用pipe1的委托Pipe实例delegate1的process方法。
第7步:delegate1的process方法调用其doProcess方法对输入元素inputA进行处理,相应处理结果为outA。
第8、9步:delegate1的process方法获取其下一个Pipe实例(同时也是pipe1的下一个Pipe实例)pipe2,并调用pipe2的process方法,将outA作为输入元素提交给pipe2处理。
第10步:pipe1的process方法返回。
以上是关于多线程编程模型:Pipeline模式 上的主要内容,如果未能解决你的问题,请参考以下文章