我是流的新手,我需要从 ftp 下载文件并作为流发送到 azure stageblock(npm @azure/storage-blob)

Posted

技术标签:

【中文标题】我是流的新手,我需要从 ftp 下载文件并作为流发送到 azure stageblock(npm @azure/storage-blob)【英文标题】:I am new to streams and i have requirement to download file from ftp and send as a stream to azure stageblock(npm @azure/storage-blob) 【发布时间】:2020-10-05 20:35:32 【问题描述】:

ftp 在可写流中发送数据,而 azure stageblock 方法接受可读流中的数据,当我尝试保存数据时出现错误。 错误:body 必须是字符串、Blob、ArrayBuffer、ArrayBufferView 或返回 NodeJS.ReadableStream 的函数。

    const ftp = require("basic-ftp");
    var fs = require("fs");
    const  BlobServiceClient ,BlockBlobClient = require('@azure/storage-blob');
    var Transform=require('stream');
    var stream = require('stream');
    var fileWriteStream = new stream.Transform();
async function connectFTP(retry = 1, recon = false) 

    const blobServiceClient = await BlobServiceClient.fromConnectionString("****");
    var containerClient = await blobServiceClient.getContainerClient("***");//Establishing ContainerClient
    const blockBlobClient = await containerClient.getBlockBlobClient("Myblock.txt");
    var fileSize=0;
    const client = new ftp.Client(0);
    client.ftp.verbose = false;

    var ftpCredentials = 
        
            host: "****",
            user: "***",
            password: "***$",
            secure: ***
        ;


        try 
            let fileSize=0;
            fileWriteStream._transform = async function (fileChunk, encoding, done) 
                 fileSize += fileChunk.length;

             this.push(fileChunk)

         //HERE I AM GETTING ERROR WHILE SENDING DATA AS TRANSORM AS IT CAN BE READABLE OR WRITABLE STREAM
                blockBlobClient.stageBlock(Buffer.from('id').toString('base64'), fileWriteStream, fileSize)
            ;
            return await client.access(ftpCredentials).then((conRes) => 
                conRes.ftpClient = client;
                 client.downloadTo(fileWriteStream,"/Demo Test 3/Patch16 Items.txt").then((res)=> console.log("success",res)
                 blockBlobClient.commitBlockList(["ALLBLOCKIDS"]).then((res)=>console.log("Res",res)).catch((err)=>console.log("ERR",err))
                ).catch((err)=>console.log(err))
                 console.log("client",client);
                return conRes;
            ).catch(async (conRes) => 
                //console.log('retry catch ', retry);
                console.log(conRes);

                if (conRes.code != 220) 
                    if (client.closed && retry <= 3) 
                        retry++;
                        console.log("retry")
                        return await connectFTP(retry)
                    
                    else 
                        return client;
                    
                
                else
                    return conRes;
            );
        
        catch (err) 
            console.log('err client ', err);
            return err;
        
    ;
    connectFTP();

【问题讨论】:

你能告诉我你为什么使用方法stageblock吗? 存储大文件。使用stage block我们可以推送少量数据,最后我们可以提交所有数据。 如果你在nodejs运行时运行你的应用,我们可以使用uploadStream上传文件:docs.microsoft.com/en-us/javascript/api/@azure/storage-blob/… 好的理解。但是,ftp downloadTo 方法发送写入流,uploadStream 接受可读流。因为我收到了上面提到的错误。如何将可写流转换为可读流。我试过了正在做转换流,但这不起作用。感谢您对此进行调查。 【参考方案1】:

根据我的测试,我们可以使用以下代码从 FTP 服务器下载文件,然后将文件上传到 Azure blob

var stream = require('stream');
var  ftp = require("basic-ftp");
var process =require('process')
const  BlobServiceClient = require('@azure/storage-blob');
var uuid = require("uuid");

example()
async function example() 

    const client = await new ftp.Client()
    client.ftp.verbose = true
    try 
        await client.access(
            host: "13.76.32.70",
            user: "testqw",
            password: "Password0123!"
        )
        const blobServiceClient = await BlobServiceClient.fromConnectionString("*");
        var containerClient = await blobServiceClient.getContainerClient("output");//Establishing ContainerClient
        const blockBlobClient = await containerClient.getBlockBlobClient("sample.csv");
        const blockList=[]
        const lowCaseTransform = await new stream.Transform();
        lowCaseTransform._transform= async function (fileChunk, encoding, callback)
            var id = uuid.v4();
            const blockId=Buffer.from(id).toString("base64")
            blockList.push(blockId)           
            await blockBlobClient.stageBlock(blockId,fileChunk,fileChunk.length);
            callback();

         
        lowCaseTransform.pipe(process.stdout)
        await  client.downloadTo(lowCaseTransform,"/home/testqw/sample.csv")

        const result= await blockBlobClient.commitBlockList(blockList);
        console.log(result._response.status)
       
        
    
    catch(err) 
        console.log(err)
    
     client.close()


更新

var containerClient = await blobServiceClient.getContainerClient("output");//Establishing ContainerClient
        const blockBlobClient = await containerClient.getBlockBlobClient("sample1.csv");
        const blockList=[]
        const lowCaseTransform = await new stream.Transform();
        const pass= await new stream.PassThrough()
        
        const r=[]
        lowCaseTransform._transform= async function (fileChunk, encoding, callback)
            //lowCaseTransform.push(fileChunk)
            r.push(fileChunk)
            callback();

           
        lowCaseTransform.pipe(process.stdout)
        await  client.downloadTo(lowCaseTransform,"/home/testqw/sample.csv")
       var t = Buffer.concat(r)

        const result= await blockBlobClient.upload(t,t.byteLength);
        console.log(result._response.status)

【讨论】:

是的。很好。但是我希望减少与 azure 的交互,因此不需要更多时间将其保存在 azure 存储帐户中,这就是我使用 stagedblock 方法的原因,所以在超出一些大小之后应该分阶段。不是每个文件块(因为这会导致更频繁地与 azure 交互并且需要时间)。感谢您对此进行调查。

以上是关于我是流的新手,我需要从 ftp 下载文件并作为流发送到 azure stageblock(npm @azure/storage-blob)的主要内容,如果未能解决你的问题,请参考以下文章

怎么从linux下载文件到win中?

CMD/FTP 使用到今天的日期创建文件夹并将 ftp 下载连接到创建的文件夹

从内存中的 FTP 下载 Zip 文件并解压缩

如何使用节点连接ftp并上传文件

如何从 FTP 下载文件并使用 PHP 将其流式传输到客户端

使用 Python 从 FTP 列出所有子目录中的所有文件