如何使用不同的接收器创建多个通道? Spring Redis 发布/订阅

Posted

技术标签:

【中文标题】如何使用不同的接收器创建多个通道? Spring Redis 发布/订阅【英文标题】:How to create multiple channels with different receivers? Spring Redis pub/sub 【发布时间】:2014-12-05 16:09:24 【问题描述】:

我正在通过 Spring Data 使用 Redis 发布订阅,但我无法添加超过 1 个频道。

目前我正在关注通过添加与 Receiver 类关联的 MessageListenerAdapter 来配置 MessageListenerContainer 的典型示例,如下所示:

以前的工作完美,我能够推送和接收消息。 但是,我尝试添加第二个侦听器适配器来创建“具有不同接收器的通道,但我得到了 NullPointerException。

错误附在下面。添加新适配器有不同的方法吗?一般来说,我想动态添加频道。

可以通过在 addMessageListener 方法中提供 PatternTopic 列表来添加与一个特定接收器关联的多个通道。

感谢您的帮助

【问题讨论】:

【参考方案1】:

我认为在添加 MessageListenerAdapter 时 Spring Redis 存在一个重要的错误。

如果 Receiver 类没有从 MessageListener 扩展(因此实现了 onMessage),则 MessageListenerAdapter 类的内部方法 MethodInvoker() 专门询问 Receiver 是否是MessageListener(见下图最后一行)。

要解决这个问题,只需从 MessageListener 扩展,然后您可以直接添加额外的适配器。

很遗憾 spring-data-redis 团队没有在他们的 github 页面中启用 issue 来发布这个 bug。 https://github.com/spring-projects/spring-data-redis

【讨论】:

我已经检查了它的源代码,而 AFIAK 并不是他们真正的 bug。您可以自定义 MessageListenerAdapter 或适配器的限定符列表以添加到 container 中,如下面的答案【参考方案2】:

如果有人还在寻找,请使用 Spring Boot 1.5.X 的以下配置

RedisConfig 多频道类:

@Configuration
public class RedisConfig 

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            @Qualifier("notificationListenerAdapter") MessageListenerAdapter notificationListenerAdapter,
                                            @Qualifier("countListenerAdapter") MessageListenerAdapter countListenerAdapter) 

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(notificationListenerAdapter,  new PatternTopic("notification"));
        container.addMessageListener(countListenerAdapter, new PatternTopic("count"));
        return container;
    

    @Bean("notificationListenerAdapter")
    MessageListenerAdapter notificationListenerAdapter(RedisReceiver redisReceiver) 
        return new MessageListenerAdapter(redisReceiver, "receiveNotificationMessage");
    

    @Bean("countListenerAdapter")
    MessageListenerAdapter countListenerAdapter(RedisReceiver redisReceiver) 
        return new MessageListenerAdapter(redisReceiver, "receiveCountMessage");
    

    @Bean
    RedisReceiver receiver() 
        return new RedisReceiver();
    

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) 
        return new StringRedisTemplate(connectionFactory);
    


RedisReceiver 接收来自频道的消息。

注意:确保方法名称与上面定义的方法名称匹配。

public class RedisReceiver 

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisReceiver.class);


    public void receiveNotificationMessage(String message) 
        LOGGER.info("Message Received from notification channel: <" + message + ">");

    

    public void receiveCountMessage(String message) 
        LOGGER.info("Message Received from count channel: <" + message + ">");
    

测试流程:

public class TestMessages 

    private static final Logger LOG = LoggerFactory.getLogger(TestMessages.class);

    private final StringRedisTemplate redisTemplate;

    public TestMessages(StringRedisTemplate redisTemplate) 
        this.redisTemplate = redisTemplate;
    

    public void sendNotification(String message) 

        redisTemplate.convertAndSend("notification", message);

    

    public void sendCount(String message) 

        redisTemplate.convertAndSend("count", message);

    

【讨论】:

以上是关于如何使用不同的接收器创建多个通道? Spring Redis 发布/订阅的主要内容,如果未能解决你的问题,请参考以下文章

如何在同一本地/src 地址上创建多个 UDP 数据报通道/流

单个通道上的多个接收器。谁得到数据?

Spring使用Spring和AMQP发送接收消息(上)

一个通道 - RabbitMQ 中的一个队列?

如何在 Spring Integration DSL 中为通道设置多个消息处理程序?

Spring AMQP RPC使用者如何确定发布者通道是不是已关闭