.Net HttpListener 的多线程
Posted
技术标签:
【中文标题】.Net HttpListener 的多线程【英文标题】:Multi-threading with .Net HttpListener 【发布时间】:2011-06-08 00:07:39 【问题描述】:我有一个听众:
listener = new HttpListener();
listener.Prefixes.Add(@"http://+:8077/");
listener.Start();
listenerThread = new Thread(HandleRequests);
listenerThread.Start();
我正在处理请求:
private void HandleRequests()
while (listener.IsListening)
var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener);
context.AsyncWaitHandle.WaitOne();
private void ListenerCallback(IAsyncResult ar)
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
我想这样写void Stop()
:
-
它将一直阻塞,直到所有当前处理的请求结束(即等待所有线程“做一些事情”)。
虽然它会等待已经开始的请求,但它不会允许更多的请求(即在
ListenerCallback
的开头返回)。
之后它将调用listener.Stop()
(listener.IsListening
变为假)。
怎么写?
编辑:您如何看待这个解决方案?安全吗?
public void Stop()
lock (this)
isStopping = true;
resetEvent.WaitOne(); //initially set to true
listener.Stop();
private void ListenerCallback(IAsyncResult ar)
lock (this)
if (isStopping)
return;
resetEvent.Reset();
numberOfRequests++;
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
lock (this)
if (--numberOfRequests == 0)
resetEvent.Set();
【问题讨论】:
【参考方案1】:为了完整起见,如果您管理自己的工作线程,如下所示:
class HttpServer : IDisposable
private readonly HttpListener _listener;
private readonly Thread _listenerThread;
private readonly Thread[] _workers;
private readonly ManualResetEvent _stop, _ready;
private Queue<HttpListenerContext> _queue;
public HttpServer(int maxThreads)
_workers = new Thread[maxThreads];
_queue = new Queue<HttpListenerContext>();
_stop = new ManualResetEvent(false);
_ready = new ManualResetEvent(false);
_listener = new HttpListener();
_listenerThread = new Thread(HandleRequests);
public void Start(int port)
_listener.Prefixes.Add(String.Format(@"http://+:0/", port));
_listener.Start();
_listenerThread.Start();
for (int i = 0; i < _workers.Length; i++)
_workers[i] = new Thread(Worker);
_workers[i].Start();
public void Dispose()
Stop();
public void Stop()
_stop.Set();
_listenerThread.Join();
foreach (Thread worker in _workers)
worker.Join();
_listener.Stop();
private void HandleRequests()
while (_listener.IsListening)
var context = _listener.BeginGetContext(ContextReady, null);
if (0 == WaitHandle.WaitAny(new[] _stop, context.AsyncWaitHandle ))
return;
private void ContextReady(IAsyncResult ar)
try
lock (_queue)
_queue.Enqueue(_listener.EndGetContext(ar));
_ready.Set();
catch return;
private void Worker()
WaitHandle[] wait = new[] _ready, _stop ;
while (0 == WaitHandle.WaitAny(wait))
HttpListenerContext context;
lock (_queue)
if (_queue.Count > 0)
context = _queue.Dequeue();
else
_ready.Reset();
continue;
try ProcessRequest(context);
catch (Exception e) Console.Error.WriteLine(e);
public event Action<HttpListenerContext> ProcessRequest;
【讨论】:
这太棒了 - 它是测试 HttpListener 吞吐量的绝佳候选者。 非常感谢您提供的那段代码!有两个小问题:1. ProcessRequest 可能为 null 2. HttpListenerContext 不是线程安全的,除非它是静态的 @MartinMeeser 感谢您的评论。 1. 我们可以使用ProcessRequest?.Invoke(context);
,而不是将其包装在 try catch 块中。对于 2. 但是,如果静态不是一个选项,您有什么建议?
@JohnTube 对于上面的#1 或#2,这里没有问题。继续前进。
@topdog 当然 AutoResetEvent 是从等待池中释放线程时更常见的方法。这只是个人喜好,我不是他们的粉丝。【参考方案2】:
有几种方法可以解决这个问题...这是一个简单的示例,它使用信号量来跟踪正在进行的工作,并在所有工作人员完成时发出信号。这应该会给你一个基本的工作思路。
下面的解决方案并不理想,理想情况下我们应该在调用 BeginGetContext 之前获取信号量。这使得关机更加困难,所以我选择使用这种更简化的方法。如果我是“真正地”这样做,我可能会编写自己的线程管理,而不是依赖线程池。这将允许更可靠的关机。
无论如何这里是完整的例子:
class TestHttp
static void Main()
using (HttpServer srvr = new HttpServer(5))
srvr.Start(8085);
Console.WriteLine("Press [Enter] to quit.");
Console.ReadLine();
class HttpServer : IDisposable
private readonly int _maxThreads;
private readonly HttpListener _listener;
private readonly Thread _listenerThread;
private readonly ManualResetEvent _stop, _idle;
private readonly Semaphore _busy;
public HttpServer(int maxThreads)
_maxThreads = maxThreads;
_stop = new ManualResetEvent(false);
_idle = new ManualResetEvent(false);
_busy = new Semaphore(maxThreads, maxThreads);
_listener = new HttpListener();
_listenerThread = new Thread(HandleRequests);
public void Start(int port)
_listener.Prefixes.Add(String.Format(@"http://+:0/", port));
_listener.Start();
_listenerThread.Start();
public void Dispose()
Stop();
public void Stop()
_stop.Set();
_listenerThread.Join();
_idle.Reset();
//aquire and release the semaphore to see if anyone is running, wait for idle if they are.
_busy.WaitOne();
if(_maxThreads != 1 + _busy.Release())
_idle.WaitOne();
_listener.Stop();
private void HandleRequests()
while (_listener.IsListening)
var context = _listener.BeginGetContext(ListenerCallback, null);
if (0 == WaitHandle.WaitAny(new[] _stop, context.AsyncWaitHandle ))
return;
private void ListenerCallback(IAsyncResult ar)
_busy.WaitOne();
try
HttpListenerContext context;
try
context = _listener.EndGetContext(ar);
catch (HttpListenerException)
return;
if (_stop.WaitOne(0, false))
return;
Console.WriteLine("0 1", context.Request.HttpMethod, context.Request.RawUrl);
context.Response.SendChunked = true;
using (TextWriter tw = new StreamWriter(context.Response.OutputStream))
tw.WriteLine("<html><body><h1>Hello World</h1>");
for (int i = 0; i < 5; i++)
tw.WriteLine("<p>0 @ 1</p>", i, DateTime.Now);
tw.Flush();
Thread.Sleep(1000);
tw.WriteLine("</body></html>");
finally
if (_maxThreads == 1 + _busy.Release())
_idle.Set();
【讨论】:
【参考方案3】:我在问题的编辑部分查阅了我的代码,我决定接受它并进行一些修改:
public void Stop()
lock (locker)
isStopping = true;
resetEvent.WaitOne(); //initially set to true
listener.Stop();
private void ListenerCallback(IAsyncResult ar)
lock (locker) //locking on this is a bad idea, but I forget about it before
if (isStopping)
return;
resetEvent.Reset();
numberOfRequests++;
try
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
finally //to make sure that bellow code will be executed
lock (locker)
if (--numberOfRequests == 0)
resetEvent.Set();
【讨论】:
【参考方案4】:只需调用 listener.Stop() 就可以了。这不会终止任何已经建立的连接,但会阻止任何新的连接。
【讨论】:
这不是真的。如果您在执行ListenerCallback
期间调用listener.Stop()
,您将收到异常,例如。当调用EndGetContext
甚至更晚,当使用输出流时。我当然可以捕获异常,但我不想这样做。
在我的代码中,我使用了一个标志,并且在对其调用 stop 之后不再引用侦听器,但是关闭侦听器并不会关闭已经接受的连接,而只是关闭侦听器。
我不知道你说的“我使用旗帜”是什么意思。问题是,在ListenerCallback
中,我正在使用侦听器,如果另一个线程关闭它,而我正在使用它,我最终会遇到我提到的异常。
在您的 Stop 方法中,如果您获取 _busy 信号量然后调用 listener.Stop 怎么样?这应该允许任何挂起的 ListenerCallback 调用在您销毁侦听器对象之前完成。【参考方案5】:
这使用 BlockingCollection 类型的队列来服务请求。可以直接使用。您应该从该类派生一个类并覆盖 Response。
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Threading;
namespace Service
class HttpServer : IDisposable
private HttpListener httpListener;
private Thread listenerLoop;
private Thread[] requestProcessors;
private BlockingCollection<HttpListenerContext> messages;
public HttpServer(int threadCount)
requestProcessors = new Thread[threadCount];
messages = new BlockingCollection<HttpListenerContext>();
httpListener = new HttpListener();
public virtual int Port get; set; = 80;
public virtual string[] Prefixes
get return new string[] string.Format(@"http://+:0/", Port );
public void Start(int port)
listenerLoop = new Thread(HandleRequests);
foreach( string prefix in Prefixes ) httpListener.Prefixes.Add( prefix );
listenerLoop.Start();
for (int i = 0; i < requestProcessors.Length; i++)
requestProcessors[i] = StartProcessor(i, messages);
public void Dispose() Stop();
public void Stop()
messages.CompleteAdding();
foreach (Thread worker in requestProcessors) worker.Join();
httpListener.Stop();
listenerLoop.Join();
private void HandleRequests()
httpListener.Start();
try
while (httpListener.IsListening)
Console.WriteLine("The Linstener Is Listening!");
HttpListenerContext context = httpListener.GetContext();
messages.Add(context);
Console.WriteLine("The Linstener has added a message!");
catch(Exception e)
Console.WriteLine (e.Message);
private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages)
Thread thread = new Thread(() => Processor(number, messages));
thread.Start();
return thread;
private void Processor(int number, BlockingCollection<HttpListenerContext> messages)
Console.WriteLine ("Processor 0 started.", number);
try
for (;;)
Console.WriteLine ("Processor 0 awoken.", number);
HttpListenerContext context = messages.Take();
Console.WriteLine ("Processor 0 dequeued message.", number);
Response (context);
catch
Console.WriteLine ("Processor 0 terminated.", number);
public virtual void Response(HttpListenerContext context)
SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>") );
public static void SendReply(HttpListenerContext context, StringBuilder responseString )
byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString());
context.Response.ContentLength64 = buffer.Length;
System.IO.Stream output = context.Response.OutputStream;
output.Write(buffer, 0, buffer.Length);
output.Close();
这是一个如何使用它的示例。无需使用事件或任何锁块。 BlockingCollection 解决了所有这些问题。
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
namespace Service
class Server
public static void Main (string[] args)
HttpServer Service = new QuizzServer (8);
Service.Start (80);
for (bool coninute = true; coninute ;)
string input = Console.ReadLine ().ToLower();
switch (input)
case "stop":
Console.WriteLine ("Stop command accepted.");
Service.Stop ();
coninute = false;
break;
default:
Console.WriteLine ("Unknown Command: '0'.",input);
break;
【讨论】:
以上是关于.Net HttpListener 的多线程的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 IE/.Net/C# 进行真正的多线程 Web 挖掘?
那些年我们一起追逐的多线程(ThreadThreadPool委托异步调用Task/TaskFactoryParallerlasync和await)