如何在不创建大缓冲区的情况下将 .NET 对象的大图序列化为 SQL Server BLOB?

Posted

技术标签:

【中文标题】如何在不创建大缓冲区的情况下将 .NET 对象的大图序列化为 SQL Server BLOB?【英文标题】:How to I serialize a large graph of .NET object into a SQL Server BLOB without creating a large buffer? 【发布时间】:2011-01-07 06:04:28 【问题描述】:

我们有如下代码:

ms = New IO.MemoryStream
bin = New System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
bin.Serialize(ms, largeGraphOfObjects)
dataToSaveToDatabase = ms.ToArray()
// put dataToSaveToDatabase in a Sql server BLOB

但是内存流从给我们带来问题的大内存堆中分配了一个大缓冲区。那么我们如何在不需要足够的空闲内存来保存序列化对象的情况下流式传输数据。

我正在寻找一种从 SQL 服务器获取 Stream 的方法,然后可以将其传递给 bin.Serialize() 以避免将所有数据保留在我的进程内存中。

同样用于读回数据...


更多背景知识。

这是一个复杂的数值处理系统的一部分,它近乎实时地处理数据以查找设备问题等,序列化是为了在数据馈送等数据质量出现问题时允许重新启动。(我们存储数据馈送,并且可以在操作员编辑掉错误值后重新运行它们。)

因此,我们序列化对象的频率要比反序列化的频率高很多。

我们正在序列化的对象包括非常大的数组,主要是双精度数以及许多小的“更正常”的对象。我们正在推动 32 位系统的内存限制,并使垃圾收集器非常努力地工作。 (系统中的其他地方正在改进这一点,例如重用大数组而不是创建新数组。)

通常状态的序列化是导致内存不足异常的last straw;我们的内存使用高峰总是在 这个序列化步骤。

认为我们在反序列化对象时会出现大内存池碎片,我预计考虑到数组的大小,大内存池碎片还会存在其他问题。 (这个还没有调查,因为第一次看到这个的人是数字处理专家,而不是内存管理专家。)

我们的客户混合使用 SQL Server 2000、2005 和 2008,如果可能,我们不希望每个版本的 SQL Server 都有不同的代码路径。

我们可以同时拥有多个活动模型(在不同的进程中,跨多台机器),每个模型可以有多个已保存的状态。因此,保存的状态存储在数据库 blob 中,而不是文件中。

由于保存状态的传播很重要,我宁愿不将对象序列化为文件,然后将文件一次一个块地放入 BLOB 中。

我提出的其他相关问题

How to Stream data from/to SQL Server BLOB fields? Is there a SqlFileStream like class that works with Sql Server 2005?

【问题讨论】:

“但是内存流从给我们带来问题的大内存堆中分配了一个大缓冲区” - 你能扩展一下吗?什么问题?内存不足等? @Mitch,我们在序列化对象时内存不足,我们认为在对对象进行反灭菌时也会遇到大内存池碎片的问题。有些对象是非常大的数组。 【参考方案1】:

没有内置的 ADO.Net 功能可以真正优雅地处理大数据。问题有两个方面:

没有 API 可以将 SQL 命令或参数“写入”到流中。接受流的参数类型(如FileStream)接受流以从中READ,这不符合write 到流中的序列化语义。无论您采用哪种方式,最终都会得到整个序列化对象的内存副本,很糟糕。 即使上面的问题可以解决(而且不能解决),TDS 协议和 SQL Server 接受参数的方式也不适用于大参数,因为在启动执行之前必须首先接收整个请求这将在 SQL Server 中创建对象的其他副本。

所以你真的必须从不同的角度来解决这个问题。幸运的是,有一个相当简单的解决方案。诀窍是使用高效的UPDATE .WRITE 语法并在一系列T-SQL 语句中逐个传递数据块。这是 MSDN 推荐的方式,见Modifying Large-Value (max) Data in ADO.NET。这看起来很复杂,但实际上很容易做到并插入 Stream 类。


BlobStream 类

这是解决方案的基础。将 Write 方法实现为对 T-SQL BLOB WRITE 语法的调用的 Stream 派生类。直截了当,唯一有趣的是它必须跟踪第一次更新,因为UPDATE ... SET blob.WRITE(...) 语法在 NULL 字段上会失败:

class BlobStream: Stream

    private SqlCommand cmdAppendChunk;
    private SqlCommand cmdFirstChunk;
    private SqlConnection connection;
    private SqlTransaction transaction;

    private SqlParameter paramChunk;
    private SqlParameter paramLength;

    private long offset;

    public BlobStream(
        SqlConnection connection,
        SqlTransaction transaction,
        string schemaName,
        string tableName,
        string blobColumn,
        string keyColumn,
        object keyValue)
    
        this.transaction = transaction;
        this.connection = connection;
        cmdFirstChunk = new SqlCommand(String.Format(@"
UPDATE [0].[1]
    SET [2] = @firstChunk
    WHERE [3] = @key"
            ,schemaName, tableName, blobColumn, keyColumn)
            , connection, transaction);
        cmdFirstChunk.Parameters.AddWithValue("@key", keyValue);
        cmdAppendChunk = new SqlCommand(String.Format(@"
UPDATE [0].[1]
    SET [2].WRITE(@chunk, NULL, NULL)
    WHERE [3] = @key"
            , schemaName, tableName, blobColumn, keyColumn)
            , connection, transaction);
        cmdAppendChunk.Parameters.AddWithValue("@key", keyValue);
        paramChunk = new SqlParameter("@chunk", SqlDbType.VarBinary, -1);
        cmdAppendChunk.Parameters.Add(paramChunk);
    

    public override void Write(byte[] buffer, int index, int count)
    
        byte[] bytesToWrite = buffer;
        if (index != 0 || count != buffer.Length)
        
            bytesToWrite = new MemoryStream(buffer, index, count).ToArray();
        
        if (offset == 0)
        
            cmdFirstChunk.Parameters.AddWithValue("@firstChunk", bytesToWrite);
            cmdFirstChunk.ExecuteNonQuery();
            offset = count;
        
        else
        
            paramChunk.Value = bytesToWrite;
            cmdAppendChunk.ExecuteNonQuery();
            offset += count;
        
    

    // Rest of the abstract Stream implementation
 

使用 BlobStream

要使用这个新创建的 blob 流类,您需要插入 BufferedStream。该类有一个简单的设计,只处理将流写入表的列。我将重用另一个示例中的表格:

CREATE TABLE [dbo].[Uploads](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [FileName] [varchar](256) NULL,
    [ContentType] [varchar](256) NULL,
    [FileData] [varbinary](max) NULL)

我将添加一个要序列化的虚拟对象:

[Serializable]
class HugeSerialized

    public byte[] theBigArray  get; set; 

最后是实际的序列化。我们将首先在Uploads 表中插入一条新记录,然后在新插入的 Id 上创建一个BlobStream,并将序列化直接调用到此流中:

using (SqlConnection conn = new SqlConnection(Settings.Default.connString))

    conn.Open();
    using (SqlTransaction trn = conn.BeginTransaction())
    
        SqlCommand cmdInsert = new SqlCommand(
@"INSERT INTO dbo.Uploads (FileName, ContentType)
VALUES (@fileName, @contentType);
SET @id = SCOPE_IDENTITY();", conn, trn);
        cmdInsert.Parameters.AddWithValue("@fileName", "Demo");
        cmdInsert.Parameters.AddWithValue("@contentType", "application/octet-stream");
        SqlParameter paramId = new SqlParameter("@id", SqlDbType.Int);
        paramId.Direction = ParameterDirection.Output;
        cmdInsert.Parameters.Add(paramId);
        cmdInsert.ExecuteNonQuery();

        BlobStream blob = new BlobStream(
            conn, trn, "dbo", "Uploads", "FileData", "Id", paramId.Value);
        BufferedStream bufferedBlob = new BufferedStream(blob, 8040);

        HugeSerialized big = new HugeSerialized  theBigArray = new byte[1024 * 1024] ;
        BinaryFormatter bf = new BinaryFormatter();
        bf.Serialize(bufferedBlob, big);

        trn.Commit();
    


如果您监控这个简单示例的执行,您会发现没有创建大型序列化流。该示例将分配 [1024*1024] 的数组,但这是出于演示目的而需要序列化的内容。此代码以缓冲方式逐块序列化,使用 SQL Server BLOB 建议的一次更新大小为 8040 字节。

【讨论】:

谢谢,没想到用 BufferedStream 做了这么辛苦的缓冲工作。 什么是“bytesToWrite = new MemoryStream(buffer, index, count).ToArray();”为了?我遗漏了什么,或者是否可以分配一个字节数组? 警告:我已经使用了这种技术,它工作了大约一年左右,但现在它拒绝工作(SQL 超时),因为我们的 BLOB 表的大小约为 12GB。问题似乎在于这种方法导致 SQL Server 一次分配小块空间的方式,迫使它大量复制数据。我想知道我们是否可以在开始时发出命令将 blob 初始化为正确的长度,但用零填充,然后使用UPDATE .WRITE 用真实数据填充它。也许这可以解决这个问题。仍在努力。保持发布。 很好的答案。附加说明:您可以在附加线程的帮助下将写入/推送流转换为读取/拉取流。写入器将推入拉流可以从中读取的缓冲区的有界队列。这会产生真正的流媒体。 我发现了这个解决方案的另一个问题。 如果你在追加字节,SQL Server 会忽略长度参数(即使总是传递偏移量),以及完整的缓冲区数据是写的,虽然length/count参数更小!【参考方案2】:

您只需要 .NET Framework 4.5 和流式传输。假设我们在 HDD 上有一个大文件,我们想上传这个文件。

SQL 代码:

CREATE TABLE BigFiles 
(
    [BigDataID] [int] IDENTITY(1,1) NOT NULL,
    [Data] VARBINARY(MAX) NULL
)

C#代码:

using (FileStream sourceStream = new FileStream(filePath, FileMode.Open))

    using (SqlCommand cmd = new SqlCommand(string.Format("UPDATE BigFiles SET Data=@Data WHERE BigDataID = @BigDataID"), _sqlConn))
    
        cmd.Parameters.AddWithValue("@Data", sourceStream);
        cmd.Parameters.AddWithValue("@BigDataID", entryId);

        cmd.ExecuteNonQuery();
    

对我有好处。我已经成功上传了 400mb 的文件,但是当我尝试将这个文件加载到内存中时,MemoryStream 抛出了异常。

UPD:此代码在 Windows 7 上有效,但在 Windows XP 和 2003 Server 上失败。

【讨论】:

sourceStream 应该是 fs 吗?数据列的类型是什么? 数据为 VARBINARY(MAX)。 sourceStream 等于 fs,对不起,我的错误,会更新帖子 为什么这个答案被否决了?根据文档msdn.microsoft.com/en-us/library/hh556234(v=vs.110).aspx,您可以将 SqlParameter.Value 设置为流。这是 .NET Fx 4.5 中引入的新功能 @vladimirkhozeyev 谢谢。 post 的另一个改进是包含表模式,即 sql 脚本。 请注意,这会在服务器上创建一个文件大小参数。大文件最终可能会占用tempdb 中的空间。【参考方案3】:

您始终可以使用 Microsoft 从一开始就使用的有线协议 TDS(表格数据流)在较低级别写入 SQL Server。即使 SQLAzure 使用它,他们也不太可能立即更改它!

您可以从 Mono 项目和 freetds 项目中查看其工作原理的源代码

查看tds_blob

http://www.mono-project.com/TDS_Generic

http://www.mono-project.com/SQLClient

http://www.freetds.org/

【讨论】:

【参考方案4】:

图表是什么样子的?

这里的一个问题是流; SQL 2005 的要求很痛苦,否则您可以直接写入 SqlFileStream,但是,我认为编写自己的缓冲 8040(或多个)字节并写入的 Stream 实现并不难它逐渐地。但是,我不确定是否值得这种额外的复杂性 - 我会非常倾向于只使用一个文件作为暂存缓冲区和 then(一旦序列化)循环在文件插入/附加块上。我不认为文件系统会损害你的整体性能,它会帮助你开始编写注定要失败的数据——也就是说,在你已经知道要写入什么数据之前,你不会与数据库交谈。它还可以帮助您最大限度地缩短连接打开的时间。

下一个问题是序列化本身。我个人建议使用BinaryFormatter 写入持久存储(仅用于传输),因为它在编码器本身和您的类型中都是特定于实现的(即,如果您对您的数据类型进行看似无辜的更改)。

如果您的数据可以充分表示为(而不是完整的图),我会很想尝试协议缓冲区/protobuf-net。这种编码(由 Google 设计)小于BinaryFormatter 输出,读取和写入速度更快,并且是基于合同而不是基于字段的,因此您可以稍后再次可靠地对其进行再水化(即使您完全切换平台) .

默认选项意味着它必须在每个对象之前写入对象长度(在您的情况下这可能很昂贵),但如果您有大型(深)对象的嵌套列表,您可以使用分组编码来避免这种需要- 允许它以仅向前的单通道方式写入流;这是一个使用分组编码的简短简单示例,但如果您想向我抛出更复杂的场景,请告诉我...

using System;
using System.Collections.Generic;
using System.IO;
using ProtoBuf;
[ProtoContract]
public class Foo 
    private readonly List<Bar> bars = new List<Bar>();
    [ProtoMember(1, DataFormat = DataFormat.Group)]
    public List<Bar> Bars  get  return bars;

[ProtoContract]
public class Bar 
    [ProtoMember(1)]
    public int Id  get; set; 
    [ProtoMember(2)]
    public string Name  get; set; 

static class Program 
    static void Main() 
        var obj = new Foo  Bars = 
            new Bar  Id = 123, Name = "abc",
            new Bar  Id = 456, Name = "def",
         ;
        // write it and show it
        using (MemoryStream ms = new MemoryStream()) 
            Serializer.Serialize(ms, obj);
            Console.WriteLine(BitConverter.ToString(ms.ToArray()));
        
    

注意:我确实一些关于如何破解 Google 的有线格式以支持完整图表的理论,但需要一些时间来尝试。哦,关于“非常大的数组”——对于原始类型(不是对象),你可以为此使用“打包”编码; [DataMember(..., Options = MemberSerializationOptions.Packed)] - 可能有用,但如果没有模型的可见性就很难说。

【讨论】:

至少我们不会遇到任何版本控制问题,因为保存的状态不需要能够被新版本的软件读取,因此我们可以应对BinaryFormatter 很脆弱的事实。【参考方案5】:

为什么不实现自己的 system::io:stream 派生类?这将允许您直接通过UpdateText 将其附加到 SQL 列进行写入。

例如(伪代码)

插入带有 blob 列的数据库记录 “初始化”(见上面的 UpdateText 文章) 创建您的流类型 / 将数据库连接与 流 将流传递给 序列化调用

它可以对它的调用进行分块(我假设一次是多个 8040 字节),并在每个完整的缓冲区上将其传递给具有适当偏移量的 DB UpdateText 调用。

在流关闭时,您将通过 UpdateText 刷新未完全填满缓冲区的剩余内容。

同样,您可以使用相同/相似的派生流来允许从 DB 列中读取数据,并将其传递给反序列化。

创建一个派生流并不是那么多工作——我已经在 C++/CLI 中完成它以提供与 IStream 的互操作性——如果我能做到的话:)...(我可以为您提供 C++/CLI如果有帮助,我将流代码作为示例完成)

如果您将整个操作(插入初始行、调用以通过流更新 blob)放入事务中,如果序列化步骤失败,您将避免任何潜在的数据库不一致。

【讨论】:

谢谢,我是这么想的;但是.net iostream 有很多方法,比如“seek”,很难实现。我希望有一个预先编写好的、经过良好测试 (nunit) 的开源实现,它可以使用后台线程将块写入数据库。 是的,当我最初为 IStreams 进行调查时,我就是这么想的——但实际上只有少数调用通常被实际使用——所以你几乎可以肯定地摆脱对硬调用的影响(比如只是抛出一个未实现的异常)显然不是一般情况 - 但您可能会发现在您的特定情况下它会工作得很好。序列化实际上是一个单向线性过程 - 我相信您会发现实际上只调用了 Read & Write。 A System.IO.Stream 不必支持搜索(网络流等)这就是为什么有 CanSeek 属性。 (或者你的意思是其他类型的流?) @SealedSun,但是你怎么知道从 API 到 3rd 方方法是现在需要寻求支持还是下一个版本需要寻求支持?我只是不喜欢带有可选方法的接口! IIRC SQL2K5 驱动程序有效地为整个 BLOb 分配了内存(缓存以便重新读取?),所以这是一个双刃剑问题。不确定 SQL2K8 驱动程序如何工作?在尝试此曲目之前值得检查【参考方案6】:

我会选择文件。基本上将文件系统用作 SQL Server 和您的应用程序之间的中介。

    序列化大对象时,将其序列化为FileStream

    将其导入数据库指示数据库在保存数据时直接使用该文件。可能看起来像这样:

    插入到 MyTable ( [我的专栏] ) SELECT b.BulkColumn, FROM OPENROWSET(BULK N'C:\Path To My File\File.ext', SINGLE_BLOB) as b

    读回数据时,指示SQL将大列作为临时文件保存回文件系统,反序列化到内存后删除(不用马上删除,尽可能缓存可以在这里完成)。不太确定 sql 命令的用途是什么,因为我肯定不是数据库专家,但我很确定一定有一个。

    再次使用 FileStream 对象将其反序列化回内存中。

这个过程可以概括为一个帮助类来完成它,它会知道什么时候删除那些临时文件,如果你确定sql数据记录的值没有改变,你可以重复使用它们。

【讨论】:

这行不通,因为数据库可能位于远程计算机上,并且试图让客户设置文件共享也很痛苦。 如何使用 CLR 存储过程来克服对文件共享的需求?将序列化文件保存在数据库中很容易,您只需要写入文件的权限 - 例如 Windows 临时文件?并且为了读取它,使用 CLR 存储过程并将临时文件流回反序列化的应用程序?【参考方案7】:

请注意,从 SQL Server 2012 开始,还有类似于 FILESTREAM 的 FileTable,除了它也允许非事务性访问。

https://msdn.microsoft.com/en-us/library/hh403405.aspx#CompareFileTable

【讨论】:

请解释为什么答案被否决。这是对问题的有效答案,因为允许将大量数据保存到 SQL Server(唯一的限制是磁盘空间)。数据是流式传输的,因此内存使用量最小。也适用于阅读。自 2010 年发布问题以来,OP 可能已从 SQL Server 2005 升级,无论如何,答案对于不受 SQL Server 版本限制的其他人绝对有用。

以上是关于如何在不创建大缓冲区的情况下将 .NET 对象的大图序列化为 SQL Server BLOB?的主要内容,如果未能解决你的问题,请参考以下文章

如何在不创建架构的情况下将 CSV 文件加载到 BigQuery

如何在不循环的情况下将数组(Range.Value)传递给本机 .NET 类型?

如何在不复制列标签的情况下将多个数据框写入同一张表

如何在不跟踪索引的情况下将元素附加到列表?

我们如何在不将 ViewController 对象推入其中的情况下将对象分配给 `UINavigationController`。

如何在不写入磁盘的情况下将 XML 从 Delphi 传递到 C#?