模拟 AWS 服务和 Lambda 最佳实践

Posted

技术标签:

【中文标题】模拟 AWS 服务和 Lambda 最佳实践【英文标题】:Mocking AWS services and Lambda best practices 【发布时间】:2021-01-17 20:47:04 【问题描述】:

我正在开发一个简单的 AWS lambda 函数,该函数由 DynamoDB Streams 事件触发,并且应该将除 REMOVE 事件之外的所有记录转发到 SQS 队列。该功能按预期工作,这并不奇怪。

我想编写一个单元测试来测试在发生DELETE 事件时不向 SQS 提交任何内容的行为。我首先使用 aws-sdk-mock 进行了尝试。正如您在函数代码中看到的,我尝试通过在处理程序代码之外初始化 SQS 客户端来遵守 lambda 最佳实践。显然这会阻止 aws-sdk-mock 能够模拟 SQS 服务(GitHub 上有一个关于此的问题:https://github.com/dwyl/aws-sdk-mock/issues/206)。

然后我尝试使用 jest 模拟 SQS,这需要更多代码才能正确处理,但我最终遇到了同样的问题,需要将 SQS 的初始化放在违反的处理函数中lambda 最佳实践。

我如何为这个函数编写单元测试同时让 SQS 客户端 (const sqs: SQS = new SQS()) 在处理程序之外进行初始化?我是在以错误的方式模拟服务还是要更改处理程序的结构以使其更易于测试?

我知道这个 lambda 函数非常简单,可能不需要单元测试,但我将不得不编写更多逻辑更复杂的 lambda,我认为这个非常适合演示问题。

index.ts

import DynamoDBStreamEvent, DynamoDBStreamHandler from "aws-lambda";
import SQS = require("aws-sdk/clients/sqs");
import DynamoDB = require("aws-sdk/clients/dynamodb");

const sqs: SQS = new SQS()

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => 
    const QUEUE_URL = process.env.TARGET_QUEUE_URL
    if (QUEUE_URL.length == 0) 
        throw new Error('TARGET_QUEUE_URL not set or empty')
    
    await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => 
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                let request: SQS.SendMessageRequest = 
                    MessageAttributes: 
                        "EVENT_NAME": 
                            DataType: "String",
                            StringValue: record.eventName
                        
                    ,
                    MessageBody: JSON.stringify(unmarshalled),
                    QueueUrl: QUEUE_URL,
                
                return sqs.sendMessage(request).promise()
            )
    );

index.spec.ts

import DynamoDBRecord, DynamoDBStreamEvent, StreamRecord from "aws-lambda";
import AttributeValue from "aws-lambda/trigger/dynamodb-stream";
import handleDynamoDbEvent from "./index";
import AWSError from "aws-sdk/lib/error";
import PromiseResult, Request from "aws-sdk/lib/request";
import * as SQS from "aws-sdk/clients/sqs";
import mocked from "ts-jest/utils";
import DynamoDB = require("aws-sdk/clients/dynamodb");


jest.mock('aws-sdk/clients/sqs', () => 
    return jest.fn().mockImplementation(() => 
        return 
            sendMessage: (params: SQS.Types.SendMessageRequest, callback?: (err: AWSError, data: SQS.Types.SendMessageResult) => void): Request<SQS.Types.SendMessageResult, AWSError> => 
                // @ts-ignore
                const Mock = jest.fn<Request<SQS.Types.SendMessageResult, AWSError>>(()=>
                    return 
                        promise: (): Promise<PromiseResult<SQS.Types.SendMessageResult, AWSError>> => 
                            return new Promise<PromiseResult<SQS.SendMessageResult, AWSError>>(resolve => 
                                resolve(null)
                            )
                        
                    
                )
                return new Mock()
            
        
    )
);


describe.only('Handler test', () => 

    const mockedSqs = mocked(SQS, true)

    process.env.TARGET_QUEUE_URL = 'test'
    const OLD_ENV = process.env;

    beforeEach(() => 
        mockedSqs.mockClear()
        jest.resetModules();
        process.env = ...OLD_ENV;
    );

    it('should write INSERT events to SQS', async () => 
        console.log('Starting test')
        await handleDynamoDbEvent(createEvent(), null, null)
        expect(mockedSqs).toHaveBeenCalledTimes(1)
    );
)

【问题讨论】:

【参考方案1】:

我添加了一个从处理函数内部调用的初始化方法。如果之前已经调用过它,它会立即返回,否则将初始化 SQS 客户端。它可以很容易地扩展到初始化其他客户端。

这符合 lambda 最佳实践,并使测试代码有效。

let sqs: SQS = null
let initialized = false

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => 
    init()
    const QUEUE_URL = process.env.TARGET_QUEUE_URL
    if (QUEUE_URL.length == 0) 
        throw new Error('TARGET_QUEUE_URL not set or empty')
    
    await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => 
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                let request: SQS.SendMessageRequest = 
                    MessageAttributes: 
                        "EVENT_NAME": 
                            DataType: "String",
                            StringValue: record.eventName
                        
                    ,
                    MessageBody: JSON.stringify(unmarshalled),
                    QueueUrl: QUEUE_URL,
                
                return sqs.sendMessage(request).promise()
            )
    );


function init() 
    if (initialized) 
        return
    
    console.log('Initializing...')
    initialized = true
    sqs = new SQS()

【讨论】:

【参考方案2】:

我将如何处理这个问题的粗略想法:

我不会在主函数中进行实际的 SQS 发送/操作,而是为消息客户端创建一个接口。像这样:
interface QueueClient 
    send(eventName: string, body: string): Promise<any>;

并创建一个实现该接口以与 SQS 交互的实际类:
class SQSQueueClient implements QueueClient 
    queueUrl: string
    sqs: SQS

    constructor() 
        this.queueUrl = process.env.TARGET_QUEUE_URL;
        if (this.queueUrl.length == 0) 
            throw new Error('TARGET_QUEUE_URL not set or empty')
        
        this.sqs = new SQS();
    

    send(eventName: string, body: string): Promise<any> 
        let request: SQS.SendMessageRequest = 
            MessageAttributes: 
                "EVENT_NAME": 
                    DataType: "String",
                    StringValue: eventName
                
            ,
            MessageBody: body,
            QueueUrl: this.queueUrl,
        
        return this.sqs.sendMessage()
    

本课程了解如何将数据转换为 SQS 格式的详细信息

然后我将main函数分成2个。入口点只是解析队列url,创建一个实际的sqs队列客户端实例并调用process()。主要逻辑在process()
const queueClient = new SQSQueueClient();

export const handleDynamoDbEvent: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent, context, callback) => 
    return process(queueClient, event);


export const process = async (queueClient: QueueClient, event: DynamoDBStreamEvent) => 
    return await Promise.all(
        event.Records
            .filter(_ => _.eventName !== "REMOVE")
            .map((record) => 
                const unmarshalled = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);
                return queueClient.send(record.eventName, JSON.stringify(unmarshalled));
            )
    );

现在测试process() 中的主逻辑要容易得多。您可以提供一个模拟实例,通过手写实现接口QueueClient 或使用您喜欢的任何模拟框架 对于SQSQueueClient 类,对它进行单元测试并没有太多好处,所以我将更多地依赖集成测试(例如,使用类似 localstack 的东西)

我现在没有真正的 IDE,如果这里和那里有语法错误,请原谅我

【讨论】:

本例中,每次调用new SQSQueueClient(QUEUE_URL),都会调用handler,这会导致构造函数中的new SQS()被调用。这不符合 lambda 最佳实践。 对,在这种情况下,SQSQueueClient 的创建可以移到外面,队列 url 可能需要作为参数传递给send()。但想法可能还是一样 @mheck 我只是编辑代码以将SQSQueueClient 的创建移到外面。从环境变量解析队列url的逻辑也可以移到SQSQueueClient的构造函数中 你基本上是在实现依赖注入。顶层处理函数解析依赖关系并将它们作为参数注入到实际的处理程序实现中。我想,我也可以在没有 QueueClient 接口的情况下做到这一点。模拟 SQS 不是问题,而是让处理程序使用它。这样,它应该可以工作。

以上是关于模拟 AWS 服务和 Lambda 最佳实践的主要内容,如果未能解决你的问题,请参考以下文章

在 AWS 上设置数据管道的最佳实践? (Lambda/EMR/Redshift/雅典娜)

AWS 中的 cloudformation 最佳实践

AWS 架构最佳实践概述

最佳实践:带文件处理的 AWS ftp

更新 AWS ECS 服务任务的最佳实践

启用缓存时模拟服务人员和 Apollo 客户端的最佳实践