Azure 数据湖:将数据从 Blob 移动到 ADLS 时面临问题

Posted

技术标签:

【中文标题】Azure 数据湖:将数据从 Blob 移动到 ADLS 时面临问题【英文标题】:Azure Data Lake : Facing issue while moving data from Blob to ADLS 【发布时间】:2018-08-19 03:31:34 【问题描述】:

我正在 C# 中创建一个 Azure 函数,它执行以下操作:

从 blob 中提取压缩文件, 解压缩并将其复制到 Azure Data Lake Store。

我能够使用 UploadFromStreamAsync(stream) 函数解压缩文件并将其上传到另一个 blob。

但是,我在为 ADLS 做同样的事情时遇到了问题

我参考了以下链接 Upload to ADLS from file stream 并尝试首先使用 adlsFileSystemClient.FileSystem.Create 创建文件,然后在数据湖中使用 adlsFileSystemClient.FileSystem.Append 附加流,但它不起作用。 - create 方法创建了一个 零字节 文件,但 append 什么也不做, azure 函数仍然成功完成,没有任何错误。另外,尝试使用adlsFileSystemClient.FileSystem.AppendAsync 仍然是同样的问题。

代码:

 // Save blob(zip file) contents to a Memory Stream.
    using (var zipBlobFileStream = new MemoryStream())
    
        await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
        await zipBlobFileStream.FlushAsync(); 
        zipBlobFileStream.Position = 0;

        //use ZipArchive from System.IO.Compression to extract all the files from zip file
        using (var zip = new ZipArchive(zipBlobFileStream))
        
        //Each entry here represents an individual file or a folder
        foreach (var entry in zip.Entries)
        
            string destfilename = $"destcontanierPath2/"+entry.FullName;
            log.Info($"DestFilename: destfilename");
            //creating an empty file (blobkBlob) for the actual file with the same name of file
            var blob = extractcontainer.GetBlockBlobReference($"destfilename");
            using (var stream = entry.Open())
            
                //check for file or folder and update the above blob reference with actual content from stream
                if (entry.Length > 0)
                
                    await blob.UploadFromStreamAsync(stream);

                    //Creating a file and then append
                    adlsFileSystemClient.FileSystem.Create(_adlsAccountName, "/raw/Hello.txt",overwrite:true); 
                    // Appending the stream to Azure Data Lake 
                    using(var ms = new MemoryStream())
                    
                        stream.CopyTo(ms);
                        ms.Position = 0; // rewind
                        log.Info($"**********MemoryStream: ms");
                        // do something with ms
                        await adlsFileSystemClient.FileSystem.AppendAsync(_adlsAccountName, "/raw/Hello.txt",ms,0);
                    
                
                                       
        
        
    

新的临时解决方案:

    using (var zipBlobFileStream = new MemoryStream())
    
        await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
        using (var zip = new ZipArchive(zipBlobFileStream))
        
            //Each entry here represents an individual file or a folder
            foreach (var entry in zip.Entries)
               
                    entry.ExtractToFile(directoryPath + entry.FullName, true);
                    //Upload the File to ADLS
                    var parameters = new UploadParameters(directoryPath + entry.FullName, "/raw/" + md5, _adlsAccountName, isOverwrite: true, maxSegmentLength: 268435456 * 2);
                    var frontend = new Microsoft.Azure.Management.DataLake.StoreUploader.DataLakeStoreFrontEndAdapter(_adlsAccountName, adlsFileSystemClient);
                    var uploader = new DataLakeStoreUploader(parameters, frontend);
                    uploader.Execute();
                    File.Delete(directoryPath + entry.FullName);

            
        
    

【问题讨论】:

能否请您发布您的代码? @Thomas :我已经发布了代码。 【参考方案1】:

在您的情况下,您可以按以下方式更改您的代码,然后它应该可以工作。您应该从 foreach 子句中删除创建文件代码。

//Creating a file and then append
adlsFileSystemClient.FileSystem.Create(_adlsAccountName, "/raw/Hello.txt",overwrite:true); 

 using (var zipBlobFileStream = new MemoryStream())
    
        await blockBlob.DownloadToStreamAsync(zipBlobFileStream);
        await zipBlobFileStream.FlushAsync(); 
        zipBlobFileStream.Position = 0;

        //use ZipArchive from System.IO.Compression to extract all the files from zip file
        using (var zip = new ZipArchive(zipBlobFileStream))
        
        //Each entry here represents an individual file or a folder
        foreach (var entry in zip.Entries)
        
            string destfilename = $"destcontanierPath2/"+entry.FullName;
            log.Info($"DestFilename: destfilename");
            //creating an empty file (blobkBlob) for the actual file with the same name of file
            var blob = extractcontainer.GetBlockBlobReference($"destfilename");
            using (var stream = entry.Open())
            
                //check for file or folder and update the above blob reference with actual content from stream
                if (entry.Length > 0)
                
                    using (MemoryStream ms = new MemoryStream())
                            
                                stream.CopyTo(ms);
                                ms.Position = 0;
                                blob.UploadFromStream(ms);
                                ms.Position = 0;                                   
                      adlsFileSystemClient.FileSystem.Append(adlsAccountName, "/raw/Hello.txt", ms);

                            
                  
                
                                       
        
        
    

【讨论】:

非常感谢您提供上述解决方案。我已经尝试了上面的代码,但由于内存不足异常而失败。未压缩的文件在 180-250 MB 之间 我还更新了答案中的代码。如果您的压缩文件内存不足。我建议你使用 FileStream。您需要将其保存为临时文件。然后将其读取为 Filetream 。 作为临时解决方案,我做了同样的事情。不是最有效的方式,因为有一个额外的跃点。用新代码更新了我的问题

以上是关于Azure 数据湖:将数据从 Blob 移动到 ADLS 时面临问题的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地将大数据从数据中心移动到 Azure Blob 存储,以便以后通过 HDInsight 进行处理?

将 Azure 数据工厂上的数据管道从 SQL Server 复制到 Blob 存储

何时使用 Azure Blob 存储与 Azure 文件共享?

使用 Azure 数据工厂将数据从 Google Big Query 移动到 Azure Data Lake Store

将 Parquet 文件从 Azure 数据湖存储帐户复制到 Synapse 数据仓库表失败

Azure 数据工厂 - 删除活动时出错