使用 FluentFTP 从 FTP 并发下载多个文件,最大值

Posted

技术标签:

【中文标题】使用 FluentFTP 从 FTP 并发下载多个文件,最大值【英文标题】:Download multiple files concurrently from FTP using FluentFTP with a maximum value 【发布时间】:2021-05-20 07:05:14 【问题描述】:

我想从 FTP 目录递归下载多个下载文件,为此我使用 FluentFTP 库,我的代码是这样的:

private async Task downloadRecursively(string src, string dest, FtpClient ftp)


    foreach(var item in ftp.GetListing(src))
    
        if (item.Type == FtpFileSystemObjectType.Directory)
        
            if (item.Size != 0)
            
                System.IO.Directory.CreateDirectory(Path.Combine(dest, item.Name));
                downloadRecursively(Path.Combine(src, item.Name), Path.Combine(dest, item.Name), ftp);
            
        
        else if (item.Type == FtpFileSystemObjectType.File)
        
            await ftp.DownloadFileAsync(Path.Combine(dest, item.Name), Path.Combine(src, item.Name));
        
    

我知道你每次下载都需要一个 FtpClient,但是我怎样才能使用一定数量的连接作为最大值,我想这个想法是创建、连接、下载和关闭我找到的每个文件,但只是同时有 X 个下载文件。另外我不确定我是否应该使用异步创建任务,线程和我最大的问题,如何实现所有这些。

@Bradley 的回答似乎不错,但问题确实读取了必须从外部文件下载的每个文件,并且它没有最大并发下载值,所以我不确定如何应用这两者要求。

【问题讨论】:

【参考方案1】:

用途:

ConcurrentBag class实现连接池; Parallel class 并行化操作; ParallelOptions.MaxDegreeOfParallelism 限制并发线程数。
var clients = new ConcurrentBag<FtpClient>();

var opts = new ParallelOptions  MaxDegreeOfParallelism = maxConnections ;
Parallel.ForEach(files, opts, file =>

    file = Path.GetFileName(file);

    string thread = $"Thread Thread.CurrentThread.ManagedThreadId";
    if (!clients.TryTake(out var client))
    
        Console.WriteLine($"thread Opening connection...");
        client = new FtpClient(host, user, pass);
        client.Connect();
        Console.WriteLine($"thread Opened connection client.GetHashCode().");
    

    string remotePath = sourcePath + "/" + file;
    string localPath = Path.Combine(destPath, file);
    string desc =
        $"thread, Connection client.GetHashCode(), " +
        $"File remotePath => localPath";
    Console.WriteLine($"desc - Starting...");
    client.DownloadFile(localPath, remotePath);
    Console.WriteLine($"desc - Done.");

    clients.Add(client);
);

Console.WriteLine($"Closing clients.Count connections");
foreach (var client in clients)

    Console.WriteLine($"Closing connection client.GetHashCode()");
    client.Dispose();


另一种方法是启动固定数量的线程,每个线程都有一个连接,然后让它们从队列中挑选文件。

有关实现的示例,请参阅我关于 WinSCP .NET 程序集的文章:Automating transfers in parallel connections over SFTP/FTP protocol


关于 SFTP 的类似问题:Processing SFTP files using C# Parallel.ForEach loop not processing downloads

【讨论】:

+1 用于正确选择 rarely useful,并被大量误用 ConcurrentBag&lt;T&gt; 类!【参考方案2】:

这是TPL Dataflow 方法。 BufferBlock&lt;FtpClient&gt; 用作FtpClient 对象池。递归枚举采用IEnumerable&lt;string&gt; 类型的参数,该参数包含一个文件路径的段。在构建本地和远程文件路径时,这些段的组合方式不同。作为调用递归枚举的副作用,远程文件的路径被发送到ActionBlock&lt;IEnumerable&lt;string&gt;&gt;。该块处理文件的并行下载。它的Completion 属性最终包含了整个操作过程中可能发生的所有异常。

public static Task FtpDownloadDeep(string ftpHost, string ftpRoot,
    string targetDirectory, string username = null, string password = null,
    int maximumConnections = 1)

    // Arguments validation omitted            
    if (!Directory.Exists(targetDirectory))
        throw new DirectoryNotFoundException(targetDirectory);
    var fsLocker = new object();

    var ftpClientPool = new BufferBlock<FtpClient>();

    async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient, Task<TResult>> action)
    
        var client = await ftpClientPool.ReceiveAsync();
        try  return await action(client); 
        finally  ftpClientPool.Post(client);  // Return to the pool
    

    var downloader = new ActionBlock<IEnumerable<string>>(async path =>
    
        var remotePath = String.Join("/", path);
        var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
        var localDir = Path.GetDirectoryName(localPath);
        lock (fsLocker) Directory.CreateDirectory(localDir);
        var status = await UsingFtpAsync(client =>
            client.DownloadFileAsync(localPath, remotePath));
        if (status == FtpStatus.Failed) throw new InvalidOperationException(
            $"Download of 'remotePath' failed.");
    , new ExecutionDataflowBlockOptions()
    
        MaxDegreeOfParallelism = maximumConnections,
        BoundedCapacity = maximumConnections,
    );

    async Task Recurse(IEnumerable<string> path)
    
        if (downloader.Completion.IsCompleted) return; // The downloader has failed
        var listing = await UsingFtpAsync(client =>
            client.GetListingAsync(String.Join("/", path)));
        foreach (var item in listing)
        
            if (item.Type == FtpFileSystemObjectType.Directory)
            
                if (item.Size != 0) await Recurse(path.Append(item.Name));
            
            else if (item.Type == FtpFileSystemObjectType.File)
            
                var accepted = await downloader.SendAsync(path.Append(item.Name));
                if (!accepted) break; // The downloader has failed
            
        
    

    // Move on to the thread pool, to avoid ConfigureAwait(false) everywhere
    return Task.Run(async () =>
    
        // Fill the FtpClient pool
        for (int i = 0; i < maximumConnections; i++)
        
            var client = new FtpClient(ftpHost);
            if (username != null && password != null)
                client.Credentials = new NetworkCredential(username, password);
            ftpClientPool.Post(client);
        

        try
        
            // Enumerate the files to download
            await Recurse(new[]  ftpRoot );
            downloader.Complete();
        
        catch (Exception ex)  ((IDataflowBlock)downloader).Fault(ex); 

        try
        
            // Await the downloader to complete
            await downloader.Completion;
        
        catch (OperationCanceledException)
            when (downloader.Completion.IsCanceled)  throw; 
        catch  downloader.Completion.Wait();  // Propagate AggregateException
        finally
        
            // Clean up
            if (ftpClientPool.TryReceiveAll(out var clients))
                foreach (var client in clients) client.Dispose();
        
    );

使用示例:

await FtpDownloadDeep("ftp://ftp.test.com", "", @"C:\FtpTest",
    "username", "password", maximumConnections: 10);

注意: 上面的实现会按照下载过程的节奏懒惰地枚举远程目录。如果您喜欢急切地枚举它,尽快收集有关远程列表的所有可用信息,只需从下载文件的ActionBlock 中删除BoundedCapacity = maximumConnections 配置。请注意,这样做可能会导致高内存消耗,以防远程目录具有较深的子文件夹层次结构,累积包含大量小文件。

【讨论】:

【参考方案3】:

我会把它分成三个部分。

    递归构建源和目标对列表。 创建所需的目录。 同时下载文件。

这是最后一部分,速度很慢,应该并行完成。

代码如下:

private async Task DownloadRecursively(string src, string dest, FtpClient ftp)

    /* 1 */
    IEnumerable<(string source, string destination)> Recurse(string s, string d)
    
        foreach (var item in ftp.GetListing(s))
        
            if (item.Type == FtpFileSystemObjectType.Directory)
            
                if (item.Size != 0)
                
                    foreach(var pair in Recurse(Path.Combine(s, item.Name), Path.Combine(d, item.Name)))
                    
                        yield return pair;
                    
                
            
            else if (item.Type == FtpFileSystemObjectType.File)
            
                yield return (Path.Combine(s, item.Name), Path.Combine(d, item.Name));
            
        
    

    var pairs = Recurse(src, dest).ToArray();
    
    /* 2 */
    foreach (var d in pairs.Select(x => x.destination).Distinct())
    
        System.IO.Directory.CreateDirectory(d);
    

    /* 3 */
    var downloads =
        pairs
            .AsParallel()
            .Select(x => ftp.DownloadFileAsync(x.source, x.destination))
            .ToArray();
    
    await Task.WhenAll(downloads);

代码应该干净、整洁且易于推理。

【讨论】:

除非我错了,否则此解决方案将同时下载所有文件。但是 OP 只想同时下载 X 个文件。顺便说一句,PLINQ 对异步不友好。 @TheodorZoulias - 当然,我需要添加最大并行选项。在任何情况下,我都没有在我的代码中使用带有 PLINQ 的 async 没错,这不是async void 代表的情况。问题是DownloadFileAsync 返回一个Task,而PLINQ 对任务一无所知。所以DownloadFileAsync方法创建的所有任务都被忽略了,它们不是awaited,所以它们变成了即发即弃的任务。 默认情况下,您不能在一个FtpClient 实例上运行多个并行传输。你可以,如果你设置EnableThreadSafeDataConnections,但是每次文件传输都会打开一个新的连接,这将是非常低效的(尤其是在下载大量小文件时)。 @TheodorZoulias - 是的,但我不会等待他们,直到我创建了最终的任务数组。我已经将 PLINQ 和 await 完全分开了。

以上是关于使用 FluentFTP 从 FTP 并发下载多个文件,最大值的主要内容,如果未能解决你的问题,请参考以下文章

c#使用FluentFtp实现一行代码实现ftp上传下载等

c#使用FluentFtp实现一行代码实现ftp上传下载等

快速高效的C#FTP文件传输库FluentFTP

如何使用FluentFTP列出文件夹

FluentFTP证书错误

带有命令的 FTP 到大型机数据集