Django 批处理/批量更新或创建?

Posted

技术标签:

【中文标题】Django 批处理/批量更新或创建?【英文标题】:Django batching/bulk update_or_create? 【发布时间】:2015-01-18 18:55:21 【问题描述】:

我在数据库中有需要定期更新的数据。数据源会返回当时可用的所有内容,因此将包括数据库中尚未包含的新数据。

当我遍历源数据时,如果可能的话,我不想进行 1000 次单独的写入。

有没有像update_or_create这样但批量工作的东西?

一种想法是将update_or_create 与手动事务结合使用,但我不确定这是否只是将单个写入排队,或者是否会将它们全部组合到一个 SQL 插入中?

或者类似地可以在循环内使用update_or_create 的函数上使用@commit_on_success()

除了翻译数据并将其保存到模型之外,我没有对数据做任何事情。没有任何东西依赖于循环期间存在的模型

【问题讨论】:

我认为在大多数 sql server 中没有一个单独的更新或创建查询。 postgres 9.5 中有一个,但 django 不支持它。事务不会导致“单一”查询。它只会确保如果一个查询失败,所有查询都会失败。事实上,它会减慢所有查询。 更新。我对交易的看法是错误的。对所有操作使用单个事务将加快您的写入速度。这至少对于 postgres 和 sqlite 是正确的:github.com/coderholic/django-cities/pull/… 【参考方案1】:

批量更新将是一个 upsert 命令,就像@imposeren 所说,Postgres 9.5 为您提供了这种能力。我认为 mysql 5.7 也可以(请参阅http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html),具体取决于您的具体需求。也就是说,只使用 db 游标可能是最简单的。这没什么错,当 ORM 不够用时,它就在那里。

这些方面的东西应该可以工作。这是伪代码,所以不要只是剪切-粘贴,但这个概念是为你准备的。

class GroupByChunk(object):
    def __init__(self, size):
        self.count = 0
        self.size = size
        self.toggle = False

    def __call__(self, *args, **kwargs):
        if self.count >= self.size:  # Allows for size 0
            self.toggle = not self.toggle
            self.count = 0
        self.count += 1
        return self.toggle

def batch_update(db_results, upsert_sql):
    with transaction.atomic():
        cursor = connection.cursor()   
        for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
            cursor.execute_many(upsert_sql, chunk)

这里的假设是:

db_results 是某种结果迭代器,在列表或字典中 db_results 的结果可以直接输入到原始 sql exec 语句中 如果任何批量更新失败,您将全部回滚。如果你想为每个块移动它,只需将 with 块向下推一点

【讨论】:

【参考方案2】:

由于 Django 添加了对 bulk_update 的支持,现在这在一定程度上是可能的,尽管您需要每批执行 3 次数据库调用(一次获取、一次批量创建和一次批量更新)。在这里为通用功能创建一个良好的接口有点挑战性,因为您希望该功能既支持高效查询又支持更新。这是我为批量 update_or_create 设计的一种方法,在该方法中,您有许多通用标识键(可能为空)和一个标识键,该键因批次而异。

这是作为基础模型上的方法实现的,但可以独立使用。这也假设基础模型在名为updated_on 的模型上具有auto_now 时间戳;如果不是这种情况,则假定此情况的代码行已被注释以便于修改。

为了批量使用它,在调用它之前将你的更新分批。这也是一种绕过数据的方法,这些数据可能具有少量的辅助标识符值之一,而无需更改接口。

class BaseModel(models.Model):
    updated_on = models.DateTimeField(auto_now=True)
    
    @classmethod
    def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
        """
        common_keys: field_name: field_value
        unique_key_name: field_name
        unique_key_to_defaults: field_value: field_name: field_value
        
        ex. Event.bulk_update_or_create(
            "organization": organization, "external_id", 1234: "started": True
        )
        """
        with transaction.atomic():
            filter_kwargs = dict(common_keys)
            filter_kwargs[f"unique_key_name__in"] = unique_key_to_defaults.keys()
            existing_objs = 
                getattr(obj, unique_key_name): obj
                for obj in cls.objects.filter(**filter_kwargs).select_for_update()
            
            
            create_data = 
                k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
            
            for unique_key_value, obj in create_data.items():
                obj[unique_key_name] = unique_key_value
                obj.update(common_keys)
            creates = [cls(**obj_data) for obj_data in create_data.values()]
            if creates:
                cls.objects.bulk_create(creates)

            # This set should contain the name of the `auto_now` field of the model
            update_fields = "updated_on"
            updates = []
            for key, obj in existing_objs.items():
                obj.update(unique_key_to_defaults[key], save=False)
                update_fields.update(unique_key_to_defaults[key].keys())
                updates.append(obj)
            if existing_objs:
                cls.objects.bulk_update(updates, update_fields)
        return len(creates), len(updates)

    def update(self, update_dict=None, save=True, **kwargs):
        """ Helper method to update objects """
        if not update_dict:
            update_dict = kwargs
        # This set should contain the name of the `auto_now` field of the model
        update_fields = "updated_on"
        for k, v in update_dict.items():
            setattr(self, k, v)
            update_fields.add(k)
        if save:
            self.save(update_fields=update_fields)

示例用法:

class Event(BaseModel):
    organization = models.ForeignKey(Organization)
    external_id = models.IntegerField()
    started = models.BooleanField()


organization = Organization.objects.get(...)
updates_by_external_id = 
    1234: "started": True,
    2345: "started": True,
    3456: "started": False,

Event.bulk_update_or_create(
    "organization": organization, "external_id", updates_by_external_id
)

【讨论】:

太棒了!文笔不错。

以上是关于Django 批处理/批量更新或创建?的主要内容,如果未能解决你的问题,请参考以下文章

在 Django Rest Framework 中批量更新数据

Django在批量插入/更新/删除时“模拟”数据库触发行为

Django 批量插入数据自定义分页器多表关系的建立及Form组件(待更新。。。)

Grails批量处理锁定在桌子上

正确使用 Django 的批量更新追加到列表

Django 批量更新/插入性能