Actor Mailbox

Posted Jeece

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Actor Mailbox相关的知识,希望对你有一定的参考价值。

ActorMailbox:

internal static class MailboxStatus
    {
        public const int Idle = 0;
        public const int Busy = 1;
    }

    public class UnboundedMailboxQueue
    {
        private readonly ConcurrentQueue<object> _messages = new ConcurrentQueue<object>();

        public void Push(object message)
        {
            _messages.Enqueue(message);
        }

        public object Pop()
        {
            object message;
            return _messages.TryDequeue(out message) ? message : null;
        }

        public bool HasMessages { get { return !_messages.IsEmpty; } }
    }

    public interface IMessageInvoker
    {
        Task InvokeMessageAsync(object msg);
        void EscalateFailure(Exception reason, object message);
    }

    public interface IDispatcher
    {
        int Throughput { get; }
        void Schedule(Func<Task> runner);
    }

    public sealed class ThreadPoolDispatcher : IDispatcher
    {
        public ThreadPoolDispatcher()
        {
            Throughput = 300;
        }

        public void Schedule(Func<Task> runner) => Task.Factory.StartNew(runner, TaskCreationOptions.None);

        public int Throughput { get; set; }
    }

    public static class Dispatchers
    {
        public static ThreadPoolDispatcher DefaultDispatcher { get; } = new ThreadPoolDispatcher();
    }

    public class ActorMailbox
    {
        private readonly UnboundedMailboxQueue _mailbox;
        private IDispatcher _dispatcher;
        private IMessageInvoker _invoker;

        private int _status = MailboxStatus.Idle;

        public ActorMailbox()
        {
            _mailbox = new UnboundedMailboxQueue();
        }

        public void RegisterHandlers(IMessageInvoker invoker)
        {
            _invoker = invoker;
            _dispatcher = Dispatchers.DefaultDispatcher;
        }

        public void PostMessage(object msg)
        {
            _mailbox.Push(msg);
            Schedule();
        }

        private Task RunAsync()
        {
            var done = ProcessMessages();

            if (!done)
                // mailbox is halted, awaiting completion of a message task, upon which mailbox will be rescheduled
                return Task.FromResult(0);

            Interlocked.Exchange(ref _status, MailboxStatus.Idle);

            if (_mailbox.HasMessages)
            {
                Schedule();
            }
            return Task.FromResult(0);
        }

        private bool ProcessMessages()
        {
            object msg = null;
            try
            {
                for (var i = 0; i < _dispatcher.Throughput; i++)
                {
                    if ((msg = _mailbox.Pop()) != null)
                    {
                        var t = _invoker.InvokeMessageAsync(msg);
                        if (t.IsFaulted)
                        {
                            _invoker.EscalateFailure(t.Exception, msg);
                            continue;
                        }
                        if (!t.IsCompleted)
                        {
                            // if task didn‘t complete immediately, halt processing and reschedule a new run when task completes
                            t.ContinueWith(RescheduleOnTaskComplete, msg);
                            return false;
                        }
                    }
                    else
                    {
                        break;
                    }
                }
            }
            catch (Exception e)
            {
                _invoker.EscalateFailure(e, msg);
            }
            return true;
        }

        private void RescheduleOnTaskComplete(Task task, object message)
        {
            if (task.IsFaulted)
            {
                _invoker.EscalateFailure(task.Exception, message);
            }
            _dispatcher.Schedule(RunAsync);
        }

        protected void Schedule()
        {
            if (Interlocked.CompareExchange(ref _status, MailboxStatus.Busy, MailboxStatus.Idle) == MailboxStatus.Idle)
            {
                _dispatcher.Schedule(RunAsync);
            }
        }

    }

  使用方法:

class Program
    {
        public static AutoResetEvent resetEvent = new AutoResetEvent(false);
        public static int maxCount = 10000000;
        static void Main(string[] args)
        { 
            RoomData room = new RoomData();
            Stopwatch watch = new Stopwatch();
            watch.Start();
            int j = 1;
            for (int i = 0; i < maxCount; i++)
            {
                room.Tell(i);   
                //j++;
                //if (j >= Program.maxCount)
                //    Program.resetEvent.Set();
            }
            resetEvent.WaitOne();
            watch.Stop();
            Console.WriteLine("{0},{1}", maxCount * 1000.0 / watch.ElapsedMilliseconds, watch.ElapsedMilliseconds);
            Console.Read();
        }
    }

    public class TestMessage
    {
        public string Message { get; set; }
    }

    public class MailboxHandler : IMessageInvoker
    {
        int i = 1;
        public Task InvokeMessageAsync(object msg)
        {
            //return ((TestMessage)msg).TaskCompletionSource.Task;
            //Console.Write(msg);
            //throw new Exception(msg.ToString());
            i++;
            if (i >= Program.maxCount)
                Program.resetEvent.Set();
            return Task.FromResult(0);
        }

        public void EscalateFailure(Exception reason, object message)
        {
            //EscalatedFailures.Add(reason);
            Console.WriteLine("执行异常:{0},{1},{2}", message, reason.Message, reason.StackTrace);
        }
    }

    public class RoomData
    {
        private ActorMailbox _mailbox;
        private MailboxHandler _handler;
        public RoomData()
        {
            _mailbox = new ActorMailbox();
            _handler = new MailboxHandler();
            _mailbox.RegisterHandlers(_handler);
        }

        public void Tell(object msg)
        {
            _mailbox.PostMessage(msg);
        }
    }

  

以上是关于Actor Mailbox的主要内容,如果未能解决你的问题,请参考以下文章

Flink源码1-Flink 的集群和Jobmanager启动

Actor模型及原理

FlinkFlink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

Scala并发框架Akka原理详解

Akka框架使用注意点

Swift新async/await并发中利用Task防止指定代码片段执行的数据竞争(Data Race)问题