如何配置Kafka RPC调用者主题和组
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何配置Kafka RPC调用者主题和组相关的知识,希望对你有一定的参考价值。
我正在尝试使用Kafka作为消息代理来实现RPC架构。使用Kafka而不是另一个消息代理解决方案的决定由当前上下文决定。
实际实现包括两种不同类型的服务:
- 接收者:该服务从Kafka主题接收消息,该消息使用,处理消息,然后将响应消息发布到响应主题;
- 调用者:此服务接收HTTP请求,然后将消息发布到接收者主题,使用接收者服务的响应主题作为响应消息,然后将其作为HTTP响应返回。
主题中发布的请求/响应消息与消息密钥相关。
接收器实现相当简单:在启动时,它创建“请求”和“响应”主题,然后开始使用服务组ID的请求主题(接收器的许多实例将共享相同的组ID以实现适当的要求平衡)。当请求到达时,服务处理请求,然后在响应主题中发布响应。
我的问题在于调用者实现,特别是在使用响应队列的响应时。
有以下假设:
- 必须同时管理HTTP请求;
- 此调用方服务可能有多个实例。
每个线程/服务必须接收响应主题中的所有消息,以便找到具有相应请求密钥的消息。
例如,假设两个接收器服务分别产生两个带有密钥1和2的消息。这些消息将在接收者主题中发布并进行处理。然后,响应将发布在主题接收者 - 响应中。如果两个接收方服务共享相同的组ID,则可能是响应1到达发布消息2的服务,反之亦然,从而导致HTTP超时。
为了避免这个问题,我设法考虑了这些可能的解决方案:
- 为每个请求创建一个新组(编辑:但是不能通过代码删除组,因此需要另一个服务来清除这些组中的zookeeper);
- 为每个请求创建一个新主题,然后删除它。
希望我自己足够清楚 - 我必须承认我是卡夫卡的初学者 - 我的问题是:
哪种解决方案比另一种更昂贵?或者是否有其他主题/组配置可以实现假设3?
谢谢。
我想我找到了一个可能的解决方案。当一个组的偏移量在一段时间内没有更新时,由一个时间段自动删除一个组,由配置offsets.topic.retention.minutes决定。
可以通过设置配置offsets.retention.check.interval.ms来设置偏移更新时间检查。
这样,当消费者连接到搜索回复消息的响应主题时,可以放弃创建的组,并且动物园管理员稍后将删除它。
以上是关于如何配置Kafka RPC调用者主题和组的主要内容,如果未能解决你的问题,请参考以下文章
autobahn JS,如果 RPC 的被调用者是异步的怎么办?
如何在 OpenStack 中捕获对应于 RPC.call Request 消息的回复 rabbitmq 消息?