pubsub 消息没有被 poller 和 serviceactivator 拉取

Posted

技术标签:

【中文标题】pubsub 消息没有被 poller 和 serviceactivator 拉取【英文标题】:pubsub messages not being pulled with poller and serviceactivator 【发布时间】:2021-06-04 06:14:41 【问题描述】:

我一直在尝试让 pubsub 在 Spring 应用程序中工作。为了启动和运行,我一直在阅读教程和文档,例如 this

我可以构建和启动东西,但如果我通过云控制台向测试订阅发送消息,它永远不会到达。

这就是我的代码现在的样子:

@Configuration
@Import(GcpPubSubAutoConfiguration.class)
public class PubSubConfigurator 

@Bean
public GcpProjectIdProvider projectIdProvider()
    return () -> "project-id";


@Bean
public CredentialsProvider credentialsProvider()
    return GoogleCredentials::getApplicationDefault;


@Bean
public MessageChannel inputMessageChannel() 
   return new PublishSubscribeChannel();


@Bean
@InboundChannelAdapter(channel = "inputMessageChannel", poller = @Poller(fixedDelay = "5"))
public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) 
    PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate,  "tst-sandbox");
    messageSource.setAckMode(AckMode.MANUAL);
    messageSource.setPayloadType(String.class);
    messageSource.setBlockOnPull(false);
    messageSource.setMaxFetchSize(10);
    //pubSubTemplate.pull("tst-sandbox", 10, true);
    return messageSource;


// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) 
    System.out.println("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
    message.ack();


我的想法是轮询器注释会启动一个轮询器以每隔一段时间运行一次以检查消息并将它们发送到使用服务激活器注释的方法,但显然情况并非如此,因为它永远不会被命中。

有趣的是,如果我在“return messageSource”之前放置一个断点并检查模板的结果。pull 调用返回的消息,所以这似乎不是连接本身的问题。

我在这里缺少什么?教程和文档在这一点上并没有多大帮助,因为它们都使用与上面几乎相同的教程代码......

我尝试了上述代码的变体,例如创建适配器而不是消息源,如下所示:

@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) 
    PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(pubSubTemplate, "tst-sandbox");
    adapter.setOutputChannel(messageChannel);
    adapter.setAckMode(AckMode.MANUAL);
    adapter.setPayloadType(String.class);
    return adapter;

无济于事。欢迎提出任何建议。

【问题讨论】:

【参考方案1】:

从头创建spring boot项目后发现问题(主项目为普通spring)。在调试输出中注意到它正在自动启动服务激活器 bean 和其他一些事情,例如实际订阅它在主项目中没有执行的通道。

快速谷歌后解决方法很简单,不得不添加

@EnableIntegration

类级别的注释和消息开始传入。

【讨论】:

以上是关于pubsub 消息没有被 poller 和 serviceactivator 拉取的主要内容,如果未能解决你的问题,请参考以下文章

为啥 PubSub 订阅在保留期到期后向死信主题发布消息

从 Google PubSub 中提取消息不起作用 - 权限被拒绝

如何编写云函数来接收、解析和发布 PubSub 消息?

GCP Pubsub 中的消息丢失和重复

如何使用 Google PubSub 确认 (@google-cloud/pubsub)

Dapr PubSub 与 dotnet SDK