使用 SemaphoreSlim 和 Continuewith 下载 URL

Posted

技术标签:

【中文标题】使用 SemaphoreSlim 和 Continuewith 下载 URL【英文标题】:Using SemaphoreSlim and Continuewith for downloading Urls 【发布时间】:2015-04-07 06:40:52 【问题描述】:

我正在尝试使用SemaphoreSlimContinueWith 来限制我正在运行的并发任务的数量。但是运行时的行为与我的预期大相径庭。

我为 ServicePointManager.DefaultConnectionLimit 设置的值等于 288,因为我已经初始化了 SemaphoreSlim(100),我对运行时行为的期望是代码应该首先产生 100 个线程,然后开始一个新任务当第一个任务完成时。

var sr =
    new StreamReader(
        @"UrlList.tsv");

var urlList = new List<string>();
for (int i = 0; i < 1000; i++)

    string line = sr.ReadLine();
    string[] tokens = line.Split('\t');
    string url = tokens[4];
    urlList.Add(url);


ServicePointManager.DefaultConnectionLimit = 12*Environment.ProcessorCount;
Console.WriteLine(DateTime.Now + "\t" + ServicePointManager.DefaultConnectionLimit);

var tasks = new Task[urlList.Count];
var semaphore = new SemaphoreSlim(100);
var client = new HttpClient();
int cnt = 0;
for (int i = 0; i < urlList.Count; i++)

    int i1 = i;
    tasks[i] = semaphore.WaitAsync().ContinueWith(task =>
    
        Console.WriteLine(DateTime.Now + "\t" + ++cnt);
        var t = client.GetStringAsync(urlList[i1]);
        Console.WriteLine(t.Result);
        semaphore.Release();
        return t.Result;
    );

Task.WhenAll(tasks).GetAwaiter().GetResult();

输出看起来像这样:

4/6/2015 11:36:12 PM    288
4/6/2015 11:36:12 PM    1
4/6/2015 11:36:12 PM    7
4/6/2015 11:36:12 PM    10
4/6/2015 11:36:12 PM    11
4/6/2015 11:36:12 PM    12
4/6/2015 11:36:12 PM    3
4/6/2015 11:36:12 PM    4
4/6/2015 11:36:12 PM    2
4/6/2015 11:36:12 PM    5
4/6/2015 11:36:12 PM    8
4/6/2015 11:36:12 PM    6
4/6/2015 11:36:12 PM    9
4/6/2015 11:36:12 PM    21
4/6/2015 11:36:12 PM    17
4/6/2015 11:36:12 PM    14
4/6/2015 11:36:12 PM    15
4/6/2015 11:36:12 PM    13
4/6/2015 11:36:12 PM    22
4/6/2015 11:36:12 PM    16
4/6/2015 11:36:12 PM    23
4/6/2015 11:36:12 PM    20
4/6/2015 11:36:12 PM    19
4/6/2015 11:36:12 PM    24
4/6/2015 11:36:12 PM    25
4/6/2015 11:36:12 PM    18
4/6/2015 11:36:13 PM    26
4/6/2015 11:36:14 PM    27
4/6/2015 11:36:15 PM    28

所以看起来线程没有以我期望的方式产生,也没有显示 Url 内容。我的代码到底有什么问题?

【问题讨论】:

您尝试手动操作是否有原因? TPL 和 PLINQ 都有并行循环机制,您可以在其中添加最大并行度作为参数。 @nvoigt:感谢您的评论,但如果我错了,请纠正我。我认为基本上我应该只在我的工作受 CPU 限制时使用 Parallel.ForEach,而在我的工作受 I/O 限制时使用 async-await? 【参考方案1】:

试试这样的:

async Task<IEnumerable<string>> DoItAsync(int threads, IEnumerable<string> urls)

    ServicePointManager.DefaultConnectionLimit = 12*Environment.ProcessorCount;
    Console.WriteLine("0:HH:mm:ss.ffffff\t1", DateTime.Now, ServicePointManager.DefaultConnectionLimit);

    var semaphore = new SemaphoreSlim(threads);
    var client = new HttpClient();
    var cnt = 0;
    var tasks = new List<Task<string>>();
    foreach (var url in urls)
    
        tasks.Add(((Func<Task<string>>)(async () =>
            
                await semaphore.WaitAsync();

                var c = ++cnt;
                Console.WriteLine("0:HH:mm:ss.ffffff\t1\t2", DateTime.Now, c, url);
                var s = await client.GetStringAsync(url);
                Console.WriteLine("0:HH:mm:ss.ffffff\t1\t2\t3", DateTime.Now, c, url, s.Substring(0, 20));
                semaphore.Release();
                return s;
            ))());
    

    return await Task.WhenAll(tasks);

【讨论】:

async/await 的使用是对 OP 代码的改进,但你不希望 Task.Run(你已经从你的异步委托那里得到了一个“热”任务),我认为你最后可以简单地return await Task.WhenAll(...) 根据我以前的 cmets 编辑。 其实 Todd Menier,我更喜欢前者。看到这两个链接:blogs.msdn.com/b/pfxteam/archive/2012/02/08/10265476.aspxblogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx @ToddMenier,我更喜欢可读性。异步委托不是“热”任务,而是在调用时返回热任务的委托。在您的更正中,您没有调用它,代码甚至没有编译。 @PauloMorgado 是的,我的错。 (我应该尝试构建它!)我确实认为你已经到达这里(调用委托)比强制另一个线程与Task.Run 混合更可取。我可能会将委托提取到异步方法以提高可读性(避免尴尬的演员表等),但从正确性来看,这是一种改进。 +1。

以上是关于使用 SemaphoreSlim 和 Continuewith 下载 URL的主要内容,如果未能解决你的问题,请参考以下文章

多线程之信号量——SemaphoreSlim

具有动态 maxCount 的 SemaphoreSlim

跳过 SemaphoreSlim 而不是等待

SemaphoreSlim.WaitAsync 在尝试块之前/之后

测试在内部使用 SemaphoreSlim 以实现并行化的异步方法

多线程10-SemaphoreSlim