在 C# 中有效地将数据插入 MySQL 中的多个表中
Posted
技术标签:
【中文标题】在 C# 中有效地将数据插入 MySQL 中的多个表中【英文标题】:Insert Data into MySQL in multiple Tables in C# efficiently 【发布时间】:2014-10-12 04:42:09 【问题描述】:我需要将一个巨大的 CSV 文件插入到 mysql 数据库中具有 1:n 关系的 2 个表中。
CSV 文件每周生成一次,大小约为 1GB,需要附加到现有数据中。 它们每个都有 2 个表有一个自动增量主键。
我试过了:
实体框架(在所有方法中花费的时间最多) 数据集(相同) 批量上传(不支持多表) 带参数的MySqlCommand(需要嵌套,我目前的做法) MySqlCommand 和 StoredProcedure 包括一个事务还有什么建议吗?
假设这是我的数据结构:
public class User
public string FirstName get; set;
public string LastName get; set;
public List<string> Codes get; set;
我需要从 csv 插入这个数据库:
User (1-n) Code
+---+-----+-----+ +---+---+-----+
|PID|FName|LName| |CID|PID|Code |
+---+-----+-----+ +---+---+-----+
| 1 |Jon | Foo | | 1 | 1 | ed3 |
| 2 |Max | Foo | | 2 | 1 | wst |
| 3 |Paul | Foo | | 3 | 2 | xsd |
+---+-----+-----+ +---+---+-----+
这里是 CSV 文件的示例行
Jon;Foo;ed3,wst
像LOAD DATA LOCAL INFILE
这样的批量加载是不可能的,因为我的写作权限受到限制
【问题讨论】:
你能详细说明一下“(不支持多表)。为什么不能一次批量插入一个文件?顺便说一下,INSERT INTO
with params 与批量相比非常慢插入。
我建议批量上传到临时文件夹,这可以很快完成,然后使用 MySQL 存储过程,您可以将数据拆分到 User
和 Code
表。使用存储过程将比任何 C# 解决方案快得多。
当你说动态量是指数据的1-N
关系?如果是这样,我看不出问题是什么。您能否提供您提供的样本的源数据?
@fubo 你能改变表格结构吗?喜欢添加和删除列?
@SirajMansour:如果您要删除一列,那么您还将删除表中已经存在的该列的数据。这违背了所有目的。
【参考方案1】:
1 - 将列 VirtualId 添加到 User
表和类。
已编辑
2 - 在循环中为每个User
对象中的 VirtualId 字段分配数字(使用从 -1 开始的负数以避免在最后一步中发生冲突)字段。对于属于User u
对象的每个Code c
对象,设置c.UserId = u.VirtualId
。
3 - 将用户批量加载到User
表中,将代码批量加载到Code
表中。
4-UPDATE CODE C,USER U SET C.UserId = U.Id WHERE C.UserId = U.VirtualId.
注意:如果您对 Code.UserId 有 FK 约束,您可以删除它并在插入后重新添加它。
public class User
public int Id get; set;
public string FirstName get; set;
public string LastName get; set;
public int VirtualId get; set;
public class Code
public int Id get; set;
public string Code get; set;
public string UserId get; set;
【讨论】:
这就是为什么你应该添加一个 VirtualID 列,你可能错过了我的意思,你把 ID 留在原样,你不要碰它,你只需要添加 VirtualID 列和给它赋值 @fubo 我们已经在生产中使用了这种方法来处理比你更复杂的案例,试试吧,当涉及到大型数据集时,它是我们能找到的最快的。 @fubo Code Object/Table 中的 UserId 字段与 'User' 对象中的不一样,看看上面的类结构。请参阅我对第 2 步的编辑 @fubo 你没看懂我的方法,我给你画个数据表作为例子。 @fubo 太好了,这是所有建议中最快的方法。如果您尝试过,请给我您的反馈【参考方案2】:鉴于数据量很大,最好的方法(性能方面)是将尽可能多的数据处理留给数据库而不是应用程序。
创建一个临时表,用于临时保存 .csv 文件中的数据。
CREATE TABLE `imported` (
`id` int(11) NOT NULL,
`firstname` varchar(45) DEFAULT NULL,
`lastname` varchar(45) DEFAULT NULL,
`codes` varchar(450) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
将数据从.csv
加载到该表中非常简单。我建议使用MySqlCommand
(这也是您当前的方法)。此外,对所有 INSERT
语句使用相同的 MySqlConnection
对象将减少总执行时间。
然后为了进一步处理数据,您可以创建一个存储过程来处理它。
假设这两个表(取自您的简化示例):
CREATE TABLE `users` (
`PID` int(11) NOT NULL AUTO_INCREMENT,
`FName` varchar(45) DEFAULT NULL,
`LName` varchar(45) DEFAULT NULL,
PRIMARY KEY (`PID`)
) ENGINE=InnoDB AUTO_INCREMENT=3737 DEFAULT CHARSET=utf8;
和
CREATE TABLE `codes` (
`CID` int(11) NOT NULL AUTO_INCREMENT,
`PID` int(11) DEFAULT NULL,
`code` varchar(45) DEFAULT NULL,
PRIMARY KEY (`CID`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;
你可以有下面的存储过程。
CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`()
BEGIN
DECLARE fname VARCHAR(255);
DECLARE lname VARCHAR(255);
DECLARE codesstr VARCHAR(255);
DECLARE splitted_value VARCHAR(255);
DECLARE done INT DEFAULT 0;
DECLARE newid INT DEFAULT 0;
DECLARE occurance INT DEFAULT 0;
DECLARE i INT DEFAULT 0;
DECLARE cur CURSOR FOR SELECT firstname,lastname,codes FROM imported;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
OPEN cur;
import_loop:
LOOP FETCH cur INTO fname, lname, codesstr;
IF done = 1 THEN
LEAVE import_loop;
END IF;
INSERT INTO users (FName,LName) VALUES (fname, lname);
SET newid = LAST_INSERT_ID();
SET i=1;
SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);
WHILE i <= occurance DO
SET splitted_value =
(SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));
INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
SET i = i + 1;
END WHILE;
END LOOP;
CLOSE cur;
END
对于源数据中的每一行,它为user
表创建一个INSERT
语句。然后有一个WHILE
循环来拆分逗号分隔的代码,并为每个代码创建一个INSERT
codes
表的语句。
关于 LAST_INSERT_ID()
的使用,它在 PER CONNECTION 基础上是可靠的 (see doc here)。如果用于运行此存储过程的 MySQL 连接没有被其他事务使用,则使用LAST_INSERT_ID()
是安全的。
生成的 ID 在每个连接的基础上在服务器中维护。这意味着函数返回给给定客户端的值是为影响该客户端的 AUTO_INCREMENT 列的最新语句生成的第一个 AUTO_INCREMENT 值。此值不受其他客户端的影响,即使它们生成自己的 AUTO_INCREMENT 值。这种行为确保每个客户端都可以检索自己的 ID,而无需担心其他客户端的活动,也不需要锁或事务。
编辑:这是省略临时表imported
的OP 变体。您无需将 .csv 中的数据插入到imported
表中,而是调用 SP 将它们直接存储到您的数据库中。
CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255))
BEGIN
DECLARE splitted_value VARCHAR(255);
DECLARE done INT DEFAULT 0;
DECLARE newid INT DEFAULT 0;
DECLARE occurance INT DEFAULT 0;
DECLARE i INT DEFAULT 0;
INSERT INTO users (FName,LName) VALUES (fname, lname);
SET newid = LAST_INSERT_ID();
SET i=1;
SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);
WHILE i <= occurance DO
SET splitted_value =
(SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));
INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
SET i = i + 1;
END WHILE;
END
注意:拆分代码的代码取自here(MySQL不提供字符串拆分功能)。
【讨论】:
太棒了!如果没有临时表CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255))
,这也是可能的,甚至更快
这样安全吗? SET newid = LAST_INSERT_ID();
@Byyo 这是一个很好的观点。它是线程安全的,我用相关信息更新了我的答案
@fubo 这是个好主意!是的,省略临时表可以使执行更快。不过,我相信,如果您可以选择批量导入 .csv 数据,我的 SP 会执行得更快。但在任何情况下,都应该测试并查看这两种方法的效果。【参考方案3】:
当您说“高效”时,您指的是记忆还是时间?
在提高插入速度方面,如果每个插入语句可以做多个值块,可以得到500%的速度提升。我在这个问题上做了一些基准测试:Which is faster: multiple single INSERTs or one multiple-row INSERT?
答案中描述了我的方法,但简单地说,一次读取多达 50 个“行”(要插入)并将它们捆绑到单个 INSERT INTO(...), VALUES(...),(...),(...)...(...),(...)
类型语句中似乎真的可以加快速度。至少,如果您被限制无法批量加载。
另一种方法是,如果您在上传期间无法删除索引的实时数据,则在没有索引的 mysql 服务器上创建一个内存表,将数据转储到那里,然后执行INSERT INTO live SELECT * FROM mem
。虽然这在服务器上使用了更多内存,但因此在这个答案开头的问题是“你所说的'高效'是什么意思?” :)
哦,遍历文件并首先插入所有第一个表,然后再插入第二个表,这可能没有什么问题。除非数据被实时使用,我猜。在这种情况下,您肯定仍然可以使用捆绑方法,但执行此操作的应用程序逻辑要复杂得多。
更新: OP 请求了多值插入块的示例 C# 代码。
注意:此代码假定您已经配置了许多结构:
-
tables List
MySqlDbType
s 列表,与字段名称的顺序相同。
nullslist Dictionary哦,是的,本地命令是通过在本地 MySqlConnection 对象上使用 CreateCommand() 创建的 MySqlCommand。
进一步说明:我在很久以前写这篇文章的时候还想开始。如果这导致你的眼睛或大脑流血,我提前道歉:)
const int perinsert = 50;
foreach (string table in tables)
string[] fields = fieldslist[table].ToArray();
MySqlDbType[] types = typeslist[table].ToArray();
bool[] nulls = nullslist[table].ToArray();
int thisblock = perinsert;
int rowstotal = theData[table].Count;
int rowsremainder = rowstotal % perinsert;
int rowscopied = 0;
// Do the bulk (multi-VALUES block) INSERTs, but only if we have more rows than there are in a single bulk insert to perform:
while (rowscopied < rowstotal)
if (rowstotal - rowscopied < perinsert)
thisblock = rowstotal - rowscopied;
// Generate a 'perquery' multi-VALUES prepared INSERT statement:
List<string> extravals = new List<string>();
for (int j = 0; j < thisblock; j++)
extravals.Add(String.Format("(@0_1)", j, String.Join(String.Format(", @0_", j), fields)));
localcmd.CommandText = String.Format("INSERT INTO 0 VALUES1", tmptable, String.Join(",", extravals.ToArray()));
// Now create the parameters to match these:
for (int j = 0; j < thisblock; j++)
for (int i = 0; i < fields.Length; i++)
localcmd.Parameters.Add(String.Format("0_1", j, fields[i]), types[i]).IsNullable = nulls[i];
// Keep doing bulk INSERTs until there's less rows left than we need for another one:
while (rowstotal - rowscopied >= thisblock)
// Queue up all the VALUES for this block INSERT:
for (int j = 0; j < thisblock; j++)
Dictionary<int, object> row = theData[table][rowscopied++];
for (int i = 0; i < fields.Length; i++)
localcmd.Parameters[String.Format("0_1", j, fields[i])].Value = row[i];
// Run the query:
localcmd.ExecuteNonQuery();
// Clear all the paramters - we're done here:
localcmd.Parameters.Clear();
【讨论】:
您使用什么语言?基准是在 C# 中,所以...是的。基本上它涉及一个循环,我将 fieldname.i 作为参数添加到每个字段的查询中,然后为每一行 (i),然后我去为要插入的每个实际数据行填写这些字段名。然后我有一个包含多个值的大量准备好的查询。 这么多的工作,没有一个独角兽美元的回报! :D【参考方案4】:AFAIK 在表中完成的插入是顺序的,而在不同表中的插入可以并行完成。打开两个单独的 new 连接到同一个数据库,然后可能使用任务并行库并行插入。
但是,如果表之间存在1:n关系的完整性约束,那么:
-
插入可能会失败,因此任何并行插入方法都是错误的。显然,最好的办法是只进行顺序插入,一张接着一张。
您可以尝试对两个表的数据进行排序,编写下面编写的
InsertInto
方法,这样只有在第一个表中插入数据后才会在第二个表中插入。
编辑:既然您提出了要求,如果您可以并行执行插入,以下是您可以使用的代码模板。
private void ParallelInserts()
..
//Other code in the method
..
//Read first csv into memory. It's just a GB so should be fine
ReadFirstCSV();
//Read second csv into memory...
ReadSecondCSV();
//Because the inserts will last more than a few CPU cycles...
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None)
//An array to hold the two parallel inserts
var insertTasks = new Task[2];
//Begin insert into first table...
insertTasks[0] = taskFactory.StartNew(() => InsertInto(commandStringFirst, connectionStringFirst));
//Begin insert into second table...
insertTasks[1] = taskFactory.StartNew(() => InsertInto(commandStringSecond, connectionStringSecond));
//Let them be done...
Task.WaitAll(insertTasks);
Console.WriteLine("Parallel insert finished.");
//Defining the InsertInto method which we are passing to the tasks in the method above
private static void InsertInto(string commandString, string connectionString)
using (/*open a new connection using the connectionString passed*/)
//In a while loop, iterate until you have 100/200/500 rows
while (fileIsNotExhausted)
using (/*commandString*/)
//Execute command to insert in bulk
【讨论】:
如果 1:n 关系存在完整性约束,这可能不起作用 @JonKloske:在这种情况下,只能顺序插入表。当我们在表中插入时,插入总是排队的。因此,用户所能做的就是等待。感谢您指出这一点。 没问题 - 你的建议仍然可以工作,你只需要确保在父表中的相应行被插入之后严格地插入到依赖表中。 @fubo:如果你又被卡住了,请告诉我。我会尝试找到一些解决方法。我真的很喜欢并行方法。 @fubo:我在InsertInto
方法中的 while 循环上方的注释,用于获取 100/200/500 行,用于批量插入。为了保持一对n的关系,可以修改这个数字,这样我们首先在主表中插入1
部分,然后在引用表中插入n
部分。【参考方案5】:
参考你的回答我会替换
using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))
foreach (string Code in item.Codes)
myCmdNested.Parameters.Add(new MySqlParameter("@UserID", UID));
myCmdNested.Parameters.Add(new MySqlParameter("@Code", Code));
myCmdNested.ExecuteNonQuery();
与
List<string> lCodes = new List<string>();
foreach (string code in item.Codes)
lCodes.Add(String.Format("('0','1')", UID, MySqlHelper.EscapeString(code)));
string cCommand = "INSERT INTO Code (UserID, Code) VALUES " + string.Join(",", lCodes);
using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))
myCmdNested.ExecuteNonQuery();
生成一个插入语句而不是item.Count
【讨论】:
@fubo:这个答案描述了InsertInto
方法的样子。【参考方案6】:
我使用实体框架开发了我的 WPF 应用程序应用程序并使用了 SQL Server 数据库,并且需要从 excel 文件中读取数据,并且必须将该数据插入到它们之间有关系的 2 个表中。对于大约 15000 行的 excel,它过去需要大约 4 小时的时间。然后我所做的是每次插入使用 500 行的块,这将我的插入速度加快到难以置信的速度,现在只需 3-5 秒即可导入相同的数据。
因此,我建议您一次将行添加到 100/200/500 之类的上下文中,然后调用 SaveChanges 方法(如果您真的想使用 EF)。还有其他有用的技巧可以加快 EF 的性能。请阅读this 供您参考。
var totalRecords = TestPacksData.Rows.Count;
var totalPages = (totalRecords / ImportRecordsPerPage) + 1;
while (count <= totalPages)
var pageWiseRecords = TestPacksData.Rows.Cast<DataRow>().Skip(count * ImportRecordsPerPage).Take(ImportRecordsPerPage);
count++;
Project.CreateNewSheet(pageWiseRecords.ToList());
Project.CreateNewSpool(pageWiseRecords.ToList());
这里是 CreateNewSheet 方法
/// <summary>
/// Creates a new Sheet record in the database
/// </summary>
/// <param name="row">DataRow containing the Sheet record</param>
public void CreateNewSheet(List<DataRow> rows)
var tempSheetsList = new List<Sheet>();
foreach (var row in rows)
var sheetNo = row[SheetFields.Sheet_No.ToString()].ToString();
if (string.IsNullOrWhiteSpace(sheetNo))
continue;
var testPackNo = row[SheetFields.Test_Pack_No.ToString()].ToString();
TestPack testPack = null;
if (!string.IsNullOrWhiteSpace(testPackNo))
testPack = GetTestPackByTestPackNo(testPackNo);
var existingSheet = GetSheetBySheetNo(sheetNo);
if (existingSheet != null)
UpdateSheet(existingSheet, row);
continue;
var isometricNo = GetIsometricNoFromSheetNo(sheetNo);
var newSheet = new Sheet
sheet_no = sheetNo,
isometric_no = isometricNo,
ped_rev = row[SheetFields.PED_Rev.ToString()].ToString(),
gpc_rev = row[SheetFields.GPC_Rev.ToString()].ToString()
;
if (testPack != null)
newSheet.test_pack_id = testPack.id;
newSheet.test_pack_no = testPack.test_pack_no;
if (!tempSheetsList.Any(l => l.sheet_no == newSheet.sheet_no))
DataStore.Context.Sheets.Add(newSheet);
tempSheetsList.Add(newSheet);
try
DataStore.Context.SaveChanges();
**DataStore.Dispose();** This is very important. Dispose the context
catch (DbEntityValidationException ex)
// Create log for the exception here
CreateNewSpool 是同上的方法,除了字段名和表名,因为它更新一个子表。但是思路是一样的
【讨论】:
第二张表的插入需要第一张表的ID/外键。 EF 如何在不单独插入每一行的情况下进行内部处理? +1 参考其他问题 在父表中插入时,获取id/外键值,然后用它在子表中插入值。这正是我所做的,我的表现很合我的胃口(4-5 秒)【参考方案7】:你能把 CSV 分成两个文件吗?
例如假设您的文件具有以下列:
... A ... | ... B ...
a0 | b0
a0 | b1
a0 | b2 <-- data
a1 | b3
a1 | b4
所以一组 A 可能有多个 B 条目。拆开后,你会得到:
... A ...
a0
a1
... B ...
b0
b1
b2
b3
b4
然后你单独批量插入它们。
编辑:伪代码
根据对话,类似:
DataTable tableA = ...; // query schema for TableA
DataTable tableB = ...; // query schmea for TableB
List<String> usernames = select distinct username from TableA;
Hashtable htUsername = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
foreach (String username in usernames)
htUsername[username] = "";
int colUsername = ...;
foreach (String[] row in CSVFile)
String un = row[colUsername] as String;
if (htUsername[un] == null)
// add new row to tableA
DataRow row = tableA.NewRow();
row["Username"] = un;
// etc.
tableA.Rows.Add(row);
htUsername[un] = "";
// bulk insert TableA
select userid, username from TableA
Hashtable htUserId = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
// htUserId[username] = userid;
int colUserId = ...;
foreach (String[] row in CSVFile)
String un = row[colUsername] as String;
int userid = (int) htUserId[un];
DataRow row = tableB.NewRow();
row[colUserId] = userId;
// fill in other values
tableB.Rows.Add(row);
if (table.Rows.Count == 65000)
// bulk insert TableB
var t = tableB.Clone();
tableB.Dispose();
tableB = t;
if (tableB.Rows.Count > 0)
// bulk insert TableB
【讨论】:
基于表A的主键。如果没有主键则决定哪些列形成唯一的A。 所以你有作为主键的自动增量列? 表在批量插入之前是否被截断?还是将 csv 附加到现有数据中? 自动增量列 - 是的,在批量插入之前被截断的表 - 否以上是关于在 C# 中有效地将数据插入 MySQL 中的多个表中的主要内容,如果未能解决你的问题,请参考以下文章
如何根据非常大的df中的名称有效地将唯一ID分配给具有多个条目的个人