c++千万数据级别正确使用无锁队列,避免内存撕碎

Posted qianbo_insist

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++千万数据级别正确使用无锁队列,避免内存撕碎相关的知识,希望对你有一定的参考价值。

第一篇文章
这里逐渐深入,先讲一下协议,很多概念必须讲清楚

七层协议

我们使用的是网络传输,千万级别的数据实际上并不多,对于队列来说,每个队列的数据块大小定义为MTU大小,如1400字节,实际上传输层定义在操作系统里面,我们无法更改,无论是linux还是windows,都需要经过内核态和用户态,假定我们有100路视频, 每路视频为2M 码流, 一分钟有多少数据呢, 2 * 1024*1024 / 8 * 60 * 100 = 1572864000 bytes = 1500 G
对于MTU 包个数来说, 是 1500G / 1400 = 1G 个队列包, 那每秒钟就是 1024 * 1024 / 60 = 17476 个队列包, 每路有 17476 / 100 = 175 个 MTU包,对于带宽来说,上传就需要200Mbit的带宽,所以这里千万数据级别,说的就是1分钟MTU的包个数。

会话层来说,如果传输层协议使用tcp, 100个上传就需要100个会话,到应用层的时候需要发送的RTP协议实际上每个包都会加上至少12个字节的包头。这些都是开销,选择RTMP协议和RTSP,SIP等等,各自都有开销,但是RTSP和SIP 传输层都可以是 RTP协议 ,也都可以选择UDP和TCP,选择UDP意味着更低的开销,TCP 意味着丢包会少。

应用方

主要应用在视频传输方面,下面画出主要的视频传输,无论使用什么协议,我们先关注在RTP协议上面,RTP协议首先为UDP,RTP over UDP协议我们主要关注在他的包数量和缓冲的内存数量上,我们这样来计算,对于2Mbit 的视频传输来说, 我们有如下算法
MTU = 1400
2M 约为 210241024 /8 /1400 = 187

也就是如果需要缓存1秒数据,至少需要187 个缓存单元,有了如上计算,我们就可以假定一个视频为2M的单元来说,我们缓存200 个单元,每个单元大小为1400字节。

数据量极大

大数据量莫过于视频,尤其是RTP协议中的包,1是多,每个RTP协议包数据量不会超过MTU,最大传输单元,

#include <iostream>
#include <mutex>
#include <queue>
enum

    en_emptying = 0,
    en_writing,
    en_canreadings
;
struct smem

    char v_status;
    char v_i_flag;
    char r1;
    char r2;
    int v_len = 0;
    uint32_t v_pts = 0;
    uint8_t* v_data = NULL;
;

typedef std::shared_ptr<smem> ptr_smem;

struct s_mem_pool

    //qianbo :just one thread read,one thread write ,otherwise error occur
    uint8_t* v_data = NULL;
    uint8_t* v_end = NULL;
    int v_len = 0;;
    uint8_t* v_write = NULL;
    uint8_t* v_read = NULL;

    std::mutex v_mux;
    std::atomic_int v_framenum;
    std::queue<ptr_smem>v_i;
    void init_mem(int memlen)
    
        if (v_data != NULL)
        
            if (v_len < memlen)
            
                free(v_data);
                v_data = NULL;
            
        


        v_len = memlen;
        if (v_data == NULL)
            v_data = (uint8_t*)malloc(memlen);

        v_end = v_data + memlen;
        v_write = v_data;
        v_read = v_data;
        v_framenum = 0;
        v_mux.lock();
        while (!v_i.empty())
        
            v_i.pop();
        
        v_mux.unlock();

    

    ~s_mem_pool()
    
        clearqueue();
        if (v_data != NULL)
            free(v_data);
    


    bool push(uint8_t* data, int len, uint32_t pts, bool i_flag)
    
#define EX_LEN (sizeof(char)*4 +sizeof(int)+sizeof(uint32_t))
#define NEEDLEN (len)

#define WRITE_INFO \\
        ptr_smem mem = std::make_shared<smem>();\\
        mem->v_data = v_write;\\
        mem->v_len = len;\\
        mem->v_pts = pts;\\
        mem->v_i_flag = i_flag; \\
        mem->v_status = en_writing;\\
        v_mux.lock();\\
        v_i.emplace(mem);\\
        v_mux.unlock();

#define WRITE_INFO2 \\
    uint8_t *m = v_write; \\
    *(char*)(m) = (char)en_writing; \\
    m +=1; \\
    *(char*)(m) = (char)i_flag;\\
    m +=3; \\
    *(int*)m = len; \\
    m +=4; \\
    *(uint32_t*)m = pts; \\
    m+= 4;\\
    v_write = m;


        if (v_write >= v_read)
        
            if ((v_end - v_write) > (long)(NEEDLEN))
            

                memcpy(v_write, data, len);
                WRITE_INFO
                    v_write += (NEEDLEN);
                v_framenum++;
                return true;
            
            else // the left mem is not enough
            
                //qInfo()<<"left mem is not enough";
                if ((v_read - v_data) > (long)(NEEDLEN))
                
                    v_write = v_data;
                    memcpy(v_write, data, len);
                    WRITE_INFO
                        v_write += NEEDLEN;
                    v_framenum++;
                    return true; //from the head
                
                else
                
                    std::cout << "not enough memory 0 :num" << v_framenum;
                    return false;
                
            
        
        else //read>write
        
            if ((v_read - v_write) > (long)(NEEDLEN))
            
                memcpy(v_write, data, len);
                WRITE_INFO
                    v_write += (NEEDLEN);
                v_framenum++;
                //IncNumber();
                return true;
            
            std::cout << "not enough memory 1 clear the buffer the len is " << NEEDLEN;

            return false;
        
    
    ptr_smem get()
    
        ptr_smem sm = nullptr;
        v_mux.lock();
        if (!v_i.empty())
        
            sm = v_i.front();
            v_i.pop();
            v_read = sm->v_data;
        
        v_mux.unlock();
        v_framenum--;
        return sm;
    
        int size()
        
            return v_framenum;
        
        void clearqueue()
        
            v_mux.lock();
            if (!v_i.empty())
            
                v_i.pop();
            
            v_mux.unlock();
        
    ;

去除拷贝,去除锁

还是要去除拷贝的,否则,随着数据量越来越大,拷贝消耗的cpu也就客观了,所以修改成为以下方式:

struct s_mem_pool_fix

    ~s_mem_pool_fix()
    
        if (v_data != NULL)
            free(v_data);
    
    uint8_t* v_data = NULL;
    uint8_t* v_end = NULL;
    uint8_t* v_write = NULL;
    uint8_t* v_preparewrite = NULL;
    uint8_t* v_read = NULL;
    void init_mem(int memlen, int totalnum)
    
        v_data = (uint8_t*)malloc(1500 * 1000);
        //v_write = v_data;
        v_preparewrite = v_data;
        v_end = v_data + 1500 * 1000;
    
    uint8_t* get_mem()
    
        v_write = v_preparewrite;
        v_preparewrite += 1500;
        if (v_preparewrite == v_end)
            v_preparewrite = v_data;
        return v_write;
    


;

去除锁,使用无锁队列,意味着用一个线程写,一个线程读,使用状态码来防止越界,这个将会在下一篇文章里面写,上一篇文章有着简单的测试。

main 测试

这个后面再讲了
int main()

std::cout << “Hello World!\\n”;

以上是关于c++千万数据级别正确使用无锁队列,避免内存撕碎的主要内容,如果未能解决你的问题,请参考以下文章

C++ 无锁队列与多线程崩溃

无锁队列真的比有锁队列快吗c++ linux后台开发

c++内存池无锁

c++内存池无锁

folly无锁队列正确性说明

基于共享内存的无锁消息队列设计