在 C# 中有效地将数据插入 MySQL 中的多个表中

Posted

技术标签:

【中文标题】在 C# 中有效地将数据插入 MySQL 中的多个表中【英文标题】:Insert Data into MySQL in multiple Tables in C# efficiently 【发布时间】:2014-10-12 04:42:09 【问题描述】:

我需要将一个巨大的 CSV 文件插入到 mysql 数据库中具有 1:n 关系的 2 个表中。

CSV 文件每周生成一次,大小约为 1GB,需要附加到现有数据中。 它们每个都有 2 个表有一个自动增量主键。

我试过了:

实体框架(在所有方法中花费的时间最多) 数据集(相同) 批量上传(不支持多表) 带参数的MySqlCommand(需要嵌套,我目前的做法) MySqlCommand 和 StoredProcedure 包括一个事务

还有什么建议吗?

假设这是我的数据结构:

public class User

    public string FirstName  get; set; 
    public string LastName  get; set; 
    public List<string> Codes  get; set; 

我需要从 csv 插入这个数据库:

       User   (1-n)   Code     
+---+-----+-----+ +---+---+-----+        
|PID|FName|LName| |CID|PID|Code | 
+---+-----+-----+ +---+---+-----+
| 1 |Jon  | Foo | | 1 | 1 | ed3 | 
| 2 |Max  | Foo | | 2 | 1 | wst | 
| 3 |Paul | Foo | | 3 | 2 | xsd | 
+---+-----+-----+ +---+---+-----+ 

这里是 CSV 文件的示例行

Jon;Foo;ed3,wst

LOAD DATA LOCAL INFILE 这样的批量加载是不可能的,因为我的写作权限受到限制

【问题讨论】:

你能详细说明一下“(不支持多表)。为什么不能一次批量插入一个文件?顺便说一下,INSERT INTO with params 与批量相比非常慢插入。 我建议批量上传到临时文件夹,这可以很快完成,然后使用 MySQL 存储过程,您可以将数据拆分到 UserCode 表。使用存储过程将比任何 C# 解决方案快得多。 当你说动态量是指数据的1-N关系?如果是这样,我看不出问题是什么。您能否提供您提供的样本的源数据? @fubo 你能改变表格结构吗?喜欢添加和删除列? @SirajMansour:如果您要删除一列,那么您还将删除表中已经存在的该列的数据。这违背了所有目的。 【参考方案1】:

1 - 将列 VirtualId 添加到 User 表和类。

已编辑 2 - 在循环中为每个User 对象中的 VirtualId 字段分配数字(使用从 -1 开始的负数以避免在最后一步中发生冲突)字段。对于属于User u 对象的每个Code c 对象,设置c.UserId = u.VirtualId

3 - 将用户批量加载到User 表中,将代码批量加载到Code 表中。

4-UPDATE CODE C,USER U SET C.UserId = U.Id WHERE C.UserId = U.VirtualId.

注意:如果您对 Code.UserId 有 FK 约束,您可以删除它并在插入后重新添加它。

public class User

    public int Id  get; set; 
    public string FirstName  get; set; 
    public string LastName  get; set; 
    public int VirtualId  get; set; 



public class Code

    public int Id  get; set; 
    public string Code  get; set; 
    public string UserId  get; set; 

【讨论】:

这就是为什么你应该添加一个 VirtualID 列,你可能错过了我的意思,你把 ID 留在原样,你不要碰它,你只需要添加 VirtualID 列和给它赋值 @fubo 我们已经在生产中使用了这种方法来处理比你更复杂的案例,试试吧,当涉及到大型数据集时,它是我们能找到的最快的。 @fubo Code Object/Table 中的 UserId 字段与 'User' 对象中的不一样,看看上面的类结构。请参阅我对第 2 步的编辑 @fubo 你没看懂我的方法,我给你画个数据表作为例子。 @fubo 太好了,这是所有建议中最快的方法。如果您尝试过,请给我您的反馈【参考方案2】:

鉴于数据量很大,最好的方法(性能方面)是将尽可能多的数据处理留给数据库而不是应用程序。

创建一个临时表,用于临时保存 .csv 文件中的数据。

CREATE TABLE `imported` (
    `id` int(11) NOT NULL,
    `firstname` varchar(45) DEFAULT NULL,
    `lastname` varchar(45) DEFAULT NULL,
    `codes` varchar(450) DEFAULT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

将数据从.csv 加载到该表中非常简单。我建议使用MySqlCommand(这也是您当前的方法)。此外,对所有 INSERT 语句使用相同的 MySqlConnection 对象将减少总执行时间

然后为了进一步处理数据,您可以创建一个存储过程来处理它。

假设这两个表(取自您的简化示例):

CREATE TABLE `users` (
  `PID` int(11) NOT NULL AUTO_INCREMENT,
  `FName` varchar(45) DEFAULT NULL,
  `LName` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`PID`)
) ENGINE=InnoDB AUTO_INCREMENT=3737 DEFAULT CHARSET=utf8;

CREATE TABLE `codes` (
  `CID` int(11) NOT NULL AUTO_INCREMENT,
  `PID` int(11) DEFAULT NULL,
  `code` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`CID`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;

你可以有下面的存储过程。

CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`()
BEGIN
    DECLARE fname VARCHAR(255);
    DECLARE lname VARCHAR(255);
    DECLARE codesstr VARCHAR(255);
    DECLARE splitted_value VARCHAR(255);
    DECLARE done INT DEFAULT 0;
    DECLARE newid INT DEFAULT 0;
    DECLARE occurance INT DEFAULT 0;
    DECLARE i INT DEFAULT 0;

    DECLARE cur CURSOR FOR SELECT firstname,lastname,codes FROM imported;
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;

    OPEN cur;

    import_loop: 
        LOOP FETCH cur INTO fname, lname, codesstr;
            IF done = 1 THEN
                LEAVE import_loop;
            END IF;

            INSERT INTO users (FName,LName) VALUES (fname, lname);
            SET newid = LAST_INSERT_ID();

            SET i=1;
            SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);

            WHILE i <= occurance DO
                SET splitted_value =
                    (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
                    LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));

                INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
                SET i = i + 1;
            END WHILE;
        END LOOP;
    CLOSE cur;
END

对于源数据中的每一行,它为user 表创建一个INSERT 语句。然后有一个WHILE 循环来拆分逗号分隔的代码,并为每个代码创建一个INSERTcodes 表的语句。

关于 LAST_INSERT_ID() 的使用,它在 PER CONNECTION 基础上是可靠的 (see doc here)。如果用于运行此存储过程的 MySQL 连接没有被其他事务使用,则使用LAST_INSERT_ID() 是安全的。

生成的 ID 在每个连接的基础上在服务器中维护。这意味着函数返回给给定客户端的值是为影响该客户端的 AUTO_INCREMENT 列的最新语句生成的第一个 AUTO_INCREMENT 值。此值不受其他客户端的影响,即使它们生成自己的 AUTO_INCREMENT 值。这种行为确保每个客户端都可以检索自己的 ID,而无需担心其他客户端的活动,也不需要锁或事务。

编辑:这是省略临时表imported 的OP 变体。您无需将 .csv 中的数据插入到imported 表中,而是调用 SP 将它们直接存储到您的数据库中。

CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255))
BEGIN
    DECLARE splitted_value VARCHAR(255);
    DECLARE done INT DEFAULT 0;
    DECLARE newid INT DEFAULT 0;
    DECLARE occurance INT DEFAULT 0;
    DECLARE i INT DEFAULT 0;

    INSERT INTO users (FName,LName) VALUES (fname, lname);
    SET newid = LAST_INSERT_ID();

    SET i=1;
    SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);

    WHILE i <= occurance DO
        SET splitted_value =
            (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
            LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));

        INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
        SET i = i + 1;
    END WHILE;
END

注意:拆分代码的代码取自here(MySQL不提供字符串拆分功能)。

【讨论】:

太棒了!如果没有临时表CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255)),这也是可能的,甚至更快 这样安全吗? SET newid = LAST_INSERT_ID(); @Byyo 这是一个很好的观点。它是线程安全的,我用相关信息更新了我的答案 @fubo 这是个好主意!是的,省略临时表可以使执行更快。不过,我相信,如果您可以选择批量导入 .csv 数据,我的 SP 会执行得更快。但在任何情况下,都应该测试并查看这两种方法的效果。【参考方案3】:

当您说“高效”时,您指的是记忆还是时间?

在提高插入速度方面,如果每个插入语句可以做多个值块,可以得到500%的速度提升。我在这个问题上做了一些基准测试:Which is faster: multiple single INSERTs or one multiple-row INSERT?

答案中描述了我的方法,但简单地说,一次读取多达 50 个“行”(要插入)并将它们捆绑到单个 INSERT INTO(...), VALUES(...),(...),(...)...(...),(...) 类型语句中似乎真的可以加快速度。至少,如果您被限制无法批量加载。

另一种方法是,如果您在上传期间无法删除索引的实时数据,则在没有索引的 mysql 服务器上创建一个内存表,将数据转储到那里,然后执行INSERT INTO live SELECT * FROM mem。虽然这在服务器上使用了更多内存,但因此在这个答案开头的问题是“你所说的'高效'是什么意思?” :)

哦,遍历文件并首先插入所有第一个表,然后再插入第二个表,这可能没有什么问题。除非数据被实时使用,我猜。在这种情况下,您肯定仍然可以使用捆绑方法,但执行此操作的应用程序逻辑要复杂得多。

更新: OP 请求了多值插入块的示例 C# 代码。

注意:此代码假定您已经配置了许多结构:

    tables List - 要插入的表名 fieldslist Dictionary> - 每个表的字段名称列表 typeslist Dictionary> - 每个表的MySqlDbTypes 列表,与字段名称的顺序相同。 nullslist Dictionary> - 用于指示每个表的字段是否可为空的标志列表(与字段名称的顺序相同)。 prikey Dictionary - 每个表的主键字段名称列表(注意:这不支持多个字段主键,但如果你需要它你可能会破解它 - 我想在某个地方我有一个支持这个的版本,但是......嗯)。 theData Dictionary>> - 实际数据,作为每个表的 fieldnum-value 字典列表。

哦,是的,本地命令是通过在本地 MySqlConnection 对象上使用 CreateCommand() 创建的 MySqlCommand。

进一步说明:我在很久以前写这篇文章的时候还想开始。如果这导致你的眼睛或大脑流血,我提前道歉:)

const int perinsert = 50;
foreach (string table in tables)

    string[] fields = fieldslist[table].ToArray();
    MySqlDbType[] types = typeslist[table].ToArray();
    bool[] nulls = nullslist[table].ToArray();

    int thisblock = perinsert;
    int rowstotal = theData[table].Count;
    int rowsremainder = rowstotal % perinsert;
    int rowscopied = 0;

    // Do the bulk (multi-VALUES block) INSERTs, but only if we have more rows than there are in a single bulk insert to perform:
    while (rowscopied < rowstotal)
    
        if (rowstotal - rowscopied < perinsert)
            thisblock = rowstotal - rowscopied;
        // Generate a 'perquery' multi-VALUES prepared INSERT statement:
        List<string> extravals = new List<string>();
        for (int j = 0; j < thisblock; j++)
            extravals.Add(String.Format("(@0_1)", j, String.Join(String.Format(", @0_", j), fields)));
        localcmd.CommandText = String.Format("INSERT INTO 0 VALUES1", tmptable, String.Join(",", extravals.ToArray()));
        // Now create the parameters to match these:
        for (int j = 0; j < thisblock; j++)
            for (int i = 0; i < fields.Length; i++)
                localcmd.Parameters.Add(String.Format("0_1", j, fields[i]), types[i]).IsNullable = nulls[i];

        // Keep doing bulk INSERTs until there's less rows left than we need for another one:
        while (rowstotal - rowscopied >= thisblock)
        
            // Queue up all the VALUES for this block INSERT:
            for (int j = 0; j < thisblock; j++)
            
                Dictionary<int, object> row = theData[table][rowscopied++];
                for (int i = 0; i < fields.Length; i++)
                    localcmd.Parameters[String.Format("0_1", j, fields[i])].Value = row[i];
            
            // Run the query:
            localcmd.ExecuteNonQuery();
        
        // Clear all the paramters - we're done here:
        localcmd.Parameters.Clear();
    

【讨论】:

您使用什么语言?基准是在 C# 中,所以...是的。基本上它涉及一个循环,我将 fieldname.i 作为参数添加到每个字段的查询中,然后为每一行 (i),然后我去为要插入的每个实际数据行填写这些字段名。然后我有一个包含多个值的大量准备好的查询。 这么多的工作,没有一个独角兽美元的回报! :D【参考方案4】:

AFAIK 在表中完成的插入是顺序的,而在不同表中的插入可以并行完成。打开两个单独的 new 连接到同一个数据库,然后可能使用任务并行库并行插入。

但是,如果表之间存在1:n关系的完整性约束,那么:

    插入可能会失败,因此任何并行插入方法都是错误的。显然,最好的办法是只进行顺序插入,一张接着一张。 您可以尝试对两个表的数据进行排序,编写下面编写的InsertInto 方法,这样只有在第一个表中插入数据后才会在第二个表中插入。

编辑:既然您提出了要求,如果您可以并行执行插入,以下是您可以使用的代码模板

private void ParallelInserts()

    ..
    //Other code in the method
    ..

    //Read first csv into memory. It's just a GB so should be fine
    ReadFirstCSV();

    //Read second csv into memory...
    ReadSecondCSV();

    //Because the inserts will last more than a few CPU cycles...
    var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None)

    //An array to hold the two parallel inserts
    var insertTasks = new Task[2];

    //Begin insert into first table...
    insertTasks[0] = taskFactory.StartNew(() => InsertInto(commandStringFirst, connectionStringFirst));

    //Begin insert into second table...
    insertTasks[1] = taskFactory.StartNew(() => InsertInto(commandStringSecond, connectionStringSecond));

    //Let them be done...
    Task.WaitAll(insertTasks);

    Console.WriteLine("Parallel insert finished.");



//Defining the InsertInto method which we are passing to the tasks in the method above
private static void InsertInto(string commandString, string connectionString)

    using (/*open a new connection using the connectionString passed*/)
    
        //In a while loop, iterate until you have 100/200/500 rows
        while (fileIsNotExhausted)
        
            using (/*commandString*/)
            
                //Execute command to insert in bulk
            
        
    

【讨论】:

如果 1:n 关系存在完整性约束,这可能不起作用 @JonKloske:在这种情况下,只能顺序插入表。当我们在表中插入时,插入总是排队的。因此,用户所能做的就是等待。感谢您指出这一点。 没问题 - 你的建议仍然可以工作,你只需要确保在父表中的相应行被插入之后严格地插入到依赖表中。 @fubo:如果你又被卡住了,请告诉我。我会尝试找到一些解决方法。我真的很喜欢并行方法。 @fubo:我在 InsertInto 方法中的 while 循环上方的注释,用于获取 100/200/500 行,用于批量插入。为了保持一对n的关系,可以修改这个数字,这样我们首先在主表中插入1部分,然后在引用表中插入n部分。【参考方案5】:

参考你的回答我会替换

using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))

    foreach (string Code in item.Codes)
    
        myCmdNested.Parameters.Add(new MySqlParameter("@UserID", UID));
        myCmdNested.Parameters.Add(new MySqlParameter("@Code", Code));
        myCmdNested.ExecuteNonQuery();
    

List<string> lCodes = new List<string>();
foreach (string code in item.Codes)

    lCodes.Add(String.Format("('0','1')", UID, MySqlHelper.EscapeString(code)));

string cCommand = "INSERT INTO Code (UserID, Code) VALUES " + string.Join(",", lCodes);
using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))

    myCmdNested.ExecuteNonQuery();

生成一个插入语句而不是item.Count

【讨论】:

@fubo:这个答案描述了InsertInto 方法的样子。【参考方案6】:

我使用实体框架开发了我的 WPF 应用程序应用程序并使用了 SQL Server 数据库,并且需要从 excel 文件中读取数据,并且必须将该数据插入到它们之间有关系的 2 个表中。对于大约 15000 行的 excel,它过去需要大约 4 小时的时间。然后我所做的是每次插入使用 500 行的块,这将我的插入速度加快到难以置信的速度,现在只需 3-5 秒即可导入相同的数据。

因此,我建议您一次将行添加到 100/200/500 之类的上下文中,然后调用 SaveChanges 方法(如果您真的想使用 EF)。还有其他有用的技巧可以加快 EF 的性能。请阅读this 供您参考。

var totalRecords = TestPacksData.Rows.Count;
var totalPages = (totalRecords / ImportRecordsPerPage) + 1;
while (count <= totalPages)

     var pageWiseRecords = TestPacksData.Rows.Cast<DataRow>().Skip(count * ImportRecordsPerPage).Take(ImportRecordsPerPage);
     count++;
     Project.CreateNewSheet(pageWiseRecords.ToList());
     Project.CreateNewSpool(pageWiseRecords.ToList());

这里是 CreateNewSheet 方法

/// <summary>
/// Creates a new Sheet record in the database
/// </summary>
/// <param name="row">DataRow containing the Sheet record</param>
public void CreateNewSheet(List<DataRow> rows)

     var tempSheetsList = new List<Sheet>();
     foreach (var row in rows)
     
         var sheetNo = row[SheetFields.Sheet_No.ToString()].ToString();
         if (string.IsNullOrWhiteSpace(sheetNo))
              continue;
         var testPackNo = row[SheetFields.Test_Pack_No.ToString()].ToString();
         TestPack testPack = null;
         if (!string.IsNullOrWhiteSpace(testPackNo))
              testPack = GetTestPackByTestPackNo(testPackNo);

         var existingSheet = GetSheetBySheetNo(sheetNo);
         if (existingSheet != null)
         
             UpdateSheet(existingSheet, row);
             continue;
         

         var isometricNo = GetIsometricNoFromSheetNo(sheetNo);
         var newSheet = new Sheet
         
             sheet_no = sheetNo,
             isometric_no = isometricNo,
             ped_rev = row[SheetFields.PED_Rev.ToString()].ToString(),
             gpc_rev = row[SheetFields.GPC_Rev.ToString()].ToString()
         ;
         if (testPack != null)
         
             newSheet.test_pack_id = testPack.id;
             newSheet.test_pack_no = testPack.test_pack_no;
         
         if (!tempSheetsList.Any(l => l.sheet_no == newSheet.sheet_no))
         
              DataStore.Context.Sheets.Add(newSheet);
              tempSheetsList.Add(newSheet);
         
   
   try
   
        DataStore.Context.SaveChanges();
        **DataStore.Dispose();** This is very important. Dispose the context
   
   catch (DbEntityValidationException ex)
   
       // Create log for the exception here
   

CreateNewSpool 是同上的方法,除了字段名和表名,因为它更新一个子表。但是思路是一样的

【讨论】:

第二张表的插入需要第一张表的ID/外键。 EF 如何在不单独插入每一行的情况下进行内部处理? +1 参考其他问题 在父表中插入时,获取id/外键值,然后用它在子表中插入值。这正是我所做的,我的表现很合我的胃口(4-5 秒)【参考方案7】:

你能把 CSV 分成两个文件吗?

例如假设您的文件具有以下列:

... A ... | ... B ... 
a0 | b0
a0 | b1
a0 | b2    <-- data
a1 | b3
a1 | b4

所以一组 A 可能有多个 B 条目。拆开后,你会得到:

... A ...
a0
a1

... B ...
b0
b1
b2
b3
b4

然后你单独批量插入它们。

编辑:伪代码

根据对话,类似:

DataTable tableA = ...; // query schema for TableA
DataTable tableB = ...; // query schmea for TableB

List<String> usernames = select distinct username from TableA;
Hashtable htUsername = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
foreach (String username in usernames)
     htUsername[username] = "";

int colUsername = ...;
foreach (String[] row in CSVFile) 
    String un = row[colUsername] as String;
    if (htUsername[un] == null) 
        // add new row to tableA
        DataRow row = tableA.NewRow();
        row["Username"] = un;
        // etc.
        tableA.Rows.Add(row);
        htUsername[un] = "";
    


// bulk insert TableA

select userid, username from TableA
Hashtable htUserId = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
// htUserId[username] = userid;

int colUserId = ...;

foreach (String[] row in CSVFile) 

    String un = row[colUsername] as String;
    int userid = (int) htUserId[un];
    DataRow row = tableB.NewRow();
    row[colUserId] = userId;
    // fill in other values
    tableB.Rows.Add(row);
    if (table.Rows.Count == 65000) 
        // bulk insert TableB
        var t = tableB.Clone();
        tableB.Dispose();
        tableB = t;
    


if (tableB.Rows.Count > 0)
    // bulk insert TableB

【讨论】:

基于表A的主键。如果没有主键则决定哪些列形成唯一的A。 所以你有作为主键的自动增量列? 表在批量插入之前是否被截断?还是将 csv 附加到现有数据中? 自动增量列 - 是的,在批量插入之前被截断的表 - 否

以上是关于在 C# 中有效地将数据插入 MySQL 中的多个表中的主要内容,如果未能解决你的问题,请参考以下文章

尝试在 C# 中添加反斜杠作为 MySql 插入语句的转义

如何有效地将元素插入数组的任意位置?

如何有效地将数据插入索引丰富的 oracle db?

如何根据非常大的df中的名称有效地将唯一ID分配给具有多个条目的个人

如何有效地将 hadoop 与大型 MySQL 数据库一起使用?

在 C# 中有效地将字符串转换为字节数组(不使用编码)[重复]