GCP Datastore Python - InvalidArgument:400 非事务性提交可能不包含影响同一实体的多个突变

Posted

技术标签:

【中文标题】GCP Datastore Python - InvalidArgument:400 非事务性提交可能不包含影响同一实体的多个突变【英文标题】:GCP Datastore Python - InvalidArgument: 400 A non-transactional commit may not contain multiple mutations affecting the same entity 【发布时间】:2021-12-13 15:56:36 【问题描述】:

我有一个 Cloud Run 应用程序,它会定期将记录写入 Cloud Datastore。每条记录每次都有相同的键,所以我在写的时候更新记录。 问题是这个应用程序会收到很多请求,因此会自动缩放。当所有这些自动缩放的实例写入 Cloud Datastore 时,有时它们都会同时尝试,这就是我看到下面提到的异常的时候:

google.api_core.exceptions.InvalidArgument: 400 A non-transactional commit may not contain multiple mutations affecting the same entity.

下面是上传功能的骨架代码。

def datastore_upload(records: list):
    client = datastore.Client()
    kind = "some_kind"
    entities = []
    for record in records:
        name = record['name']
        key = client.key(kind, name)
        task = datastore.Entity(key=key)
        task['x'] = record['x']
        task['y'] = record['y']
        entities.append(task)
    client.put_multi(entities)

【问题讨论】:

可能值得研究 PubSub 主题以挂钩消息请求队列 【参考方案1】:

您可以通过跟踪正在更新的实体来轻松解决此问题,例如

def datastore_upload(records: list):
    client = datastore.Client()
    kind = "some_kind"
    entities = 
    for record in records:
        name = record['name']
        key = client.key(kind, name)
        task = datastore.Entity(key=key)
        task['x'] = record['x']
        task['y'] = record['y']
        # Keep the last update for each task.
        entities[name] = task
    client.put_multi(list(entities.values()))

【讨论】:

这可能适用于其他情况,但不适用于我的,因为此函数同时被许多自动缩放实例的工作人员调用。 为什么不行。错误是您将同一个实体两次放入同一个批处理写入调用中。写入中的重复数据删除修复了 RPC 的问题。如果您的错误与 RPC 之间的冲突有关,那么这将需要不同的解决方案(例如事务)。 我的问题是后者。如何使用事务解决这个问题?

以上是关于GCP Datastore Python - InvalidArgument:400 非事务性提交可能不包含影响同一实体的多个突变的主要内容,如果未能解决你的问题,请参考以下文章

以增量方式将数据从 GCP Datastore 移动到 BigQuery 的最佳做法

在不覆盖实时数据库的情况下查询 GCP Datastore 备份

Spring+GCP Datastore:我使用存储库,并希望在开发或测试期间将其配置为使用数据存储模拟器

比较 GCP 数据存储区查询性能

GCP 数据存储模拟器不会安装在 OpenJDK 10 上

如何从BigQuery导出到Datastore?