C++ Actor并发模型框架 Actor Framework (CAF)
Posted BBinChina
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++ Actor并发模型框架 Actor Framework (CAF)相关的知识,希望对你有一定的参考价值。
什么是ACTOR 并发模型
Actor 是一种处理并发计算的基本模型,主要概念是通过OO 面向对象思想,将计算单元独立成一个对象,在模型中成为Actor,各个独立计算单元之间的通信通过mailbox(邮箱),每个Actor维护自己的mailbox,ma
ilbox存储的是需要消费的消息。这里扩散一下知识,在更早期出现的ESB(enterprise service bus)企业服务总线模型是将服务接到消息总线上,服务获取总线上投递给他的消息。
这种方式存在耦合,以及需要一个个服务进行确认,直到消费者进行确认消费,好处是实现简便,接入现在的消息中间件就行。
以下为Actor模型的组件图:
Actor的思想早在1973年就提出,最早在erlang中实现,应用于电信系统,可以想象下,每个通话内容质量互不影响,每分每秒有多少人在通话。
在Erlang中,每段代码都运行在进程中,进程是Erlang中对Actor的称呼,意味着它的状态不会影响其他进程。系统中会有一个supervisor,实际上它只是另一个进程。被监控的进程挂掉了,supervisor会被通知并对此进行处理,因此也就能创建一个具有自愈功能的系统。如果一个Actor到达异常状态并且崩溃,无论如何,supervisor都可以做出反应并尝试把它变成一致状态,最常见的方式就是根据初始状态重启Actor。
Actor模型不关乎任何语言,只在于思想。
在Java领域的框架有 Scala实现的 Akka、C++ 实现的有:CAF,我自己实现的actor系统是借鉴于skynet.
今天主要介绍CAF
CAF c++版的 Actor 模型开发框架
CAF的源码仓库
https://github.com/actor-framework/actor-framework
c++ actor framework 并没有被boost收入,而是自己独立发展。
先简要介绍我自己的实现版本:
-
场景:
actor数量固定(强场景),比如整个处理系统的计算单元独立分工,类微服务(进程内)
-
实现方式:
a、每个actor自己维护消息队列
b、actor间通过依赖反转实现消息投递
c、actor系统(主线程),监听到有需要线程的actor之后,从线程池分配线程给actor进行处理,每个acotr随机处理消息数量,防止消息过多从而线程独占。
CAF中的Actor
class TestActor : public event_based_actor {
public:
TestActor(actor_config& cfg) : event_based_actor(cfg) {
// nop
}
behavior make_behavior() override {
return {
[](int x, int y) {
return x + y;
},
[](double x, double y) {
return x + y;
}
};
}
};
重写了make_behavior的类就是一个类类型的actor。
可以在make_behavior中实现我们的计算逻辑。
actor间通信
caf框架出了进程内actor通信外,还支持进程间通信,这就比我只需在进程内通信的需求大得多了, 先讲进程内通信方式
进程内
behavior adder() {
return {
[](int x, int y) {
return x + y;
},
[](double x, double y) {
return x + y;
}
};
}
int main() {
//
actor_system_config cfg;
actor_system sys{cfg};
// Create (spawn) our actor.
auto a = sys.spawn(adder);
// Send it a message.
scoped_actor self{sys};
self->send(a, 40, 2);
// Block and wait for reply.
self->receive(
[](int result) {
cout << result << endl; // prints “42”
}
);
}
1、我把actor_system看成actor运行的加载平台,在我的实现中,actor_system主要记录每个actor,通过actor 名,可以获取到对应的actor并投递消息给到actor。
2、这里的spawn 也可以理解是将我们的actor adder挂到sys中。
3、使用scoped_actor 生成一个局部actor self,self向a 发送消息后,actor的receive实现是阻塞执行的,通常我们使用actor模型的目的提高并发能力,那非阻塞操作才是王道对吧,别急,caf可不是这么low的,而且还一口气送你两种非阻塞实现方式:
一、返回一个behavior,实际上还是交给actor system去执行
auto a = sys.spawn(adder);
sys.spawn(
[=](event_based_actor* self) -> behavior {
self->send(a, 40, 2);
return {
[=](int result) {
cout << result << endl;
self->quit();
}
};
}
);
二、then执行类似 js的调用链方式
auto a = sys.spawn(adder);
sys.spawn(
[=](event_based_actor* self) {
self->request(a, seconds(1), 40, 2).then(
[=](int result) {
cout << result << endl;
self->quit();
}
};
}
);
进程间
进程内的actor信息维护在内存中,那么进程间的actor信息常见的方式是注册中心(微服务思想本身来说,每个服务就是一个actor),caf是怎么做的呢?
看源码学习的最佳方式是先把demo跑起来,再根据功能翻看源码,因为开发的内容始终是离不开场景的。先来看serve 跟 client的代码
serve
int main(int argc, char** argv) {
// Defaults.
auto host = "localhost"s;
auto port = uint16_t{42000};
auto server = false;
actor_system sys{...}; // Parse command line and setup actor system.
auto& middleman = sys.middleman();
actor a;
a = sys.spawn(adder);
auto bound = middleman.publish(a, port);
if (bound == 0)
return 1;
}
client
int main(int argc, char** argv) {
// Defaults.
auto host = "localhost"s;
auto port = uint16_t{42000};
auto server = false;
actor_system sys{...}; // Parse command line and setup actor system.
auto& middleman = sys.middleman();
actor a;
auto r = middleman.remote_actor(host, port);
if (!r)
return 1;
a = *r;
sys.spawn(
[=](event_based_actor* self) {
self->request(a, seconds(1), 40, 2).then(
[=](int result) {
cout << result << endl;
self->quit();
}
)};
);
}
可以看到跟我们的进程内通信方式,只多了个middleman,这里的middleman无非相当于sidecar,帮我们跟网络间的caf节点建立长连接。
/// Manages brokers and network backends.
class CAF_IO_EXPORT middleman : public actor_system::networking_module
/// Tries to publish `whom` at `port` and returns either an
/// `error` or the bound port.
/// @param whom Actor that should be published at `port`.
/// @param port Unused TCP port.
/// @param in The IP address to listen to or `INADDR_ANY` if `in == nullptr`.
/// @param reuse Create socket using `SO_REUSEADDR`.
/// @returns The actual port the OS uses after `bind()`. If `port == 0`
/// the OS chooses a random high-level port.
template <class Handle>
expected<uint16_t> publish(Handle&& whom, uint16_t port,
const char* in = nullptr, bool reuse = false) {
detail::type_list<typename std::decay<Handle>::type> tk;
return publish(actor_cast<strong_actor_ptr>(std::forward<Handle>(whom)),
system().message_types(tk), port, in, reuse);
}
middleman即我们的网络端,而publish是将我们actor最为tcp端的处理者。
/// Establish a new connection to the actor at `host` on given `port`.
/// @param host Valid hostname or IP address.
/// @param port TCP port.
/// @returns An `actor` to the proxy instance representing
/// a remote actor or an `error`.
template <class ActorHandle = actor>
expected<ActorHandle> remote_actor(std::string host, uint16_t port) {
detail::type_list<ActorHandle> tk;
auto x = remote_actor(system().message_types(tk), std::move(host), port);
if (!x)
return x.error();
CAF_ASSERT(x && *x);
return actor_cast<ActorHandle>(std::move(*x));
}
通过remote_actor 返回的 r是远端actor在本地进程内的代理actor,可回归到进程内actor通信本身,兜兜转转又回到进程内的通信方式了,接下来介绍caf的线程模型,基本自研时便可使用其线程模型实现自己的actor框架了。
CAF线程模型
caf actor_system初始化时会根据核数创建对应的worker,每个worker一个线程和一个队列,创建的actor会放到队列中,线程去消费队列,每个线程先消费自己的队列,当队列为空时会做work steal去消费其他线程的队列的actor。
skynet的线程模型
以上是关于C++ Actor并发模型框架 Actor Framework (CAF)的主要内容,如果未能解决你的问题,请参考以下文章