将 SqsListener 与 SNS 和 SQS 一起使用

Posted

技术标签:

【中文标题】将 SqsListener 与 SNS 和 SQS 一起使用【英文标题】:Using SqsListener with SNS and SQS 【发布时间】:2016-08-01 08:08:15 【问题描述】:

我正在使用 spring-cloud-aws 的 SqsListener 从 AWS 的简单队列服务 (SQS) 接收 JSON Format 中的 AWS SNS HTTP 通知。

这是监听器的代码:

@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handle(final MyObject obj) throws Exception 
// ...

上面链接的文档只是关于向队列发送和读取普通序列化对象,我认为接收 SNS 消息应该是开箱即用的。但我最终收到转换错误:

10:45:51.480 [simpleMessageListenerContainer-2] 错误 o.s.c.a.m.l.SimpleMessageListenerContainer - 遇到异常 在处理消息时。 org.springframework.messaging.MessagingException:一个异常 调用处理程序方法时发生;嵌套异常是 org.springframework.messaging.converter.MessageConversionException:否 转换器发现转换为 com.myproject.model.MyObject 类, 消息=通用消息

我还尝试创建一个看起来像上面链接的预期 SNS Json 格式的包装器对象,但我不断收到相同的异常。唯一有效的类型是签名中的字符串。 SNS不应该自动转换吗?

【问题讨论】:

【参考方案1】:

是的,应该。它确实如此。

为了在反序列化时调用正确的HandlerMethodArgumentResolver(在本例中为NotificationMessageArgumentResolver),进而调用正确的转换器NotificationRequestConverter,您只需将注释org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage 添加到您的方法签名中。例如

@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handle(final @NotificationMessage MyObject obj) throws Exception 
// ...

这样,您的 SNS 的 Message 部分将被提取并转换为 MyObject

【讨论】:

如何从 SQS 接收字符串形式的对象? SQS不是只支持字符串作为消息吗? @jtcotton63 这是真的。有效负载以 String 的形式出现 - 一个 JSON 对象 - @NotificationMessage 调用一个转换器,该转换器将有效负载转换为所需类的对象。 添加到这个答案,对我来说,简单地添加 @NotificationMessage 注释并没有帮助,因为我用自己的 ArgumentResolver 覆盖了 QueueMessageHandlerFactory。许多人都会遇到这种情况,因为这是自定义 Jackson 映射器所必需的。在这种情况下,必须更改解析器:从 factory.setArgumentResolvers(List.of(new PayloadArgumentResolver(jacksonMessageConverter)));factory.setArgumentResolvers(List.of(new NotificationMessageArgumentResolver(jacksonMessageConverter)));【参考方案2】:

这在没有@NotificationMessage 的情况下也有效。这样您就不需要发送使用此注释所需的“类型”和“消息”部分。

首先创建一个具有所需属性的类。

public class SqsMessage 

   private String myTask;

   public SqsMessage() 
   

   public SqsMessage(@JsonProperty("MyTask") String myTask ) 
       this.myTask = myTask ;
   

   //Getter + Setter 

接下来设置监听器

@SqsListener(value = "MyQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(SqsMessage payload, @Headers Map<String, Object> header) 
   logger.info("Got message with task: " + payload.getTask() 
    + " with custom attribute " + header.get("CustomAttribute").toString());

现在你可以发送 JSON 格式

"MyTask":"My task"

POJO 构造函数中的@JsonProperty("MyTask") 注释可以是可选的,具体取决于您的 spring 版本以及您的属性是否与 Json 字符串中的名称相同。例如,如果您的属性名为 task 而您的 Json 字符串为 "task":"My task",则没有必要。

【讨论】:

但问题是关于接收 SNS 消息,其中 TypeMessage 是 the format 的一部分 这是一段时间以前,但据我记得,Message 映射到 POJO,Type 信息是标题的一部分。我的答案与接受的答案完全相同,但没有使用 @NotificationMessage 注释,在我的情况下这会导致其他解析问题。 JSON 的东西只是一个奖励。

以上是关于将 SqsListener 与 SNS 和 SQS 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

在Python中使用aws的sns和sqs

如何将 SQS 队列订阅到 Java 中的 SNS 主题

SQS 和 SNS 对比分析

Amazon SQS/SNS 策略错误

AWS 默认访问策略 SNS 主题和 SQS q

使用SQS接收SES消息