设置手动确认 SQS 消息的 Spring Cloud AWS 问题

Posted

技术标签:

【中文标题】设置手动确认 SQS 消息的 Spring Cloud AWS 问题【英文标题】:Spring Cloud AWS Issue with setting manual acknowledge of SQS message 【发布时间】:2019-03-22 05:33:07 【问题描述】:

我正在尝试使用 spring-cloud-aws-messaging 手动删除 AWS SQS 消息来实现逻辑。此功能在this ticketfrom the example in tests范围内实现

@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment) 

    LOGGER.info("Received message ", message.getFoo());
    try 
        acknowledgment.acknowledge().get();
     catch (InterruptedException e) 
        LOGGER.error("Opps", e);
     catch (ExecutionException e) 
        LOGGER.error("Opps", e);
    

但面对意外的异常

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance oforg.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

SqsMessageDeletionPolicy.ON_SUCCESS 的解决方案有效,但我想避免引发异常。

我在配置中遗漏了什么?

【问题讨论】:

你有没有想过这个问题?我也遇到了同样的问题... @***s 我现在使用 Solution with SqsMessageDeletionPolicy.ON_SUCCESS 并保持这个问题开放以获得其他解决方案 我有解决办法,今天下午用代码sn-ps贴出来。本来打算前几天做的,但是找不到这个帖子,谢谢回复! @***s 会很棒的! 【参考方案1】:

问题作者没有提到的是他试图自定义杰克逊ObjectMapper。因此,他实例化了MappingJackson2MessageConverter,将其包装在PayloadArgumentResolver 中,并将其设置为QueueMessageHandlerFactory.setArgumentResolvers() 上的单个HandlerMethodArgumentResolver。这样做会覆盖在QueueMessageHandler.initArgumentResolvers() 中定义的默认参数解析器列表(在QueueMessageHandlerFactory 中创建QueueMessageHandler 的实例时调用)。

例如,当只有PayloadArgumentResolver 设置为单参数解析器,Acknowledgement 参数不能再绑定。

因此,比覆盖参数解析器列表以自定义 Jackson 消息转换器更好的解决方案是在 QueueMessageHandlerFactory 上设置消息转换器列表:

    @Bean
    fun queueMessageHandlerFactory(objectMapper: ObjectMapper): QueueMessageHandlerFactory 
        val factory = QueueMessageHandlerFactory()

        val messageConverter = MappingJackson2MessageConverter()
        messageConverter.objectMapper = objectMapper

        factory.setMessageConverters(listOf(messageConverter)) // <-- this is the important line.
        return factory
    

注册的MessageConvertersQueueMessageHandler.initArgumentResolvers()里面用作PayloadArgumentResolvers

因此,这是一个侵入性较小的更改。

【讨论】:

遗憾的是,文档准确地推荐了问题作者所做的事情。在找到这篇文章之前,我自己一直在努力解决这个问题……【参考方案2】:

花了一些时间摆弄并尝试与其他 SO 答案不同的东西。

这是我的代码,我会尽力解释。我包含了我用于 SQS 消费者的所有内容。

我的配置类在下面。下面唯一需要注意的是在 queueMessageHandlerFactory 方法中实例化的转换器和解析器对象。 MappingJackson2MessageConverter(以防从非常明显的类名中看不出来)类处理来自 SQS 的有效负载的反序列化。

将严格的内容类型匹配设置为 false 也很重要。

此外,MappingJackson2MessageConverter 允许您设置自己的 Jackson ObjectMapper,但是如果您这样做,则需要按如下方式对其进行配置:

objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

你可能不想这样做,所以你可以让它为空,它会创建自己的 ObjectMapper。

我认为其余的代码是不言自明的......?如果没有,请告诉我。

我们的用例之间的一个区别是,您似乎正在映射您自己的自定义对象 (SqsEventDTO),而我认为这是可行的?在这种情况下,我认为您不需要 MappingJackson2MessageConverter,但我可能是错的。

@Configuration
public class AppConfig 

@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory) 
    return queueMessageHandlerFactory.createQueueMessageHandler();


@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient) 

    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    factory.setAmazonSqs(sqsClient);

    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setSerializedPayloadClass(String.class);

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);

    // Uses the MappingJackson2MessageConverter object to resolve/map 
    // the payload against the Message/S3EventNotification argument.
    PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);

    // Extract the acknowledgment data from the payload's headers, 
    // which then gets deserialized into the Acknowledgment object.  
    AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");

    // I don't remember the specifics of WHY, however there is 
    // something important about the order of the argument resolvers 
    // in the list
    factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));

    return factory;


@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
    @Autowired ThreadPoolTaskExecutor threadPoolExecutor) 

    SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
    smlc.setWaitTimeOut(20);
    smlc.setAmazonSqs(amazonSQSAsync);
    smlc.setMessageHandler(queueMessageHandler);
    smlc.setBeanName("ConsumerBean");
    smlc.setMaxNumberOfMessages(sqsMaxMessages);
    smlc.setTaskExecutor(threadPoolExecutor);

    return smlc;


@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor() 
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(corePoolSize);
    executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setKeepAliveSeconds(threadTimeoutSeconds);
    executor.setThreadNamePrefix(threadName);
    executor.initialize();

    return executor;


下面是我的 SQS 消费者服务类。

@Service
public class RawConsumer 

@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "$input.sqs.queuename")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception 
    // Handle event here

希望对您有所帮助,如果您有任何问题,请告诉我。

【讨论】:

以上是关于设置手动确认 SQS 消息的 Spring Cloud AWS 问题的主要内容,如果未能解决你的问题,请参考以下文章

AWS SQS JMS确认

如何使用 spring 集成来调整 sqs 队列的消耗

Spring AMQP 源码分析 06 - 手动消息确认

AWS SQS Boto3 手动将消息发送到死信

如何修改 Spring Cloud AWS 用来反序列化 SQS 消息的对象映射器?

如何使用 Amazon SQS Spring 云注释 @SqsListener 轮询特定数量的消息