如何在 AWS CDK 2.0 中编写稀疏的全局二级索引行?

Posted

技术标签:

【中文标题】如何在 AWS CDK 2.0 中编写稀疏的全局二级索引行?【英文标题】:How to write sparse Global Secondary Index rows in AWS CDK 2.0? 【发布时间】:2022-01-18 02:49:18 【问题描述】:

我正在尝试使用 AWS CDK 实现类似 this 的东西,其中我有一个每约 30 分钟写入一次的表,以及一个汇总每天的值的聚合函数。对表的原始写入将包含以下列:player, timestamp, skills, activities。我想要一个稀疏的 GSI 来每天汇总,所以这些行将包含 player, date, skills, activities 列。

这是我的 CDK 代码:

class TrackerStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        """Construct a TrackerStack."""

        super().__init__(scope, construct_id, **kwargs)

        table = ddb.Table(
            self,
            "GranularTable",
            partition_key=ddb.Attribute(name="player", type=ddb.AttributeType.STRING),
            sort_key=ddb.Attribute(name="timestamp", type=ddb.AttributeType.STRING),
            encryption=ddb.TableEncryption.AWS_MANAGED,
            read_capacity=5,
            write_capacity=5,
            removal_policy=RemovalPolicy.DESTROY,
            stream=ddb.StreamViewType.NEW_IMAGE,
        )
        table.add_global_secondary_index(
            index_name="DailyAggregate",
            partition_key=ddb.Attribute(name="player", type=ddb.AttributeType.STRING),
            sort_key=ddb.Attribute(name="date", type=ddb.AttributeType.STRING),
            read_capacity=3,
            write_capacity=3,
        )
        aggregation_lambda = _lambda.Function(
            self,
            "DailyAggregatorLambda",
            handler="aggregator.handler",
            code=_lambda.Code.from_asset("lambda/aggregator"),
            runtime=_lambda.Runtime.PYTHON_3_8,
            environment="TABLE_NAME": table.table_name,
        )
        table.grant_read_write_data(aggregation_lambda)
        aggregation_lambda.add_event_source(
            lambda_event_sources.DynamoEventSource(
                hiscores_table,
                starting_position=_lambda.StartingPosition.TRIM_HORIZON,
                batch_size=1,
            )
        )

这是我的 lambda 代码:

ddb = boto3.resource("dynamodb")
table = ddb.Table(os.environ["TABLE_NAME"])


def _timestamp_to_date(timestamp):
    return timestamp.split()[0]


def _image_map(_map):
    return _map["M"]


def _image_num(_map):
    return _map["N"]


def _image_str(_map):
    return _map["S"]


def handler(event, context):
    event_name = event["Records"][0]["eventName"]
    event_source = event["Records"][0]["eventSource"]
    logger.info(f"Processing Event 'event_name' from source 'event_source'.")

    new_image = event["Records"][0]["dynamodb"]["NewImage"]
    logger.info(f"Received image: new_image")

    if event_name == "INSERT":
        player_id = _image_str(new_image["player"])
        timestamp = _image_str(new_image["timestamp"])
        date = _timestamp_to_date(timestamp)

        # Increment divisor
        logger.debug(f"Incrementing divisor for player_id:date")
        table.update_item(
            Key="player": player_id, "date": date,
            UpdateExpression="ADD divisor :incr",
            ExpressionAttributeValues=":incr": 1,
        )

当我写入表时,聚合器被正确调用,但它无法写入新的全局二级索引:

[ERROR] ClientError: An error occurred (ValidationException) when calling the UpdateItem operation: The provided key element does not match the schema
Traceback (most recent call last):
  File "/var/task/aggregator.py", line 47, in handler
    table.update_item(
  File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
    response = action(self, *args, **kwargs)
  File "/var/runtime/boto3/resources/action.py", line 83, in __call__
    response = getattr(parent.meta.client, operation_name)(*args, **params)
  File "/var/runtime/botocore/client.py", line 386, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/var/runtime/botocore/client.py", line 705, in _make_api_call
    raise error_class(parsed_response, operation_name)

这是有道理的,因为date 属性不包含在写入原始表的行中。但是,Table API (reference) 似乎没有提供指定 AttributeDefinitions 的选项。我尝试在创建记录时向表中写入一个空的“日期”列,以便在架构中推断它,但出现以下错误(此错误用于写入空字符串;写入空值时出现类似错误) :

[ERROR] ClientError: An error occurred (ValidationException) when calling the PutItem operation: One or more parameter values are not valid. A value specified for a secondary index key is not supported. The AttributeValue for a key attribute cannot contain an empty string value. IndexName: DailyAggregate, IndexKey: date
Traceback (most recent call last):
  File "/var/task/get_and_parse_hiscores.py", line 47, in handler
    table.put_item(Item=payload)
  File "/var/runtime/boto3/resources/factory.py", line 520, in do_action
    response = action(self, *args, **kwargs)
  File "/var/runtime/boto3/resources/action.py", line 83, in __call__
    response = getattr(parent.meta.client, operation_name)(*args, **params)
  File "/var/runtime/botocore/client.py", line 386, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/var/runtime/botocore/client.py", line 705, in _make_api_call
    raise error_class(parsed_response, operation_name)

有没有办法使用这些工具来实现这个功能?

编辑:虽然Table API 不允许用户指定架构,但CfnTable API 允许 (reference)。我尝试使用CfnTable 来实现这一点:

class TrackerStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        """Construct a TrackerStack."""

        super().__init__(scope, construct_id, **kwargs)

        cfn_table = ddb.CfnTable(
            self,
            "GranularCfnTable",
            attribute_definitions=[
                ddb.CfnTable.AttributeDefinitionProperty(
                    attribute_name="date",
                    attribute_type="S",
                )
            ],
            key_schema=[
                ddb.CfnTable.KeySchemaProperty(
                    attribute_name="player", key_type="HASH"
                ),
                ddb.CfnTable.KeySchemaProperty(
                    attribute_name="timestamp", key_type="RANGE"
                ),
            ],
            global_secondary_indexes=[
                ddb.CfnTable.GlobalSecondaryIndexProperty(
                    index_name="DailyAggregate",
                    key_schema=[
                        ddb.CfnTable.KeySchemaProperty(
                            attribute_name="player", key_type="HASH"
                        ),
                        ddb.CfnTable.KeySchemaProperty(
                            attribute_name="date", key_type="RANGE"
                        ),
                    ],
                    projection=ddb.CfnTable.ProjectionProperty(projection_type="ALL"),
                    provisioned_throughput=ddb.CfnTable.ProvisionedThroughputProperty(
                        read_capacity_units=3,
                        write_capacity_units=3,
                    ),
                )
            ],
            provisioned_throughput=ddb.CfnTable.ProvisionedThroughputProperty(
                read_capacity_units=5,
                write_capacity_units=5,
            ),
            sse_specification=ddb.CfnTable.SSESpecificationProperty(sse_enabled=True),
            stream_specification=ddb.CfnTable.StreamSpecificationProperty(
                stream_view_type="NEW_IMAGE"
            ),
        )
        cfn_table.apply_removal_policy(RemovalPolicy.DESTROY)
        table = ddb.Table.from_table_arn(self, "GranularTable", cfn_table.attr_arn)
        aggregation_lambda = _lambda.Function(
            self,
            "DailyAggregatorLambda",
            handler="aggregator.handler",
            code=_lambda.Code.from_asset("lambda/aggregator"),
            runtime=_lambda.Runtime.PYTHON_3_8,
            environment=
                "TABLE_NAME": table.table_name,
            ,
        )
        table.grant_read_write_data(aggregation_lambda)
        aggregation_lambda.add_event_source(
            lambda_event_sources.DynamoEventSource(
                table,
                starting_position=_lambda.StartingPosition.TRIM_HORIZON,
                batch_size=1,
            )
        )

但是,cdk synth 失败并出现以下错误。我在协调 1 级 CloudFormation API 和 2 级 CDK API 时遇到了一些麻烦。

jsii.errors.JSIIError: DynamoDB Streams must be enabled on the table TrackerStack/GranularTable

【问题讨论】:

【参考方案1】:

您在表格设计和使用流事件聚合方面取得了不错的成绩。许多人努力做到这一点。 有几个问题需要排序才能使事情正常进行。好消息是修复涉及 消除当前设置的复杂性。

[编辑] 首先要做的事情是:您的更新操作失败不是因为 CDK 或架构问题,而是因为 update_item 缺少必需的 SK timestamp 字段。 Dynamo 需要主键的唯一值 - 您只提供了 player 值,而不是 timestamp。索引 SK 字段date 不是必需的。这就是稀疏索引中的“稀疏”!

接下来,“模式”。 DynamoDB 几乎是无模式的——几乎是它需要一个显式的简单或复合主键(PK 或 PK+SK)。 L1 构造 CfnTable.KeySchemaPropertyAttributeDefinition 设置这些。但是使用 L2 Table partition_keysort_key(以及它们的索引等效项)要容易得多,它们做同样的事情。

最后,关于餐桌设计的思考。使用复合键模式(也使用 OP 链接),您可以在没有 GSI 的情况下实现您的每日得分模式。 您可以通过查询PK=player1 AND begins_with(SK, "Daily")(使用Limit=1ScanIndexForward=False)获取player1 的最新一天得分。

PK SK
player1 PlayerInfo
player1 Daily#20211214
player1 Daily#20211215
player2 PlayerInfo
player2 Daily#20211214
player2 Daily#20211215

总之,回到你的问题:

AWS CDK 2.0 如何编写稀疏的全局二级索引行?

你没有。您在 CDK 中定义 GSI 及其密钥,但使用 SDK/Console/etc。实际写入行。*

有没有办法使用这些工具来实现这个功能?

是的。修复查询,将 Table Construct 回滚到 L2,一切都会为您准备就绪。


* 您可以使用 CDK Custom Resource 来播种初始行,但这是一个高级的必备功能,不是必须具备的。

【讨论】:

非常感谢您的快速回复!几个问题: 1. 我在table.update_item 下没有看到任何提及IndexName;我误会了吗? boto3.amazonaws.com/v1/documentation/api/latest/reference/… 2. 如何使用 L2 Table 构造设置多个排序键?是否像传递列表一样简单?这与我在docs.aws.amazon.com/cdk/api/latest/python/aws_cdk.aws_dynamodb/… 3 中看到的内容相矛盾。这是一个很好的观点,我现在将尝试这样做! 很高兴为您提供帮助!回复:1:我的错,我修复了答案。不要问我怎么做的,但我把update_item 误读为query...不过故事情节保持不变。 Re 2:从概念上讲,您不会在任何地方“提供多个排序键”。每个表(和每个索引)有 0 或 1 个排序键。也许这会有所帮助:在 Dynamo-land 中,将键列一般命名为 PKSK 是很常见的。此键命名约定对于所有酷孩子都使用的single table design pattern 中的列重载至关重要。 Re 2:如果仍然不相信,请打开 DynamoDB 控制台并尝试创建具有多个排序键的表。 啊,好吧。我感到困惑的是,在插入您希望使用 GSI 查询的行时,您仍然必须包含基表中的 PK 和 SK。所以我的“日期”行在这个模式中仍然需要一个“时间戳”。非常感谢您的澄清。我同意使用 DAILY_SENTINEL 来表示每日聚合行并重用时间戳排序键更容易,并以这种方式实现。我很快就会用我更新的代码发布一个答案,并接受你的答案:)。再次感谢您的帮助!

以上是关于如何在 AWS CDK 2.0 中编写稀疏的全局二级索引行?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 AWS CDK 中指定源安全组 ID?

如何在 AWS CDK 中使用 CfnParameter 而无需在运行时填写值

如何在 Cloudformation 模板/CDK 中添加 AWS IoT 配置模板

如何在 AWS CDK 的“CodeBuildAction”中指定区域?

如何在部署前从@aws-cdk/aws-appsync 检索 schema.graphql 文件

如何将现有资源从不同的 AWS 账户导入 AWS CDK