Django bulk_create 忽略导致 IntegrityError 的行?
Posted
技术标签:
【中文标题】Django bulk_create 忽略导致 IntegrityError 的行?【英文标题】:Django bulk_create with ignore rows that cause IntegrityError? 【发布时间】:2012-09-09 04:26:18 【问题描述】:我正在使用 bulk_create 将数千行或行加载到 postgresql 数据库中。不幸的是,有些行导致 IntegrityError 并停止了 bulk_create 进程。我想知道是否有办法告诉 django 忽略这些行并尽可能多地保存批处理?
【问题讨论】:
这可能是不可能的,因为 PostgreSQL 在第一个错误时中止事务。 Django 需要(a)在每次插入之前创建一个 SAVEPOINT,这会减慢速度并消耗资源;或 (b) 仅当行不存在时才使用过程或查询插入。就个人而言,我会批量插入到一个新的单独表中,可能是UNLOGGED
或TEMPORARY
,然后是INSERT INTO realtable SELECT * FROM temptable WHERE NOT EXISTS (SELECT 1 FROM realtable WHERE temptable.id = realtable.id)
或类似的。
@CraigRinger 好主意,但如果你有更大的模型,那就有点工作
【参考方案1】:
这对我有用 我在线程中使用这个函数。 我的 csv 文件包含 120907 行。
def products_create():
full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
filename = os.listdir(full_path)[0]
logger.debug(filename)
logger.debug(len(Product.objects.all()))
if len(Product.objects.all()) > 0:
logger.debug("Products Data Erasing")
Product.objects.all().delete()
logger.debug("Products Erasing Done")
csvfile = os.path.join(full_path,filename)
csv_df = pd.read_csv(csvfile,sep=',')
csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
row_iter = csv_df.iterrows()
logger.debug(row_iter)
logger.debug("New Products Creating")
for index, row in row_iter:
Product.objects.create(part_number = row[0],
part_description = row[1],
mrp = row[2],
hsn_code = row[3],
gst = row[4],
)
# products_list = [
# Product(
# part_number = row[0] ,
# part_description = row[1],
# mrp = row[2],
# hsn_code = row[3],
# gst = row[4],
# )
# for index, row in row_iter
# ]
# logger.debug(products_list)
# Product.objects.bulk_create(products_list)
logger.debug("Products uploading done")```
【讨论】:
【参考方案2】:Django 2.2 之前的项目的最新答案:
我最近遇到了这种情况,我找到了一个辅助列表数组来检查唯一性的方法。
在我的情况下,模型具有唯一的共同检查,并且由于批量创建的数组中有重复数据,因此批量创建会引发完整性错误异常。
所以我决定在批量创建对象列表之外创建清单。这是示例代码;唯一键是 owner 和 brand,在这个例子中,owner 是一个用户对象实例,brand 是一个字符串实例:
create_list = []
create_list_check = []
for brand in brands:
if (owner.id, brand) not in create_list_check:
create_list_check.append((owner.id, brand))
create_list.append(ProductBrand(owner=owner, name=brand))
if create_list:
ProductBrand.objects.bulk_create(create_list)
【讨论】:
【参考方案3】:这在 Django 2.2 上现在是可能的
Django 2.2 为bulk_create
方法添加了一个新的ignore_conflicts
选项,来自documentation:
在支持它的数据库上(除 PostgreSQL
例子:
Entry.objects.bulk_create([
Entry(headline='This is a test'),
Entry(headline='This is only a test'),
], ignore_conflicts=True)
【讨论】:
看起来,如果打开该选项,几乎没有性能影响。干得好姜戈!并感谢您的回答... 可悲的是,创建的模型上不会有PK:( @int_32 你在使用特殊的PK字段吗?至少在默认情况下,PK 可以很好地创建,除了返回的对象仍然有None
作为它们的 PK,但在数据库中它很好。
@HenryWoody 正确,我的意思是返回的对象没有 PK,在 DB 中当然可以。
当使用这种技术时,你是否得到任何关于哪些行无法插入的返回信息?【参考方案4】:
即使在 Django 1.11 中也无法做到这一点。我找到了比使用 Raw SQL 更好的选择。它使用djnago-query-builder。它有一个upsert 方法
from querybuilder.query import Query
q = Query().from_table(YourModel)
# replace with your real objects
rows = [YourModel() for i in range(10)]
q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])
注意:该库仅支持 postgreSQL
这是我用于批量插入的gist,它支持忽略 IntegrityErrors 并返回插入的记录。
【讨论】:
【参考方案5】:一个不涉及手动 SQL 和临时表的快速而简单的解决方法是尝试批量插入数据。如果失败,则恢复串行插入。
objs = [(Event), (Event), (Event)...]
try:
Event.objects.bulk_create(objs)
except IntegrityError:
for obj in objs:
try:
obj.save()
except IntegrityError:
continue
如果您有很多错误,这可能不是那么有效(您将花费更多时间连续插入而不是批量插入),但我正在处理一个重复很少的高基数数据集,所以这解决了我的大部分问题。
【讨论】:
在我个人看来,这确实是最好的,因为它允许您捕获最终作为“忽略冲突”的一部分而遗漏的错误。【参考方案6】:或者 5. 分而治之
我没有对此进行彻底的测试或基准测试,但它对我来说表现相当不错。 YMMV,具体取决于您预计在批量操作中会出现多少错误。
def psql_copy(records):
count = len(records)
if count < 1:
return True
try:
pg.copy_bin_values(records)
return True
except IntegrityError:
if count == 1:
# found culprit!
msg = "Integrity error copying record:\n%r"
logger.error(msg % records[0], exc_info=True)
return False
finally:
connection.commit()
# There was an integrity error but we had more than one record.
# Divide and conquer.
mid = count / 2
return psql_copy(records[:mid]) and psql_copy(records[mid:])
# or just return False
【讨论】:
【参考方案7】:(注意:我不使用Django,所以可能有更合适的框架特定的答案)
Django 不可能通过简单地忽略 INSERT
失败来做到这一点,因为 PostgreSQL 在第一个错误时中止整个事务。
Django 需要以下方法之一:
INSERT
每一行在一个单独的事务中并忽略错误(非常慢);
在每次插入之前创建一个SAVEPOINT
(可能存在缩放问题);
仅当行不存在时才使用过程或查询插入(复杂且缓慢);或
将数据批量插入或(更好)COPY
到 TEMPORARY
表中,然后将其合并到服务器端的主表中。
类似 upsert 的方法 (3) 似乎是个好主意,但 upsert and insert-if-not-exists are surprisingly complicated。
就我个人而言,我会采取 (4):我会批量插入到一个新的单独表中,可能是 UNLOGGED
或 TEMPORARY
,然后我会运行一些手动 SQL 来:
LOCK TABLE realtable IN EXCLUSIVE MODE;
INSERT INTO realtable
SELECT * FROM temptable WHERE NOT EXISTS (
SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);
LOCK TABLE ... IN EXCLUSIVE MODE
防止创建行的并发插入导致与上述语句完成的插入发生冲突并失败。它确实不阻止并发SELECT
s,只有SELECT ... FOR UPDATE
、INSERT
、UPDATE
和DELETE
,所以从表中读取正常进行。
如果您无法长时间阻止并发写入,您可以改为使用可写 CTE 将行范围从 temptable
复制到 realtable
,如果失败则重试每个块。
【讨论】:
谢谢@craig-ringer 我最终在将它们插入数据库之前清除了我的python对象列表,类似于你的方法#3,但在纯python中。 rodmtech.net/docs/django/…有(4)的详细例子以上是关于Django bulk_create 忽略导致 IntegrityError 的行?的主要内容,如果未能解决你的问题,请参考以下文章
Django model中数据批量导入bulk_create()