C# 实现 Actor并发模型 (案例版)
Posted dotNET跨平台
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C# 实现 Actor并发模型 (案例版)相关的知识,希望对你有一定的参考价值。
啥是Actor模型
Actor (英语翻译 演员) 这个概念要回溯到面向对象程序设计的本身上来,更偏向于现实世界,现实世界就是由单个个体(人)与其他个体或(人)通讯(消息)组成的现实世界,所以,它的好处是显而易见的,生活就是Actor。
现有的面向对象编程模型是基于内存共享线程模型的,而Actor是基于消息模型的,Actor 之间是完全隔离的,不会共享任何变量。
基于内存,那么,内存会溢出,异常,基于消息的话,则没有这种困扰。
又由于它变相的来讲是单线程的,自己处理自己的事务,又实现了与其他业务的隔离。
Erlang语言,天生支持Actor模型,是一个好语言啊。
Actor 的优势
1. Actor不共享状态
2. 高并发,无锁(无共享状态)
3. Actor效率高(同一时刻只处理一个任务,属于单线程处理)
Actor 框架有
1. Orleans
2. Dapr
3. Akka.NET
4. Proto.Actor
Actor模型的实现
为啥要实现Actor模型,一个是为了更深入的了解它,一个是想实现一下。
可能在某些地方直接就用了。没必要搞那么复杂引用。
Actor模型的原理
按照自己的理解画了一下。
简单来讲,就是各个服务都可以投递消息到Actor实例里,Actor会从邮箱里把消息取出来,然后,消费掉。这么简单的一件事情。
Actor 代码逻辑的实现
IActor.cs
/// <summary>
/// 无锁并行编程模型(暂时用来处理串行任务,任务串行执行)
/// </summary>
public interface IActor
/// <summary>
/// 增加消息
/// </summary>
/// <returns></returns>
bool AddMsg(object message);
/// <summary>
/// 启动服务
/// </summary>
/// <returns></returns>
Task Start();
/// <summary>
/// 停止服务运行,等待毫秒数
/// </summary>
/// <param name="WatingTimeout"></param>
/// <returns></returns>
bool Stop(int WatingTimeout);
Actor.cs
/// <summary>
/// Actor抽象
/// </summary>
public abstract class Actor : IDisposable, IActor
public Actor(string name)
Name = name;
MailBox = new BlockingCollection<object>();
/// <summary>
/// 名称
/// </summary>
public string Name get; set;
/// <summary>
/// 是否启用
/// </summary>
public bool Active get; private set;
/// <summary>
/// 是否长时间运行。长时间运行任务使用独立线程,默认true
/// </summary>
public bool LongRunning get; set; = true;
/// <summary>
/// 处理的消息邮箱
/// </summary>
public BlockingCollection<object> MailBox get; set;
/// <summary>
/// 内置任务
/// </summary>
private Task _task;
public virtual Task Start()
if (Active) return _task;
Active = true;
// 启动异步
if (_task == null)
lock (this)
if (_task == null)
_task = Task.Factory.StartNew(DoActorWork, LongRunning ? TaskCreationOptions.LongRunning : TaskCreationOptions.None);
return _task;
public virtual bool Stop(int WatingTimeout = 100)
MailBox?.CompleteAdding();
Active = false;
if (WatingTimeout == 0 || _task == null) return true;
return _task.Wait(WatingTimeout);
public virtual bool AddMsg(object message)
// 自动开始
if (!Active)
Start();
if (!Active)
return false;
MailBox.Add(message);
return true;
/// <summary>
/// 循环消费消息
/// </summary>
private void DoActorWork()
while (!MailBox.IsCompleted)
try
var ctx = MailBox.Take();
var task = ProcessAsync(ctx);
if (task != null)
task.Wait();
catch (InvalidOperationException)
catch (Exception ex)
Console.WriteLine($"DoActorWork Error : ex.Message");
Active = false;
/// <summary>
/// 处理消息
/// </summary>
/// <returns></returns>
public abstract Task ProcessAsync(object msg);
public void Dispose()
try
Stop(100);
catch (Exception)
while (MailBox?.TryTake(out _) == true)
MailBox = null;
相关测试模型
AccumulationActor.cs
/// <summary>
/// 累加
/// </summary>
public class AccumulationActor : Actor
private int Count = 0;
private IActor actor;
public AccumulationActor(IActor actor) : base(nameof(AccumulationActor))
Count = 0;
this.actor = actor;
/// <summary>
/// 处理信息
/// </summary>
/// <returns></returns>
public override Task ProcessAsync(object msg)
try
var msgNumber = (int)(msg);
Count += msgNumber;
Console.WriteLine($"处理this.Name :msg ,累积总数:Count");
if (Count >= 100)
this.actor.AddMsg(Count);
Count = 0;
catch (Exception e)
Console.WriteLine($"业务处理异常:e.Message");
return Task.CompletedTask;
WriteActor.cs
/// <summary>
/// 输出
/// </summary>
public class WriteActor : Actor
public WriteActor() : base(nameof(WriteActor))
/// <summary>
/// 处理信息
/// </summary>
/// <returns></returns>
public override Task ProcessAsync(object msg)
try
Console.WriteLine($"输出 this.Name :msg");
catch (Exception e)
Console.WriteLine($"业务处理异常:e.Message");
return Task.CompletedTask;
测试代码
static void Main(string[] args)
Console.Title = "Actor Demo by 蓝创精英团队";
//实现一个加法逻辑
//a累加到100,就发送消息到 b里,让b 输出。
var write = new WriteActor();
var User = new AccumulationActor(write);
for (int i = 0; i < 20; i++)
User.AddMsg(i * 30);
Thread.Sleep(2000);
write.Stop();
User.Stop();
//释放资源
Console.WriteLine("示例完毕!");
Console.ReadLine();
运行结果
总结
上节实现了状态机,这节实现了Actor模型,接下来对Orleans 和 Dapr 的核心原理就了解深入一些了,那么,运用这些技术就不会显的很生涩。
代码地址
https://github.com/kesshei/ActorDemo.git
https://gitee.com/kesshei/ActorDemo.git
阅
一键三连呦!,感谢大佬的支持,您的支持就是我的动力!
以上是关于C# 实现 Actor并发模型 (案例版)的主要内容,如果未能解决你的问题,请参考以下文章