如何将 SQS 队列订阅到 Java 中的 SNS 主题

Posted

技术标签:

【中文标题】如何将 SQS 队列订阅到 Java 中的 SNS 主题【英文标题】:How to subscribe a SQS queue to a SNS topic in Java 【发布时间】:2019-07-02 15:03:33 【问题描述】:

当我创建一个新队列并将其订阅到 Java 中的主题时,没有消息出现。通过 AWS Web 控制台也可以正常工作。

我想我必须以某种方式确认订阅,但 sns.confirmSubscription 方法需要一个令牌 - 我应该从哪里得到它?

这是我的 Java 代码:

String queueURL = sqs.createQueue("my-queue").getQueueUrl();

sns.subscribe(myTopicARN, "sqs", queueURL);

sns.publish(myTopicARN, "\"payload\":\"test\"");

sqs.receiveMessage(queueURL).getMessages()
        .forEach(System.out::println);  // nothing

我做错了什么?

【问题讨论】:

您的 IAM 用户在从控制台执行此操作时可能需要权限。您可能需要检查 SDK 使用的凭据是否具有正确的权限。 @A.Khan 我实际上创建了一个管理员用户并通过设置AWS_PROFILE=user-from-credentials使用其凭据 @A.Khan 无论如何,在这种情况下我会期待一个例外...... 真的。您是否在队列中启用了长轮询? 不,我只写了您可以在上面看到的实际代码。当消息立即发送和接收时,我需要这个吗? 【参考方案1】:

看看这个:https://aws.amazon.com/blogs/developer/subscribing-queues-to-topics/

你应该这样订阅:

Topics.subscribeQueue(sns, sqs, myTopicARN, queueURL);

这种方便的方法为订阅创建了一个策略,以允许主题向队列发送消息。

【讨论】:

【参考方案2】:

将队列订阅到 sns 不会自动创建允许 sns 向队列发送消息的策略(根据我对 sns/sqs 的经验),因此您需要自己创建策略并授予 sns 向队列发送消息的权限你的队列这是一个关于如何使用队列 url、队列 arn 和主题 arn 的示例

import static com.amazonaws.auth.policy.Principal.All;
import static com.amazonaws.auth.policy.Statement.Effect.Allow;
import static com.amazonaws.auth.policy.actions.SQSActions.SendMessage;
import static com.amazonaws.auth.policy.conditions.ArnCondition.ArnComparisonType.ArnEquals;

final Statement mainQueueStatements = new Statement(Allow) //imported above
        .withActions(SendMessage) //imported above
            .withPrincipals(All) //imported above
            .withResources(new Resource(queueArn)) // your queue arn
            .withConditions(
                    new Condition()
                            .withType(ArnEquals.name()) //imported above
                            .withConditionKey(SOURCE_ARN_CONDITION_KEY) //imported above
                            .withValues(topicArn) // your topic arn
            );
    final Policy mainQueuePolicy = ()
            .withId("MainQueuePolicy")
            .withStatements(mainQueueStatements);
    final HashMap<QueueAttributeName, String> attributes = new HashMap<>();
     attributes.put(QueueAttributeName.Policy.toString(), mainQueuePolicy.toJson());
    amazonSQS.setQueueAttributes(new SetQueueAttributesRequest().withAttributes(attributes).withQueueUrl(queueUrl)); // your queue url

【讨论】:

以上是关于如何将 SQS 队列订阅到 Java 中的 SNS 主题的主要内容,如果未能解决你的问题,请参考以下文章

Cloudformation - 如何在代码中设置 SNS 订阅的过滤策略?

将 S3 事件订阅到 SQS 队列而不向世界公开 SQS?

Amazon SQS/SNS 策略错误

aws cloudformation 模板 sns sqs

在发送到 SNS 之前,我可以使用 Amazon SQS 作为延迟队列吗?

Serverless 订阅其他区域的 SNS