Libevent源码分析--- evbuffer的基本操作
Posted 子曰帅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Libevent源码分析--- evbuffer的基本操作相关的知识,希望对你有一定的参考价值。
之前几节分析了libevent底层的结构和运行机制,接下来的几节将会分析Bufferevents,Bufferevents在event的基础上加入了数据缓存逻辑,使得事件和数据结合在一起。libevent的bufferevent有六种类型,分别是:bufferevent_async,bufferevent_filter,bufferevent_openssl,bufferevent_pair,bufferevent_ratelim和bufferevent_sock。其中最常用的是bufferevent_sock类型。
evbuffer
每一个bufferevent 都有两个evbuffer 分被作为读写缓存,evbuffer 处理真正的数据,下面是evbuffer的定义:
struct evbuffer
/** The first chain in this buffer's linked list of chains. */
struct evbuffer_chain *first;
/** The last chain in this buffer's linked list of chains. */
struct evbuffer_chain *last;
/** Pointer to the next pointer pointing at the 'last_with_data' chain.
*
* To unpack:
*
* The last_with_data chain is the last chain that has any data in it.
* If all chains in the buffer are empty, it is the first chain.
* If the buffer has no chains, it is NULL.
*
* The last_with_datap pointer points at _whatever 'next' pointer_
* points at the last_with_datap chain. If the last_with_data chain
* is the first chain, or it is NULL, then the last_with_datap pointer
* is &buf->first.
*/
struct evbuffer_chain **last_with_datap;
/** Total amount of bytes stored in all chains.*/
size_t total_len;
/** Number of bytes we have added to the buffer since we last tried to
* invoke callbacks. */
size_t n_add_for_cb;
/** Number of bytes we have removed from the buffer since we last
* tried to invoke callbacks. */
size_t n_del_for_cb;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
/** A lock used to mediate access to this buffer. */
void *lock;
#endif
/** True iff we should free the lock field when we free this
* evbuffer. */
unsigned own_lock : 1;
/** True iff we should not allow changes to the front of the buffer
* (drains or prepends). */
unsigned freeze_start : 1;
/** True iff we should not allow changes to the end of the buffer
* (appends) */
unsigned freeze_end : 1;
/** True iff this evbuffer's callbacks are not invoked immediately
* upon a change in the buffer, but instead are deferred to be invoked
* from the event_base's loop. Useful for preventing enormous stack
* overflows when we have mutually recursive callbacks, and for
* serializing callbacks in a single thread. */
unsigned deferred_cbs : 1;
#ifdef WIN32
/** True iff this buffer is set up for overlapped IO. */
unsigned is_overlapped : 1;
#endif
/** Zero or more EVBUFFER_FLAG_* bits */
ev_uint32_t flags;
/** Used to implement deferred callbacks. */
struct deferred_cb_queue *cb_queue;
/** A reference count on this evbuffer. When the reference count
* reaches 0, the buffer is destroyed. Manipulated with
* evbuffer_incref and evbuffer_decref_and_unlock and
* evbuffer_free. */
int refcnt;
/** A deferred_cb handle to make all of this buffer's callbacks
* invoked from the event loop. */
struct deferred_cb deferred;
/** A doubly-linked-list of callback functions */
TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
/** The parent bufferevent object this evbuffer belongs to.
* NULL if the evbuffer stands alone. */
struct bufferevent *parent;
;
evbuffer包含一个evbuffer_chain链表,数据都是存储在各个evbuffer_chain中,evbuffer有三个指针:first,last和last_with_datap,分别指向evbuffer_chain列表的第一个元素,最后一个元素,以及有数据的最后一个元素;total_len表示数据的总长度,n_add_for_cb和n_del_for_cb记录回调之间数据的变化;lock和own_lock用于锁相关;freeze_start和freeze_end用于标记首尾evbuffer_chain是否锁定;deferred_cbs用于标记是否使用延迟回调;is_overlapped标记是否使用iocp;flag用于设置状态,目前可以设置的值只有0和EVBUFFER_FLAG_DRAINS_TO_FD,后者标记是否用于bufferevent_sock类型。cb_queue指向event_base中的defer_queue,deferred作为一个延迟回调会加入到的defer_queue中,deferred触发后会调用所有的callbacks,最后parent变量设置evbuffer对应的bufferEvent。
下面是evbuffer_chain的定义:
struct evbuffer_chain
/** points to next buffer in the chain */
struct evbuffer_chain *next;
/** total allocation available in the buffer field. */
size_t buffer_len;
/** unused space at the beginning of buffer or an offset into a
* file for sendfile buffers. */
ev_misalign_t misalign;
/** Offset into buffer + misalign at which to start writing.
* In other words, the total number of bytes actually stored
* in buffer. */
size_t off;
/** Set if special handling is required for this chain */
unsigned flags;
#define EVBUFFER_MMAP 0x0001 /**< memory in buffer is mmaped */
#define EVBUFFER_SENDFILE 0x0002 /**< a chain used for sendfile */
#define EVBUFFER_REFERENCE 0x0004 /**< a chain with a mem reference */
#define EVBUFFER_IMMUTABLE 0x0008 /**< read-only chain */
/** a chain that mustn't be reallocated or freed, or have its contents
* memmoved, until the chain is un-pinned. */
#define EVBUFFER_MEM_PINNED_R 0x0010
#define EVBUFFER_MEM_PINNED_W 0x0020
#define EVBUFFER_MEM_PINNED_ANY (EVBUFFER_MEM_PINNED_R|EVBUFFER_MEM_PINNED_W)
/** a chain that should be freed, but can't be freed until it is
* un-pinned. */
#define EVBUFFER_DANGLING 0x0040
/** Usually points to the read-write memory belonging to this
* buffer allocated as part of the evbuffer_chain allocation.
* For mmap, this can be a read-only buffer and
* EVBUFFER_IMMUTABLE will be set in flags. For sendfile, it
* may point to NULL.
*/
unsigned char *buffer;
;
evbuffer_chain比较简单,next用于链表,buffer_len代表evbuffer_chain的总长度,misalign标记偏移,off记录当前数据的长度,flags是一个状态标记位,接下来的一组宏定义给出了flags可以设置的状态,最后的buffer指向了数据缓存。
evbuffer的相关操作非常多,初次接触可能比较乱,下面是他比较重要的几个方法:
evbuffer_chain_new & evbuffer_chain_free
static struct evbuffer_chain * evbuffer_chain_new(size_t size)
struct evbuffer_chain *chain;
size_t to_alloc;
size += EVBUFFER_CHAIN_SIZE;
/* get the next largest memory that can hold the buffer */
to_alloc = MIN_BUFFER_SIZE;
while (to_alloc < size)
to_alloc <<= 1;
/* we get everything in one chunk */
if ((chain = mm_malloc(to_alloc)) == NULL)
return (NULL);
memset(chain, 0, EVBUFFER_CHAIN_SIZE);
chain->buffer_len = to_alloc - EVBUFFER_CHAIN_SIZE;
/* this way we can manipulate the buffer to different addresses,
* which is required for mmap for example.
*/
chain->buffer = EVBUFFER_CHAIN_EXTRA(u_char, chain);
return (chain);
static inline void evbuffer_chain_free(struct evbuffer_chain *chain)
if (CHAIN_PINNED(chain))
chain->flags |= EVBUFFER_DANGLING;
return;
if (chain->flags & (EVBUFFER_MMAP|EVBUFFER_SENDFILE|
EVBUFFER_REFERENCE))
if (chain->flags & EVBUFFER_REFERENCE)
struct evbuffer_chain_reference *info =
EVBUFFER_CHAIN_EXTRA(
struct evbuffer_chain_reference,
chain);
if (info->cleanupfn)
(*info->cleanupfn)(chain->buffer,
chain->buffer_len,
info->extra);
#ifdef _EVENT_HAVE_MMAP
if (chain->flags & EVBUFFER_MMAP)
struct evbuffer_chain_fd *info =
EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd,
chain);
if (munmap(chain->buffer, chain->buffer_len) == -1)
event_warn("%s: munmap failed", __func__);
if (close(info->fd) == -1)
event_warn("%s: close(%d) failed",
__func__, info->fd);
#endif
#ifdef USE_SENDFILE
if (chain->flags & EVBUFFER_SENDFILE)
struct evbuffer_chain_fd *info =
EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd,
chain);
if (close(info->fd) == -1)
event_warn("%s: close(%d) failed",
__func__, info->fd);
#endif
mm_free(chain);
这里注意的是调用free方法时flag可能有三个状态,EVBUFFER_REFERENCE代表这是一个引用类型的evbuffer_chain,这个标记在evbuffer_add_reference中设置:
int evbuffer_add_reference(struct evbuffer *outbuf,
const void *data, size_t datlen,
evbuffer_ref_cleanup_cb cleanupfn, void *extra)
struct evbuffer_chain *chain;
struct evbuffer_chain_reference *info;
int result = -1;
// evbuffer_chain_new方法申请了evbuffer_chain_reference大小的空间,真正的数据是传入的data
chain = evbuffer_chain_new(sizeof(struct evbuffer_chain_reference));
if (!chain)
return (-1);
chain->flags |= EVBUFFER_REFERENCE | EVBUFFER_IMMUTABLE;
chain->buffer = (u_char *)data;
chain->buffer_len = datlen;
chain->off = datlen;
info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_reference, chain);
info->cleanupfn = cleanupfn;
info->extra = extra;
EVBUFFER_LOCK(outbuf);
if (outbuf->freeze_end)
/* don't call chain_free; we do not want to actually invoke
* the cleanup function */
mm_free(chain);
goto done;
evbuffer_chain_insert(outbuf, chain);
outbuf->n_add_for_cb += datlen;
evbuffer_invoke_callbacks(outbuf);
result = 0;
done:
EVBUFFER_UNLOCK(outbuf);
return result;
当使用evbuffer_add_reference时,evbuffer_chain指向一块已经存在的内存,evbuffer_chain只需要申请一个evbuffer_chain_reference大小的变量涌来保存cleanupfn和extra参数即可。EVBUFFER_MMAP主要用与mmap操作,EVBUFFER_SENDFILE则直接用于发送文件,这两个参数后面还会详细分析。
evbuffer_add & evbuffer_remove
evbuffer作为读写缓存,它的操作基本都是成对出现的,比如下面的一组add和remove:
int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
struct evbuffer_chain *chain, *tmp;
const unsigned char *data = data_in;
size_t remain, to_alloc;
int result = -1;
EVBUFFER_LOCK(buf);
if (buf->freeze_end)
goto done;
chain = buf->last;
/* If there are no chains allocated for this buffer, allocate one
* big enough to hold all the data. */
if (chain == NULL)
chain = evbuffer_chain_new(datlen);
if (!chain)
goto done;
evbuffer_chain_insert(buf, chain);
if ((chain->flags & EVBUFFER_IMMUTABLE) == 0)
remain = (size_t)(chain->buffer_len - chain->misalign - chain->off);
if (remain >= datlen)
/* there's enough space to hold all the data in the
* current last chain */
memcpy(chain->buffer + chain->misalign + chain->off,
data, datlen);
chain->off += datlen;
buf->total_len += datlen;
buf->n_add_for_cb += datlen;
goto out;
else if (!CHAIN_PINNED(chain) &&
evbuffer_chain_should_realign(chain, datlen))
/* we can fit the data into the misalignment */
evbuffer_chain_align(chain);
memcpy(chain->buffer + chain->off, data, datlen);
chain->off += datlen;
buf->total_len += datlen;
buf->n_add_for_cb += datlen;
goto out;
else
/* we cannot write any data to the last chain */
remain = 0;
/* we need to add another chain */
to_alloc = chain->buffer_len;
if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2)
to_alloc <<= 1;
if (datlen > to_alloc)
to_alloc = datlen;
tmp = evbuffer_chain_new(to_alloc);
if (tmp == NULL)
goto done;
if (remain)
memcpy(chain->buffer + chain->misalign + chain->off,
data, remain);
chain->off += remain;
buf->total_len += remain;
buf->n_add_for_cb += remain;
data += remain;
datlen -= remain;
memcpy(tmp->buffer, data, datlen);
tmp->off = datlen;
evbuffer_chain_insert(buf, tmp);
buf->n_add_for_cb += datlen;
out:
evbuffer_invoke_callbacks(buf);
result = 0;
done:
EVBUFFER_UNLOCK(buf);
return result;
/** Helper: realigns the memory in chain->buffer so that misalign is 0. */
static void
evbuffer_chain_align(struct evbuffer_chain *chain)
EVUTIL_ASSERT(!(chain->flags & EVBUFFER_IMMUTABLE));
EVUTIL_ASSERT(!(chain->flags & EVBUFFER_MEM_PINNED_ANY));
memmove(chain->buffer, chain->buffer + chain->misalign, chain->off);
chain->misalign = 0;
#define MAX_TO_COPY_IN_EXPAND 4096
#define MAX_TO_REALIGN_IN_EXPAND 2048
/** Helper: return true iff we should realign chain to fit datalen bytes of
data in it. */
static int
evbuffer_chain_should_realign(struct evbuffer_chain *chain,
size_t datlen)
return chain->buffer_len - chain->off >= datlen &&
(chain->off < chain->buffer_len / 2) &&
(chain->off <= MAX_TO_REALIGN_IN_EXPAND);
evbuffer_add比较简单,需要注意的是调用evbuffer_add要确保last指向的即使最后一块有数据的chain,另外 evbuffer非常注重效率和空间利用率的平衡,evbuffer_chain_should_realign就是判断当前的chain是否需要移动的,只有满足条件移动数据才有意义。evbuffer_chain_should_realign的判定条件移动后的容量足够datlen长度,并且当前数据长度不超过总长度的1/2并且小于MAX_TO_REALIGN_IN_EXPAND。下面是remove函数:
/* Reads data from an event buffer and drains the bytes read */
int
evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
ev_ssize_t n;
EVBUFFER_LOCK(buf);
n = evbuffer_copyout(buf, data_out, datlen);
if (n > 0)
if (evbuffer_drain(buf, n)<0)
n = -1;
EVBUFFER_UNLOCK(buf);
return (int)n;
ev_ssize_t
evbuffer_copyout(struct evbuffer *buf, void *data_out, size_t datlen)
/*XXX fails badly on sendfile case. */
struct evbuffer_chain *chain;
char *data = data_out;
size_t nread;
ev_ssize_t result = 0;
EVBUFFER_LOCK(buf);
chain = buf->first;
if (datlen >= buf->total_len)
datlen = buf->total_len;
if (datlen == 0)
goto done;
if (buf->freeze_start)
result = -1;
goto done;
nread = datlen;
while (datlen && datlen >= chain->off)
memcpy(data, chain->buffer + chain->misalign, chain->off);
data += chain->off;
datlen -= chain->off;
chain = chain->next;
EVUTIL_ASSERT(chain || datlen==0);
if (datlen)
EVUTIL_ASSERT(chain);
memcpy(data, chain->buffer + chain->misalign, datlen);
result = nread;
done:
EVBUFFER_UNLOCK(buf);
return result;
evbuffer_remove调用两个函数,evbuffer_copyout和evbuffer_drain,前者主要是从evbuffer中拷贝出datlen大小的数据到data_out中,后者从evbuffer中移除指定大小的数据。evbuffer_drain时evbuffer中比较常用的函数之一:
int evbuffer_drain(struct evbuffer *buf, size_t len)
struct evbuffer_chain *chain, *next;
size_t remaining, old_len;
int result = 0;
EVBUFFER_LOCK(buf);
old_len = buf->total_len;
if (old_len == 0)
goto done;
if (buf->freeze_start)
result = -1;
goto done;
if (len >= old_len && !HAS_PINNED_R(buf))
len = old_len;
for (chain = buf->first; chain != NULL; chain = next)
next = chain->next;
evbuffer_chain_free(chain);
ZERO_CHAIN(buf);
else
if (len >= old_len)
len = old_len;
buf->total_len -= len;
remaining = len;
for (chain = buf->first;
remaining >= chain->off;
chain = next)
next = chain->next;
remaining -= chain->off;
if (chain == *buf->last_with_datap)
buf->last_with_datap = &buf->first;
if (&chain->next == buf->last_with_datap)
buf->last_with_datap = &buf->first;
if (CHAIN_PINNED_R(chain))
EVUTIL_ASSERT(remaining == 0);
chain->misalign += chain->off;
chain->off = 0;
break;
else
evbuffer_chain_free(chain);
buf->first = chain;
if (chain)
chain->misalign += remaining;
chain->off -= remaining;
buf->n_del_for_cb += len;
/* Tell someone about changes in this buffer */
evbuffer_invoke_callbacks(buf);
done:
EVBUFFER_UNLOCK(buf);
return result;
EVBUFFER_MEM_PINNED_R标记主要用于iocp,使用方式也将在iocp的章节详细分析。
evbuffer_read & evbuffer_write
看完add和remove,接下来的一队时read和write,这两个函数主要用于套接字的读写:
int evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
struct evbuffer_chain **chainp;
int n;
int result;
#ifdef USE_IOVEC_IMPL
int nvecs, i, remaining;
#else
struct evbuffer_chain *chain;
unsigned char *p;
#endif
EVBUFFER_LOCK(buf);
if (buf->freeze_end)
result = -1;
goto done;
n = get_n_bytes_readable_on_socket(fd);
if (n <= 0 || n > EVBUFFER_MAX_READ)
n = EVBUFFER_MAX_READ;
if (howmuch < 0 || howmuch > n)
howmuch = n;
#ifdef USE_IOVEC_IMPL
/* Since we can use iovecs, we're willing to use the last
* NUM_READ_IOVEC chains. */
if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1)
result = -1;
goto done;
else
IOV_TYPE vecs[NUM_READ_IOVEC];
#ifdef _EVBUFFER_IOVEC_IS_NATIVE
nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
NUM_READ_IOVEC, &chainp, 1);
#else
/* We aren't using the native struct iovec. Therefore,
we are on win32. */
struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,
&chainp, 1);
for (i=0; i < nvecs; ++i)
WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
#endif
#ifdef WIN32
DWORD bytesRead;
DWORD flags=0;
if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL))
/* The read failed. It might be a close,
* or it might be an error. */
if (WSAGetLastError() == WSAECONNABORTED)
n = 0;
else
n = -1;
else
n = bytesRead;
#else
n = readv(fd, vecs, nvecs);
#endif
#else /*!USE_IOVEC_IMPL*/
/* If we don't have FIONREAD, we might waste some space here */
/* XXX we _will_ waste some space here if there is any space left
* over on buf->last. */
if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL)
result = -1;
goto done;
/* We can append new data at this point */
p = chain->buffer + chain->misalign + chain->off;
#ifndef WIN32
n = read(fd, p, howmuch);
#else
n = recv(fd, p, howmuch, 0);
#endif
#endif /* USE_IOVEC_IMPL */
if (n == -1)
result = -1;
goto done;
if (n == 0)
result = 0;
goto done;
#ifdef USE_IOVEC_IMPL
remaining = n;
for (i=0; i < nvecs; ++i)
ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);
if (space < remaining)
(*chainp)->off += space;
remaining -= (int)space;
else
(*chainp)->off += remaining;
buf->last_with_datap = chainp;
break;
chainp = &(*chainp)->next;
#else
chain->off += n;
advance_last_with_data(buf);
#endif
buf->total_len += n;
buf->n_add_for_cb += n;
/* Tell someone about changes in this buffer */
evbuffer_invoke_callbacks(buf);
result = n;
done:
EVBUFFER_UNLOCK(buf);
return result;
evbuffer_read函数受限通过get_n_bytes_readable_on_socket获取当前最大的可读入数据,然后根据USE_IOVEC_IMPL宏定义判定是否使用IOVEC方式发送。IOVEC方式可以一次发送多段数据,和evbuffer中的chain搭配使用可以减少系统调用次数,提高效率。_evbuffer_expand_fast函数扩充evbuffer的chain,保证尾部的n(或者小于n)个chain有至少datlen大小的空余空间可供写入数据。
int _evbuffer_expand_fast(struct evbuffer *buf, size_t datlen, int n)
struct evbuffer_chain *chain = buf->last, *tmp, *next;
size_t avail;
int used;
ASSERT_EVBUFFER_LOCKED(buf);
EVUTIL_ASSERT(n >= 2);
if (chain == NULL || (chain->flags & EVBUFFER_IMMUTABLE))
/* There is no last chunk, or we can't touch the last chunk.
* Just add a new chunk. */
chain = evbuffer_chain_new(datlen);
if (chain == NULL)
return (-1);
evbuffer_chain_insert(buf, chain);
return (0);
used = 0; /* number of chains we're using space in. */
avail = 0; /* how much space they have. */
/* How many bytes can we stick at the end of buffer as it is? Iterate
* over the chains at the end of the buffer, tring to see how much
* space we have in the first n. */
for (chain = *buf->last_with_datap; chain; chain = chain->next)
if (chain->off)
size_t space = (size_t) CHAIN_SPACE_LEN(chain);
EVUTIL_ASSERT(chain == *buf->last_with_datap);
if (space)
avail += space;
++used;
else
/* No data in chain; realign it. */
chain->misalign = 0;
avail += chain->buffer_len;
++used;
if (avail >= datlen)
/* There is already enough space. Just return */
return (0);
if (used == n)
break;
/* There wasn't enough space in the first n chains with space in
* them. Either add a new chain with enough space, or replace all
* empty chains with one that has enough space, depending on n. */
if (used < n)
/* The loop ran off the end of the chains before it hit n
* chains; we can add another. */
EVUTIL_ASSERT(chain == NULL);
tmp = evbuffer_chain_new(datlen - avail);
if (tmp == NULL)
return (-1);
buf->last->next = tmp;
buf->last = tmp;
/* (we would only set last_with_data if we added the first
* chain. But if the buffer had no chains, we would have
* just allocated a new chain earlier) */
return (0);
else
/* Nuke _all_ the empty chains. */
int rmv_all = 0; /* True iff we removed last_with_data. */
chain = *buf->last_with_datap;
if (!chain->off)
EVUTIL_ASSERT(chain == buf->first);
rmv_all = 1;
avail = 0;
else
avail = (size_t) CHAIN_SPACE_LEN(chain);
chain = chain->next;
for (; chain; chain = next)
next = chain->next;
EVUTIL_ASSERT(chain->off == 0);
evbuffer_chain_free(chain);
tmp = evbuffer_chain_new(datlen - avail);
if (tmp == NULL)
if (rmv_all)
ZERO_CHAIN(buf);
else
buf->last = *buf->last_with_datap;
(*buf->last_with_datap)->next = NULL;
return (-1);
if (rmv_all)
buf->first = buf->last = tmp;
buf->last_with_datap = &buf->first;
else
(*buf->last_with_datap)->next = tmp;
buf->last = tmp;
return (0);
_evbuffer_read_setup_vecs函数用于填充evbuffer_iovec结构体,之后就可以通过readv或者WSARecv读取数据了。如果不是使用IOVEC则需要调用evbuffer_expand_singlechain和_evbuffer_expand_fast作用相似,
/* Expands the available space in the event buffer to at least datlen, all in
* a single chunk. Return that chunk. */
static struct evbuffer_chain *
evbuffer_expand_singlechain(struct evbuffer *buf, size_t datlen)
struct evbuffer_chain *chain, **chainp;
struct evbuffer_chain *result = NULL;
ASSERT_EVBUFFER_LOCKED(buf);
chainp = buf->last_with_datap;
/* XXX If *chainp is no longer writeable, but has enough space in its
* misalign, this might be a bad idea: we could still use *chainp, not
* (*chainp)->next. */
if (*chainp && CHAIN_SPACE_LEN(*chainp) == 0)
chainp = &(*chainp)->next;
/* 'chain' now points to the first chain with writable space (if any)
* We will either use it, realign it, replace it, or resize it. */
chain = *chainp;
if (chain == NULL ||
(chain->flags & (EVBUFFER_IMMUTABLE|EVBUFFER_MEM_PINNED_ANY)))
/* We can't use the last_with_data chain at all. Just add a
* new one that's big enough. */
goto insert_new;
/* If we can fit all the data, then we don't have to do anything */
if (CHAIN_SPACE_LEN(chain) >= datlen)
result = chain;
goto ok;
/* If the chain is completely empty, just replace it by adding a new
* empty chain. */
if (chain->off == 0)
goto insert_new;
/* If the misalignment plus the remaining space fulfills our data
* needs, we could just force an alignment to happen. Afterwards, we
* have enough space. But only do this if we're saving a lot of space
* and not moving too much data. Otherwise the space savings are
* probably offset by the time lost in copying.
*/
if (evbuffer_chain_should_realign(chain, datlen))
evbuffer_chain_align(chain);
result = chain;
goto ok;
/* At this point, we can either resize the last chunk with space in
* it, use the next chunk after it, or If we add a new chunk, we waste
* CHAIN_SPACE_LEN(chain) bytes in the former last chunk. If we
* resize, we have to copy chain->off bytes.
*/
/* Would expanding this chunk be affordable and worthwhile? */
if (CHAIN_SPACE_LEN(chain) < chain->buffer_len / 8 ||
chain->off > MAX_TO_COPY_IN_EXPAND)
/* It's not worth resizing this chain. Can the next one be
* used? */
if (chain->next && CHAIN_SPACE_LEN(chain->next) >= datlen)
/* Yes, we can just use the next chain (which should
* be empty. */
result = chain->next;
goto ok;
else
/* No; append a new chain (which will free all
* terminal empty chains.) */
goto insert_new;
else
/* Okay, we're going to try to resize this chain: Not doing so
* would waste at least 1/8 of its current allocation, and we
* can do so without having to copy more than
* MAX_TO_COPY_IN_EXPAND bytes. */
/* figure out how much space we need */
size_t length = chain->off + datlen;
struct evbuffer_chain *tmp = evbuffer_chain_new(length);
if (tmp == NULL)
goto err;
/* copy the data over that we had so far */
tmp->off = chain->off;
memcpy(tmp->buffer, chain->buffer + chain->misalign,
chain->off);
/* fix up the list */
EVUTIL_ASSERT(*chainp == chain);
result = *chainp = tmp;
if (buf->last == chain)
buf->last = tmp;
tmp->next = chain->next;
evbuffer_chain_free(chain);
goto ok;
insert_new:
result = evbuffer_chain_insert_new(buf, datlen);
if (!result)
goto err;
ok:
EVUTIL_ASSERT(result);
EVUTIL_ASSERT(CHAIN_SPACE_LEN(result) >= datlen);
err:
return result;
evbuffer_expand_singlechain会调用evbuffer_chain_should_realign来查看是否可以在把last_with_data指向的chain通过移动来扩展空间,如果不可以则看是否需要把last_with_data指向的chain移动到新的chain中,判断依据是该chain的空间利用率不足八分之一并且总的长度小于MAX_TO_COPY_IN_EXPAND。
看完read,接下来分析write:
int
evbuffer_write(struct evbuffer *buffer, evutil_socket_t fd)
return evbuffer_write_atmost(buffer, fd, -1);
int
evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
ev_ssize_t howmuch)
int n = -1;
EVBUFFER_LOCK(buffer);
if (buffer->freeze_start)
goto done;
if (howmuch < 0 || (size_t)howmuch > buffer->total_len)
howmuch = buffer->total_len;
if (howmuch > 0)
#ifdef USE_SENDFILE
struct evbuffer_chain *chain = buffer->first;
if (chain != NULL && (chain->flags & EVBUFFER_SENDFILE))
n = evbuffer_write_sendfile(buffer, fd, howmuch);
else
#endif
#ifdef USE_IOVEC_IMPL
n = evbuffer_write_iovec(buffer, fd, howmuch);
#elif defined(WIN32)
/* XXX(nickm) Don't disable this code until we know if
* the WSARecv code above works. */
void *p = evbuffer_pullup(buffer, howmuch);
n = send(fd, p, howmuch, 0);
#else
void *p = evbuffer_pullup(buffer, howmuch);
n = write(fd, p, howmuch);
#endif
#ifdef USE_SENDFILE
#endif
if (n > 0)
evbuffer_drain(buffer, n);
done:
EVBUFFER_UNLOCK(buffer);
return (n);
evbuffer_write_atmost函数有三种情况,第一种是chain->flags 有 EVBUFFER_SENDFILE标记时,调用evbuffer_write_sendfile方法:
#ifdef USE_SENDFILE
static inline int
evbuffer_write_sendfile(struct evbuffer *buffer, evutil_socket_t fd,
ev_ssize_t howmuch)
struct evbuffer_chain *chain = buffer->first;
struct evbuffer_chain_fd *info =
EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain);
#if defined(SENDFILE_IS_MACOSX) || defined(SENDFILE_IS_FREEBSD)
int res;
off_t len = chain->off;
#elif defined(SENDFILE_IS_LINUX) || defined(SENDFILE_IS_SOLARIS)
ev_ssize_t res;
off_t offset = chain->misalign;
#endif
ASSERT_EVBUFFER_LOCKED(buffer);
#if defined(SENDFILE_IS_MACOSX)
res = sendfile(info->fd, fd, chain->misalign, &len, NULL, 0);
if (res == -1 && !EVUTIL_ERR_RW_RETRIABLE(errno))
return (-1);
return (len);
#elif defined(SENDFILE_IS_FREEBSD)
res = sendfile(info->fd, fd, chain->misalign, chain->off, NULL, &len, 0);
if (res == -1 && !EVUTIL_ERR_RW_RETRIABLE(errno))
return (-1);
return (len);
#elif defined(SENDFILE_IS_LINUX)
/* TODO(niels): implement splice */
res = sendfile(fd, info->fd, &offset, chain->off);
if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno))
/* if this is EAGAIN or EINTR return 0; otherwise, -1 */
return (0);
return (res);
#elif defined(SENDFILE_IS_SOLARIS)
const off_t offset_orig = offset;
res = sendfile(fd, info->fd, &offset, chain->off);
if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno))
if (offset - offset_orig)
return offset - offset_orig;
/* if this is EAGAIN or EINTR and no bytes were
* written, return 0 */
return (0);
return (res);
#endif
#endif
该函数主要是在支持sendfile的系统上直接发送文件内容到套接字中,EVBUFFER_SENDFILE之前分析过。
第二种情况是调用evbuffer_write_iovec:
#ifdef USE_IOVEC_IMPL
static inline int
evbuffer_write_iovec(struct evbuffer *buffer, evutil_socket_t fd,
ev_ssize_t howmuch)
IOV_TYPE iov[NUM_WRITE_IOVEC];
struct evbuffer_chain *chain = buffer->first;
int n, i = 0;
if (howmuch < 0)
return -1;
ASSERT_EVBUFFER_LOCKED(buffer);
/* XXX make this top out at some maximal data length? if the
* buffer has (say) 1MB in it, split over 128 chains, there's
* no way it all gets written in one go. */
while (chain != NULL && i < NUM_WRITE_IOVEC && howmuch)
#ifdef USE_SENDFILE
/* we cannot write the file info via writev */
if (chain->flags & EVBUFFER_SENDFILE)
break;
#endif
iov[i].IOV_PTR_FIELD = (void *) (chain->buffer + chain->misalign);
if ((size_t)howmuch >= chain->off)
/* XXXcould be problematic when windows supports mmap*/
iov[i++].IOV_LEN_FIELD = (IOV_LEN_TYPE)chain->off;
howmuch -= chain->off;
else
/* XXXcould be problematic when windows supports mmap*/
iov[i++].IOV_LEN_FIELD = (IOV_LEN_TYPE)howmuch;
break;
chain = chain->next;
if (! i)
return 0;
#ifdef WIN32
DWORD bytesSent;
if (WSASend(fd, iov, i, &bytesSent, 0, NULL, NULL))
n = -1;
else
n = bytesSent;
#else
n = writev(fd, iov, i);
#endif
return (n);
#endif
iovec可以结合chain使用,减少系统调用。
第三种情况是只能使用send或者wtire发送单块的内存数据,这时需要调用evbuffer_pullup将evbuff前面size长度的数据放置到一块chain中:
unsigned char *evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
struct evbuffer_chain *chain, *next, *tmp, *last_with_data;
unsigned char *buffer, *result = NULL;
ev_ssize_t remaining;
int removed_last_with_data = 0;
int removed_last_with_datap = 0;
EVBUFFER_LOCK(buf);
chain = buf->first;
以上是关于Libevent源码分析--- evbuffer的基本操作的主要内容,如果未能解决你的问题,请参考以下文章
Libevent bufferevent 的 evbuffer_add
如何读取 evbuffer 并将其放入 libevent 中的字符串 (char*)