Pipeline 设计模式的优缺点和实践案例

Posted 明明如月学长

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pipeline 设计模式的优缺点和实践案例相关的知识,希望对你有一定的参考价值。

一、概述

之前我们在《Java 中的 Pipeline 设计模式》 一文中介绍了 Pipeline 设计模式。其核心思想是创建一组操作(管道)并将数据在这些操作中传递,每个操作可以独立工作,也可以支持同时处理多个数据流。

有同学提到几个不错的相关问题,本文简单探讨下。

  • (1)例子中 Pipeline 的代码使用硬编码也可以实现,为什么要用这个模式,有什么好处?
  • (2)Pipeline 设计模式在实际的编码中是怎样体现的?
  • (3)Pipeline 设计模式有什么缺点?如何解决?

二、答疑

2.1 为什么要用 Pipeline 设计模式,而不是硬编码?

“为什么用 XXX 设计模式,而不是硬编码?” 这个问题对其他设计模式都适用。
这个问题分为两个维度来回答:

  • (1) 该设计模式有什么优点?
  • (2) 该设计模式的适用场景有哪些?

2.1.1 Pipeline 设计模式 的优点

Pipeline 设计模式的优点主要有以下三点:降低耦合、增加灵活性、提高性能。

  • (1)降低耦合度
    高内聚、弱耦合:Pipeline 设计模式将不同的处理逻辑封装成独立的阶段,每个阶段只关注自己的输入和输出,不需要知道其他阶段的细节。这样可以方便地增加、删除或修改阶段,而不影响整个流程的运行。
    排查问题方便:对于适合该设计模式相对复杂或较长的代码,如果不采用 Pipeline 设计模式,直接编码,中间某个步骤出错时,通常还需要理解上下文才敢动手修改;采用 Pipeline 设计模式之后,由于不同的步骤之间解耦,出错只需要关注该步骤即可。
    可测试性强:由于不同的步骤之间相对独立,耦合较低,更符合单一职责原则,可以更方便地对每个步骤编写单测;编写单测时,更容易全面地覆盖代码逻辑。
  • (2)增加灵活性。Pipeline设计模式可以通过配置化来实现不同的业务走不同的流程,而不需要修改代码。这样可以根据需求变化快速地调整流程,提高开发效率和可维护性。
  • (3)提高性能。Pipeline设计模式可以利用多线程或异步机制来并行执行不同的阶段,从而提高整个流程的吞吐量和响应时间。

如果采用硬编码的方式来实现类似的功能:代码之间的耦合度更高,排查问题需要读懂更多代码;编写高质量的单测也很困难;无法灵活地实现不同步骤之间的组合复用;虽然有些步骤也可以自己使用线程池等方式实现异步化,但这种能力不能复用,换个场景又要自己写一遍。

2.1.2 Pipeline 设计模式的常见场景

一般来说,某个处理流程可以拆分成多个处理步骤,不同的步骤之间相对独立,数据在不同的步骤之间传递,可以通过特定编排来完成一个复杂的任务,此时可以考虑使用 Pipeline 设计模式。

下面给出一些常见的场景:

  • (1)数据处理:当需要对大量数据进行处理时,通常需要将处理过程分为多个阶段。例如,数据清洗、转换、归一化、特征提取等阶段都可以作为 Pipeline 中的一部分。

  • (2)图像处理:在图像处理中,需要对图像进行多个处理阶段,例如颜色空间转换、滤波、边缘检测、特征提取等。这些处理步骤可以被组合成一个 Pipeline,以便可以轻松地处理整个图像数据集。

  • (3)构建 DevOps 流水线:在软件开发过程中,需要对代码进行多个处理阶段,例如代码编译、单元测试、代码分析、代码部署等。这些步骤可以被组合成一个 Pipeline,以构成整个开发过程。

2.2 实际工作中怎样落地?

可能很多人觉得上面讲的 Pipeline 设计模式场景不够接地气,那么实际工作中 Pipeline 有哪些常见的落地方式?

2.2.1 Java Function API

我们可以使用 Function 来实现简单易用的 Pipeline。
示例代码:

Function<Integer, Integer> square = s -> s * s;
    Function<Integer, Integer> half = s -> s / 2;
    Function<Integer, String> toString = Object::toString;
    Function<Integer, String> pipeline = square.andThen(half)
        .andThen(toString);
    String result = pipeline.apply(5);

    String expected = "12";
    assertEquals(expected, result);

我们可以使用 BiFunction 拓展 Function 的功能,支持两个对象转为一个对象。
示例代码:

    BiFunction<Integer, Integer, Integer> add = Integer::sum;
    BiFunction<Integer, Integer, Integer> mul = (a, b) -> a * b;
    Function<Integer, String> toString = Object::toString;
    BiFunction<Integer, Integer, String> pipeline = add.andThen(a -> mul.apply(a, 2))
        .andThen(toString);
    String result = pipeline.apply(1, 2);
    String expected = "6";
    assertEquals(expected, result);

2.2.2 Java Stream API

Java Stream API 就是一种典型的流水线落地方式。
下面是一个简单的Java Stream的示例代码,它使用了 filtermapcollect操作,从一个字符串列表中筛选出以字母"A"开头的字符串,并转换为大写,然后收集到一个新的列表中。

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class StreamExample 

    public static void main(String[] args) 
        // Create a list of strings
        List<String> list = Arrays.asList("Apple", "Banana", "Orange", "Pear", "Avocado");

        // Create a stream from the list
        // Filter the strings that start with "A"
        // Map the strings to upper case
        // Collect the results into a new list
        List<String> result = list.stream()
                .filter(s -> s.startsWith("A"))
                .map(s -> s.toUpperCase())
                .collect(Collectors.toList());

        // Print the result
        System.out.println(result); // [APPLE, AVOCADO]
    

日常开发中,通常查询出底层数据,通过筛选和映射,转为所需的结构。由于 Java 的 Stream 比较简单和常用,在这里就不作过多陈述。

2.2.3 业务编排

比如你工作中有一个这种需求:需要做一个物料(新闻资讯、短视频等)推荐系统,有以下几个步骤:物料召回(根据业务需求从 mysql 、ES 或二方接口中查询候选物料)、黑名单过滤(有些物料不允许透出)、观看记录过滤(观看过的不能透出,需要过滤掉)、按类型粗排(同个类目或主题只能保留 M 个)、算法精排(调用算法系统进行打分)、业务置顶(根据业务需要对某些物料置顶)、按 size 截断(返回请求所需的 size)等步骤。

伪代码如下:

// 定义一个Pipeline接口,表示一个流水线
public interface Pipeline<T> 
    // 添加一个阶段到流水线
    void addStage(Stage<T> stage);
    // 执行流水线
    void execute(T input);


// 定义一个Stage接口,表示一个阶段
public interface Stage<T> 
    // 处理输入数据,并返回输出数据
    T process(T input);


// 定义一个PipelineContext类,表示流水线的上下文
public class PipelineContext<T> 
    // 存储流水线的阶段列表
    private List<Stage<T>> stages;
    // 存储流水线的当前索引
    private int index;

    public PipelineContext() 
        stages = new ArrayList<>();
        index = 0;
    

    // 添加一个阶段到上下文
    public void addStage(Stage<T> stage) 
        stages.add(stage);
    

    // 执行上下文中的下一个阶段
    public void invokeNext(T input) 
        if (index < stages.size()) 
            Stage<T> stage = stages.get(index++);
            stage.process(input);

         
    
     


// 定义一个RecContext类,表示推荐的上下文
public class RecContext<T> 
    // 存储推荐中的物料列表
    private List<T> items; 

    // 其他属性

    public PipelineContext() 
        items = new ArrayList<>();
    

    // 省略其他方法
     


// 定义一个DefaultPipeline类,实现Pipeline接口
public class DefaultPipeline<T> implements Pipeline<T> 
    // 创建一个PipelineContext对象
    private PipelineContext<T> context;

    public DefaultPipeline() 
        context = new PipelineContext<>();
    

    @Override
    public void addStage(Stage<T> stage) 
        context.addStage(stage);
    

    @Override
    public void execute(T input) 
        context.invokeNext(input);
    


// 定义一个物料类,表示推荐系统的输入和输出数据
public class Material 
    // 物料的id
    private String id;
    
    // 物料的类型(资讯、视频等)
    private String type;
    
    // 物料的评分(算法精排后的结果)
    private double score;
    
    // 省略构造方法、getters和setters




// 定义一个物料召回阶段类,实现Stage接口
public class MaterialRecallStage implements Stage<RecContext<Material>> 

    @Override
    public RecContext<Material> process(RecContext<Material> context) 
        // 根据用户的兴趣、行为等特征,从物料库(如 MySQL、Es存储或二方接口)中召回一批候选物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    


// 定义一个黑名单过滤阶段类,实现Stage接口
public class BlacklistFilterStage implements Stage<RecContext<Material>> 

    @Override
    public RecContext<Material> process(RecContext<Material> context) 
        // 根据用户的黑名单设置,过滤掉不符合条件的物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    



// 定义一个观看记录过滤阶段类,实现Stage接口
public class WatchRecordFilterStage implements Stage<RecContext<Material>> 

    @Override
    public RecContext<Material> process(RecContext<Material> context) 
        // 根据用户的观看记录,过滤掉已经观看过的物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    


// 定义一个按类型粗排阶段类,实现Stage接口
public class TypeSortStage implements Stage<RecContext<Material>> 

    @Override
    public RecContext<Material> process(RecContext<Material> context) 
        // 根据用户的偏好和物料的类型,按照一定的规则对物料进行粗排,并设置到 context 的 items中
       
        // 省略具体实现细节
        return context;
    


// 定义一个算法精排阶段类,实现Stage接口
public class AlgorithmSortStage implements Stage<RecContext<Material>> 

    @Override
    public RecContext<Material> process(RecContext<Material> context) 
        // 根据用户的特征和物料的特征,使用机器学习模型对物料进行打分,排序后设置到 context 的 items中
       
        // 省略具体实现细节
        return context;
    


// 定义一个业务置顶阶段类,实现Stage接口
public class BusinessTopStage implements Stage<RecContext<Material>> 

    @Override
    public RecContext<Material> process(RecContext<Material> context) 
        // 根据业务的需求,对部分物料进行置顶操作,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    


// 定义一个按size截断阶段类,实现Stage接口
public class SizeCutStage implements Stage<RecContext<Material>> 

    @Override
    public RecContext<Material> process(RecContext<Material> context) 
        // 根据请求中的 size 数量,对物料的数量进行截断,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    


// 定义一个测试类,用来创建和执行流水线
public class Test 

    public static void main(String[] args) 
        // 创建一个物料对象,作为流水线的输入数据
        RecContext<Material> recContext = new RecContext<Material>();
        
        // 创建一个流水线对象
        Pipeline<RecContext<Material>> pipeline = new DefaultPipeline<>();
        
        // 添加各个阶段到流水线中
        pipeline.addStage(new MaterialRecallStage());
        pipeline.addStage(new BlacklistFilterStage());
        pipeline.addStage(new WatchRecordFilterStage());
        pipeline.addStage(new TypeSortStage());
        pipeline.addStage(new AlgorithmSortStage());
        pipeline.addStage(new BusinessTopStage());
        pipeline.addStage(new SizeCutStage());
        
        // 执行流水线
        pipeline.execute(recContext);
        
        // 输出流水线的结果
        System.out.println(material);
    


每个流程(Stage)可以配置为 Spring 的 Bean, 流程的编排可以使用动态配置进行控制,这样就可以比较灵活调整。
比如粗排、置顶等步骤有多种方式可选,可以根据业务需要通过修改动态配置进行替换。
还可以自研框架或者自己编码去解析这些步骤,可以让某些步骤并行执行,比如将上述 Stage 的 bean 的 name 进行动态配置,其中中括号的部分解析后并行执行来提高性能:

[videoRecall, newsRecall,topicRecal],blacklist,
    recordFilter,typeSorce,algorithmSort,businessTop,sizeCut

希望这个例子,能够帮助大家更好地理解 Pipeline 设计模式的优点:不同的步骤可以相互独立降低耦合,灵活组合复用,部分步骤之间可以采用并行/并发执行的方式来提高性能等

2.3 Pipeline 模式有哪些缺点?

每种设计模式都有自己的局限性,下面给出 Pipeline 设计模式的几个缺点:

  • (1)可读性不强。因为 Pipeline 设计模式是可配置化的,且配置经常在外部(比如数据库里的一个 JSON 、携程的 Apollo 动态配置),所以不容易看出整个流程的逻辑和细节。
  • (2)调试困难。因为 Pipeline 设计模式涉及多个阶段的协作,如果某个阶段出现问题,不容易快速定位和修复。
  • (3)性能损耗。因为 Pipeline 设计模式需要在每个阶段之间传递数据,如果每个阶段是跨机器的,会增加内存和网络的开销。

当然这些缺点也是可以通过一些技巧解决的:

  • (1)针对可读性不强的问题,我们可以在请求的入口处贴出配置的地址,方便代码和配置关联。由于每个步骤非常独立,做好每个步骤的代码可读性也可以在一定程度上解决问题。
  • (2)针对调试和排查问题困难的问题。使用 Pipeline 设计模式时,我们可以对关键地方打好日志,方便快速定位、排查问题。
  • (3)针对性能损耗的问题。我们可以通过一些调整提高性能,比如上述物料推荐业务而言,需要调用算法平台的服务去打分,我们可以在打分前进行粗排,只将粗排分数较高的传给算法平台,用户和物料特征不需要传递给算法平台,算法平台自己去查询相关物料和用户特征再打分等。有些内存占用不管是采用哪种方式都是不可避免,不必纠结。

三、总结

学习的目的还是为了应用,大家在学习设计模式时,要主动和 JDK 源码,和自己使用的二方和三方框架的设计相结合,要主动和日常的业务场景相结合,以便更好的做到学以致用
每种设计模式都有自己的适用场景、优点和缺点,我们要注重掌握,并且不仅要了解某种设计模式存在的问题,还要积极思考如何解决。
通常来说,对于非常简单的场景,直接编码即可;对于复杂场景,建议优先考虑遵循设计原则,使用经典的设计模式,以提高代码的可重用性、可读性、灵活性、可拓展性、安全性和降低代码复杂度等。

以上是关于Pipeline 设计模式的优缺点和实践案例的主要内容,如果未能解决你的问题,请参考以下文章

案例分析:设计模式与代码的结构特性

Verilog十大基本功1(流水线设计Pipeline Design)

设计模式 行为型模式 -- 职责链模式(定义结构纯与不纯的职责链模具体案例)

设计模式之工厂模式详解和应用

大数据仓库架构设计实践案例分享

原型设计工具比较及实践