C# 实现 Actor并发模型 (案例版)

Posted dotNET跨平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C# 实现 Actor并发模型 (案例版)相关的知识,希望对你有一定的参考价值。

啥是Actor模型

Actor (英语翻译 演员) 这个概念要回溯到面向对象程序设计的本身上来,更偏向于现实世界,现实世界就是由单个个体(人)与其他个体或(人)通讯(消息)组成的现实世界,所以,它的好处是显而易见的,生活就是Actor。

现有的面向对象编程模型是基于内存共享线程模型的,而Actor是基于消息模型的,Actor 之间是完全隔离的,不会共享任何变量。

基于内存,那么,内存会溢出,异常,基于消息的话,则没有这种困扰。

又由于它变相的来讲是单线程的,自己处理自己的事务,又实现了与其他业务的隔离。

Erlang语言,天生支持Actor模型,是一个好语言啊。

Actor 的优势

  1. 1. Actor不共享状态

  2. 2. 高并发,无锁(无共享状态)

  3. 3. Actor效率高(同一时刻只处理一个任务,属于单线程处理)

Actor 框架有

  1. 1. Orleans

  2. 2. Dapr

  3. 3. Akka.NET

  4. 4. Proto.Actor

Actor模型的实现

为啥要实现Actor模型,一个是为了更深入的了解它,一个是想实现一下。

可能在某些地方直接就用了。没必要搞那么复杂引用。

Actor模型的原理

按照自己的理解画了一下。

简单来讲,就是各个服务都可以投递消息到Actor实例里,Actor会从邮箱里把消息取出来,然后,消费掉。这么简单的一件事情。

Actor 代码逻辑的实现

IActor.cs

/// <summary>
/// 无锁并行编程模型(暂时用来处理串行任务,任务串行执行)
/// </summary>
public interface IActor

    /// <summary>
    /// 增加消息
    /// </summary>
    /// <returns></returns>
    bool AddMsg(object message);
    /// <summary>
    /// 启动服务
    /// </summary>
    /// <returns></returns>
    Task Start();
    /// <summary>
    /// 停止服务运行,等待毫秒数
    /// </summary>
    /// <param name="WatingTimeout"></param>
    /// <returns></returns>
    bool Stop(int WatingTimeout);

Actor.cs

/// <summary>
/// Actor抽象
/// </summary>
public abstract class Actor : IDisposable, IActor

    public Actor(string name)
    
        Name = name;
        MailBox = new BlockingCollection<object>();
    
    /// <summary>
    /// 名称
    /// </summary>
    public string Name  get; set; 
    /// <summary>
    /// 是否启用
    /// </summary>
    public bool Active  get; private set; 
    /// <summary>
    /// 是否长时间运行。长时间运行任务使用独立线程,默认true
    /// </summary>
    public bool LongRunning  get; set;  = true;
    /// <summary>
    /// 处理的消息邮箱
    /// </summary>
    public BlockingCollection<object> MailBox  get; set; 
    /// <summary>
    /// 内置任务
    /// </summary>
    private Task _task;

    public virtual Task Start()
    
        if (Active) return _task;
        Active = true;
        // 启动异步
        if (_task == null)
        
            lock (this)
            
                if (_task == null)
                
                    _task = Task.Factory.StartNew(DoActorWork, LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None);
                
            
        
        return _task;
    

    public virtual bool Stop(int WatingTimeout = 100)
    
        MailBox?.CompleteAdding();
        Active = false;
        if (WatingTimeout == 0 || _task == null) return true;

        return _task.Wait(WatingTimeout);
    
    public virtual bool AddMsg(object message)
    
        // 自动开始
        if (!Active)
        
            Start();
        

        if (!Active)
        
            return false;
        
        MailBox.Add(message);
        return true;
    
    /// <summary>
    /// 循环消费消息
    /// </summary>
    private void DoActorWork()
    
        while (!MailBox.IsCompleted)
        
            try
            
                var ctx = MailBox.Take();
                var task = ProcessAsync(ctx);
                if (task != null)
                
                    task.Wait();
                
            
            catch (InvalidOperationException)  
            catch (Exception ex)
            
                Console.WriteLine($"DoActorWork Error : ex.Message");
            
        

        Active = false;
    
    /// <summary>
    /// 处理消息
    /// </summary>
    /// <returns></returns>
    public abstract Task ProcessAsync(object msg);
    public void Dispose()
    
        try
        
            Stop(100);
        
        catch (Exception)
        
        
        while (MailBox?.TryTake(out _) == true)  
        MailBox = null;
    

相关测试模型

AccumulationActor.cs

/// <summary>
/// 累加
/// </summary>
public class AccumulationActor : Actor

    private int Count = 0;
    private IActor actor;
    public AccumulationActor(IActor actor) : base(nameof(AccumulationActor))
    
        Count = 0;
        this.actor = actor;
       
    /// <summary>
    /// 处理信息
    /// </summary>
    /// <returns></returns>
    public override Task ProcessAsync(object msg)
    
        try
        
            var  msgNumber = (int)(msg);
            Count += msgNumber;
            Console.WriteLine($"处理this.Name :msg ,累积总数:Count");

            if (Count >= 100)
            
                this.actor.AddMsg(Count);
                Count = 0;
            
        
        catch (Exception e)
        
            Console.WriteLine($"业务处理异常:e.Message");
        
        return Task.CompletedTask;
    

WriteActor.cs

/// <summary>
    /// 输出
    /// </summary>
    public class WriteActor : Actor
    
        public WriteActor() : base(nameof(WriteActor))
        
        
        /// <summary>
        /// 处理信息
        /// </summary>
        /// <returns></returns>
        public override Task ProcessAsync(object msg)
        
            try
            
                Console.WriteLine($"输出 this.Name :msg");
            
            catch (Exception e)
            
                Console.WriteLine($"业务处理异常:e.Message");
            
            return Task.CompletedTask;
        
    

测试代码

static void Main(string[] args)

    Console.Title = "Actor Demo by 蓝创精英团队";

    //实现一个加法逻辑
    //a累加到100,就发送消息到 b里,让b 输出。
    var write = new WriteActor();
    var User = new AccumulationActor(write);
    for (int i = 0; i < 20; i++)
    
        User.AddMsg(i * 30);
    
    Thread.Sleep(2000);
    write.Stop();
    User.Stop();
    //释放资源
    Console.WriteLine("示例完毕!");
    Console.ReadLine();

运行结果

总结

上节实现了状态机,这节实现了Actor模型,接下来对Orleans 和 Dapr 的核心原理就了解深入一些了,那么,运用这些技术就不会显的很生涩。

代码地址

https://github.com/kesshei/ActorDemo.git

https://gitee.com/kesshei/ActorDemo.git

一键三连呦!,感谢大佬的支持,您的支持就是我的动力!

以上是关于C# 实现 Actor并发模型 (案例版)的主要内容,如果未能解决你的问题,请参考以下文章

Lite Actor:方舟Actor并发模型的轻量级优化

Akka并发编程——第七节:Actor模型

Actor模型和CSP模型的区别

C++ Actor并发模型框架 Actor Framework (CAF)

Actor初识

Actor模型