linux c 环形缓冲区
Posted fulianzhou
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了linux c 环形缓冲区相关的知识,希望对你有一定的参考价值。
mmap()函数介绍:
头文件:
#include <sys/mman.h>
函数原型:
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
参数:
addr:映射区的开始地址。为NULL时由系统决定映射区的起始地址
length:映射长度。不足一页按一页处理。
prot:内存的保护标志。
PROT_READ //页内容可以被读取
PROT_WRITE //页可以被写入
PROT_NONE //页不可访问
flags:映射对象的类型。映射选项和映射页是否可以共享
MAP_FIXED //使用指定的映射起始地址,如果由addr和length参数指定的内存区重叠于现存的映射空间,重叠部分将会被丢弃。如果指定的起始地址不可用,操作将会失败。并且起始地址必须落在页的边界上。
MAP_SHARED //与其它所有映射这个对象的进程共享映射空间。对共享区的写入,相当于输出到文件。直到msync()或者munmap()被调用,文件实际上不会被更新。
MAP_PRIVATE //建立一个写入时拷贝的私有映射。内存区域的写入不会影响到原文件。这个标志和以上标志是互斥的,只能使用其中一个。
MAP_ANONYMOUS //匿名映射,映射区不与任何文件关联。
fd:文件描述符。为-1时需要设置MAP_ANONYMOUS ,表明进行的是匿名映射
offset:被映射内容的起点。
例如:需要一个8k的环形缓冲区。
1.创建一个8k的临时文件
// 创建临时文件
int fd = mkstemp("/tmp/ring-buffer-XXXXXX");
unlink(path);
// 修改文件大小为8k
ftruncate(fd, 8 * 1024);
2.mmap一个16k的匿名内存
char* base_addr = (char*)mmap(NULL, 16 * 1024, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
3.对匿名内存的起始位置与8k位置进行映射
char* address1 = mmap(base_addr, 8*1024, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
if(address1 != base_addr)
{
return -1;
}
char* address2 = mmap(base_addr + 8*1024, 8*1024, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
if(address2 != base_addr + 8*1024)
{
return -1;
}
映射完成后:
此时,访问base_addr的内容相当于访问address2的内容。
*(base_addr + 8K + offset) == *(base_addr + offset)
服务端实现:
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <pthread.h>
#include <atomic>
#include <sys/mman.h>
struct RingBuffer
{
void* address;
int size;
std::atomic<int> pos_w;
std::atomic<int> pos_r;
};
std::atomic<int> seq;
RingBuffer ringbuffer;
int InitRingBuffer()
{
char path[] = "/tmp/ring-buffer-XXXXXX";
void* address = NULL;
int fd = mkstemp(path);
if(fd < 0)
{
printf("mkstemp() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
return -1;
}
if(unlink(path) != 0)
{
printf("unlink() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
return -1;
}
ringbuffer.size = 8192;
ringbuffer.pos_w = 0;
ringbuffer.pos_r = 0;
if(ftruncate(fd, ringbuffer.size) != 0)
{
printf("ftruncate() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
return -1;
}
ringbuffer.address = mmap(NULL, ringbuffer.size * 2, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
if(ringbuffer.address == MAP_FAILED)
{
printf("mmap() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
return -1;
}
printf("ringbuffer.address:%p\\n", ringbuffer.address);
address = mmap(ringbuffer.address, ringbuffer.size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
if(address != ringbuffer.address)
{
printf("mmap() error, address != ringbuffer.address, errno=%d, errmsg=%s\\n", errno, strerror(errno));
return -1;
}
printf("address:%p\\n", address);
address = mmap(ringbuffer.address + ringbuffer.size, ringbuffer.size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
printf("address:%p\\n", address);
if(address != ringbuffer.address + ringbuffer.size)
{
printf("mmap() error, address != ringbuffer.address + ringbuffer.size, errno=%d, errmsg=%s\\n", errno, strerror(errno));
return -1;
}
if(close(fd) != 0)
{
printf("close() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
}
return 0;
}
int Code(unsigned char* buff, char* sendbuff, int len)
{
int offset = 0;
*(int*)(buff + offset) = 0xAAAA;
offset += sizeof(int);
*(int*)(buff + offset) = len;
offset += sizeof(int);
strncpy((char*)(buff + offset), sendbuff, len);
offset += len;
printf("CODE<======[0xAAAA, %d, %s]\\n", len, sendbuff);
return offset;
}
int Decode(void* buff, int maxlen)
{
int offset = 0;
int head = *(int*)(buff + offset);
offset += sizeof(int);
if(head != 0xAAAA)
{
printf("Decode failed, head invalid! head=%x\\n", head);
return 0;
}
int len = *(int*)(buff + offset);
offset += sizeof(int);
if(len > maxlen - offset)
{
printf("Decode failed, len invalid! len=%d\\n", len);
return 0;
}
char cont[1024] = { 0 };
strncpy(cont, (char*)buff + offset, len);
offset += len;
cont[len] = 0;
printf("DECODE====>[0x%X, %d, %s]\\n", head, len, cont);
return offset;
}
void genmsg(char* buff, int maxlen)
{
const char* charset = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ~!@#$%^&*()_+ ;',.?";
const int charsetlen = strlen(charset);
int idx = 0;
for(int i = 0; i < maxlen;i++)
{
idx = rand()%charsetlen;
buff[i] = charset[idx];
}
}
void* SendTrd(void* args)
{
int sockfd = *(int*)args;
int epollfd = epoll_create1(EPOLL_CLOEXEC);
if(epollfd <= 0)
{
printf("epoll_create() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
close(sockfd);
return NULL;
}
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
ev.data.fd = sockfd;
struct epoll_event events[128] = { 0 };
if(0 != epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev))
{
printf("epoll_crl add server sockfd failed! errno=%d, errmsg=%s\\n", errno, strerror(errno));
close(sockfd);
return NULL;
}
int run = 1;
int ev_num = 0;
unsigned char sendbuff[1024] = { 0 };
int sendlen = 0;
while(run)
{
ev_num = epoll_wait(epollfd, events, sizeof(events)/sizeof(events[0]), 1);
if(ev_num == 0)
{
continue;
}
else if(ev_num == -1)
{
printf("epoll_wait() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
break;
}
for(int i = 0; i < ev_num; i++)
{
if(events[i].events & EPOLLERR ||
events[i].events & EPOLLHUP
)
{
printf("epoll event error, sockfd=%d, errno=%d, errmsg=%s\\n", events[i].data.fd, errno, strerror(errno));
return NULL;
}
if(events[i].events & EPOLLOUT)
{
usleep(1000);
int connfd = events[i].data.fd;
if(connfd < 0)
{
printf("errno socket, connfd = %d\\n", connfd);
continue;
}
int nsend = 0;
while(nsend < sendlen)
{
int ret = send(sockfd, sendbuff + nsend, sendlen - nsend, 0);
if(ret ==-1)
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
continue;
}
printf("send() failed, nread=%d, errno=%d, errmsg=%s\\n", nsend, errno, strerror(errno));
continue;
}
else if(ret == 0)
{
printf("connection closed!\\n");
break;
}
nsend += ret;
}
printf("send %d bytes.\\n", nsend);
char msg[480] = { 0 };
char buff[512] = { 0 };
int maxlen = 10 + rand()%(sizeof(msg) - 11);
genmsg(msg, maxlen);
snprintf(buff, sizeof(buff) - 1, "[%d,%s]", seq++, msg);
sendlen = Code(sendbuff, buff, strlen(buff));
ev.events = EPOLLOUT | EPOLLET;
if(0 != epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev))
{
printf("SendThrd, epoll_ctl() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
}
}
}
}
printf("Send Thread Quit!\\n");
return NULL;
}
void* ReadTrd(void* args)
{
int sockfd = *(int*)args;
int epollfd = epoll_create1(EPOLL_CLOEXEC);
if(epollfd <= 0)
{
printf("epoll_create() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
close(sockfd);
return NULL;
}
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = sockfd;
struct epoll_event events[128] = { 0 };
if(0 != epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev))
{
printf("epoll_crl add server sockfd failed! errno=%d, errmsg=%s\\n", errno, strerror(errno));
close(sockfd);
return NULL;
}
int run = 1;
int ev_num = 0;
while(run)
{
ev_num = epoll_wait(epollfd, events, sizeof(events)/sizeof(events[0]), 1);
if(ev_num == 0)
{
continue;
}
else if(ev_num == -1)
{
printf("epoll_wait() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
break;
}
for(int i = 0; i < ev_num; i++)
{
if(events[i].events & EPOLLERR ||
events[i].events & EPOLLHUP
)
{
printf("epoll event error, sockfd=%d, errno=%d, errmsg=%s\\n", events[i].data.fd, errno, strerror(errno));
return NULL;
}
if(events[i].events & EPOLLIN)
{
int connfd = events[i].data.fd;
if(connfd < 0)
{
printf("errno socket, connfd = %d\\n", connfd);
continue;
}
unsigned char readbuff[1024] = { 0 };
while(1)
{
int read_size = 0;
if(ringbuffer.pos_w == ringbuffer.pos_r)
{
read_size = ringbuffer.size - 1;
}
else if(ringbuffer.pos_w > ringbuffer.pos_r)
{
read_size 以上是关于linux c 环形缓冲区的主要内容,如果未能解决你的问题,请参考以下文章