❲追根究底❳Memcached中Libevent和线程池使用初探
Posted VanillaOpenResty开发
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了❲追根究底❳Memcached中Libevent和线程池使用初探相关的知识,希望对你有一定的参考价值。
来源:Nicol TAO
原文地址:http://taozj.org/2016/05/Libevent%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%BA%8C%EF%BC%89%EF%BC%9AMemcached%E4%B8%ADLibevent%E5%92%8C%E7%BA%BF%E7%A8%8B%E6%B1%A0%E4%BD%BF%E7%94%A8%E5%88%9D%E6%8E%A2/
这些天想弄一下缓存,减少程序查询数据库的压力,而这方面的王者基本就是memcached和redis了。克隆了一份memcached的源码,发现是基于Libevent+线程池的实现方式,大致看了一下感觉很有启发。正好前两天看的Libevent手册,而且相比自己写的线程池模型(https://github.com/taozhijiang/st_utils),也很好奇企业级线程模型的实现方式,就顺着memcached初始化的流程了解梳理一下了。
memcached启动时候执行memcached.c中的main函数,在加载了好长的初始化配置之后,定义并初始化event_base;
static struct event_base *main_base;
main_base = event_init();
然后通过调用memcached_thread_init,创建工作者线程
memcached_thread_init(settings.num_threads, main_base);
// unix socket
listen_conn = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1,
local_transport, main_base)))
// tcp
listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1,
transport, main_base))
这个conn_new不仅仅在这里用以侦听套接字分配资源、创建事件侦听,之后所有客户端连接的套接字也会用这个函数。这个函数最终回调的响应函数是event_handler,然后最终调用一个碉堡了名字的函数drive_machine,这个函数内部是一个复杂的有限状态机,会处理所有与套接字相关的连接、关闭、读写等操作。
listen套接字当接收到客户请求的时候,如果连接OK,并且没有超过最大连接数目,就调用dispatch_conn_new接收请求。这个函数中,会轮询选择要添加的工作线程,然后创建一个等待item,并添加到对应线程的new_conn_queue队列上去,然后向这个线程的读取队列里面写入’c’一个字节表明有一个新的请求,然后对应线程管道读事件就会被触发,执行处理回调函数。
主线ain_base进入Libevent事件循环中
/* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}
上面我们关注的核心在于调用memcached_thread_init这个函数创建nthreads个工作者线程。
typedef struct {
pthread_t thread_id; /* unique ID of this thread */
struct event_base *base; /* libevent handle this thread uses */
struct event notify_event; /* listen event for notify pipe */
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
} LIBEVENT_THREAD;
void memcached_thread_init(int nthreads, struct event_base *main_base) {
...
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();
for (i = 0; i < nthreads; i++) {
int fds[2];
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}
/* Create threads after we've done all the libevent setup. */
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
...
}
上面把非核心的代码剔除掉,就可以看清memcached_thread_init所做的具体工作了。
(1)为每个线程创建LIBEVENT_THREAD结构体,并把自我分发线程的信息记录在dispatcher_thread中;
(2)对每个线程的结构体LIBEVENT_THREAD初始化,然后通过pipe创建匿名管道,pipefd[0]指向读端,而pipefd[1]指向写端,然后每个线程就通过这个匿名管道同其他线程进行通信;
static void setup_thread(LIBEVENT_THREAD *me) {
...
me->base = event_init();
/* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
event_add(&me->notify_event, 0);
me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->new_conn_queue);
me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*), NULL, NULL);
...
}
其中的setup_thread函数中,为每一个线程创建一个event_base,然后添加之前管道的读写时间侦听;同时每个线程还创建了一个等待队列,所有的新请求会添加到这个等待队列上面去。
在匿名管道的读写事件的相应函数thread_libevent_process上,会尝试读取一个字节,如果是上面写入的’c’,就表明有待处理的请求,然后就从等待队列new_conn_queue中取出一个item,然后处理。处理的方式就是确认这个连接,分配相应的资源,然后再丢到上面的那个event_handler->drive_machine的状态机中去!
(3)调用create_worker(worker_libevent, &threads[i]);实行真正创建线程操作,其内部就是一个pthread_create;
Memcached工作方式可以描述如下:
软件启动的时候,创建event_base,并且根据设置类型创建侦听的tcp/udp socket或者unix socket,然后为这些套接字创建读侦听事件,加入到event_base上,等待客户端连接;
创建工作线程池,每个工作线程创建自己的event_base;创建一个等待队列,新连接的客户请求都会挂在这个队列上;创建一个匿名管道,并为管道创建读写侦听事件;
当新的客户端连接上来有请求时候,主线程的侦听事件回调函数会被激活,条件满足后接受这个连接,然后选取一个工作线程,创建等待item挂到其队列上,然后向其管道写入一个c,对应线程管道读事件被激活,读取一个c,并从队列中取出一个请求处理;
Memcache对所有socket的处理都是event_handler->drive_machine中处理的。
可以说,memcached在线程池在等待连接和事件处理中都充分利用了Libevent的异步事件,所以效率是非常之高的。自己的那个线程池,主要是将所有的任务都放到一个链表队列中,当线程发现没有任务的时候,就会用pthread_cond_wait阻塞睡眠,当主线程发现等待的任务太多,就会用pthread_cond_signal唤醒睡眠线程(不会惊群)。总体会让人感觉,把任务事先分给各个队列,吞吐量要大一些,让任务阻塞在select、poll、epoll上,会比自己控制睡眠唤醒要高效可靠!
Vanilla社区发起⦗晨读计划⦘,每天坚持积累一点,今天的努力至少让我们比昨天更进一步。
⦗晨读计划⦘ 期待你的加入... ...
晨读计划
2016/05/25
以上是关于❲追根究底❳Memcached中Libevent和线程池使用初探的主要内容,如果未能解决你的问题,请参考以下文章
为 Windows 构建 memcached 是不是需要 libevent?
ubuntu下安装memcached和PHP的memcache扩展
启动Memcached报错:/usr/local/memcached/bin/memcached: error while loading shared libraries: libevent-2.1