ActorMailbox:
internal static class MailboxStatus { public const int Idle = 0; public const int Busy = 1; } public class UnboundedMailboxQueue { private readonly ConcurrentQueue<object> _messages = new ConcurrentQueue<object>(); public void Push(object message) { _messages.Enqueue(message); } public object Pop() { object message; return _messages.TryDequeue(out message) ? message : null; } public bool HasMessages { get { return !_messages.IsEmpty; } } } public interface IMessageInvoker { Task InvokeMessageAsync(object msg); void EscalateFailure(Exception reason, object message); } public interface IDispatcher { int Throughput { get; } void Schedule(Func<Task> runner); } public sealed class ThreadPoolDispatcher : IDispatcher { public ThreadPoolDispatcher() { Throughput = 300; } public void Schedule(Func<Task> runner) => Task.Factory.StartNew(runner, TaskCreationOptions.None); public int Throughput { get; set; } } public static class Dispatchers { public static ThreadPoolDispatcher DefaultDispatcher { get; } = new ThreadPoolDispatcher(); } public class ActorMailbox { private readonly UnboundedMailboxQueue _mailbox; private IDispatcher _dispatcher; private IMessageInvoker _invoker; private int _status = MailboxStatus.Idle; public ActorMailbox() { _mailbox = new UnboundedMailboxQueue(); } public void RegisterHandlers(IMessageInvoker invoker) { _invoker = invoker; _dispatcher = Dispatchers.DefaultDispatcher; } public void PostMessage(object msg) { _mailbox.Push(msg); Schedule(); } private Task RunAsync() { var done = ProcessMessages(); if (!done) // mailbox is halted, awaiting completion of a message task, upon which mailbox will be rescheduled return Task.FromResult(0); Interlocked.Exchange(ref _status, MailboxStatus.Idle); if (_mailbox.HasMessages) { Schedule(); } return Task.FromResult(0); } private bool ProcessMessages() { object msg = null; try { for (var i = 0; i < _dispatcher.Throughput; i++) { if ((msg = _mailbox.Pop()) != null) { var t = _invoker.InvokeMessageAsync(msg); if (t.IsFaulted) { _invoker.EscalateFailure(t.Exception, msg); continue; } if (!t.IsCompleted) { // if task didn‘t complete immediately, halt processing and reschedule a new run when task completes t.ContinueWith(RescheduleOnTaskComplete, msg); return false; } } else { break; } } } catch (Exception e) { _invoker.EscalateFailure(e, msg); } return true; } private void RescheduleOnTaskComplete(Task task, object message) { if (task.IsFaulted) { _invoker.EscalateFailure(task.Exception, message); } _dispatcher.Schedule(RunAsync); } protected void Schedule() { if (Interlocked.CompareExchange(ref _status, MailboxStatus.Busy, MailboxStatus.Idle) == MailboxStatus.Idle) { _dispatcher.Schedule(RunAsync); } } }
使用方法:
class Program { public static AutoResetEvent resetEvent = new AutoResetEvent(false); public static int maxCount = 10000000; static void Main(string[] args) { RoomData room = new RoomData(); Stopwatch watch = new Stopwatch(); watch.Start(); int j = 1; for (int i = 0; i < maxCount; i++) { room.Tell(i); //j++; //if (j >= Program.maxCount) // Program.resetEvent.Set(); } resetEvent.WaitOne(); watch.Stop(); Console.WriteLine("{0},{1}", maxCount * 1000.0 / watch.ElapsedMilliseconds, watch.ElapsedMilliseconds); Console.Read(); } } public class TestMessage { public string Message { get; set; } } public class MailboxHandler : IMessageInvoker { int i = 1; public Task InvokeMessageAsync(object msg) { //return ((TestMessage)msg).TaskCompletionSource.Task; //Console.Write(msg); //throw new Exception(msg.ToString()); i++; if (i >= Program.maxCount) Program.resetEvent.Set(); return Task.FromResult(0); } public void EscalateFailure(Exception reason, object message) { //EscalatedFailures.Add(reason); Console.WriteLine("执行异常:{0},{1},{2}", message, reason.Message, reason.StackTrace); } } public class RoomData { private ActorMailbox _mailbox; private MailboxHandler _handler; public RoomData() { _mailbox = new ActorMailbox(); _handler = new MailboxHandler(); _mailbox.RegisterHandlers(_handler); } public void Tell(object msg) { _mailbox.PostMessage(msg); } }