使用 FluentFTP 从 FTP 并发下载多个文件,最大值
Posted
技术标签:
【中文标题】使用 FluentFTP 从 FTP 并发下载多个文件,最大值【英文标题】:Download multiple files concurrently from FTP using FluentFTP with a maximum value 【发布时间】:2021-05-20 07:05:14 【问题描述】:我想从 FTP 目录递归下载多个下载文件,为此我使用 FluentFTP 库,我的代码是这样的:
private async Task downloadRecursively(string src, string dest, FtpClient ftp)
foreach(var item in ftp.GetListing(src))
if (item.Type == FtpFileSystemObjectType.Directory)
if (item.Size != 0)
System.IO.Directory.CreateDirectory(Path.Combine(dest, item.Name));
downloadRecursively(Path.Combine(src, item.Name), Path.Combine(dest, item.Name), ftp);
else if (item.Type == FtpFileSystemObjectType.File)
await ftp.DownloadFileAsync(Path.Combine(dest, item.Name), Path.Combine(src, item.Name));
我知道你每次下载都需要一个 FtpClient,但是我怎样才能使用一定数量的连接作为最大值,我想这个想法是创建、连接、下载和关闭我找到的每个文件,但只是同时有 X 个下载文件。另外我不确定我是否应该使用异步创建任务,线程和我最大的问题,如何实现所有这些。
@Bradley 的回答似乎不错,但问题确实读取了必须从外部文件下载的每个文件,并且它没有最大并发下载值,所以我不确定如何应用这两者要求。
【问题讨论】:
【参考方案1】:用途:
ConcurrentBag
class实现连接池;
Parallel
class 并行化操作;
ParallelOptions.MaxDegreeOfParallelism
限制并发线程数。
var clients = new ConcurrentBag<FtpClient>();
var opts = new ParallelOptions MaxDegreeOfParallelism = maxConnections ;
Parallel.ForEach(files, opts, file =>
file = Path.GetFileName(file);
string thread = $"Thread Thread.CurrentThread.ManagedThreadId";
if (!clients.TryTake(out var client))
Console.WriteLine($"thread Opening connection...");
client = new FtpClient(host, user, pass);
client.Connect();
Console.WriteLine($"thread Opened connection client.GetHashCode().");
string remotePath = sourcePath + "/" + file;
string localPath = Path.Combine(destPath, file);
string desc =
$"thread, Connection client.GetHashCode(), " +
$"File remotePath => localPath";
Console.WriteLine($"desc - Starting...");
client.DownloadFile(localPath, remotePath);
Console.WriteLine($"desc - Done.");
clients.Add(client);
);
Console.WriteLine($"Closing clients.Count connections");
foreach (var client in clients)
Console.WriteLine($"Closing connection client.GetHashCode()");
client.Dispose();
另一种方法是启动固定数量的线程,每个线程都有一个连接,然后让它们从队列中挑选文件。
有关实现的示例,请参阅我关于 WinSCP .NET 程序集的文章:Automating transfers in parallel connections over SFTP/FTP protocol
关于 SFTP 的类似问题:Processing SFTP files using C# Parallel.ForEach loop not processing downloads
【讨论】:
+1 用于正确选择 rarely useful,并被大量误用ConcurrentBag<T>
类!【参考方案2】:
这是TPL Dataflow 方法。 BufferBlock<FtpClient>
用作FtpClient
对象池。递归枚举采用IEnumerable<string>
类型的参数,该参数包含一个文件路径的段。在构建本地和远程文件路径时,这些段的组合方式不同。作为调用递归枚举的副作用,远程文件的路径被发送到ActionBlock<IEnumerable<string>>
。该块处理文件的并行下载。它的Completion
属性最终包含了整个操作过程中可能发生的所有异常。
public static Task FtpDownloadDeep(string ftpHost, string ftpRoot,
string targetDirectory, string username = null, string password = null,
int maximumConnections = 1)
// Arguments validation omitted
if (!Directory.Exists(targetDirectory))
throw new DirectoryNotFoundException(targetDirectory);
var fsLocker = new object();
var ftpClientPool = new BufferBlock<FtpClient>();
async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient, Task<TResult>> action)
var client = await ftpClientPool.ReceiveAsync();
try return await action(client);
finally ftpClientPool.Post(client); // Return to the pool
var downloader = new ActionBlock<IEnumerable<string>>(async path =>
var remotePath = String.Join("/", path);
var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
var localDir = Path.GetDirectoryName(localPath);
lock (fsLocker) Directory.CreateDirectory(localDir);
var status = await UsingFtpAsync(client =>
client.DownloadFileAsync(localPath, remotePath));
if (status == FtpStatus.Failed) throw new InvalidOperationException(
$"Download of 'remotePath' failed.");
, new ExecutionDataflowBlockOptions()
MaxDegreeOfParallelism = maximumConnections,
BoundedCapacity = maximumConnections,
);
async Task Recurse(IEnumerable<string> path)
if (downloader.Completion.IsCompleted) return; // The downloader has failed
var listing = await UsingFtpAsync(client =>
client.GetListingAsync(String.Join("/", path)));
foreach (var item in listing)
if (item.Type == FtpFileSystemObjectType.Directory)
if (item.Size != 0) await Recurse(path.Append(item.Name));
else if (item.Type == FtpFileSystemObjectType.File)
var accepted = await downloader.SendAsync(path.Append(item.Name));
if (!accepted) break; // The downloader has failed
// Move on to the thread pool, to avoid ConfigureAwait(false) everywhere
return Task.Run(async () =>
// Fill the FtpClient pool
for (int i = 0; i < maximumConnections; i++)
var client = new FtpClient(ftpHost);
if (username != null && password != null)
client.Credentials = new NetworkCredential(username, password);
ftpClientPool.Post(client);
try
// Enumerate the files to download
await Recurse(new[] ftpRoot );
downloader.Complete();
catch (Exception ex) ((IDataflowBlock)downloader).Fault(ex);
try
// Await the downloader to complete
await downloader.Completion;
catch (OperationCanceledException)
when (downloader.Completion.IsCanceled) throw;
catch downloader.Completion.Wait(); // Propagate AggregateException
finally
// Clean up
if (ftpClientPool.TryReceiveAll(out var clients))
foreach (var client in clients) client.Dispose();
);
使用示例:
await FtpDownloadDeep("ftp://ftp.test.com", "", @"C:\FtpTest",
"username", "password", maximumConnections: 10);
注意: 上面的实现会按照下载过程的节奏懒惰地枚举远程目录。如果您喜欢急切地枚举它,尽快收集有关远程列表的所有可用信息,只需从下载文件的ActionBlock
中删除BoundedCapacity = maximumConnections
配置。请注意,这样做可能会导致高内存消耗,以防远程目录具有较深的子文件夹层次结构,累积包含大量小文件。
【讨论】:
【参考方案3】:我会把它分成三个部分。
-
递归构建源和目标对列表。
创建所需的目录。
同时下载文件。
这是最后一部分,速度很慢,应该并行完成。
代码如下:
private async Task DownloadRecursively(string src, string dest, FtpClient ftp)
/* 1 */
IEnumerable<(string source, string destination)> Recurse(string s, string d)
foreach (var item in ftp.GetListing(s))
if (item.Type == FtpFileSystemObjectType.Directory)
if (item.Size != 0)
foreach(var pair in Recurse(Path.Combine(s, item.Name), Path.Combine(d, item.Name)))
yield return pair;
else if (item.Type == FtpFileSystemObjectType.File)
yield return (Path.Combine(s, item.Name), Path.Combine(d, item.Name));
var pairs = Recurse(src, dest).ToArray();
/* 2 */
foreach (var d in pairs.Select(x => x.destination).Distinct())
System.IO.Directory.CreateDirectory(d);
/* 3 */
var downloads =
pairs
.AsParallel()
.Select(x => ftp.DownloadFileAsync(x.source, x.destination))
.ToArray();
await Task.WhenAll(downloads);
代码应该干净、整洁且易于推理。
【讨论】:
除非我错了,否则此解决方案将同时下载所有文件。但是 OP 只想同时下载 X 个文件。顺便说一句,PLINQ 对异步不友好。 @TheodorZoulias - 当然,我需要添加最大并行选项。在任何情况下,我都没有在我的代码中使用带有 PLINQ 的async
。
没错,这不是async void
代表的情况。问题是DownloadFileAsync
返回一个Task
,而PLINQ 对任务一无所知。所以DownloadFileAsync
方法创建的所有任务都被忽略了,它们不是await
ed,所以它们变成了即发即弃的任务。
默认情况下,您不能在一个FtpClient
实例上运行多个并行传输。你可以,如果你设置EnableThreadSafeDataConnections
,但是每次文件传输都会打开一个新的连接,这将是非常低效的(尤其是在下载大量小文件时)。
@TheodorZoulias - 是的,但我不会等待他们,直到我创建了最终的任务数组。我已经将 PLINQ 和 await
完全分开了。以上是关于使用 FluentFTP 从 FTP 并发下载多个文件,最大值的主要内容,如果未能解决你的问题,请参考以下文章