Lambda 函数中的过滤条件

Posted

技术标签:

【中文标题】Lambda 函数中的过滤条件【英文标题】:Filter Criteria in Lambda Function 【发布时间】:2022-01-18 15:42:07 【问题描述】:

我想使用我能够做到的 AWS CDK 在我的 lambda 上启用 DynamoDB 流,但我也想在 lambda 上启用 filter criteria

但我收到此错误:

过滤器模式定义无效。 (服务:AWSLambda;状态码:400;错误码:InvalidParameterValueException

这是我从 DynamoDB 流中获得的事件:


    "input": 
        "Records": [
            
                "eventID": "e92e0072a661a06df0e62e411f",
                "eventName": "INSERT",
                "eventVersion": "1.1",
                "eventSource": "aws:dynamodb",
                "awsRegion": "<region>",
                "dynamodb": 
                    "ApproximateCreationDateTime": 1639500357,
                    "Keys": 
                        "service": 
                            "S": "service"
                        ,
                        "key": 
                            "S": "key"
                        
                    ,
                    "NewImage": 
                        "service": 
                            "S": "service"
                        ,
                        "channel": 
                            "S": "email"
                        ,
                        "key": 
                            "S": "key"
                        
                    ,
                    "SequenceNumber": "711500000000015864417",
                    "SizeBytes": 168,
                    "StreamViewType": "NEW_IMAGE"
                ,
                "eventSourceARN": "arn:aws:dynamodb:<region>:<account>:table/table-name/stream/2021-12-14T13:00:29.888"
            
        ]
    ,
    "env": 
        "lambdaContext": 
            "callbackWaitsForEmptyEventLoop": true,
            "functionVersion": "$LATEST",
            "functionName": "functionName",
            "memoryLimitInMB": "128",
            "logGroupName": "/aws/lambda/functionName",
            "logStreamName": "2021/12/14/[$LATEST]028531c7b489b8ec69bace700acc0",
            "invokedFunctionArn": "arn:aws:lambda:<region>:<account>:function:functionName",
            "awsRequestId": "c72e80252-4722-b9f0-a03b7f8b820e"
        ,
        "region": "<region-name>"
    

事件源映射代码为:

const mapping = new lambda.CfnEventSourceMapping(this, 'event', 
  functionName: "functionName,
  batchSize: 1,
  bisectBatchOnFunctionError: true,
  startingPosition: lambda.StartingPosition.TRIM_HORIZON,
  eventSourceArn: <stream-arn>,
  filterCriteria: filter,
);

我想将 eventName 设为 INSERT 并将频道设为 email 此处。过滤条件的值应该是多少?它不适合我

【问题讨论】:

filter 是如何定义的? 过滤器定义的例子可以在这里查看docs.aws.amazon.com/lambda/latest/dg/… 我说的是变量filter。共享您定义的代码。 我正在使用这个 const filter = "Filters": [ "Pattern": " \"eventName\": [ \"INSERT\" ] " ] 而不是工作,我也想从 dynamodb 获取 channel = email 【参考方案1】:

这是 DynamoDB 流过滤器 Pattern 语法,用于新记录的 channel email

` \"eventName\": [\"INSERT\"], \"dynamodb\":  \"NewImage\": \"channel\":  \"S\" : [\"email\"]  `

换句话说,Pattern 是带有转义引号的字符串化 JSON filter rule。该模式应用于每个流记录。

这是完整的 CDK 语法。代码以通常的 L2 EventSourceMapping 开头。然后它使用escape hatch 语法在underlying L1 上设置FilterCriteria CfnEventSourceMapping

// start with the L2 type - Note: the OP code starts with a L1 `CfnEventSourceMapping`
const source: EventSourceMapping = new lambda.EventSourceMapping(this, 'EventSourceMapping', 
  target: func,
  eventSourceArn: table.tableStreamArn,
  startingPosition: lambda.StartingPosition.TRIM_HORIZON,
);

// escape hatch - get a L1 reference
const cfnSouce = source.node.defaultChild as lambda.CfnEventSourceMapping;

cfnSouce.addPropertyOverride('FilterCriteria', 
  Filters: [
    
      Pattern: ` \"eventName\": [\"INSERT\"], \"dynamodb\":  \"NewImage\": \"channel\":  \"S\" : [\"email\"]  `,
    ,
  ],
);

【讨论】:

它最初仍然无法正常工作,它在 cfnSource.addPropertyOverride 中给了我错误,无法访问未定义的属性,所以我将其更改为 cfnSource?.addPropertyOverride 并且我认为 cfnSource 未定义,这就是导致 cloudformation 的原因准备没有变化。它对你有用吗? . 是的,答案的代码按预期部署。您的错误可能是由于混淆了L1 (Cfn*) and L2 types。我的答案从 L2 source: EventSourceMapping 开始,并获得对其 L1 子节点 cfnSouce: CfnEventSourceMapping 的引用。您 L1 构造开始:mapping: CfnEventSourceMapping。如果做得好,这两种方法都可以正常工作。我编辑了答案,以使我们的不同起点更清晰。 为了简单起见:如果您的mapping 已经是CfnEventSourceMapping,您可以在没有.node.defaultChild 行的情况下调用mapping.addPropertyOverride 是的,非常感谢它的工作

以上是关于Lambda 函数中的过滤条件的主要内容,如果未能解决你的问题,请参考以下文章

每天一点小进步:lambda实现列表过滤&trim函数实现

JDK 1.8 Lambda表达式集合分组条件过滤组装去重排序转换求和最值

PySpark 中的窗口函数和条件过滤器

Lambda表达式where过滤数据

过滤 php 中的数组,同时具有值​​和键相关条件

如何在过滤器和 lambda 函数 pyspark 中使用多列