如何配置Kafka RPC调用者主题和组

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何配置Kafka RPC调用者主题和组相关的知识,希望对你有一定的参考价值。

我正在尝试使用Kafka作为消息代理来实现RPC架构。使用Kafka而不是另一个消息代理解决方案的决定由当前上下文决定。

实际实现包括两种不同类型的服务:

  • 接收者:该服务从Kafka主题接收消息,该消息使用,处理消息,然后将响应消息发布到响应主题;
  • 调用者:此服务接收HTTP请求,然后将消息发布到接收者主题,使用接收者服务的响应主题作为响应消息,然后将其作为HTTP响应返回。

主题中发布的请求/响应消息与消息密钥相关。

接收器实现相当简单:在启动时,它创建“请求”和“响应”主题,然后开始使用服务组ID的请求主题(接收器的许多实例将共享相同的组ID以实现适当的要求平衡)。当请求到达时,服务处理请求,然后在响应主题中发布响应。

我的问题在于调用者实现,特别是在使用响应队列的响应时。

有以下假设:

  1. 必须同时管理HTTP请求;
  2. 此调用方服务可能有多个实例。

每个线程/服务必须接收响应主题中的所有消息,以便找到具有相应请求密钥的消息。

例如,假设两个接收器服务分别产生两个带有密钥1和2的消息。这些消息将在接收者主题中发布并进行处理。然后,响应将发布在主题接收者 - 响应中。如果两个接收方服务共享相同的组ID,则可能是响应1到达发布消息2的服务,反之亦然,从而导致HTTP超时。

为了避免这个问题,我设法考虑了这些可能的解决方案:

  1. 为每个请求创建一个新组(编辑:但是不能通过代码删除组,因此需要另一个服务来清除这些组中的zookeeper);
  2. 为每个请求创建一个新主题,然后删除它。

希望我自己足够清楚 - 我必须承认我是卡夫卡的初学者 - 我的问题是:

哪种解决方案比另一种更昂贵?或者是否有其他主题/组配置可以实现假设3?

谢谢。

答案

我想我找到了一个可能的解决方案。当一个组的偏移量在一段时间内没有更新时,由一个时间段自动删除一个组,由配置offsets.topic.retention.minutes决定。

可以通过设置配置offsets.retention.check.interval.ms来设置偏移更新时间检查。

这样,当消费者连接到搜索回复消息的响应主题时,可以放弃创建的组,并且动物园管理员稍后将删除它。

以上是关于如何配置Kafka RPC调用者主题和组的主要内容,如果未能解决你的问题,请参考以下文章

autobahn JS,如果 RPC 的被调用者是异步的怎么办?

如何在 OpenStack 中捕获对应于 RPC.call Request 消息的回复 rabbitmq 消息?

基于zookeeper实现rpc注册中心

如何配置从 kafka 到 cassandra 的 kafkaConnect

Kafka Broker配置

如何找到 kafka 配置文件?