从rabbitmq消费消息并通过其客户端连接转发它们的“扭曲”方式是啥?

Posted

技术标签:

【中文标题】从rabbitmq消费消息并通过其客户端连接转发它们的“扭曲”方式是啥?【英文标题】:What is the 'twisted' way of consuming messages from rabbitmq and forwarding them through its client connections?从rabbitmq消费消息并通过其客户端连接转发它们的“扭曲”方式是什么? 【发布时间】:2016-12-12 11:12:29 【问题描述】:

我正在twisted写一个websocket服务器来学习框架。它将接收来自rabbitmq 代理的消息,并向连接的客户端发送更新。如果我想通过多个客户端连接一次广播/多播多条消息,那么调用(仅作为示例)deferToThread(channel.basic_consume, queue)callInThread(" ") 是一个很好的选择吗?

如果不是,twisted 使用来自rabbitmq 的消息并将它们转发到连接的客户端的方式是什么?

到目前为止,我的策略是:

reactor_thread: 监听端口(x)以建立和维护客户端连接

其他线程: 订阅rabbitmq队列并消费消息(如果有) (永远持续下去)

【问题讨论】:

您应该添加标签websocketautobahncrossbar,以便Tavendo 的异步websocket 开发人员也可以帮助您。他们或许能够提供更好的解决方案。 【参考方案1】:

调用 deferToThread(channel.basic_consume, queue) 或 callInThread(" ") 是一个很好的选择吗?

在这种情况下使用线程并不会真正提供太多好处,因为消息已经在 RabbitMQ 中排队。我过去也遇到过类似的情况,我可以为您提供一个高级概述,了解我在不使用线程的情况下为解决问题所做的工作。免责声明:我已经有一年或 2 年没有使用 RabbitMQ 或 Websockets 了,所以我的知识可能有点模糊。

列出已连接的客户端

假设您将 autobahn 用于 websockets,您可以在工厂类 (autobahn.twisted.websocket.WebSocketServerFactory) 中添加一个变量,该变量将跟踪连接的客户端。 listdict 都可以正常工作。

factory = WebSocketServerFactory()
factory.connection_list = []

connection_list 变量将在建立连接后存储协议对象 (autobahn.twisted.websocket.WebSocketServerProtocol)。在协议中,您需要重载connectionMade 函数以将协议(在本例中为self)附加到self.factory.connection_list

def connectionMade(self):
    super(WSProtocol, self).connectionMade()
    self.factory.connection_list.append(self)

为了灵活性,最好创建类似“onConnect deferred”的东西,但这就是它的要点。也许autobahn 提供了一个接口来做到这一点。

RabbitMQ

使用pika,您可以使用此example 异步消费消息。根据需要对通道和交换名称进行更改,以使其适用于您的设置。然后我们将进行 2 处更改。首先我们将factory.connection_list 传递给回调,然后当一条消息被消费时,我们会将它写入连接的客户端的协议。

@defer.inlineCallbacks
def run(connection, proto_list):
    #...
    l = task.LoopingCall(read, queue_object, proto_list)
    l.start(0.01)

@defer.inlineCallbacks
def read(queue_object, proto_list):
    #...
    if body:
        print(body)
        for client in sorted(proto_list):
            yield client.write(body)

    yield ch.basic_ack(delivery_tag=method.delivery_tag)

#...
d.addCallback(run, factory.connection_list)
reactor.run()

read回调函数中,每消费一条消息,循环任务就会遍历已连接的客户端列表,并将消息发送给他们。

【讨论】:

谢谢;您能否添加服务器在端口上侦听传入客户端连接的部分,以及它们将如何协同工作?我想使用另一个线程的原因是因为我打算使用“consume”而不是“get”来接收来自 rabbitmq 的消息。他们的文档推荐它,因为执行 get 时显然会使用更多资源。 感谢您接受我的回答(以及赏金 :)) 抱歉回复延迟,我没​​有注意到您发表了评论。你还需要连接代码吗?我想你已经把那部分记下来了。至于线程,我建议你学会在没有它们的情况下这样做,因为它们将是一个共享变量 (connection_list),然后你继承了共享状态带来的问题。线程也会延迟学习 Twisted 的大量异步功能(在我个人看来)。我建议您学习异步模型,然后在您感到舒适后使用 crochet 之类的东西。 当然,没问题,这是一个内容丰富的答案,但我还是新手,我想真正了解所有这些是如何组合在一起的。我可以简单地添加“reactor.listenTCP(8989, wsfactory)”,其中“wsfactory”是 websocket 协议工厂吗?至于loopingcall,不如创建一个rabbitmq消费者呢? 是的,您只需reactor.listenTCP(8‌​989, wsfactory)。我还没有看到任何使用 rabbit's consumer 和 Twisted 的例子,所以很遗憾我不确定如何做到这一点。

以上是关于从rabbitmq消费消息并通过其客户端连接转发它们的“扭曲”方式是啥?的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习第一记:用java连接RabbitMQ

rabbitMQ 基本概念

RabbitMQ消息应答重新入队

RabbitMQ集群和失败处理

RabbitMQ

RabbitMQ 相关概念