类型化消息的一种设计模式

Posted 高可用架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了类型化消息的一种设计模式相关的知识,希望对你有一定的参考价值。

使用消息的时候,我们经常会碰到一种场景:Producer 将消息格式升级了,如果没有通知到 Consumer 方,Consumer 就会获取到不兼容格式的消息,导致应用报错。


鉴于大部分公司上下游联调基本靠吼,所以这种不兼容导致的问题还不少。有没有办法解决这个问题?

业界知名的 Kafka 通过引入 schema registry 来解决这个问题:Kafka 有个配套的 schema registry,可以预先将发送的消息格式 (avro schema) 记录到 schema registry,每次升级时,schema registry 会检查这个 topic 所有的消费者是不是都兼容,如果不兼容,那么就不能升级,你要么修改消息格式,保持兼容;要么联系所有的 consumer 让他们进行升级改造。

好多年前我就想在 QMQ 里引入类似 schema registry 的想法,但是受限于精力和要保持 API 的兼容性,一直没有付诸实践。

另外,Kafka 的 Schema Registry 虽然是一个绝妙的好点子,但还是有点问题:

  1. Kafka 的 Schema registry 必须提前去注册,这个过程有一点点繁琐,然后很多人就不愿意提前去配置,然后干脆不用这种方式,这真是遗憾。所以公共组件一定要做到 API 简洁,使用一定要简单,千万不要繁琐,这些开发人员懒得要命。
  2. Kafka 的检查是运行时发送的时候检查的,这个时候检查虽然能够避免将不兼容的消息发送出去,但是还是有点晚了。

那么有没有什么办法解决这两个问题呢?

前一阵在高可用架构群,@翁伟老板一直在“鼓吹” micronaut 这个框架,翁老板说的是这样的一个例子:

  
    
    
  
import io.micronaut.http.annotation.Get; import io.micronaut.http.client.Client; import io.reactivex.Single;
@Client("/hello") public interface HelloClient {
@Get("/") Single hello(); }

这种通过 annotation 的方式提供强类型的 API 让人眼前一亮(后来经过群友@皆浩的提醒,有个 feign 的框架也提供类似的 API)。

有了这种方式,那 Kafka 的两个问题就迎刃而解了,那么我就可以在 QMQ 里提供类似下面的 API:

  
    
    
  
//发送消息,定义API,这个接口是自动实现的,不需要y用户实现 @QMQProducer public interface Producer {
  @QMQTopic(topic="order.changed")   void orderChagned(OrderEvent event); }
//使用发送消息的API @Resource private Producer producer;
public void pay(Order order){ OrderEvent e = buildEvent(order); producer.orderChanged(e); }
//消费消息 @QMQConsumer(topic = "order.changed") public void onMessage(Message<OrderEvent> msg){   OrderEvent e = msg.Data();   //process }

通过这种 API,配合 Java 里的 annotation processor,我们就可以在编译时对这个 API 进行检查了。

  1.  在编译时我们就可以提取 OrderEvent 这个类型,将其自动的转换成 schema,然后注册到 schema registry
  2. 编译时,我们就可以将 OrderEvent 与 schema registry 里的进行兼容性对比,然后编译时就可以确定是否兼容,不兼容编译时报错,而不是等到发送消息的时候再报错,然后紧急修复。

上述方案,除了解决 Kafka 的 Schema Registry 存在的一些问题外,是不是也得到了一套更好的 API ?欢迎留言讨论。


参考阅读:





高可用架构
改变互联网的构建方式

长按二维码 关注「高可用架构」公众号

以上是关于类型化消息的一种设计模式的主要内容,如果未能解决你的问题,请参考以下文章

替换的片段仍然可见

分布式中间件和消息队列到底是怎么的一种工作模式?

java 代码片段

C#泛型学习

论如何设计一款端对端加密通讯软件

需要示例代码片段帮助