设置手动确认 SQS 消息的 Spring Cloud AWS 问题
Posted
技术标签:
【中文标题】设置手动确认 SQS 消息的 Spring Cloud AWS 问题【英文标题】:Spring Cloud AWS Issue with setting manual acknowledge of SQS message 【发布时间】:2019-03-22 05:33:07 【问题描述】:我正在尝试使用 spring-cloud-aws-messaging 手动删除 AWS SQS 消息来实现逻辑。此功能在this ticketfrom the example in tests范围内实现
@SqsListener(value = "queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(SqsEventDTO message, Acknowledgment acknowledgment)
LOGGER.info("Received message ", message.getFoo());
try
acknowledgment.acknowledge().get();
catch (InterruptedException e)
LOGGER.error("Opps", e);
catch (ExecutionException e)
LOGGER.error("Opps", e);
但面对意外的异常
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of
org.springframework.cloud.aws.messaging.listener.Acknowledgment(no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
SqsMessageDeletionPolicy.ON_SUCCESS
的解决方案有效,但我想避免引发异常。
我在配置中遗漏了什么?
【问题讨论】:
你有没有想过这个问题?我也遇到了同样的问题... @***s 我现在使用 Solution with SqsMessageDeletionPolicy.ON_SUCCESS 并保持这个问题开放以获得其他解决方案 我有解决办法,今天下午用代码sn-ps贴出来。本来打算前几天做的,但是找不到这个帖子,谢谢回复! @***s 会很棒的! 【参考方案1】:问题作者没有提到的是他试图自定义杰克逊ObjectMapper
。因此,他实例化了MappingJackson2MessageConverter
,将其包装在PayloadArgumentResolver
中,并将其设置为QueueMessageHandlerFactory.setArgumentResolvers()
上的单个HandlerMethodArgumentResolver
。这样做会覆盖在QueueMessageHandler.initArgumentResolvers()
中定义的默认参数解析器列表(在QueueMessageHandlerFactory
中创建QueueMessageHandler
的实例时调用)。
例如,当只有PayloadArgumentResolver
设置为单参数解析器,Acknowledgement
参数不能再绑定。
因此,比覆盖参数解析器列表以自定义 Jackson 消息转换器更好的解决方案是在 QueueMessageHandlerFactory
上设置消息转换器列表:
@Bean
fun queueMessageHandlerFactory(objectMapper: ObjectMapper): QueueMessageHandlerFactory
val factory = QueueMessageHandlerFactory()
val messageConverter = MappingJackson2MessageConverter()
messageConverter.objectMapper = objectMapper
factory.setMessageConverters(listOf(messageConverter)) // <-- this is the important line.
return factory
注册的MessageConverters
在QueueMessageHandler.initArgumentResolvers()
里面用作PayloadArgumentResolvers
。
因此,这是一个侵入性较小的更改。
【讨论】:
遗憾的是,文档准确地推荐了问题作者所做的事情。在找到这篇文章之前,我自己一直在努力解决这个问题……【参考方案2】:花了一些时间摆弄并尝试与其他 SO 答案不同的东西。
这是我的代码,我会尽力解释。我包含了我用于 SQS 消费者的所有内容。
我的配置类在下面。下面唯一需要注意的是在 queueMessageHandlerFactory 方法中实例化的转换器和解析器对象。 MappingJackson2MessageConverter(以防从非常明显的类名中看不出来)类处理来自 SQS 的有效负载的反序列化。
将严格的内容类型匹配设置为 false 也很重要。
此外,MappingJackson2MessageConverter 允许您设置自己的 Jackson ObjectMapper,但是如果您这样做,则需要按如下方式对其进行配置:
objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
你可能不想这样做,所以你可以让它为空,它会创建自己的 ObjectMapper。
我认为其余的代码是不言自明的......?如果没有,请告诉我。
我们的用例之间的一个区别是,您似乎正在映射您自己的自定义对象 (SqsEventDTO),而我认为这是可行的?在这种情况下,我认为您不需要 MappingJackson2MessageConverter,但我可能是错的。
@Configuration
public class AppConfig
@Bean
@Primary
public QueueMessageHandler queueMessageHandler(@Autowired QueueMessageHandlerFactory queueMessageHandlerFactory)
return queueMessageHandlerFactory.createQueueMessageHandler();
@Bean
@Primary
public QueueMessageHandlerFactory queueMessageHandlerFactory(@Autowired AmazonSQSAsync sqsClient)
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setAmazonSqs(sqsClient);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setSerializedPayloadClass(String.class);
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
// Uses the MappingJackson2MessageConverter object to resolve/map
// the payload against the Message/S3EventNotification argument.
PayloadArgumentResolver payloadResolver = new PayloadArgumentResolver(messageConverter);
// Extract the acknowledgment data from the payload's headers,
// which then gets deserialized into the Acknowledgment object.
AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver = new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
// I don't remember the specifics of WHY, however there is
// something important about the order of the argument resolvers
// in the list
factory.setArgumentResolvers(Arrays.asList(acknowledgmentResolver, payloadResolver));
return factory;
@Bean("ConsumerBean")
@Primary
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Autowired AmazonSQSAsync amazonSQSAsync, @Autowired QueueMessageHandler queueMessageHandler,
@Autowired ThreadPoolTaskExecutor threadPoolExecutor)
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
smlc.setWaitTimeOut(20);
smlc.setAmazonSqs(amazonSQSAsync);
smlc.setMessageHandler(queueMessageHandler);
smlc.setBeanName("ConsumerBean");
smlc.setMaxNumberOfMessages(sqsMaxMessages);
smlc.setTaskExecutor(threadPoolExecutor);
return smlc;
@Bean
@Primary
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setAllowCoreThreadTimeOut(coreThreadsTimeout);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(threadTimeoutSeconds);
executor.setThreadNamePrefix(threadName);
executor.initialize();
return executor;
下面是我的 SQS 消费者服务类。
@Service
public class RawConsumer
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "$input.sqs.queuename")
public void sqsListener(S3EventNotification event, Acknowledgment ack) throws Exception
// Handle event here
希望对您有所帮助,如果您有任何问题,请告诉我。
【讨论】:
以上是关于设置手动确认 SQS 消息的 Spring Cloud AWS 问题的主要内容,如果未能解决你的问题,请参考以下文章