Amazon SQS 消息队列服务

Posted helloz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Amazon SQS 消息队列服务相关的知识,希望对你有一定的参考价值。

Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证。 sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格有序的,即消息接收的顺序是按照消息发送的顺序来的, 而标准队列是尽最大可能有序, 即不保证一定为有序, 此外FIFO还保证了消息在一定时间内不能重复发出,即使是重复发了, 它也不会把消息发送到队列上。

队列操作

创建队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
CreateQueueRequest create_request = new CreateQueueRequest(QUEUE_NAME)
        .addAttributesEntry("DelaySeconds", "60")
        .addAttributesEntry("MessageRetentionPeriod", "86400");

try {
    sqs.createQueue(create_request);
} catch (AmazonSQSException e) {
    if (!e.getErrorCode().equals("QueueAlreadyExists")) {
        throw e;
    }
}
列出队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
ListQueuesResult lq_result = sqs.listQueues();
System.out.println("Your SQS Queue URLs:");
for (String url : lq_result.getQueueUrls()) {
    System.out.println(url);
}
获取队列Url
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
删除队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
sqs.deleteQueue(queue_url);

消息操作

发送消息
SendMessageRequest send_msg_request = new SendMessageRequest()
        .withQueueUrl(queueUrl)
        .withMessageBody("hello world")
        .withDelaySeconds(5);
sqs.sendMessage(send_msg_request);
批量发送消息
SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
        .withQueueUrl(queueUrl)
        .withEntries(
                new SendMessageBatchRequestEntry(
                        "msg_1", "Hello from message 1"),
                new SendMessageBatchRequestEntry(
                        "msg_2", "Hello from message 2")
                        .withDelaySeconds(10));
sqs.sendMessageBatch(send_batch_request);
获取消息
List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
删除消息
for (Message m : messages) {
    sqs.deleteMessage(queueUrl, m.getReceiptHandle());
}

使用JMS方法

发送消息
public class TextMessageSender {
public static void main(String args[]) throws JMSException {
    ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);

    ExampleCommon.setupLogging();

    // Create the connection factory based on the config       
    SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                    .withRegion(config.getRegion().getName())
                    .withCredentials(config.getCredentialsProvider())
            );

    // Create the connection
    SQSConnection connection = connectionFactory.createConnection();

    // Create the queue if needed
    ExampleCommon.ensureQueueExists(connection, config.getQueueName());

    // Create the session
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );

    sendMessages(session, producer);

    // Close the connection. This closes the session automatically
    connection.close();
    System.out.println( "Connection closed" );
}

private static void sendMessages( Session session, MessageProducer producer ) {
    BufferedReader inputReader = new BufferedReader(
        new InputStreamReader( System.in, Charset.defaultCharset() ) );

    try {
        String input;
        while( true ) { 
            System.out.print( "Enter message to send (leave empty to exit): " );
            input = inputReader.readLine();
            if( input == null || input.equals("" ) ) break;

            TextMessage message = session.createTextMessage(input);
            producer.send(message);
            System.out.println( "Send message " + message.getJMSMessageID() );
        }
    } catch (EOFException e) {
        // Just return on EOF
    } catch (IOException e) {
        System.err.println( "Failed reading input: " + e.getMessage() );
    } catch (JMSException e) {
        System.err.println( "Failed sending message: " + e.getMessage() );
        e.printStackTrace();
    }
}
}
同步接收消息
public class SyncMessageReceiver {
public static void main(String args[]) throws JMSException {
ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);

ExampleCommon.setupLogging();

// Create the connection factory based on the config
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
        new ProviderConfiguration(),
        AmazonSQSClientBuilder.standard()
                .withRegion(config.getRegion().getName())
                .withCredentials(config.getCredentialsProvider())
        );

// Create the connection
SQSConnection connection = connectionFactory.createConnection();

// Create the queue if needed
ExampleCommon.ensureQueueExists(connection, config.getQueueName());

// Create the session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );

connection.start();

receiveMessages(session, consumer);

// Close the connection. This closes the session automatically
connection.close();
System.out.println( "Connection closed" );
}

private static void receiveMessages( Session session, MessageConsumer consumer ) {
try {
    while( true ) {
        System.out.println( "Waiting for messages");
        // Wait 1 minute for a message
        Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));
        if( message == null ) {
            System.out.println( "Shutting down after 1 minute of silence" );
            break;
        }
        ExampleCommon.handleMessage(message);
        message.acknowledge();
        System.out.println( "Acknowledged message " + message.getJMSMessageID() );
    }
} catch (JMSException e) {
    System.err.println( "Error receiving from SQS: " + e.getMessage() );
    e.printStackTrace();
}
}
}
异步接收消息
public class AsyncMessageReceiver {
public static void main(String args[]) throws JMSException, InterruptedException {
    ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);

    ExampleCommon.setupLogging();          

    // Create the connection factory based on the config
    SQSConnectionFactory connectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                    .withRegion(config.getRegion().getName())
                    .withCredentials(config.getCredentialsProvider())
            );

    // Create the connection
    SQSConnection connection = connectionFactory.createConnection();

    // Create the queue if needed
    ExampleCommon.ensureQueueExists(connection, config.getQueueName());

    // Create the session
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );

    ReceiverCallback callback = new ReceiverCallback();
    consumer.setMessageListener( callback );

    // No messages are processed until this is called
    connection.start();

    callback.waitForOneMinuteOfSilence();
    System.out.println( "Returning after one minute of silence" );

    // Close the connection. This closes the session automatically
    connection.close();
    System.out.println( "Connection closed" );
}


private static class ReceiverCallback implements MessageListener {
    // Used to listen for message silence
    private volatile long timeOfLastMessage = System.nanoTime();

    public void waitForOneMinuteOfSilence() throws InterruptedException {
        for(;;) {
            long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;
            long remainingTillOneMinuteOfSilence = 
                TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;
            if( remainingTillOneMinuteOfSilence < 0 ) {
                break;
            }
            TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);
        }
    }


    @Override
    public void onMessage(Message message) {
        try {
            ExampleCommon.handleMessage(message);
            message.acknowledge();
            System.out.println( "Acknowledged message " + message.getJMSMessageID() );
            timeOfLastMessage = System.nanoTime();
        } catch (JMSException e) {
            System.err.println( "Error processing message: " + e.getMessage() );
            e.printStackTrace();
        }
    }
}
}

https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-messages.html
https://docs.amazonaws.cn/en_us/AWSSimpleQueueService/latest/SQSDeveloperGuide/code-examples.html


以上是关于Amazon SQS 消息队列服务的主要内容,如果未能解决你的问题,请参考以下文章

探索 Amazon SQS 的架构

如何真正快速地从 Amazon SQS(简单队列服务)队列中删除事件?

Amazon SQS 消息多路传送

Amazon-SQS + Django-Celery 创建了数千个队列(每条消息一个队列)

使用 Node 轮询 Amazon SQS 队列的最有效方法

Amazon SQS 延迟队列