我是流的新手,我需要从 ftp 下载文件并作为流发送到 azure stageblock(npm @azure/storage-blob)
Posted
技术标签:
【中文标题】我是流的新手,我需要从 ftp 下载文件并作为流发送到 azure stageblock(npm @azure/storage-blob)【英文标题】:I am new to streams and i have requirement to download file from ftp and send as a stream to azure stageblock(npm @azure/storage-blob) 【发布时间】:2020-10-05 20:35:32 【问题描述】:ftp 在可写流中发送数据,而 azure stageblock 方法接受可读流中的数据,当我尝试保存数据时出现错误。 错误:body 必须是字符串、Blob、ArrayBuffer、ArrayBufferView 或返回 NodeJS.ReadableStream 的函数。
const ftp = require("basic-ftp");
var fs = require("fs");
const BlobServiceClient ,BlockBlobClient = require('@azure/storage-blob');
var Transform=require('stream');
var stream = require('stream');
var fileWriteStream = new stream.Transform();
async function connectFTP(retry = 1, recon = false)
const blobServiceClient = await BlobServiceClient.fromConnectionString("****");
var containerClient = await blobServiceClient.getContainerClient("***");//Establishing ContainerClient
const blockBlobClient = await containerClient.getBlockBlobClient("Myblock.txt");
var fileSize=0;
const client = new ftp.Client(0);
client.ftp.verbose = false;
var ftpCredentials =
host: "****",
user: "***",
password: "***$",
secure: ***
;
try
let fileSize=0;
fileWriteStream._transform = async function (fileChunk, encoding, done)
fileSize += fileChunk.length;
this.push(fileChunk)
//HERE I AM GETTING ERROR WHILE SENDING DATA AS TRANSORM AS IT CAN BE READABLE OR WRITABLE STREAM
blockBlobClient.stageBlock(Buffer.from('id').toString('base64'), fileWriteStream, fileSize)
;
return await client.access(ftpCredentials).then((conRes) =>
conRes.ftpClient = client;
client.downloadTo(fileWriteStream,"/Demo Test 3/Patch16 Items.txt").then((res)=> console.log("success",res)
blockBlobClient.commitBlockList(["ALLBLOCKIDS"]).then((res)=>console.log("Res",res)).catch((err)=>console.log("ERR",err))
).catch((err)=>console.log(err))
console.log("client",client);
return conRes;
).catch(async (conRes) =>
//console.log('retry catch ', retry);
console.log(conRes);
if (conRes.code != 220)
if (client.closed && retry <= 3)
retry++;
console.log("retry")
return await connectFTP(retry)
else
return client;
else
return conRes;
);
catch (err)
console.log('err client ', err);
return err;
;
connectFTP();
【问题讨论】:
你能告诉我你为什么使用方法stageblock
吗?
存储大文件。使用stage block我们可以推送少量数据,最后我们可以提交所有数据。
如果你在nodejs运行时运行你的应用,我们可以使用uploadStream
上传文件:docs.microsoft.com/en-us/javascript/api/@azure/storage-blob/…
好的理解。但是,ftp downloadTo 方法发送写入流,uploadStream 接受可读流。因为我收到了上面提到的错误。如何将可写流转换为可读流。我试过了正在做转换流,但这不起作用。感谢您对此进行调查。
【参考方案1】:
根据我的测试,我们可以使用以下代码从 FTP 服务器下载文件,然后将文件上传到 Azure blob
var stream = require('stream');
var ftp = require("basic-ftp");
var process =require('process')
const BlobServiceClient = require('@azure/storage-blob');
var uuid = require("uuid");
example()
async function example()
const client = await new ftp.Client()
client.ftp.verbose = true
try
await client.access(
host: "13.76.32.70",
user: "testqw",
password: "Password0123!"
)
const blobServiceClient = await BlobServiceClient.fromConnectionString("*");
var containerClient = await blobServiceClient.getContainerClient("output");//Establishing ContainerClient
const blockBlobClient = await containerClient.getBlockBlobClient("sample.csv");
const blockList=[]
const lowCaseTransform = await new stream.Transform();
lowCaseTransform._transform= async function (fileChunk, encoding, callback)
var id = uuid.v4();
const blockId=Buffer.from(id).toString("base64")
blockList.push(blockId)
await blockBlobClient.stageBlock(blockId,fileChunk,fileChunk.length);
callback();
lowCaseTransform.pipe(process.stdout)
await client.downloadTo(lowCaseTransform,"/home/testqw/sample.csv")
const result= await blockBlobClient.commitBlockList(blockList);
console.log(result._response.status)
catch(err)
console.log(err)
client.close()
更新
var containerClient = await blobServiceClient.getContainerClient("output");//Establishing ContainerClient
const blockBlobClient = await containerClient.getBlockBlobClient("sample1.csv");
const blockList=[]
const lowCaseTransform = await new stream.Transform();
const pass= await new stream.PassThrough()
const r=[]
lowCaseTransform._transform= async function (fileChunk, encoding, callback)
//lowCaseTransform.push(fileChunk)
r.push(fileChunk)
callback();
lowCaseTransform.pipe(process.stdout)
await client.downloadTo(lowCaseTransform,"/home/testqw/sample.csv")
var t = Buffer.concat(r)
const result= await blockBlobClient.upload(t,t.byteLength);
console.log(result._response.status)
【讨论】:
是的。很好。但是我希望减少与 azure 的交互,因此不需要更多时间将其保存在 azure 存储帐户中,这就是我使用 stagedblock 方法的原因,所以在超出一些大小之后应该分阶段。不是每个文件块(因为这会导致更频繁地与 azure 交互并且需要时间)。感谢您对此进行调查。以上是关于我是流的新手,我需要从 ftp 下载文件并作为流发送到 azure stageblock(npm @azure/storage-blob)的主要内容,如果未能解决你的问题,请参考以下文章
CMD/FTP 使用到今天的日期创建文件夹并将 ftp 下载连接到创建的文件夹