Pipeline 设计模式的优缺点和实践案例
Posted 明明如月学长
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pipeline 设计模式的优缺点和实践案例相关的知识,希望对你有一定的参考价值。
一、概述
之前我们在《Java 中的 Pipeline 设计模式》 一文中介绍了 Pipeline 设计模式。其核心思想是创建一组操作(管道)并将数据在这些操作中传递,每个操作可以独立工作,也可以支持同时处理多个数据流。
有同学提到几个不错的相关问题,本文简单探讨下。
- (1)例子中 Pipeline 的代码使用硬编码也可以实现,为什么要用这个模式,有什么好处?
- (2)Pipeline 设计模式在实际的编码中是怎样体现的?
- (3)Pipeline 设计模式有什么缺点?如何解决?
二、答疑
2.1 为什么要用 Pipeline 设计模式,而不是硬编码?
“为什么用 XXX 设计模式,而不是硬编码?” 这个问题对其他设计模式都适用。
这个问题分为两个维度来回答:
- (1) 该设计模式有什么优点?
- (2) 该设计模式的适用场景有哪些?
2.1.1 Pipeline 设计模式 的优点
Pipeline 设计模式的优点主要有以下三点:降低耦合、增加灵活性、提高性能。
- (1)降低耦合度。
高内聚、弱耦合:Pipeline 设计模式将不同的处理逻辑封装成独立的阶段,每个阶段只关注自己的输入和输出,不需要知道其他阶段的细节。这样可以方便地增加、删除或修改阶段,而不影响整个流程的运行。
排查问题方便:对于适合该设计模式相对复杂或较长的代码,如果不采用 Pipeline 设计模式,直接编码,中间某个步骤出错时,通常还需要理解上下文才敢动手修改;采用 Pipeline 设计模式之后,由于不同的步骤之间解耦,出错只需要关注该步骤即可。
可测试性强:由于不同的步骤之间相对独立,耦合较低,更符合单一职责原则,可以更方便地对每个步骤编写单测;编写单测时,更容易全面地覆盖代码逻辑。 - (2)增加灵活性。Pipeline设计模式可以通过配置化来实现不同的业务走不同的流程,而不需要修改代码。这样可以根据需求变化快速地调整流程,提高开发效率和可维护性。
- (3)提高性能。Pipeline设计模式可以利用多线程或异步机制来并行执行不同的阶段,从而提高整个流程的吞吐量和响应时间。
如果采用硬编码的方式来实现类似的功能:代码之间的耦合度更高,排查问题需要读懂更多代码;编写高质量的单测也很困难;无法灵活地实现不同步骤之间的组合复用;虽然有些步骤也可以自己使用线程池等方式实现异步化,但这种能力不能复用,换个场景又要自己写一遍。
2.1.2 Pipeline 设计模式的常见场景
一般来说,某个处理流程可以拆分成多个处理步骤,不同的步骤之间相对独立,数据在不同的步骤之间传递,可以通过特定编排来完成一个复杂的任务,此时可以考虑使用 Pipeline 设计模式。
下面给出一些常见的场景:
-
(1)数据处理:当需要对大量数据进行处理时,通常需要将处理过程分为多个阶段。例如,数据清洗、转换、归一化、特征提取等阶段都可以作为 Pipeline 中的一部分。
-
(2)图像处理:在图像处理中,需要对图像进行多个处理阶段,例如颜色空间转换、滤波、边缘检测、特征提取等。这些处理步骤可以被组合成一个 Pipeline,以便可以轻松地处理整个图像数据集。
-
(3)构建 DevOps 流水线:在软件开发过程中,需要对代码进行多个处理阶段,例如代码编译、单元测试、代码分析、代码部署等。这些步骤可以被组合成一个 Pipeline,以构成整个开发过程。
2.2 实际工作中怎样落地?
可能很多人觉得上面讲的 Pipeline 设计模式场景不够接地气,那么实际工作中 Pipeline 有哪些常见的落地方式?
2.2.1 Java Function API
我们可以使用 Function
来实现简单易用的 Pipeline。
示例代码:
Function<Integer, Integer> square = s -> s * s;
Function<Integer, Integer> half = s -> s / 2;
Function<Integer, String> toString = Object::toString;
Function<Integer, String> pipeline = square.andThen(half)
.andThen(toString);
String result = pipeline.apply(5);
String expected = "12";
assertEquals(expected, result);
我们可以使用 BiFunction
拓展 Function
的功能,支持两个对象转为一个对象。
示例代码:
BiFunction<Integer, Integer, Integer> add = Integer::sum;
BiFunction<Integer, Integer, Integer> mul = (a, b) -> a * b;
Function<Integer, String> toString = Object::toString;
BiFunction<Integer, Integer, String> pipeline = add.andThen(a -> mul.apply(a, 2))
.andThen(toString);
String result = pipeline.apply(1, 2);
String expected = "6";
assertEquals(expected, result);
2.2.2 Java Stream API
Java Stream API 就是一种典型的流水线落地方式。
下面是一个简单的Java Stream
的示例代码,它使用了 filter
、map
和 collect
操作,从一个字符串列表中筛选出以字母"A"开头的字符串,并转换为大写,然后收集到一个新的列表中。
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class StreamExample
public static void main(String[] args)
// Create a list of strings
List<String> list = Arrays.asList("Apple", "Banana", "Orange", "Pear", "Avocado");
// Create a stream from the list
// Filter the strings that start with "A"
// Map the strings to upper case
// Collect the results into a new list
List<String> result = list.stream()
.filter(s -> s.startsWith("A"))
.map(s -> s.toUpperCase())
.collect(Collectors.toList());
// Print the result
System.out.println(result); // [APPLE, AVOCADO]
日常开发中,通常查询出底层数据,通过筛选和映射,转为所需的结构。由于 Java 的 Stream 比较简单和常用,在这里就不作过多陈述。
2.2.3 业务编排
比如你工作中有一个这种需求:需要做一个物料(新闻资讯、短视频等)推荐系统,有以下几个步骤:物料召回(根据业务需求从 mysql 、ES 或二方接口中查询候选物料)、黑名单过滤(有些物料不允许透出)、观看记录过滤(观看过的不能透出,需要过滤掉)、按类型粗排(同个类目或主题只能保留 M 个)、算法精排(调用算法系统进行打分)、业务置顶(根据业务需要对某些物料置顶)、按 size 截断(返回请求所需的 size)等步骤。
伪代码如下:
// 定义一个Pipeline接口,表示一个流水线
public interface Pipeline<T>
// 添加一个阶段到流水线
void addStage(Stage<T> stage);
// 执行流水线
void execute(T input);
// 定义一个Stage接口,表示一个阶段
public interface Stage<T>
// 处理输入数据,并返回输出数据
T process(T input);
// 定义一个PipelineContext类,表示流水线的上下文
public class PipelineContext<T>
// 存储流水线的阶段列表
private List<Stage<T>> stages;
// 存储流水线的当前索引
private int index;
public PipelineContext()
stages = new ArrayList<>();
index = 0;
// 添加一个阶段到上下文
public void addStage(Stage<T> stage)
stages.add(stage);
// 执行上下文中的下一个阶段
public void invokeNext(T input)
if (index < stages.size())
Stage<T> stage = stages.get(index++);
stage.process(input);
// 定义一个RecContext类,表示推荐的上下文
public class RecContext<T>
// 存储推荐中的物料列表
private List<T> items;
// 其他属性
public PipelineContext()
items = new ArrayList<>();
// 省略其他方法
// 定义一个DefaultPipeline类,实现Pipeline接口
public class DefaultPipeline<T> implements Pipeline<T>
// 创建一个PipelineContext对象
private PipelineContext<T> context;
public DefaultPipeline()
context = new PipelineContext<>();
@Override
public void addStage(Stage<T> stage)
context.addStage(stage);
@Override
public void execute(T input)
context.invokeNext(input);
// 定义一个物料类,表示推荐系统的输入和输出数据
public class Material
// 物料的id
private String id;
// 物料的类型(资讯、视频等)
private String type;
// 物料的评分(算法精排后的结果)
private double score;
// 省略构造方法、getters和setters
// 定义一个物料召回阶段类,实现Stage接口
public class MaterialRecallStage implements Stage<RecContext<Material>>
@Override
public RecContext<Material> process(RecContext<Material> context)
// 根据用户的兴趣、行为等特征,从物料库(如 MySQL、Es存储或二方接口)中召回一批候选物料,并设置到 context 的 items中
// 省略具体实现细节
return context;
// 定义一个黑名单过滤阶段类,实现Stage接口
public class BlacklistFilterStage implements Stage<RecContext<Material>>
@Override
public RecContext<Material> process(RecContext<Material> context)
// 根据用户的黑名单设置,过滤掉不符合条件的物料,并设置到 context 的 items中
// 省略具体实现细节
return context;
// 定义一个观看记录过滤阶段类,实现Stage接口
public class WatchRecordFilterStage implements Stage<RecContext<Material>>
@Override
public RecContext<Material> process(RecContext<Material> context)
// 根据用户的观看记录,过滤掉已经观看过的物料,并设置到 context 的 items中
// 省略具体实现细节
return context;
// 定义一个按类型粗排阶段类,实现Stage接口
public class TypeSortStage implements Stage<RecContext<Material>>
@Override
public RecContext<Material> process(RecContext<Material> context)
// 根据用户的偏好和物料的类型,按照一定的规则对物料进行粗排,并设置到 context 的 items中
// 省略具体实现细节
return context;
// 定义一个算法精排阶段类,实现Stage接口
public class AlgorithmSortStage implements Stage<RecContext<Material>>
@Override
public RecContext<Material> process(RecContext<Material> context)
// 根据用户的特征和物料的特征,使用机器学习模型对物料进行打分,排序后设置到 context 的 items中
// 省略具体实现细节
return context;
// 定义一个业务置顶阶段类,实现Stage接口
public class BusinessTopStage implements Stage<RecContext<Material>>
@Override
public RecContext<Material> process(RecContext<Material> context)
// 根据业务的需求,对部分物料进行置顶操作,并设置到 context 的 items中
// 省略具体实现细节
return context;
// 定义一个按size截断阶段类,实现Stage接口
public class SizeCutStage implements Stage<RecContext<Material>>
@Override
public RecContext<Material> process(RecContext<Material> context)
// 根据请求中的 size 数量,对物料的数量进行截断,并设置到 context 的 items中
// 省略具体实现细节
return context;
// 定义一个测试类,用来创建和执行流水线
public class Test
public static void main(String[] args)
// 创建一个物料对象,作为流水线的输入数据
RecContext<Material> recContext = new RecContext<Material>();
// 创建一个流水线对象
Pipeline<RecContext<Material>> pipeline = new DefaultPipeline<>();
// 添加各个阶段到流水线中
pipeline.addStage(new MaterialRecallStage());
pipeline.addStage(new BlacklistFilterStage());
pipeline.addStage(new WatchRecordFilterStage());
pipeline.addStage(new TypeSortStage());
pipeline.addStage(new AlgorithmSortStage());
pipeline.addStage(new BusinessTopStage());
pipeline.addStage(new SizeCutStage());
// 执行流水线
pipeline.execute(recContext);
// 输出流水线的结果
System.out.println(material);
每个流程(Stage)可以配置为 Spring 的 Bean, 流程的编排可以使用动态配置进行控制,这样就可以比较灵活调整。
比如粗排、置顶等步骤有多种方式可选,可以根据业务需要通过修改动态配置进行替换。
还可以自研框架或者自己编码去解析这些步骤,可以让某些步骤并行执行,比如将上述 Stage 的 bean 的 name 进行动态配置,其中中括号的部分解析后并行执行来提高性能:
[videoRecall, newsRecall,topicRecal],blacklist,
recordFilter,typeSorce,algorithmSort,businessTop,sizeCut
希望这个例子,能够帮助大家更好地理解 Pipeline 设计模式的优点:不同的步骤可以相互独立降低耦合,灵活组合复用,部分步骤之间可以采用并行/并发执行的方式来提高性能等。
2.3 Pipeline 模式有哪些缺点?
每种设计模式都有自己的局限性,下面给出 Pipeline 设计模式的几个缺点:
- (1)可读性不强。因为 Pipeline 设计模式是可配置化的,且配置经常在外部(比如数据库里的一个 JSON 、携程的 Apollo 动态配置),所以不容易看出整个流程的逻辑和细节。
- (2)调试困难。因为 Pipeline 设计模式涉及多个阶段的协作,如果某个阶段出现问题,不容易快速定位和修复。
- (3)性能损耗。因为 Pipeline 设计模式需要在每个阶段之间传递数据,如果每个阶段是跨机器的,会增加内存和网络的开销。
当然这些缺点也是可以通过一些技巧解决的:
- (1)针对可读性不强的问题,我们可以在请求的入口处贴出配置的地址,方便代码和配置关联。由于每个步骤非常独立,做好每个步骤的代码可读性也可以在一定程度上解决问题。
- (2)针对调试和排查问题困难的问题。使用 Pipeline 设计模式时,我们可以对关键地方打好日志,方便快速定位、排查问题。
- (3)针对性能损耗的问题。我们可以通过一些调整提高性能,比如上述物料推荐业务而言,需要调用算法平台的服务去打分,我们可以在打分前进行粗排,只将粗排分数较高的传给算法平台,用户和物料特征不需要传递给算法平台,算法平台自己去查询相关物料和用户特征再打分等。有些内存占用不管是采用哪种方式都是不可避免,不必纠结。
三、总结
学习的目的还是为了应用,大家在学习设计模式时,要主动和 JDK 源码,和自己使用的二方和三方框架的设计相结合,要主动和日常的业务场景相结合,以便更好的做到学以致用。
每种设计模式都有自己的适用场景、优点和缺点,我们要注重掌握,并且不仅要了解某种设计模式存在的问题,还要积极思考如何解决。
通常来说,对于非常简单的场景,直接编码即可;对于复杂场景,建议优先考虑遵循设计原则,使用经典的设计模式,以提高代码的可重用性、可读性、灵活性、可拓展性、安全性和降低代码复杂度等。
以上是关于Pipeline 设计模式的优缺点和实践案例的主要内容,如果未能解决你的问题,请参考以下文章
Verilog十大基本功1(流水线设计Pipeline Design)