如何聚合 AWS SQS ApproximateNumberOfMessages

Posted

技术标签:

【中文标题】如何聚合 AWS SQS ApproximateNumberOfMessages【英文标题】:How to aggregate AWS SQS ApproximateNumberOfMessages 【发布时间】:2016-12-07 21:49:02 【问题描述】:

给定 X 个 SQS 队列,我如何将 ApproximateNumberOfMessages 聚合到一个 CloudWatch 指标中?

我希望根据队列中的消息数量自动调整组规模。使用多个 CloudWatch 警报(每个队列一个)会导致问题,因为一个队列将为空,而其他队列为“满”。

【问题讨论】:

【参考方案1】:

我实现这一点的方法是使用 AWS Lambda 和 Node.js。我添加了一个 CloudWatch 事件触发器来每分钟运行一次 lambda 函数。这会查询 sqs 队列,然后创建一个自定义 CloudWatch 指标,然后您可以将其用于自动缩放组进行缩放。

var AWS = require('aws-sdk');
var sqs = new AWS.SQS();
var cloudWatch = new AWS.CloudWatch();

var queueUrls = ['https://sqs.REGION.amazonaws.com/ACCOUNT-NUMBER/queueUrl1','https://sqs.REGION.amazonaws.com/ACCOUNT-NUMBER/queueUrl2'];

exports.handler = (event, context, callback) => 
    var fn = function (url) 
        return new Promise(resolve => 
            var sqsParams = 
                AttributeNames: ['ApproximateNumberOfMessages'],
                QueueUrl: url
            ;

            sqs.getQueueAttributes(sqsParams, function(err,data)
                if(err)
                
                    console.log(err,err.stack);
                    context.fail(err);
                
                else
                
                    resolve(name: url.split('/').pop(), messageCount: parseInt(data.Attributes.ApproximateNumberOfMessages));
                
            ); 
        );
    ;

    var actions = queueUrls.map(fn);
    Promise.all(actions).then(function(queues) 
        var messageCount = queues.map(function(m)return m.messageCount;);
        var queueNames = queues.map(function(n)return n.name;).join();

        var metricParams = 
            MetricData:[
                MetricName: 'ApproximateNumberOfMessages',
                Dimensions:[
                    Name: 'QueueName',
                    Value: queueNames
                ],
                Unit: 'Count',
                StatisticValues: 
                    Maximum: Math.max.apply(Math, messageCount),
                    Minimum: Math.min.apply(Math, messageCount),
                    SampleCount: queues.length,
                    Sum: messageCount.reduce((pv, cv) => pv+cv, 0)
                
            ],
            Namespace: 'AWS/SQS'
        ;
        cloudWatch.putMetricData(metricParams, function(err, metricData)
            if(err) console.log(err,err.stack);
            else console.log(metricData);
        );
    );
;

这段代码显然可以优化为处理超过 2 个队列,并且可能受益于异步瀑布。

编辑:更新为使用承诺。

EDIT2:连接 CloudWatch 指标的队列名称

【讨论】:

以上是关于如何聚合 AWS SQS ApproximateNumberOfMessages的主要内容,如果未能解决你的问题,请参考以下文章

AWS SQS - CDK - 如何创建主题过滤器

AWS Lambda 在向 SQS 发送消息之前完成

如何修改 Spring Cloud AWS 用来反序列化 SQS 消息的对象映射器?

AWS SQS JMS确认

AWS Beanstalk:SQS 的指数退避?

到 Kafka 的 AWS Sqs 源连接器