c++千万数据级别正确使用无锁队列,避免内存撕碎
Posted qianbo_insist
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++千万数据级别正确使用无锁队列,避免内存撕碎相关的知识,希望对你有一定的参考价值。
第一篇文章
这里逐渐深入,先讲一下协议,很多概念必须讲清楚
七层协议
我们使用的是网络传输,千万级别的数据实际上并不多,对于队列来说,每个队列的数据块大小定义为MTU大小,如1400字节,实际上传输层定义在操作系统里面,我们无法更改,无论是linux还是windows,都需要经过内核态和用户态,假定我们有100路视频, 每路视频为2M 码流, 一分钟有多少数据呢, 2 * 1024*1024 / 8 * 60 * 100 = 1572864000 bytes = 1500 G
对于MTU 包个数来说, 是 1500G / 1400 = 1G 个队列包, 那每秒钟就是 1024 * 1024 / 60 = 17476 个队列包, 每路有 17476 / 100 = 175 个 MTU包,对于带宽来说,上传就需要200Mbit的带宽,所以这里千万数据级别,说的就是1分钟MTU的包个数。
会话层来说,如果传输层协议使用tcp, 100个上传就需要100个会话,到应用层的时候需要发送的RTP协议实际上每个包都会加上至少12个字节的包头。这些都是开销,选择RTMP协议和RTSP,SIP等等,各自都有开销,但是RTSP和SIP 传输层都可以是 RTP协议 ,也都可以选择UDP和TCP,选择UDP意味着更低的开销,TCP 意味着丢包会少。
应用方
主要应用在视频传输方面,下面画出主要的视频传输,无论使用什么协议,我们先关注在RTP协议上面,RTP协议首先为UDP,RTP over UDP协议我们主要关注在他的包数量和缓冲的内存数量上,我们这样来计算,对于2Mbit 的视频传输来说, 我们有如下算法
MTU = 1400
2M 约为 210241024 /8 /1400 = 187
也就是如果需要缓存1秒数据,至少需要187 个缓存单元,有了如上计算,我们就可以假定一个视频为2M的单元来说,我们缓存200 个单元,每个单元大小为1400字节。
数据量极大
大数据量莫过于视频,尤其是RTP协议中的包,1是多,每个RTP协议包数据量不会超过MTU,最大传输单元,
#include <iostream>
#include <mutex>
#include <queue>
enum
en_emptying = 0,
en_writing,
en_canreadings
;
struct smem
char v_status;
char v_i_flag;
char r1;
char r2;
int v_len = 0;
uint32_t v_pts = 0;
uint8_t* v_data = NULL;
;
typedef std::shared_ptr<smem> ptr_smem;
struct s_mem_pool
//qianbo :just one thread read,one thread write ,otherwise error occur
uint8_t* v_data = NULL;
uint8_t* v_end = NULL;
int v_len = 0;;
uint8_t* v_write = NULL;
uint8_t* v_read = NULL;
std::mutex v_mux;
std::atomic_int v_framenum;
std::queue<ptr_smem>v_i;
void init_mem(int memlen)
if (v_data != NULL)
if (v_len < memlen)
free(v_data);
v_data = NULL;
v_len = memlen;
if (v_data == NULL)
v_data = (uint8_t*)malloc(memlen);
v_end = v_data + memlen;
v_write = v_data;
v_read = v_data;
v_framenum = 0;
v_mux.lock();
while (!v_i.empty())
v_i.pop();
v_mux.unlock();
~s_mem_pool()
clearqueue();
if (v_data != NULL)
free(v_data);
bool push(uint8_t* data, int len, uint32_t pts, bool i_flag)
#define EX_LEN (sizeof(char)*4 +sizeof(int)+sizeof(uint32_t))
#define NEEDLEN (len)
#define WRITE_INFO \\
ptr_smem mem = std::make_shared<smem>();\\
mem->v_data = v_write;\\
mem->v_len = len;\\
mem->v_pts = pts;\\
mem->v_i_flag = i_flag; \\
mem->v_status = en_writing;\\
v_mux.lock();\\
v_i.emplace(mem);\\
v_mux.unlock();
#define WRITE_INFO2 \\
uint8_t *m = v_write; \\
*(char*)(m) = (char)en_writing; \\
m +=1; \\
*(char*)(m) = (char)i_flag;\\
m +=3; \\
*(int*)m = len; \\
m +=4; \\
*(uint32_t*)m = pts; \\
m+= 4;\\
v_write = m;
if (v_write >= v_read)
if ((v_end - v_write) > (long)(NEEDLEN))
memcpy(v_write, data, len);
WRITE_INFO
v_write += (NEEDLEN);
v_framenum++;
return true;
else // the left mem is not enough
//qInfo()<<"left mem is not enough";
if ((v_read - v_data) > (long)(NEEDLEN))
v_write = v_data;
memcpy(v_write, data, len);
WRITE_INFO
v_write += NEEDLEN;
v_framenum++;
return true; //from the head
else
std::cout << "not enough memory 0 :num" << v_framenum;
return false;
else //read>write
if ((v_read - v_write) > (long)(NEEDLEN))
memcpy(v_write, data, len);
WRITE_INFO
v_write += (NEEDLEN);
v_framenum++;
//IncNumber();
return true;
std::cout << "not enough memory 1 clear the buffer the len is " << NEEDLEN;
return false;
ptr_smem get()
ptr_smem sm = nullptr;
v_mux.lock();
if (!v_i.empty())
sm = v_i.front();
v_i.pop();
v_read = sm->v_data;
v_mux.unlock();
v_framenum--;
return sm;
int size()
return v_framenum;
void clearqueue()
v_mux.lock();
if (!v_i.empty())
v_i.pop();
v_mux.unlock();
;
去除拷贝,去除锁
还是要去除拷贝的,否则,随着数据量越来越大,拷贝消耗的cpu也就客观了,所以修改成为以下方式:
struct s_mem_pool_fix
~s_mem_pool_fix()
if (v_data != NULL)
free(v_data);
uint8_t* v_data = NULL;
uint8_t* v_end = NULL;
uint8_t* v_write = NULL;
uint8_t* v_preparewrite = NULL;
uint8_t* v_read = NULL;
void init_mem(int memlen, int totalnum)
v_data = (uint8_t*)malloc(1500 * 1000);
//v_write = v_data;
v_preparewrite = v_data;
v_end = v_data + 1500 * 1000;
uint8_t* get_mem()
v_write = v_preparewrite;
v_preparewrite += 1500;
if (v_preparewrite == v_end)
v_preparewrite = v_data;
return v_write;
;
去除锁,使用无锁队列,意味着用一个线程写,一个线程读,使用状态码来防止越界,这个将会在下一篇文章里面写,上一篇文章有着简单的测试。
main 测试
这个后面再讲了
int main()
std::cout << “Hello World!\\n”;
以上是关于c++千万数据级别正确使用无锁队列,避免内存撕碎的主要内容,如果未能解决你的问题,请参考以下文章