使用 Petapoco 批量插入/更新
Posted
技术标签:
【中文标题】使用 Petapoco 批量插入/更新【英文标题】:Bulk insert/Update with Petapoco 【发布时间】:2011-09-29 12:30:43 【问题描述】:我正在使用Save()
方法来插入或更新记录,但我想让它执行一次批量插入和批量更新,并且只有一次数据库命中。我该怎么做?
【问题讨论】:
【参考方案1】:就我而言,我利用了database.Execute()
方法。
我创建了一个包含我插入的第一部分的 SQL 参数:
var sql = new Sql("insert into myTable(Name, Age, Gender) values");
for (int i = 0; i < pocos.Count ; ++i)
var p = pocos[i];
sql.Append("(@0, @1, @2)", p.Name, p.Age , p.Gender);
if(i != pocos.Count -1)
sql.Append(",");
Database.Execute(sql);
【讨论】:
如果列表中有很多项,它会在 PetaPoco "Sql.Append()" 方法中生成 ***。【参考方案2】:我尝试了两种不同的方法来插入大量行,速度比默认的 Insert 快(当你有很多行时,这会很慢)。
1) 首先用 poco 组成一个 List
using (var tr = PetaPocoDb.GetTransaction())
foreach (var record in listOfRecords)
PetaPocoDb.Insert(record);
tr.Complete();
2) SqlBulk复制一个DataTable:
var bulkCopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock);
bulkCopy.DestinationTableName = "SomeTable";
bulkCopy.WriteToServer(dt);
为了将我的列表 SqlBulkCopy 是最快的,比我对大约 1000 行进行的(快速)性能测试中的事务方法快 50% 左右。 第
【讨论】:
我认为您的第一种方法仍然会在每次插入时进入数据库 是的,将速度与组合插入 tsql 进行比较会很有趣。就我而言,当我发现我只比 bulkcopy 慢 50% 时,我停止了挖掘更多性能。【参考方案3】:插入一个 SQL 查询要快得多。
这是 PetaPoco.Database 类的客户方法,它增加了对任何集合进行批量插入的能力:
public void BulkInsertRecords<T>(IEnumerable<T> collection)
try
OpenSharedConnection();
using (var cmd = CreateCommand(_sharedConnection, ""))
var pd = Database.PocoData.ForType(typeof(T));
var tableName = EscapeTableName(pd.TableInfo.TableName);
string cols = string.Join(", ", (from c in pd.QueryColumns select tableName + "." + EscapeSqlIdentifier(c)).ToArray());
var pocoValues = new List<string>();
var index = 0;
foreach (var poco in collection)
var values = new List<string>();
foreach (var i in pd.Columns)
values.Add(string.Format("01", _paramPrefix, index++));
AddParam(cmd, i.Value.GetValue(poco), _paramPrefix);
pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
var sql = string.Format("INSERT INTO 0 (1) VALUES 2", tableName, cols, string.Join(", ", pocoValues));
cmd.CommandText = sql;
cmd.ExecuteNonQuery();
finally
CloseSharedConnection();
【讨论】:
非常好,将性能与我描述的事务方法进行比较会很有趣(我相信你的更快,但多少?)另外 - afaik - 如果你将插入包装在事务中,你应该获得一点额外的性能(blog.staticvoid.co.nz/2012/4/26/…)【参考方案4】:这是 Steve Jansen 答案的更新版本,最多分成 2100 个 pacos
我注释掉了以下代码,因为它在数据库中产生了重复...
//using (var reader = cmd.ExecuteReader())
//
// while (reader.Read())
//
// inserted.Add(reader[0]);
//
//
更新代码
/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted. Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
public object BulkInsert(IEnumerable<object> pocos)
Sql sql;
IList<PocoColumn> columns = new List<PocoColumn>();
IList<object> parameters;
IList<object> inserted;
PocoData pd;
Type primaryKeyType;
object template;
string commandText;
string tableName;
string primaryKeyName;
bool autoIncrement;
int maxBulkInsert;
if (null == pocos)
return new object[] ;
template = pocos.First<object>();
if (null == template)
return null;
pd = PocoData.ForType(template.GetType());
tableName = pd.TableInfo.TableName;
primaryKeyName = pd.TableInfo.PrimaryKey;
autoIncrement = pd.TableInfo.AutoIncrement;
//Calculate the maximum chunk size
maxBulkInsert = 2100 / pd.Columns.Count;
IEnumerable<object> pacosToInsert = pocos.Take(maxBulkInsert);
IEnumerable<object> pacosremaining = pocos.Skip(maxBulkInsert);
try
OpenSharedConnection();
try
var names = new List<string>();
var values = new List<string>();
var index = 0;
foreach (var i in pd.Columns)
// Don't insert result columns
if (i.Value.ResultColumn)
continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
primaryKeyType = i.Value.PropertyInfo.PropertyType;
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
names.Add(i.Key);
values.Add(autoIncExpression);
continue;
names.Add(_dbType.EscapeSqlIdentifier(i.Key));
values.Add(string.Format("01", _paramPrefix, index++));
columns.Add(i.Value);
string outputClause = String.Empty;
if (autoIncrement)
outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
commandText = string.Format("INSERT INTO 0 (1)2 VALUES",
_dbType.EscapeTableName(tableName),
string.Join(",", names.ToArray()),
outputClause
);
sql = new Sql(commandText);
parameters = new List<object>();
string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
bool isFirstPoco = true;
var parameterCounter = 0;
foreach (object poco in pacosToInsert)
parameterCounter++;
parameters.Clear();
foreach (PocoColumn column in columns)
parameters.Add(column.GetValue(poco));
sql.Append(valuesText, parameters.ToArray<object>());
if (isFirstPoco && pocos.Count() > 1)
valuesText = "," + valuesText;
isFirstPoco = false;
inserted = new List<object>();
using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
if (!autoIncrement)
DoPreExecute(cmd);
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
PocoColumn pkColumn;
if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
foreach (object poco in pocos)
inserted.Add(pkColumn.GetValue(poco));
return inserted.ToArray<object>();
object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);
if (pacosremaining.Any())
return BulkInsert(pacosremaining);
return id;
//using (var reader = cmd.ExecuteReader())
//
// while (reader.Read())
//
// inserted.Add(reader[0]);
//
//
//object[] primaryKeys = inserted.ToArray<object>();
//// Assign the ID back to the primary key property
//if (primaryKeyName != null)
//
// PocoColumn pc;
// if (pd.Columns.TryGetValue(primaryKeyName, out pc))
//
// index = 0;
// foreach (object poco in pocos)
//
// pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
// index++;
//
//
//
//return primaryKeys;
finally
CloseSharedConnection();
catch (Exception x)
if (OnException(x))
throw;
return null;
【讨论】:
【参考方案5】:下面是 PetaPoco 的 BulkInsert 方法,它扩展了 taylonr 非常聪明的想法,即使用通过INSERT INTO tab(col1, col2) OUTPUT inserted.[ID] VALUES (@0, @1), (@2, 3), (@4, @5), ..., (@n-1, @n)
插入多行的 SQL 技术。
它还返回插入记录的自动增量(标识)值,我认为 IvoTops 的实现中不会发生这种情况。
注意: SQL Server 2012(及更低版本)每个查询的参数限制为 2,100 个。 (这可能是 Zelid 的评论引用的堆栈溢出异常的来源)。您需要根据未装饰为Ignore
或Result
的列数手动拆分批次。例如,一个包含 21 列的 POCO 应该以 99 的批量大小发送,或者(2100 - 1) / 21
。我可能会重构它以根据 SQL Server 的此限制动态拆分批次;但是,通过管理此方法外部的批量大小,您将始终看到最佳结果。
与我之前在单个事务中为所有插入使用共享连接的技术相比,此方法的执行时间增加了大约 50%。
这是 Massive 真正大放异彩的一个领域——Massive 有一个 Save(params object[] things),它构建了一个 IDbCommands 数组,并在共享连接上执行每一个。它开箱即用,不会遇到参数限制。
/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted. Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
/// <remarks>
/// NOTE: As of SQL Server 2012, there is a limit of 2100 parameters per query. This limitation does not seem to apply on other platforms, so
/// this method will allow more than 2100 parameters. See http://msdn.microsoft.com/en-us/library/ms143432.aspx
/// The name of the table, it's primary key and whether it's an auto-allocated primary key are retrieved from the attributes of the first POCO in the collection
/// </remarks>
public object[] BulkInsert(IEnumerable<object> pocos)
Sql sql;
IList<PocoColumn> columns = new List<PocoColumn>();
IList<object> parameters;
IList<object> inserted;
PocoData pd;
Type primaryKeyType;
object template;
string commandText;
string tableName;
string primaryKeyName;
bool autoIncrement;
if (null == pocos)
return new object[] ;
template = pocos.First<object>();
if (null == template)
return null;
pd = PocoData.ForType(template.GetType());
tableName = pd.TableInfo.TableName;
primaryKeyName = pd.TableInfo.PrimaryKey;
autoIncrement = pd.TableInfo.AutoIncrement;
try
OpenSharedConnection();
try
var names = new List<string>();
var values = new List<string>();
var index = 0;
foreach (var i in pd.Columns)
// Don't insert result columns
if (i.Value.ResultColumn)
continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
primaryKeyType = i.Value.PropertyInfo.PropertyType;
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
names.Add(i.Key);
values.Add(autoIncExpression);
continue;
names.Add(_dbType.EscapeSqlIdentifier(i.Key));
values.Add(string.Format("01", _paramPrefix, index++));
columns.Add(i.Value);
string outputClause = String.Empty;
if (autoIncrement)
outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
commandText = string.Format("INSERT INTO 0 (1)2 VALUES",
_dbType.EscapeTableName(tableName),
string.Join(",", names.ToArray()),
outputClause
);
sql = new Sql(commandText);
parameters = new List<object>();
string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
bool isFirstPoco = true;
foreach (object poco in pocos)
parameters.Clear();
foreach (PocoColumn column in columns)
parameters.Add(column.GetValue(poco));
sql.Append(valuesText, parameters.ToArray<object>());
if (isFirstPoco)
valuesText = "," + valuesText;
isFirstPoco = false;
inserted = new List<object>();
using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
if (!autoIncrement)
DoPreExecute(cmd);
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
PocoColumn pkColumn;
if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
foreach (object poco in pocos)
inserted.Add(pkColumn.GetValue(poco));
return inserted.ToArray<object>();
// BUG: the following line reportedly causes duplicate inserts; need to confirm
//object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);
using(var reader = cmd.ExecuteReader())
while (reader.Read())
inserted.Add(reader[0]);
object[] primaryKeys = inserted.ToArray<object>();
// Assign the ID back to the primary key property
if (primaryKeyName != null)
PocoColumn pc;
if (pd.Columns.TryGetValue(primaryKeyName, out pc))
index = 0;
foreach(object poco in pocos)
pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
index++;
return primaryKeys;
finally
CloseSharedConnection();
catch (Exception x)
if (OnException(x))
throw;
return null;
【讨论】:
Another answer 在这里指出了自动增量路径中的一个严重错误,该错误导致每条记录被插入两次 - 但在此过程中有点搞砸了一切。我需要做的就是删除以下行:object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);
。我会继续编辑您的帖子,但您可能应该检查以确保确实可以安全地删除所述行。
嗨@Aaronaught,感谢您的提醒。在我可以访问带有 VS 的 Windows 框进行测试之前,我已经注释掉了该行。这个错误并不让我感到惊讶,因为我为其创建此代码的数据库对于重复插入非常宽容。【参考方案6】:
这里是 BulkInsert 的代码,您可以添加到 v5.01 PetaPoco.cs
您可以将其粘贴到靠近第 1098 行常规插入的位置
你给它一个 Pocos 的 IEnumerable,它会将它发送到数据库
分批 x 一起。代码 90% 来自常规插入。
我没有性能比较,告诉我:)
/// <summary>
/// Bulk inserts multiple rows to SQL
/// </summary>
/// <param name="tableName">The name of the table to insert into</param>
/// <param name="primaryKeyName">The name of the primary key column of the table</param>
/// <param name="autoIncrement">True if the primary key is automatically allocated by the DB</param>
/// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
/// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>
public void BulkInsert(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos, int batchSize = 25)
try
OpenSharedConnection();
try
using (var cmd = CreateCommand(_sharedConnection, ""))
var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
// Create list of columnnames only once
var names = new List<string>();
foreach (var i in pd.Columns)
// Don't insert result columns
if (i.Value.ResultColumn)
continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
names.Add(i.Key);
continue;
names.Add(_dbType.EscapeSqlIdentifier(i.Key));
var namesArray = names.ToArray();
var values = new List<string>();
int count = 0;
do
cmd.CommandText = "";
cmd.Parameters.Clear();
var index = 0;
foreach (var poco in pocos.Skip(count).Take(batchSize))
values.Clear();
foreach (var i in pd.Columns)
// Don't insert result columns
if (i.Value.ResultColumn) continue;
// Don't insert the primary key (except under oracle where we need bring in the next sequence value)
if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
// Setup auto increment expression
string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
if (autoIncExpression != null)
values.Add(autoIncExpression);
continue;
values.Add(string.Format("01", _paramPrefix, index++));
AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
string outputClause = String.Empty;
if (autoIncrement)
outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
cmd.CommandText += string.Format("INSERT INTO 0 (1)2 VALUES (3)", _dbType.EscapeTableName(tableName),
string.Join(",", namesArray), outputClause, string.Join(",", values.ToArray()));
// Are we done?
if (cmd.CommandText == "") break;
count += batchSize;
DoPreExecute(cmd);
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
while (true);
finally
CloseSharedConnection();
catch (Exception x)
if (OnException(x))
throw;
/// <summary>
/// Performs a SQL Bulk Insert
/// </summary>
/// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
/// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>
public void BulkInsert(IEnumerable<object> pocos, int batchSize = 25)
if (!pocos.Any()) return;
var pd = PocoData.ForType(pocos.First().GetType());
BulkInsert(pd.TableInfo.TableName, pd.TableInfo.PrimaryKey, pd.TableInfo.AutoIncrement, pocos);
【讨论】:
【参考方案7】:如果您想要 BulkUpdate,请在同一行中:
public void BulkUpdate<T>(string tableName, string primaryKeyName, IEnumerable<T> pocos, int batchSize = 25)
try
object primaryKeyValue = null;
OpenSharedConnection();
try
using (var cmd = CreateCommand(_sharedConnection, ""))
var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
int count = 0;
do
cmd.CommandText = "";
cmd.Parameters.Clear();
var index = 0;
var cmdText = new StringBuilder();
foreach (var poco in pocos.Skip(count).Take(batchSize))
var sb = new StringBuilder();
var colIdx = 0;
foreach (var i in pd.Columns)
// Don't update the primary key, but grab the value if we don't have it
if (string.Compare(i.Key, primaryKeyName, true) == 0)
primaryKeyValue = i.Value.GetValue(poco);
continue;
// Dont update result only columns
if (i.Value.ResultColumn)
continue;
// Build the sql
if (colIdx > 0)
sb.Append(", ");
sb.AppendFormat("0 = 12", _dbType.EscapeSqlIdentifier(i.Key), _paramPrefix,
index++);
// Store the parameter in the command
AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
colIdx++;
// Find the property info for the primary key
PropertyInfo pkpi = null;
if (primaryKeyName != null)
pkpi = pd.Columns[primaryKeyName].PropertyInfo;
cmdText.Append(string.Format("UPDATE 0 SET 1 WHERE 2 = 34;\n",
_dbType.EscapeTableName(tableName), sb.ToString(),
_dbType.EscapeSqlIdentifier(primaryKeyName), _paramPrefix,
index++));
AddParam(cmd, primaryKeyValue, pkpi);
if (cmdText.Length == 0) break;
if (_providerName.IndexOf("oracle", StringComparison.OrdinalIgnoreCase) >= 0)
cmdText.Insert(0, "BEGIN\n");
cmdText.Append("\n END;");
DoPreExecute(cmd);
cmd.CommandText = cmdText.ToString();
count += batchSize;
cmd.ExecuteNonQuery();
OnExecutedCommand(cmd);
while (true);
finally
CloseSharedConnection();
catch (Exception x)
if (OnException(x))
throw;
【讨论】:
【参考方案8】:这是一个不错的 2018 年更新,使用来自 NuGet 的 FastMember:
private static void SqlBulkCopyPoco<T>(PetaPoco.Database db, IEnumerable<T> data)
var pd = PocoData.ForType(typeof(T), db.DefaultMapper);
using (var bcp = new SqlBulkCopy(db.ConnectionString))
using (var reader = ObjectReader.Create(data))
// set up a mapping from the property names to the column names
var propNames = typeof(T).GetProperties().Select(propertyInfo => propertyInfo.Name).ToArray();
foreach (var propName in propNames)
bcp.ColumnMappings.Add(propName, "[" + pd.GetColumnName(propName) + "]");
bcp.DestinationTableName = pd.TableInfo.TableName;
bcp.WriteToServer(reader);
【讨论】:
【参考方案9】:你可以对你的记录做一个 foreach。
foreach (var record in records)
db.Save(record);
【讨论】:
这只会创建一个数据库命中吗? 不,每条记录都会访问数据库一次。您希望如何在一次数据库命中中做到这一点?除非你只想生成一个更新语句并执行它。以上是关于使用 Petapoco 批量插入/更新的主要内容,如果未能解决你的问题,请参考以下文章