Django bulk_create 忽略导致 IntegrityError 的行?

Posted

技术标签:

【中文标题】Django bulk_create 忽略导致 IntegrityError 的行?【英文标题】:Django bulk_create with ignore rows that cause IntegrityError? 【发布时间】:2012-09-09 04:26:18 【问题描述】:

我正在使用 bulk_create 将数千行或行加载到 postgresql 数据库中。不幸的是,有些行导致 IntegrityError 并停止了 bulk_create 进程。我想知道是否有办法告诉 django 忽略这些行并尽可能多地保存批处理?

【问题讨论】:

这可能是不可能的,因为 PostgreSQL 在第一个错误时中止事务。 Django 需要(a)在每次插入之前创建一个 SAVEPOINT,这会减慢速度并消耗资源;或 (b) 仅当行不存在时才使用过程或查询插入。就个人而言,我会批量插入到一个新的单独表中,可能是UNLOGGEDTEMPORARY,然后是INSERT INTO realtable SELECT * FROM temptable WHERE NOT EXISTS (SELECT 1 FROM realtable WHERE temptable.id = realtable.id) 或类似的。 @CraigRinger 好主意,但如果你有更大的模型,那就有点工作 【参考方案1】:

这对我有用 我在线程中使用这个函数。 我的 csv 文件包含 120907 行。

def products_create():
    full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
    filename = os.listdir(full_path)[0]
    logger.debug(filename)
    logger.debug(len(Product.objects.all()))
    if len(Product.objects.all()) > 0:
        logger.debug("Products Data Erasing")
        Product.objects.all().delete()
        logger.debug("Products Erasing Done")

    csvfile = os.path.join(full_path,filename)
    csv_df = pd.read_csv(csvfile,sep=',')
    csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
    row_iter = csv_df.iterrows()
    logger.debug(row_iter)

    logger.debug("New Products Creating")

    for index, row in row_iter:
        Product.objects.create(part_number = row[0],
                               part_description = row[1],
                               mrp = row[2],
                               hsn_code = row[3],
                               gst = row[4],
                               )

    # products_list = [
    #           Product(
    #               part_number = row[0] ,
    #               part_description = row[1],
    #               mrp = row[2],
    #               hsn_code = row[3],
    #               gst = row[4],
    #           )
    #           for index, row in row_iter

    #       ]
    # logger.debug(products_list)

    # Product.objects.bulk_create(products_list)
    logger.debug("Products uploading done")```

【讨论】:

【参考方案2】:

Django 2.2 之前的项目的最新答案:

我最近遇到了这种情况,我找到了一个辅助列表数组来检查唯一性的方法。

在我的情况下,模型具有唯一的共同检查,并且由于批量创建的数组中有重复数据,因此批量创建会引发完整性错误异常。

所以我决定在批量创建对象列表之外创建清单。这是示例代码;唯一键是 ownerbrand,在这个例子中,owner 是一个用户对象实例,brand 是一个字符串实例:

create_list = []
create_list_check = []
for brand in brands:
    if (owner.id, brand) not in create_list_check:
        create_list_check.append((owner.id, brand))
        create_list.append(ProductBrand(owner=owner, name=brand))

if create_list:
    ProductBrand.objects.bulk_create(create_list)

【讨论】:

【参考方案3】:

这在 Django 2.2 上现在是可能的

Django 2.2 为bulk_create 方法添加了一个新的ignore_conflicts 选项,来自documentation:

在支持它的数据库上(除 PostgreSQL

例子:

Entry.objects.bulk_create([
    Entry(headline='This is a test'),
    Entry(headline='This is only a test'),
], ignore_conflicts=True)

【讨论】:

看起来,如果打开该选项,几乎没有性能影响。干得好姜戈!并感谢您的回答... 可悲的是,创建的模型上不会有PK:( @int_32 你在使用特殊的PK字段吗?至少在默认情况下,PK 可以很好地创建,除了返回的对象仍然有 None 作为它们的 PK,但在数据库中它很好。 @HenryWoody 正确,我的意思是返回的对象没有 PK,在 DB 中当然可以。 当使用这种技术时,你是否得到任何关于哪些行无法插入的返回信息?【参考方案4】:

即使在 Django 1.11 中也无法做到这一点。我找到了比使用 Raw SQL 更好的选择。它使用djnago-query-builder。它有一个upsert 方法

from querybuilder.query import Query
q = Query().from_table(YourModel)
# replace with your real objects
rows = [YourModel() for i in range(10)] 
q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])

注意:该库仅支持 postgreSQL

这是我用于批量插入的gist,它支持忽略 IntegrityErrors 并返回插入的记录。

【讨论】:

【参考方案5】:

一个不涉及手动 SQL 和临时表的快速而简单的解决方法是尝试批量插入数据。如果失败,则恢复串行插入。

objs = [(Event), (Event), (Event)...]

try:
    Event.objects.bulk_create(objs)

except IntegrityError:
    for obj in objs:
        try:
            obj.save()
        except IntegrityError:
            continue

如果您有很多错误,这可能不是那么有效(您将花费更多时间连续插入而不是批量插入),但我正在处理一个重复很少的高基数数据集,所以这解决了我的大部分问题。

【讨论】:

在我个人看来,这确实是最好的,因为它允许您捕获最终作为“忽略冲突”的一部分而遗漏的错误。【参考方案6】:

或者 5. 分而治之

我没有对此进行彻底的测试或基准测试,但它对我来说表现相当不错。 YMMV,具体取决于您预计在批量操作中会出现多少错误。

def psql_copy(records):
    count = len(records)
    if count < 1:
        return True
    try:
        pg.copy_bin_values(records)
        return True
    except IntegrityError:
        if count == 1:
            # found culprit!
            msg = "Integrity error copying record:\n%r"
            logger.error(msg % records[0], exc_info=True)
            return False
    finally:
        connection.commit()

    # There was an integrity error but we had more than one record.
    # Divide and conquer.
    mid = count / 2
    return psql_copy(records[:mid]) and psql_copy(records[mid:])
    # or just return False

【讨论】:

【参考方案7】:

(注意:我不使用Django,所以可能有更合适的框架特定的答案)

Django 不可能通过简单地忽略 INSERT 失败来做到这一点,因为 PostgreSQL 在第一个错误时中止整个事务。

Django 需要以下方法之一:

    INSERT 每一行在一个单独的事务中并忽略错误(非常慢); 在每次插入之前创建一个SAVEPOINT(可能存在缩放问题); 仅当行不存在时才使用过程或查询插入(复杂且缓慢);或 将数据批量插入或(更好)COPYTEMPORARY 表中,然后将其合并到服务器端的主表中。

类似 upsert 的方法 (3) 似乎是个好主意,但 upsert and insert-if-not-exists are surprisingly complicated。

就我个人而言,我会采取 (4):我会批量插入到一个新的单独表中,可能是 UNLOGGEDTEMPORARY,然后我会运行一些手动 SQL 来:

LOCK TABLE realtable IN EXCLUSIVE MODE;

INSERT INTO realtable 
SELECT * FROM temptable WHERE NOT EXISTS (
    SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);

LOCK TABLE ... IN EXCLUSIVE MODE 防止创建行的并发插入导致与上述语句完成的插入发生冲突并失败。它确实阻止并发SELECTs,只有SELECT ... FOR UPDATEINSERTUPDATEDELETE,所以从表中读取正常进行。

如果您无法长时间阻止并发写入,您可以改为使用可写 CTE 将行范围从 temptable 复制到 realtable,如果失败则重试每个块。

【讨论】:

谢谢@craig-ringer 我最终在将它们插入数据库之前清除了我的python对象列表,类似于你的方法#3,但在纯python中。 rodmtech.net/docs/django/…有(4)的详细例子

以上是关于Django bulk_create 忽略导致 IntegrityError 的行?的主要内容,如果未能解决你的问题,请参考以下文章

Django model中数据批量导入bulk_create()

django批量创建忽略重复项[重复]

django 的 bulk_create 是原子的吗?

django-pyodbc bulk_create 坏了

从 django bulk_create 中选择/查询对象?

如何获取使用 django bulk_create 创建的对象的主键