订阅者的Spring pubsub过滤消息

Posted

技术标签:

【中文标题】订阅者的Spring pubsub过滤消息【英文标题】:Spring pubsub Filter Messages for subscriber 【发布时间】:2021-12-19 14:28:07 【问题描述】:

我尝试使用 pubsub 实现请求异步响应模式。我正在通过使用弹簧集成来做到这一点。我在两端定义了一个主题和两个订阅(用于异步响应的事件发送者和用于请求的事件消费者)。但是如果消费者发送响应,它会发送给发送者和消费者。但是消费者事件是空的。到目前为止一切都很好。我的问题是如何为 springs pubsub 集成中的消息定义过滤器。它是 Google pubsub 中的一项功能。

【问题讨论】:

有机会看到你的一些代码以更好地了解发生了什么吗? 抱歉,下个工作日可以创建快速入门。到目前为止,我没有任何可用的过滤代码。目前只实现了事件发送。 对,但是到目前为止,您已经确定了要添加过滤的位置。可能简单的 Spring Integration Filter 对你来说就足够了:docs.spring.io/spring-integration/docs/current/reference/html/… 看起来很有希望。我会试一试,然后回来。 这个答案对你有帮助吗? 【参考方案1】:

我建议您使用 Spring 过滤器来让您采取行动,无论是应该丢弃消息还是将消息传递到消息通道。你可以看到更多information。

您可以查看这些示例。

@Filter(inputChannel = "inputChannel", outputChannel = "secretChannel")
boolean filter(Message<?> message) 
    String msg = message.getPayload().toString();
    return msg.contains("secret");

您可以查看这些示例,了解如何在 spring 中实现过滤。你可以看到更多examples。

package com.zj.node.contentcenter.controller.content;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * Producer
 *
 * @author 01
 * @date 2019-08-03
 **/
@RestController
@RequiredArgsConstructor
public class TestProducerController 
    private final Source source;
    @GetMapping("/stream-send-msg")
    public String streamSendMsg(String flagHeader) 
        source.output().send(
                MessageBuilder.withPayload("Message Body")
                        // Setting the header for filtering messages
                        .setHeader("flag-header", flagHeader)
                        .build()
        );
        return "send message success!";
    

【讨论】:

以上是关于订阅者的Spring pubsub过滤消息的主要内容,如果未能解决你的问题,请参考以下文章

pubsub 订阅者出错:超出最大消息大小

基于特定电子邮件 ID 的 Google Pubsub 订阅

如何在使用 triggerTopic 创建云功能时设置发布订阅消息过滤器

为啥 PubSub 订阅在保留期到期后向死信主题发布消息

javascript发布订阅pubsub模式

如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息