Libevent源码分析--- bufferevent
Posted 子曰帅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Libevent源码分析--- bufferevent相关的知识,希望对你有一定的参考价值。
上一节说过,libevent提供六种bufferevent类型,后面会详细分析其中的两个:bufferevent_sock和bufferevent_async.下面是bufferevent的详细定义:
struct bufferevent
/** Event base for which this bufferevent was created. */
struct event_base *ev_base;
/** Pointer to a table of function pointers to set up how this
bufferevent behaves. */
const struct bufferevent_ops *be_ops;
/** A read event that triggers when a timeout has happened or a socket
is ready to read data. Only used by some subtypes of
bufferevent. */
struct event ev_read;
/** A write event that triggers when a timeout has happened or a socket
is ready to write data. Only used by some subtypes of
bufferevent. */
struct event ev_write;
/** An input buffer. Only the bufferevent is allowed to add data to
this buffer, though the user is allowed to drain it. */
struct evbuffer *input;
/** An input buffer. Only the bufferevent is allowed to drain data
from this buffer, though the user is allowed to add it. */
struct evbuffer *output;
struct event_watermark wm_read;
struct event_watermark wm_write;
bufferevent_data_cb readcb;
bufferevent_data_cb writecb;
/* This should be called 'eventcb', but renaming it would break
* backward compatibility */
bufferevent_event_cb errorcb;
void *cbarg;
struct timeval timeout_read;
struct timeval timeout_write;
/** Events that are currently enabled: currently EV_READ and EV_WRITE
are supported. */
short enabled;
;
其中ev_base指向bufferevent所属的event_base ,ev_read,ev_write是读写事件,timeout_read和timeout_write是读写超时时间,readcb,writecb和errorcb分别是读回调,写回调和事件回调,因为版本兼容问题使用errorcb命名。input和output是evbuffer类型的读写缓存。enabled表示支持的事件,目前只有EV_READ 和EV_WRITE。wm_read和wm_write代表读写缓存的伐值,be_ops是bufferevent_ops类型变量,它的定义如下:
struct bufferevent_ops
/** The name of the bufferevent's type. */
const char *type;
/** At what offset into the implementation type will we find a
bufferevent structure?
Example: if the type is implemented as
struct bufferevent_x
int extra_data;
struct bufferevent bev;
then mem_offset should be offsetof(struct bufferevent_x, bev)
*/
off_t mem_offset;
/** Enables one or more of EV_READ|EV_WRITE on a bufferevent. Does
not need to adjust the 'enabled' field. Returns 0 on success, -1
on failure.
*/
int (*enable)(struct bufferevent *, short);
/** Disables one or more of EV_READ|EV_WRITE on a bufferevent. Does
not need to adjust the 'enabled' field. Returns 0 on success, -1
on failure.
*/
int (*disable)(struct bufferevent *, short);
/** Free any storage and deallocate any extra data or structures used
in this implementation.
*/
void (*destruct)(struct bufferevent *);
/** Called when the timeouts on the bufferevent have changed.*/
int (*adj_timeouts)(struct bufferevent *);
/** Called to flush data. */
int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);
/** Called to access miscellaneous fields. */
int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
;
bufferevent_ops 的实现和eventop比较类似,都是定义了一组指针,不同的bufferevent实现可以自定义这些具体操作。比如bufferevent_sock的对应定义:
const struct bufferevent_ops bufferevent_ops_socket =
"socket",
evutil_offsetof(struct bufferevent_private, bev),
be_socket_enable,
be_socket_disable,
be_socket_destruct,
be_socket_adj_timeouts,
be_socket_flush,
be_socket_ctrl,
;
接下来分析一下buffevent_sock的实现方式。
struct bufferevent *
bufferevent_new(evutil_socket_t fd,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg)
struct bufferevent *bufev;
if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
return NULL;
bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);
return bufev;
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
int options)
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;
#ifdef WIN32
if (base && event_base_get_iocp(base))
return bufferevent_async_new(base, fd, options);
#endif
if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
return NULL;
if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
options) < 0)
mm_free(bufev_p);
return NULL;
bufev = &bufev_p->bev;
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
evbuffer_freeze(bufev->input, 0);
evbuffer_freeze(bufev->output, 1);
return bufev;
bufferevent_socket_new方法用于创建一个bufferevent_socket类型的bufferevent。如果是在win32环境下则会默认调用iocp的对应实现,即bufferevent_async_new。该方法还会创建一个bufferevent_private变量,下面是他的定义:
struct bufferevent_private
/** The underlying bufferevent structure. */
struct bufferevent bev;
/** Evbuffer callback to enforce watermarks on input. */
struct evbuffer_cb_entry *read_watermarks_cb;
/** If set, we should free the lock when we free the bufferevent. */
unsigned own_lock : 1;
/** Flag: set if we have deferred callbacks and a read callback is
* pending. */
unsigned readcb_pending : 1;
/** Flag: set if we have deferred callbacks and a write callback is
* pending. */
unsigned writecb_pending : 1;
/** Flag: set if we are currently busy connecting. */
unsigned connecting : 1;
/** Flag: set if a connect failed prematurely; this is a hack for
* getting around the bufferevent abstraction. */
unsigned connection_refused : 1;
/** Set to the events pending if we have deferred callbacks and
* an events callback is pending. */
short eventcb_pending;
/** If set, read is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
bufferevent_suspend_flags read_suspended;
/** If set, writing is suspended until one or more conditions are over.
* The actual value here is a bitfield of those conditions; see the
* BEV_SUSPEND_* flags above. */
bufferevent_suspend_flags write_suspended;
/** Set to the current socket errno if we have deferred callbacks and
* an events callback is pending. */
int errno_pending;
/** The DNS error code for bufferevent_socket_connect_hostname */
int dns_error;
/** Used to implement deferred callbacks */
struct deferred_cb deferred;
/** The options this bufferevent was constructed with */
enum bufferevent_options options;
/** Current reference count for this bufferevent. */
int refcnt;
/** Lock for this bufferevent. Shared by the inbuf and the outbuf.
* If NULL, locking is disabled. */
void *lock;
/** Rate-limiting information for this bufferevent */
struct bufferevent_rate_limit *rate_limiting;
;
bufferevent_private结构的第一个变量是bufferevent类型的,其实每一个bufferevent都对应一个bufferevent_private变量,只是对用户来说是透明的,用户只需要了解bufferevent即可。bufferevent_private的变量基本都是用于记录状态。
继续分析bufferevent_socket_new函数,创建bufferevent_private之后调用bufferevent_init_common函数,这个函数定义在bufferevent.c文件中,该文件中的函数都是各个bufferevent类型通用的函数。
int bufferevent_init_common(struct bufferevent_private *bufev_private,
struct event_base *base,
const struct bufferevent_ops *ops,
enum bufferevent_options options)
struct bufferevent *bufev = &bufev_private->bev;
if (!bufev->input)
if ((bufev->input = evbuffer_new()) == NULL)
return -1;
if (!bufev->output)
if ((bufev->output = evbuffer_new()) == NULL)
evbuffer_free(bufev->input);
return -1;
bufev_private->refcnt = 1;
bufev->ev_base = base;
/* Disable timeouts. */
evutil_timerclear(&bufev->timeout_read);
evutil_timerclear(&bufev->timeout_write);
bufev->be_ops = ops;
/*
* Set to EV_WRITE so that using bufferevent_write is going to
* trigger a callback. Reading needs to be explicitly enabled
* because otherwise no data will be available.
*/
bufev->enabled = EV_WRITE;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (options & BEV_OPT_THREADSAFE)
if (bufferevent_enable_locking(bufev, NULL) < 0)
/* cleanup */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
bufev->input = NULL;
bufev->output = NULL;
return -1;
#endif
if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
== BEV_OPT_UNLOCK_CALLBACKS)
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
return -1;
if (options & BEV_OPT_DEFER_CALLBACKS)
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
bufev_private->options = options;
evbuffer_set_parent(bufev->input, bufev);
evbuffer_set_parent(bufev->output, bufev);
return 0;
bufferevent_init_common比较简单,需要注意 bufferEvent默认开启EV_WRITE,EV_READ需要手动打开,如果需要支持多线程则为bufferevent创建锁,另外在目前的版本中,UNLOCK_CALLBACKS需要DEFER_CALLBACKS作为前置条件。
继续分析bufferevent_socket_new函数:
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
调用bufferevent_init_common之后会将bufev->output添加EVBUFFER_FLAG_DRAINS_TO_FD标记,这样output就支持从文件读取数据了,接下来设置读写事件回掉,最后添加bufferevent_socket_outbuf_cb作为output的call,当output中有数据时调用bufferevent_socket_outbuf_cb,这样就可以不必一直监听写事件,bufferevent_writecb在数据全部写出之后可以移除写事件, 然后在bufferevent_socket_outbuf_cb中判断是否需要添加事件。
初始化bufferevent之后需要调用be_socket_setfd设置网络套接字来监听读写事件:
static void
be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
BEV_LOCK(bufev);
EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
if (fd >= 0)
bufferevent_enable(bufev, bufev->enabled);
BEV_UNLOCK(bufev);
int bufferevent_enable(struct bufferevent *bufev, short event)
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
short impl_events = event;
int r = 0;
_bufferevent_incref_and_lock(bufev);
if (bufev_private->read_suspended)
impl_events &= ~EV_READ;
if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;
bufev->enabled |= event;
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1;
_bufferevent_decref_and_unlock(bufev);
return r;
be_socket_setfd中设置的套接字必须是已经建立好连接的socket。如果是client端,bufferevent_sock提供了bufferevent_socket_connect用于连接套接字,但是如果是server端,libevent没有提供listen的相关函数,不过libevent提供了listener.c文件可以用于监听连接。
int
bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *sa, int socklen)
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
evutil_socket_t fd;
int r = 0;
int result=-1;
int ownfd = 0;
_bufferevent_incref_and_lock(bev);
if (!bufev_p)
goto done;
fd = bufferevent_getfd(bev);
if (fd < 0)
if (!sa)
goto done;
fd = socket(sa->sa_family, SOCK_STREAM, 0);
if (fd < 0)
goto done;
if (evutil_make_socket_nonblocking(fd)<0)
goto done;
ownfd = 1;
if (sa)
#ifdef WIN32
if (bufferevent_async_can_connect(bev))
bufferevent_setfd(bev, fd);
r = bufferevent_async_connect(bev, fd, sa, socklen);
if (r < 0)
goto freesock;
bufev_p->connecting = 1;
result = 0;
goto done;
else
#endif
r = evutil_socket_connect(&fd, sa, socklen);
if (r < 0)
goto freesock;
#ifdef WIN32
/* ConnectEx() isn't always around, even when IOCP is enabled.
* Here, we borrow the socket object's write handler to fall back
* on a non-blocking connect() when ConnectEx() is unavailable. */
if (BEV_IS_ASYNC(bev))
event_assign(&bev->ev_write, bev->ev_base, fd,
EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
#endif
bufferevent_setfd(bev, fd);
if (r == 0)
if (! be_socket_enable(bev, EV_WRITE))
bufev_p->connecting = 1;
result = 0;
goto done;
else if (r == 1)
/* The connect succeeded already. How very BSD of it. */
result = 0;
bufev_p->connecting = 1;
event_active(&bev->ev_write, EV_WRITE, 1);
else
/* The connect failed already. How very BSD of it. */
bufev_p->connection_refused = 1;
bufev_p->connecting = 1;
result = 0;
event_active(&bev->ev_write, EV_WRITE, 1);
goto done;
freesock:
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
if (ownfd)
evutil_closesocket(fd);
/* do something about the error? */
done:
_bufferevent_decref_and_unlock(bev);
return result;
win32的connect使用iocp实现,将在下一节iocp章节详细分析,这里只看其他平台的实现方式。evutil_socket_connect是libevent提供的工具函数,它的返回值有三种情况:0代表正在连接,此时需要调用be_socket_enable激活写事件。当连接成功时会调用bufferevent_writecb回掉。1代表已经连接成功,此时调用event_active直接手动触发bufferevent_writecb事件处理连接。-1代表连接被拒绝,此时同样调用event_active处理连接失败情况。
下面是bufferevent_writecb的具体实现:
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
int res = 0;
short what = BEV_EVENT_WRITING;
int connected = 0;
ev_ssize_t atmost = -1;
_bufferevent_incref_and_lock(bufev);
if (event == EV_TIMEOUT)
/* Note that we only check for event==EV_TIMEOUT. If
* event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
* timeout, since a read has occurred */
what |= BEV_EVENT_TIMEOUT;
goto error;
if (bufev_p->connecting)
int c = evutil_socket_finished_connecting(fd);
/* we need to fake the error if the connection was refused
* immediately - usually connection to localhost on BSD */
if (bufev_p->connection_refused)
bufev_p->connection_refused = 0;
c = -1;
if (c == 0)
goto done;
bufev_p->connecting = 0;
if (c < 0)
event_del(&bufev->ev_write);
event_del(&bufev->ev_read);
_bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
goto done;
else
connected = 1;
#ifdef WIN32
if (BEV_IS_ASYNC(bufev))
event_del(&bufev->ev_write);
bufferevent_async_set_connected(bufev);
_bufferevent_run_eventcb(bufev,
BEV_EVENT_CONNECTED);
goto done;
#endif
_bufferevent_run_eventcb(bufev,
BEV_EVENT_CONNECTED);
if (!(bufev->enabled & EV_WRITE) ||
bufev_p->write_suspended)
event_del(&bufev->ev_write);
goto done;
atmost = _bufferevent_get_write_max(bufev_p);
if (bufev_p->write_suspended)
goto done;
if (evbuffer_get_length(bufev->output))
evbuffer_unfreeze(bufev->output, 1);
res = evbuffer_write_atmost(bufev->output, fd, atmost);
evbuffer_freeze(bufev->output, 1);
if (res == -1)
int err = evutil_socket_geterror(fd);
if (EVUTIL_ERR_RW_RETRIABLE(err))
goto reschedule;
what |= BEV_EVENT_ERROR;
else if (res == 0)
/* eof case
XXXX Actually, a 0 on write doesn't indicate
an EOF. An ECONNRESET might be more typical.
*/
what |= BEV_EVENT_EOF;
if (res <= 0)
goto error;
_bufferevent_decrement_write_buckets(bufev_p, res);
if (evbuffer_get_length(bufev->output) == 0)
event_del(&bufev->ev_write);
/*
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if ((res || !connected) &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low)
_bufferevent_run_writecb(bufev);
goto done;
reschedule:
if (evbuffer_get_length(bufev->output) == 0)
event_del(&bufev->ev_write);
goto done;
error:
bufferevent_disable(bufev, EV_WRITE);
_bufferevent_run_eventcb(bufev, what);
done:
_bufferevent_decref_and_unlock(bufev);
bufferevent_writecb需要处理两件事,一个是连接逻辑,一个是发送数据逻辑,注意当处理连接逻辑时,如果当前没有激活EV_WRITE或者处于write_suspended状态,需要删除写监听,因为当前的写监听事件只用于处理连接。另外当处理数据逻辑时如果output的数据长度为0,同样需要删除写事件,因为output的callback在有数据之后会重新添加写事件:
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
struct bufferevent *bufev = arg;
struct bufferevent_private *bufev_p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (cbinfo->n_added &&
(bufev->enabled & EV_WRITE) &&
!event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
!bufev_p->write_suspended)
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1)
/* Should we log this? */
bufferevent_writecb和bufferevent_readcb还需要调用用户自定义的读写事件:
void
_bufferevent_run_writecb(struct bufferevent *bufev)
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->writecb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS)
p->writecb_pending = 1;
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
else
bufev->writecb(bufev, bufev->cbarg);
#define SCHEDULE_DEFERRED(bevp) \\
do \\
bufferevent_incref(&(bevp)->bev); \\
event_deferred_cb_schedule( \\
event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \\
&(bevp)->deferred); \\
while (0)
如果bufferevent的选项包含BEV_OPT_DEFER_CALLBACKS则需要使用延迟调用方式,延迟调用是在event_base的loop循环统一处理的。p->deferred.queued如果没有被设置过,则加入到ev_base的延迟调用队列。bufferevent_init_common中设置过defered对应的回掉:
if (options & BEV_OPT_DEFER_CALLBACKS)
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
这两个回掉只是使用锁的方式不同:
static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev;
BEV_LOCK(bufev);
if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
bufev->errorcb)
/* The "connected" happened before any reads or writes, so
send it first. */
bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
if (bufev_private->readcb_pending && bufev->readcb)
bufev_private->readcb_pending = 0;
bufev->readcb(bufev, bufev->cbarg);
if (bufev_private->writecb_pending && bufev->writecb)
bufev_private->writecb_pending = 0;
bufev->writecb(bufev, bufev->cbarg);
if (bufev_private->eventcb_pending && bufev->errorcb)
short what = bufev_private->eventcb_pending;
int err = bufev_private->errno_pending;
bufev_private->eventcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
bufev->errorcb(bufev, what, bufev->cbarg);
_bufferevent_decref_and_unlock(bufev);
static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev;
BEV_LOCK(bufev);
#define UNLOCKED(stmt) \\
do BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); while(0)
if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
bufev->errorcb)
/* The "connected" happened before any reads or writes, so
send it first. */
bufferevent_event_cb errorcb = bufev->errorcb;
void *cbarg = bufev->cbarg;
bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
if (bufev_private->readcb_pending && bufev->readcb)
bufferevent_data_cb readcb = bufev->readcb;
void *cbarg = bufev->cbarg;
bufev_private->readcb_pending = 0;
UNLOCKED(readcb(bufev, cbarg));
if (bufev_private->writecb_pending && bufev->writecb)
bufferevent_data_cb writecb = bufev->writecb;
void *cbarg = bufev->cbarg;
bufev_private->writecb_pending = 0;
UNLOCKED(writecb(bufev, cbarg));
if (bufev_private->eventcb_pending && bufev->errorcb)
bufferevent_event_cb errorcb = bufev->errorcb;
void *cbarg = bufev->cbarg;
short what = bufev_private->eventcb_pending;
int err = bufev_private->errno_pending;
bufev_private->eventcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
UNLOCKED(errorcb(bufev,what,cbarg));
_bufferevent_decref_and_unlock(bufev);
#undef UNLOCKED
以上就是bufferevent_sock的实现方式,下一节我们将分析IOCP的实现方式。
以上是关于Libevent源码分析--- bufferevent的主要内容,如果未能解决你的问题,请参考以下文章