C++ Actor并发模型框架 Actor Framework (CAF)

Posted BBinChina

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++ Actor并发模型框架 Actor Framework (CAF)相关的知识,希望对你有一定的参考价值。

什么是ACTOR 并发模型

Actor 是一种处理并发计算的基本模型,主要概念是通过OO 面向对象思想,将计算单元独立成一个对象,在模型中成为Actor,各个独立计算单元之间的通信通过mailbox(邮箱),每个Actor维护自己的mailbox,ma
ilbox存储的是需要消费的消息。这里扩散一下知识,在更早期出现的ESB(enterprise service bus)企业服务总线模型是将服务接到消息总线上,服务获取总线上投递给他的消息。
这种方式存在耦合,以及需要一个个服务进行确认,直到消费者进行确认消费,好处是实现简便,接入现在的消息中间件就行。
以下为Actor模型的组件图:
在这里插入图片描述
Actor的思想早在1973年就提出,最早在erlang中实现,应用于电信系统,可以想象下,每个通话内容质量互不影响,每分每秒有多少人在通话。

在Erlang中,每段代码都运行在进程中,进程是Erlang中对Actor的称呼,意味着它的状态不会影响其他进程。系统中会有一个supervisor,实际上它只是另一个进程。被监控的进程挂掉了,supervisor会被通知并对此进行处理,因此也就能创建一个具有自愈功能的系统。如果一个Actor到达异常状态并且崩溃,无论如何,supervisor都可以做出反应并尝试把它变成一致状态,最常见的方式就是根据初始状态重启Actor。

Actor模型不关乎任何语言,只在于思想。
在Java领域的框架有 Scala实现的 Akka、C++ 实现的有:CAF,我自己实现的actor系统是借鉴于skynet.

今天主要介绍CAF

CAF c++版的 Actor 模型开发框架

CAF的源码仓库
https://github.com/actor-framework/actor-framework

c++ actor framework 并没有被boost收入,而是自己独立发展。

先简要介绍我自己的实现版本:

  1. 场景:

    actor数量固定(强场景),比如整个处理系统的计算单元独立分工,类微服务(进程内)

  2. 实现方式:

    a、每个actor自己维护消息队列
    b、actor间通过依赖反转实现消息投递
    c、actor系统(主线程),监听到有需要线程的actor之后,从线程池分配线程给actor进行处理,每个acotr随机处理消息数量,防止消息过多从而线程独占。

CAF中的Actor

class TestActor : public event_based_actor {
public:
  TestActor(actor_config& cfg) : event_based_actor(cfg) {
    // nop
  }

  behavior make_behavior() override {
    return {
      [](int x, int y) {
        return x + y;
      },
      [](double x, double y) {
        return x + y;
      }
    };
  }
};

重写了make_behavior的类就是一个类类型的actor。
可以在make_behavior中实现我们的计算逻辑。

actor间通信

caf框架出了进程内actor通信外,还支持进程间通信,这就比我只需在进程内通信的需求大得多了, 先讲进程内通信方式

进程内


behavior adder() {
	return {
  	[](int x, int y) {
    	return x + y;
  	},
  	[](double x, double y) {
    	return x + y;
  	}
 	};
}

int main() {
	//
 	actor_system_config cfg;
 	actor_system sys{cfg};
 	// Create (spawn) our actor.
 	auto a = sys.spawn(adder);
 	// Send it a message.
 	scoped_actor self{sys};
 	self->send(a, 40, 2);
 	// Block and wait for reply.
 	self->receive(
  	[](int result) {
    	cout << result << endl; // prints “42”
  	}
 	); 
}

1、我把actor_system看成actor运行的加载平台,在我的实现中,actor_system主要记录每个actor,通过actor 名,可以获取到对应的actor并投递消息给到actor。
2、这里的spawn 也可以理解是将我们的actor adder挂到sys中。
3、使用scoped_actor 生成一个局部actor self,self向a 发送消息后,actor的receive实现是阻塞执行的,通常我们使用actor模型的目的提高并发能力,那非阻塞操作才是王道对吧,别急,caf可不是这么low的,而且还一口气送你两种非阻塞实现方式:
一、返回一个behavior,实际上还是交给actor system去执行

auto a = sys.spawn(adder);
sys.spawn(
 [=](event_based_actor* self) -> behavior {
  self->send(a, 40, 2);
  return {
    [=](int result) {
      cout << result << endl;
      self->quit();
    }
  };
 }
);

二、then执行类似 js的调用链方式

auto a = sys.spawn(adder);
sys.spawn(
 [=](event_based_actor* self) {
  self->request(a, seconds(1), 40, 2).then(
    [=](int result) {
      cout << result << endl;
      self->quit();
    }
  };
 }
);

进程间

进程内的actor信息维护在内存中,那么进程间的actor信息常见的方式是注册中心(微服务思想本身来说,每个服务就是一个actor),caf是怎么做的呢?
看源码学习的最佳方式是先把demo跑起来,再根据功能翻看源码,因为开发的内容始终是离不开场景的。先来看serve 跟 client的代码

serve

int main(int argc, char** argv) {
 	// Defaults.
 	auto host = "localhost"s;
 	auto port = uint16_t{42000};
 	auto server = false;
 	actor_system sys{...}; // Parse command line and setup actor system.
 	auto& middleman = sys.middleman();
 	actor a;
 	a = sys.spawn(adder);
 	auto bound = middleman.publish(a, port);
 	if (bound == 0)
  		return 1;
}

client

int main(int argc, char** argv) {
 	// Defaults.
 	auto host = "localhost"s;
 	auto port = uint16_t{42000};
 	auto server = false;
 	actor_system sys{...}; // Parse command line and setup actor system.
 	auto& middleman = sys.middleman();
 	actor a;
 	auto r = middleman.remote_actor(host, port);
 	if (!r)
  		return 1;
 	a = *r;

  	sys.spawn(
 		[=](event_based_actor* self) {
    		self->request(a, seconds(1), 40, 2).then(
      			[=](int result) {
        			cout << result << endl;
        			self->quit();
      			}
    		)};
  	);
}

可以看到跟我们的进程内通信方式,只多了个middleman,这里的middleman无非相当于sidecar,帮我们跟网络间的caf节点建立长连接。

/// Manages brokers and network backends.
class CAF_IO_EXPORT middleman : public actor_system::networking_module

  /// Tries to publish `whom` at `port` and returns either an
  /// `error` or the bound port.
  /// @param whom Actor that should be published at `port`.
  /// @param port Unused TCP port.
  /// @param in The IP address to listen to or `INADDR_ANY` if `in == nullptr`.
  /// @param reuse Create socket using `SO_REUSEADDR`.
  /// @returns The actual port the OS uses after `bind()`. If `port == 0`
  ///          the OS chooses a random high-level port.
  template <class Handle>
  expected<uint16_t> publish(Handle&& whom, uint16_t port,
                             const char* in = nullptr, bool reuse = false) {
    detail::type_list<typename std::decay<Handle>::type> tk;
    return publish(actor_cast<strong_actor_ptr>(std::forward<Handle>(whom)),
                   system().message_types(tk), port, in, reuse);
  }

middleman即我们的网络端,而publish是将我们actor最为tcp端的处理者。

/// Establish a new connection to the actor at `host` on given `port`.
  /// @param host Valid hostname or IP address.
  /// @param port TCP port.
  /// @returns An `actor` to the proxy instance representing
  ///          a remote actor or an `error`.
  template <class ActorHandle = actor>
  expected<ActorHandle> remote_actor(std::string host, uint16_t port) {
    detail::type_list<ActorHandle> tk;
    auto x = remote_actor(system().message_types(tk), std::move(host), port);
    if (!x)
      return x.error();
    CAF_ASSERT(x && *x);
    return actor_cast<ActorHandle>(std::move(*x));
  }

通过remote_actor 返回的 r是远端actor在本地进程内的代理actor,可回归到进程内actor通信本身,兜兜转转又回到进程内的通信方式了,接下来介绍caf的线程模型,基本自研时便可使用其线程模型实现自己的actor框架了。

CAF线程模型

caf actor_system初始化时会根据核数创建对应的worker,每个worker一个线程和一个队列,创建的actor会放到队列中,线程去消费队列,每个线程先消费自己的队列,当队列为空时会做work steal去消费其他线程的队列的actor。

skynet的线程模型
在这里插入图片描述
来源purecpp社区

以上是关于C++ Actor并发模型框架 Actor Framework (CAF)的主要内容,如果未能解决你的问题,请参考以下文章

Orleans框架------基于Actor模型生成分布式Id

Lite Actor:方舟Actor并发模型的轻量级优化

Scala并发框架Akka原理详解

C# 实现 Actor并发模型 (案例版)

一 Akka学习 - actor

Actor模型及原理