使用队列解耦程序

Posted

技术标签:

【中文标题】使用队列解耦程序【英文标题】:Decouple programs using queues 【发布时间】:2012-02-02 08:28:02 【问题描述】:

Rich Hickey 在他的 talk 在 54:53 分钟时谈到了使用队列作为解耦依赖程序部分的一种手段。您能否举个例子说明如何将以下 Java 伪代码解耦以改进其设计和/或灵活性:

// Warning: Java-pseudo-code ahead
class Job 
    public void doRun(A a) 
        saveObjectToDatabase(a);

        B b = computeB(a);
        saveObjectToDatabase(b);

        C c = computeC(b);
        logToFile(c);
    

saveObjectToDatabasesaveObjectToDatabase 可以看作是一种有副作用的方法,而computeBcomputeC 的输出只依赖于a

我知道这个问题相当模糊/广泛。我想了解如何利用排队机制而不会使我的程序变得非常复杂,并且仍然确保它以正确的顺序做正确的事情。感谢任何指向正确方向的指针。

【问题讨论】:

不是很清楚。您展示的示例有一个独立的方法,它不使用任何外部依赖项。没有什么可以解耦,因为 Job 没有与任何东西耦合。 我的猜测是@Matt 想让不同的线程执行 computeB() 和 computeC() 方法,使用队列在不同线程之间移动工作单元。 查看Enterprise Integration Patterns。 感谢书的链接,去看看。 【参考方案1】:

嗯,这不是一个很好的例子,但是(在最直接的设计中)您基本上有两个队列,并且(取决于所涉及的数据量)您可能会省略数据库。

第一个进程将从“外部世界”接收您的 a 对象并将它们排入队列 1。第二个进程将从队列 1 中取出对象,执行 computeB,并将结果排入队列 2。第三个进程将从队列 2 中取出对象,执行computeC,并记录结果或其他内容。

正如我所说,根据所涉及的数据量(可能还有其他一些因素),队列中传递的“对象”可能是您的实际 ab 对象,或者只是令牌/键查找数据库中的数据。

队列本身可以通过多种方式实现。可以使用数据库实现队列,例如,尽管细节有点混乱。 “进程”可以是单个 Java 进程中的 Java 任务,也可以是单独的 OS 进程,甚至可能在不同的机器上。

当您在 Unix 上使用“管道”时,您实际上是以这种方式使用队列。

【讨论】:

【参考方案2】:

这正是我正在使用的 java 库所使用的原理。 这个想法是将组件分配给程序中的各个任务(记录器就是一个很好的例子)。现在每个组件都需要独立于其他组件运行,要么作为线程要么作为事件处理程序。

在事件驱动的情况下,每个组件都会通知他想要监听哪些类型的事件\消息。您有一个调度程序,它收集传入的消息并将它们插入接收者的队列中。接收者进程,并最终产生新的消息。等等……

在你的情况下,是这样的:

class SaveObjectHandler
//
void handle(Event e, Object o)
  if(e instanceof SaveEvent)
      saveObjectToDatabase(o);


;

class TransformObject
//
 void handle(Event e,Object o)
   if(e instanceof TransformEvent)
      B result = compute(o);
      send(new SaveEvent(),result)
   

 

;

class Logger

   void handle(Event e, Object o)
      if(o instanceof B)
        //perform computeC
        logEvent((B)o);
   

;

;

有问题的图书馆是SEDA。

【讨论】:

【参考方案3】:

我担心 saveObject 方法有副作用,你不能很好地解耦它,或者至少不容易解耦。

但是假设您需要快速将一些对象写入数据库。我的观点是,使用关系数据库最快的方法应该是由多个客户端将对象保存到一个队列中,而不是由一两个非常快速的写入者尽可能快地将数据推送到数据库中来获取它们。

【讨论】:

【参考方案4】:

为了完整起见,我想在 Hot Licks 的回答中添加更多信息:

我一直在对这个主题进行更多研究,最后得出结论,解开方法是要走的路。我将使用生产者/消费者/主题的 kafka 术语。如需更多信息,请参阅The Log: What every software engineer should know about real-time data's unifying abstraction,尤其是此图:

关于我发布的示例的具体问题,有两种方法可以解决:

解决方案 1

消费者 1: 从主题a消费 保存到数据库。 消费者 2: 从主题a消费 计算b 保存到数据库。 消费者3:从主题a消费 计算b 计算c 保存到数据库

这有计算b 两次的缺点。在伪代码中:

class ConsumerA  
    public void consume(A a) 
        saveObjectToDatabase(a);
    


class ConsumerB  
    public void consume(A a) 
        B b = computeB(a);
        saveObjectToDatabase(b);
    


class ConsumerLog  
    public void consume(A a) 
        B b = computeB(a);
        C c = computeC(b);
        logToFile(c);
    

解决方案 2

消费者 1: 从主题a消费 保存到数据库。 消费者 2: 从主题a消费 计算b,保存到数据库 将b 发布到单独的主题b。 消费者 3: 从主题b消费 计算c logToFile c

在伪代码中:

class ConsumerA  
    public void consume(A a) 
        saveObjectToDatabase(a);
    


class ConsumerB  
    public void consume(A a) 
        B b = computeB(a);
        saveObjectToDatabase(b);
        publish(b); // republish computed information to another topic b
    


class ConsumerLog  
    public void consume(B b) 
        C c = computeC(b);
        logToFile(c);
    

【讨论】:

以上是关于使用队列解耦程序的主要内容,如果未能解决你的问题,请参考以下文章

将数据库与消息队列解耦的最佳实践

程序员之消息队列

Python队列queue模块

消息队列相关

分布式消息队列应用场景之异步处理应用解耦流量削锋和消息通讯理解分析

通过阻塞队列实现生产者和消费者异步解耦