将 SqsListener 与 SNS 和 SQS 一起使用
Posted
技术标签:
【中文标题】将 SqsListener 与 SNS 和 SQS 一起使用【英文标题】:Using SqsListener with SNS and SQS 【发布时间】:2016-08-01 08:08:15 【问题描述】:我正在使用 spring-cloud-aws 的 SqsListener 从 AWS 的简单队列服务 (SQS) 接收 JSON Format 中的 AWS SNS HTTP 通知。
这是监听器的代码:
@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handle(final MyObject obj) throws Exception
// ...
上面链接的文档只是关于向队列发送和读取普通序列化对象,我认为接收 SNS 消息应该是开箱即用的。但我最终收到转换错误:
10:45:51.480 [simpleMessageListenerContainer-2] 错误 o.s.c.a.m.l.SimpleMessageListenerContainer - 遇到异常 在处理消息时。 org.springframework.messaging.MessagingException:一个异常 调用处理程序方法时发生;嵌套异常是 org.springframework.messaging.converter.MessageConversionException:否 转换器发现转换为 com.myproject.model.MyObject 类, 消息=通用消息
我还尝试创建一个看起来像上面链接的预期 SNS Json 格式的包装器对象,但我不断收到相同的异常。唯一有效的类型是签名中的字符串。 SNS不应该自动转换吗?
【问题讨论】:
【参考方案1】:是的,应该。它确实如此。
为了在反序列化时调用正确的HandlerMethodArgumentResolver
(在本例中为NotificationMessageArgumentResolver
),进而调用正确的转换器NotificationRequestConverter
,您只需将注释org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage
添加到您的方法签名中。例如
@SqsListener(value = "my-queue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handle(final @NotificationMessage MyObject obj) throws Exception
// ...
这样,您的 SNS 的 Message
部分将被提取并转换为 MyObject
。
【讨论】:
如何从 SQS 接收字符串形式的对象? SQS不是只支持字符串作为消息吗? @jtcotton63 这是真的。有效负载以String
的形式出现 - 一个 JSON 对象 - @NotificationMessage
调用一个转换器,该转换器将有效负载转换为所需类的对象。
添加到这个答案,对我来说,简单地添加 @NotificationMessage 注释并没有帮助,因为我用自己的 ArgumentResolver 覆盖了 QueueMessageHandlerFactory。许多人都会遇到这种情况,因为这是自定义 Jackson 映射器所必需的。在这种情况下,必须更改解析器:从 factory.setArgumentResolvers(List.of(new PayloadArgumentResolver(jacksonMessageConverter)));
到 factory.setArgumentResolvers(List.of(new NotificationMessageArgumentResolver(jacksonMessageConverter)));
【参考方案2】:
这在没有@NotificationMessage
的情况下也有效。这样您就不需要发送使用此注释所需的“类型”和“消息”部分。
首先创建一个具有所需属性的类。
public class SqsMessage
private String myTask;
public SqsMessage()
public SqsMessage(@JsonProperty("MyTask") String myTask )
this.myTask = myTask ;
//Getter + Setter
接下来设置监听器
@SqsListener(value = "MyQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(SqsMessage payload, @Headers Map<String, Object> header)
logger.info("Got message with task: " + payload.getTask()
+ " with custom attribute " + header.get("CustomAttribute").toString());
现在你可以发送 JSON 格式
"MyTask":"My task"
POJO 构造函数中的@JsonProperty("MyTask")
注释可以是可选的,具体取决于您的 spring 版本以及您的属性是否与 Json 字符串中的名称相同。例如,如果您的属性名为 task
而您的 Json 字符串为 "task":"My task"
,则没有必要。
【讨论】:
但问题是关于接收 SNS 消息,其中Type
和 Message
是 the format 的一部分
这是一段时间以前,但据我记得,Message
映射到 POJO,Type
信息是标题的一部分。我的答案与接受的答案完全相同,但没有使用 @NotificationMessage
注释,在我的情况下这会导致其他解析问题。 JSON 的东西只是一个奖励。以上是关于将 SqsListener 与 SNS 和 SQS 一起使用的主要内容,如果未能解决你的问题,请参考以下文章