源码分析 Node 的 Cluster 模块
Posted Nodejs技术栈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析 Node 的 Cluster 模块相关的知识,希望对你有一定的参考价值。
作者 | uerwtoy
来源 | yq.aliyun.com/articles/717323
前段时间,公司的洋彬哥老哥遇到一个问题,大概就是本机有个node的http服务器,但是每次请求这个服务器的端口返回的数据都报错,一看返回的数据根本不是http的报文格式,然后经过一番排查发现是另外一个服务器同时监听了http服务器的这个端口。
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
在使用cluster的在几个子进程同时监听了8000端口后,查看了一下只有主进程监听了这个端口,其他都没有。这个时候,我猜测node还是使用在父进程中创建sever的io但是这个父进程应该就是通过Unix域套接字的cmsg_data将父进程中收到客户端套接字描述符传递给子进程然后让子进程来处理具体的数据与逻辑,但是node到底是如何通过在子进程中createServer并且listen但是只在父进程中真的监听了该端口来实现这个逻辑的呢?这个问题引起了我的好奇,让我不得不到源码中一探究竟。
从net模块出发
按理说,这个问题我们应该直接通过cluster模块来分析,但是很明显,在加载http模块的时候并不会像cluster模块启动时一样通过去判断NODE_ENV来加载不同的模块,但是从上面的分析,我可以得出子进程中的createServer执行了跟父进程不同的操作,所以只能说明http模块中通过isMaster这样的判断来进行了不同的操作,不过http.js和_http_server.js中都没有这个判断,但是通过对createServer向上的查找我在net.js的listenInCluster中找到了isMaster的判断,listenInCluster会在createServer后的server.listen(8000)中调用,所以我们可以看下他的关键逻辑。
if (cluster === null) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
//父进程中,通过_listen2方法就能开始正常的监听了
server._listen2(address, port, addressType, backlog, fd);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags: 0
};
// 子进程通过获取父进程的server句柄
// 并通过listenOnMasterHandle监听它
cluster._getServer(server, serverQuery, listenOnMasterHandle);
从这段代码中我们可以看出,如果是在父进程中,直接通过_listen2的逻辑就能开始正常的监听了,但是在子进程中,会通过cluster._getServer的方式获取父进程的句柄,并通过回调函数listenOnMasterHandle监听它。看到这里我其实比较疑惑,因为在我对于网络编程的学习中,只听说过传递描述符的,这个传递server的句柄实在是太新鲜了,于是赶紧继续深入研究了起来。
深入cluster的代码
首先,来看一下_gerServer的方法的代码。
const message = util._extend({
act: 'queryServer',
index: indexes[indexesKey],
data: null
}, options);
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
这个方法通过send像主进程发送一个包,因为在send函数中有这样一句代码:
message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
通过Node的文档,我们可以知道这种cmd带了Node字符串的包,父进程会通过internalMessage事件来响应,所以我们可以从internal/cluster/master.js中看到找到,对应于act: 'queryServer'的处理函数queryServer的代码。
...
var constructor = RoundRobinHandle;
...
handle = new constructor(key, message.address,message.port,message.addressType,message.fd,message.flags);
...
//queryServer实际是通过RoundRobinHandle的add方法
//
handle.add(worker, (errno, reply, handle) => {
//根据子进程传来的act组装返回的对象
reply = util._extend({
errno: errno,
key: key,
ack: message.seq,//子进程用来确认是哪个命令的返回结果
data: handles[key].data
}, reply);
if (errno)
delete handles[key]; // Gives other workers a chance to retry.
send(worker, reply, handle);
});
这里创建了一个RoundRobinHandle实例,在该实例的构造函数中通过代码:
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0)
this.server.listen(port, address);
else
this.server.listen(address); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
//新连接到达时分发这个handle
//distribute函数是给子进程分派任务的重要函数
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
在父进程中生成了一个server,并且通过注册listen的方法将有新的客户端连接到达时执行的onconnection改成了使用自身的this.distribute函数,这个函数我们先记下因为他是后来父进程给子进程派发任务的重要函数。说回getServer的代码,这里通过RoundRobinHandle实例的add方法:
//在server开始监听端口后,通过done函数中handle.add
//传入的匿名函数给子进程的getServer命令以返回
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
// Still busy binding.
this.server.once('listening', done);
会给子进程的getServer以回复。从这里我们可以看到在给子进程的回复中handle一直都是null。那这个所谓的去取得父进程的server是怎么取得的呢?这个地方让我困惑了一下,不过后来看子进程的代码我就明白了,实际上根本不存在什么取得父进程server的句柄,这个地方的注释迷惑了阅读者,从之前子进程的回调中我们可以看到,返回的handle决定子进程是用shared方式(udp)还是Round-robin的方式(tcp)来处理父进程派下来的任务。从这个回调函数我们就可以看出,子进程是没有任何获取句柄的操作的,那它是如何处理的呢?我们通过该例子中的rr方法可以看到:
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
handles[key] = handle;
cb(0, handle);
这个函数中生成了一个自带listen和close方法的对象,并传递给了函数listenOnMasterHandle,虽然这个名字写的是在父进程的server句柄上监听,实际上我们这个例子中是子进程自建了一个handle,但是如果是udp的情况下这个函数名字还确实就是这么回事,原因在于SO_REUSEADDR选项,里面有这样一个解释:
SO_REUSEADDR允许完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。
server._listen2(address, port, addressType, backlog, fd);
做了一个假的监听操作,实际上因为_handle的存在这里只会为之前_handle赋值一个onconnection函数,这个函数的触发则跟父进程中通过真实的客户端连接触发的时机不同,而是通过
process.on('internalMessage', (message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
}
中注册的internalMessage事件中的对父进程传入的act为newconn的包触发。而父进程中就通过我们刚刚说到的改写了server对象的onconnection函数的distribute函数,这个函数中会调用一个叫handoff的函数,通过代码:
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
其中send到子进程的handle就是新连接客户端的句柄,Node中父子进程之间的通信最后是通过src/stream_base.cc中的StreamBase::WriteString函数实现的,从这段代码我们可以看出:
...
//当进程间通信时
uv_handle_t* send_handle = nullptr;
if (!send_handle_obj.IsEmpty()) {
HandleWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
send_handle = wrap->GetHandle();
// Reference LibuvStreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
CHECK_EQ(false, req_wrap->persistent().IsEmpty());
req_wrap_obj->Set(env->handle_string(), send_handle_obj);
}
err = DoWrite(
req_wrap,
&buf,
1,
//将父进程获取的客户端句柄传递子进程
reinterpret_cast<uv_stream_t*>(send_handle));
可以看到,在调用此方式时,如果传入了一个客户端的句柄则通过Dowrite方法最后通过辅助数据cmsg_data将客户端句柄的套接字fd传送到子进程中进行处理。看到这里我不禁恍然大悟,原来还是走的是我熟悉的那套网络编程的逻辑啊。
总结
通过上面的一轮分析,我们可以总结出以下两个结论:
-
创建TCP服务器时会在父进程中创建一个server并监听目标端口,新连接到达Accept这个client后,再通过ipc的高级方法将新连接的句柄(也就是这个socket的文件描述符)通过轮询的方式分配到一个子进程中,然后在这个子进程中通过read和write处理新连接的数据和请求,所以只有主进程会监听目标ip和端口。
-
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知我们,我们会立即删除并表示歉意。谢谢!
作者 | uerwtoy
来源 | yq.aliyun.com/articles/717323
以上是关于源码分析 Node 的 Cluster 模块的主要内容,如果未能解决你的问题,请参考以下文章
《Elasticsearch 源码解析与优化实战》第14章:Cluster模块分析
《Elasticsearch 源码解析与优化实战》第14章:Cluster模块分析