订阅者的Spring pubsub过滤消息
Posted
技术标签:
【中文标题】订阅者的Spring pubsub过滤消息【英文标题】:Spring pubsub Filter Messages for subscriber 【发布时间】:2021-12-19 14:28:07 【问题描述】:我尝试使用 pubsub 实现请求异步响应模式。我正在通过使用弹簧集成来做到这一点。我在两端定义了一个主题和两个订阅(用于异步响应的事件发送者和用于请求的事件消费者)。但是如果消费者发送响应,它会发送给发送者和消费者。但是消费者事件是空的。到目前为止一切都很好。我的问题是如何为 springs pubsub 集成中的消息定义过滤器。它是 Google pubsub 中的一项功能。
【问题讨论】:
有机会看到你的一些代码以更好地了解发生了什么吗? 抱歉,下个工作日可以创建快速入门。到目前为止,我没有任何可用的过滤代码。目前只实现了事件发送。 对,但是到目前为止,您已经确定了要添加过滤的位置。可能简单的 Spring IntegrationFilter
对你来说就足够了:docs.spring.io/spring-integration/docs/current/reference/html/…
看起来很有希望。我会试一试,然后回来。
这个答案对你有帮助吗?
【参考方案1】:
我建议您使用 Spring 过滤器来让您采取行动,无论是应该丢弃消息还是将消息传递到消息通道。你可以看到更多information。
您可以查看这些示例。
@Filter(inputChannel = "inputChannel", outputChannel = "secretChannel")
boolean filter(Message<?> message)
String msg = message.getPayload().toString();
return msg.contains("secret");
您可以查看这些示例,了解如何在 spring 中实现过滤。你可以看到更多examples。
package com.zj.node.contentcenter.controller.content;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Producer
*
* @author 01
* @date 2019-08-03
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController
private final Source source;
@GetMapping("/stream-send-msg")
public String streamSendMsg(String flagHeader)
source.output().send(
MessageBuilder.withPayload("Message Body")
// Setting the header for filtering messages
.setHeader("flag-header", flagHeader)
.build()
);
return "send message success!";
【讨论】:
以上是关于订阅者的Spring pubsub过滤消息的主要内容,如果未能解决你的问题,请参考以下文章
使用 Google Cloud Pub/Sub 消息订阅者实现时无法创建 Spring-Boot bean
spring bootredisspring boot 集成redis的发布订阅机制