使用RabbitMQ插件在Grails中创建队列运行时

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用RabbitMQ插件在Grails中创建队列运行时相关的知识,希望对你有一定的参考价值。

我有一个系统,外部系统可以订阅我的系统生成的事件。系统用Grails 2编写,使用RabbitMQ plugin进行内部消息传递。外部系统的事件通过HTTP传递。

我想为每个订户创建一个队列,以防止慢速订户端点减慢向其他订户的消息。订阅可以在运行时发生,这就是为什么在应用程序配置中定义队列是不可取的。

如何使用Grails RabbitMQ插件创建具有主题绑定运行时的队列?

由于从RabbitMQ队列读取消息直接耦合到服务,因此创建队列运行时的一个副问题可能是拥有该Grails服务的多个实例。有任何想法吗?

答案

我没有为你准备好的解决方案,但如果你遵循RabbitmqGrailsPlugin Descriptor中的代码,特别是doWithSpring部分你应该能够重新创建在运行时动态初始化新的Queue和相关的Listener所需的步骤。

这一切都归结为传递所需的参数,注册必要的spring bean并启动监听器。

为了回答你的第二个问题,我认为你可以提出一些命名约定并为每个队列创建一个新的队列处理程序。如何动态创建spring bean的示例可以在这里找到:dynamically declare beans

只是一个简短的例子,我如何快速注册一个队列,它需要更多的布线等...

def createQ(queueName) {
    def queuesConfig = {
        "${queueName}"(durable: true, autoDelete: false,)
    }
    def queueBuilder = new RabbitQueueBuilder()
    queuesConfig.delegate = queueBuilder
    queuesConfig.resolveStrategy = Closure.DELEGATE_FIRST
    queuesConfig()

    queueBuilder.queues?.each { queue ->
        if (log.debugEnabled) {
            log.debug "Registering queue '${queue.name}'"
        }
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(Queue.class);
        builder.addConstructorArgValue(queue.name)
        builder.addConstructorArgValue(Boolean.valueOf(queue.durable))
        builder.addConstructorArgValue(Boolean.valueOf(queue.exclusive))
        builder.addConstructorArgValue(Boolean.valueOf(queue.autoDelete))
        builder.addConstructorArgValue(queue.arguments)
        DefaultListableBeanFactory factory = (DefaultListableBeanFactory) grailsApplication.mainContext.getBeanFactory();
        factory.registerBeanDefinition("grails.rabbit.queue.${queue.name}", builder.getBeanDefinition());
    }
}
另一答案

我最终使用了Grails RabbitMQ插件使用的Spring AMQP。删除了一些方法/参数,因为它们与示例无关:

class MyUpdater {
  void handleMessage(Object message) {
    String content = new String(message)

    // do whatever you need with the message
  }
}


import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.amqp.rabbit.connection.ConnectionFactory

class ListenerInitiator {

  // autowired
  ConnectionFactory   rabbitMQConnectionFactory

  protected void initiateListener() {
    RabbitAdmin admin = new RabbitAdmin(rabbitMQConnectionFactory)

    // normally passed to this method, moved to local vars for simplicity
    String queueName = "myQueueName"
    String routingKey = "#"
    String exchange = "myExchange"

    Queue queue = new Queue(queueName)
    admin.declareQueue(queue)
    TopicExchange exchange = new TopicExchange(exchange)
    admin.declareExchange(exchange)

    admin.declareBinding( BindingBuilder.bind(queue).to(exchange).with(routingKey) )

    // normally passed to this method, moved to local var for simplicity
    MyUpdater listener = new MyUpdater()
    SimpleMessageListenerContainer container =
        new SimpleMessageListenerContainer(rabbitMQConnectionFactory)
    MessageListenerAdapter adapter = new MessageListenerAdapter(listener)
    adapter.setMessageConverter(new SimpleMessageConverter())

    container.setMessageListener(adapter)
    container.setQueueNames(queueName)
    container.start()
}
另一答案

我强烈建议检查一下(https://www.infoq.com/presentations/api-io-state)。我为Grails 3.1提交了一个API插件,以便从控制器共享IO状态和抽象API功能,这样它就可以与分布式架构中的服务共享,但Grails团队似乎希望对社区应用不同的规则。 。

它会解决你的问题,但它可能永远不会被释放。

以上是关于使用RabbitMQ插件在Grails中创建队列运行时的主要内容,如果未能解决你的问题,请参考以下文章

如何在Rabbitmq中创建队列

Grails、Grails RabbitMQ 插件、Java 、Spring AMQP - 多个客户端

Mule 不会在 Rabbitmq 上的某些队列中创建消费者

如何在c#中使用RabbitMQ

如何在grails中自定义spring security插件登录页面

带有 AMQP 和 RabbitMQ 的 Spring,带有可选 x-dead-letter-exchange 的队列