linux c 环形缓冲区

Posted fulianzhou

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了linux c 环形缓冲区相关的知识,希望对你有一定的参考价值。

mmap()函数介绍:
头文件:
#include <sys/mman.h>
函数原型:
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
参数:
addr:映射区的开始地址。为NULL时由系统决定映射区的起始地址
length:映射长度。不足一页按一页处理。
prot:内存的保护标志。
PROT_READ //页内容可以被读取
PROT_WRITE //页可以被写入
PROT_NONE //页不可访问
flags:映射对象的类型。映射选项和映射页是否可以共享
MAP_FIXED //使用指定的映射起始地址,如果由addr和length参数指定的内存区重叠于现存的映射空间,重叠部分将会被丢弃。如果指定的起始地址不可用,操作将会失败。并且起始地址必须落在页的边界上。
MAP_SHARED //与其它所有映射这个对象的进程共享映射空间。对共享区的写入,相当于输出到文件。直到msync()或者munmap()被调用,文件实际上不会被更新。
MAP_PRIVATE //建立一个写入时拷贝的私有映射。内存区域的写入不会影响到原文件。这个标志和以上标志是互斥的,只能使用其中一个。
MAP_ANONYMOUS //匿名映射,映射区不与任何文件关联。
fd:文件描述符。为-1时需要设置MAP_ANONYMOUS ,表明进行的是匿名映射
offset:被映射内容的起点。

例如:需要一个8k的环形缓冲区。
1.创建一个8k的临时文件

	// 创建临时文件
    int fd = mkstemp("/tmp/ring-buffer-XXXXXX");
    
    unlink(path);
    // 修改文件大小为8k
    ftruncate(fd, 8 * 1024);

2.mmap一个16k的匿名内存

    char* base_addr = (char*)mmap(NULL, 16 * 1024, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);

3.对匿名内存的起始位置与8k位置进行映射

    char* address1 = mmap(base_addr, 8*1024, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
    if(address1 != base_addr)
    {
        return -1;
    }
    
    char* address2 = mmap(base_addr  + 8*1024, 8*1024, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
    if(address2 != base_addr  + 8*1024)
    {
        return -1;
    }

映射完成后:
此时,访问base_addr的内容相当于访问address2的内容。
*(base_addr + 8K + offset) == *(base_addr + offset)

服务端实现:

#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <pthread.h>
#include <atomic>
#include <sys/mman.h>

struct RingBuffer
{
    void* address;
    int size;
    std::atomic<int> pos_w;
    std::atomic<int> pos_r;
};

std::atomic<int> seq;

RingBuffer ringbuffer;

int InitRingBuffer()
{
    char path[] = "/tmp/ring-buffer-XXXXXX";
    void* address = NULL;
    int fd = mkstemp(path);
    if(fd < 0)
    {
        printf("mkstemp() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        return -1;
    }

    if(unlink(path) != 0)
    {
        printf("unlink() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        return -1;
    }

    ringbuffer.size = 8192;
    ringbuffer.pos_w = 0;
    ringbuffer.pos_r = 0;

    if(ftruncate(fd, ringbuffer.size) != 0)
    {
        printf("ftruncate() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        return -1;
    }

    ringbuffer.address = mmap(NULL, ringbuffer.size * 2, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
    if(ringbuffer.address == MAP_FAILED)
    {
        printf("mmap() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        return -1;
    }
    printf("ringbuffer.address:%p\\n", ringbuffer.address);

    address = mmap(ringbuffer.address, ringbuffer.size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
    if(address != ringbuffer.address)
    {
        printf("mmap() error, address != ringbuffer.address, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        return -1;
    }
    printf("address:%p\\n", address);

    address = mmap(ringbuffer.address + ringbuffer.size, ringbuffer.size, PROT_READ | PROT_WRITE, MAP_FIXED | MAP_SHARED, fd, 0);
    printf("address:%p\\n", address);
    if(address != ringbuffer.address + ringbuffer.size)
    {
        printf("mmap() error, address != ringbuffer.address + ringbuffer.size, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        return -1;
    }

    if(close(fd) != 0)
    {
        printf("close() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
    }
    return 0;
}



int Code(unsigned char* buff, char* sendbuff, int len)
{
    int offset = 0;

    *(int*)(buff + offset) = 0xAAAA;
    offset += sizeof(int);

    *(int*)(buff + offset) = len;
    offset += sizeof(int);

    strncpy((char*)(buff + offset), sendbuff, len);
    offset += len;

    printf("CODE<======[0xAAAA, %d, %s]\\n", len, sendbuff);
    return offset;
}

int Decode(void* buff, int maxlen)
{
    int offset = 0;

    int head = *(int*)(buff + offset);
    offset += sizeof(int);

    if(head != 0xAAAA)
    {
        printf("Decode failed, head invalid! head=%x\\n", head);
        return 0;
    }

    int len = *(int*)(buff + offset);
    offset += sizeof(int);
    if(len > maxlen - offset)
    {
        printf("Decode failed, len invalid! len=%d\\n", len);
        return 0;
    }

    char cont[1024] = { 0 };

    strncpy(cont, (char*)buff + offset, len);
    offset += len;
    cont[len] = 0;

    printf("DECODE====>[0x%X, %d, %s]\\n", head, len, cont);

    return offset;
}

void genmsg(char* buff, int maxlen)
{
    const char* charset = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ~!@#$%^&*()_+ ;',.?";
    const int charsetlen = strlen(charset);
    int idx = 0;

    for(int i = 0; i < maxlen;i++)
    {
        idx = rand()%charsetlen;
        buff[i] = charset[idx];
    }
}

void* SendTrd(void* args)
{
    int sockfd = *(int*)args;

    int epollfd = epoll_create1(EPOLL_CLOEXEC);
    if(epollfd <= 0)
    {
        printf("epoll_create() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        close(sockfd);
        return NULL;
    }
    struct epoll_event ev;
    ev.events = EPOLLOUT | EPOLLET;
    ev.data.fd = sockfd;

    struct epoll_event events[128] = { 0 };


    if(0 != epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev))
    {
        printf("epoll_crl add server sockfd failed! errno=%d, errmsg=%s\\n", errno, strerror(errno));
        close(sockfd);
        return NULL;
    }


    int run = 1;
    int ev_num = 0;
    unsigned char sendbuff[1024] = { 0 };
    int sendlen = 0;
    while(run)
    {
        ev_num = epoll_wait(epollfd, events, sizeof(events)/sizeof(events[0]), 1);
        if(ev_num == 0)
        {
            continue;
        }
        else if(ev_num == -1)
        {
            printf("epoll_wait() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
            break;
        }

        for(int i = 0; i < ev_num; i++)
        {
            if(events[i].events & EPOLLERR ||
               events[i].events & EPOLLHUP
              )
            {
                printf("epoll event error, sockfd=%d, errno=%d, errmsg=%s\\n", events[i].data.fd, errno, strerror(errno));
                return NULL;
            }

            if(events[i].events & EPOLLOUT)
            {
                usleep(1000);
                int connfd = events[i].data.fd;
                if(connfd < 0)
                {
                    printf("errno socket, connfd = %d\\n", connfd);
                    continue;
                }

                int nsend = 0;
                while(nsend < sendlen)
                {
                    int ret = send(sockfd, sendbuff + nsend, sendlen - nsend, 0);
                    if(ret ==-1)
                    {
                        if(errno == EAGAIN || errno == EWOULDBLOCK)
                        {
                            continue;
                        }

                        printf("send() failed, nread=%d, errno=%d, errmsg=%s\\n", nsend, errno, strerror(errno));
                        continue;
                    }
                    else if(ret == 0)
                    {
                        printf("connection closed!\\n");
                        break;
                    }

                    nsend += ret;
                }

                printf("send %d bytes.\\n", nsend);
                char msg[480] = { 0 };
                char buff[512] = { 0 };
                int maxlen = 10 + rand()%(sizeof(msg) - 11);
                genmsg(msg, maxlen);
                snprintf(buff, sizeof(buff) - 1, "[%d,%s]", seq++, msg);

                sendlen = Code(sendbuff, buff, strlen(buff));

                ev.events = EPOLLOUT | EPOLLET;
                if(0 != epoll_ctl(epollfd, EPOLL_CTL_MOD, sockfd, &ev))
                {
                    printf("SendThrd, epoll_ctl() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
                }
            }
        }
    }

    printf("Send Thread Quit!\\n");
    return NULL;
}
void* ReadTrd(void* args)
{
    int sockfd = *(int*)args;

    int epollfd = epoll_create1(EPOLL_CLOEXEC);
    if(epollfd <= 0)
    {
        printf("epoll_create() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
        close(sockfd);
        return NULL;
    }
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = sockfd;

    struct epoll_event events[128] = { 0 };


    if(0 != epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev))
    {
        printf("epoll_crl add server sockfd failed! errno=%d, errmsg=%s\\n", errno, strerror(errno));
        close(sockfd);
        return NULL;
    }


    int run = 1;
    int ev_num = 0;
    while(run)
    {
        ev_num = epoll_wait(epollfd, events, sizeof(events)/sizeof(events[0]), 1);
        if(ev_num == 0)
        {
            continue;
        }
        else if(ev_num == -1)
        {
            printf("epoll_wait() failed, errno=%d, errmsg=%s\\n", errno, strerror(errno));
            break;
        }

        for(int i = 0; i < ev_num; i++)
        {
            if(events[i].events & EPOLLERR ||
               events[i].events & EPOLLHUP
              )
            {
                printf("epoll event error, sockfd=%d, errno=%d, errmsg=%s\\n", events[i].data.fd, errno, strerror(errno));
                return NULL;
            }

            if(events[i].events & EPOLLIN)
            {
                int connfd = events[i].data.fd;
                if(connfd < 0)
                {
                    printf("errno socket, connfd = %d\\n", connfd);
                    continue;
                }

                unsigned char readbuff[1024] = { 0 };

                while(1)
                {
                    int read_size = 0;
                    if(ringbuffer.pos_w == ringbuffer.pos_r)
                    {
                        read_size = ringbuffer.size - 1;
                    }
                    else if(ringbuffer.pos_w > ringbuffer.pos_r)
                    {
                        read_size 以上是关于linux c 环形缓冲区的主要内容,如果未能解决你的问题,请参考以下文章

linux c 环形缓冲区

linux内核之Kfifo环形队列

环形缓冲区: ringbuf.c

dmesg命令

环形缓冲区

c_cpp 环形缓冲区