GB28181协议中的事件系统设计
Posted qianbo_insist
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了GB28181协议中的事件系统设计相关的知识,希望对你有一定的参考价值。
GB28181事件
继上次的GB p2p系统设计,我们先完成其中的一个组件,叫事件处理。
处理网络和界面以及所有定时器在GB28181服务器处理中很常见,为此,有必要设计一个事件系统,该系统设计模仿了常见的nodejs服务器的event,作为回调和异步处理,要非常方便地让程序员调用各类网络和普通排队事件。如下所示:我们把事件class 叫做c_event
c_event event1;
gb_test3 test3;
event1.func_on("test1", [](void *user)
cout << "this is test1" << endl;
);
event1.func_on("test2", [](void *user)
cout << "this is test2" << endl;
);
event1.func_on("test3", [](void *user)
cout << "this is test3" << endl;
gb_test3 *test = (gb_test3 *)user;
if(test!=NULL)
test->print();
);
event1.Start();
该事件系统本身具有线程循环能力,因此从基本线程类继承。又事件本身需要带入其他的bean,在java,nodejs的语言里面作为实体,因此需要有一个void *类型带入,实际上是所有类型的抽象。如果我们能够这样定义lamba函数来处理事件发生,无疑对程序员来说是比较方便的。
GB28181中的事件主要有:
1 网络事件->sip协议
如接收到catalog事件
2 网络事件->udp数据
3 普通事件,协助处理 界面通知
如云台转向
4 接口事件处理
5 音视频数据到来事件
6 解码事件
7 超时事件
8 定时器事件
对于8 ,我们另外设计,实际上,可以在一个线程设计中处理所有事件+ 定时器,稍后讲解
线程设计
所有事件作为一个thread来start,下面是处理基本线程class
#ifndef _TTHREAD_RUN_ABLE_H_
#define _TTHREAD_RUN_ABLE_H_
#include <mutex>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
using namespace std;
class c_thread
private:
//线程
thread _thread;
//等待信号
std::mutex _signal_mutex;
std::condition_variable _cond;
protected:
class Lock
private:
std::lock_guard<std::mutex> v_lock;
public:
inline Lock(c_thread* par) : v_lock(par->_mutex)
;
//char _running = false;
char _stop = true;
//锁定运行状态
std::mutex _mutex;
public:
c_thread()
virtual ~c_thread()
public:
char * status()
return &_stop;
void Join()
if (_thread.joinable())
_thread.join();
bool IsStop()
return _stop == 1 ? true : false;
void WaitForSignal()
std::unique_lock<std::mutex> ul(_signal_mutex);
_cond.wait(ul);
void Notify()
_cond.notify_one();
virtual int Start()
if (_stop == 1)//非運行中
_stop = 0;
_thread = std::thread(std::bind(&c_thread::Run, this));
return 0;
return -1;
virtual void Stop()
_stop = 1; // true;
virtual void Run() = 0;
;
可以看到Run为纯抽象函数设计,下面来实现c_event
#pragma once
#include <queue>
#include <memory>
#include <map>
#include "c_thread.h"
struct s_message
uint64_t no = 0;
std::string flag;
void *user = NULL;
//...
;
typedef std::shared_ptr<s_message> s_message_ptr;
typedef void(*callback)(void *user);
class c_event:public c_thread
std::queue<s_message_ptr> v_s_message_ptr_q;
std::map<std::string, callback> v_func_map;
void emit_now(std::string str, void *user)
auto iter = v_func_map.find(str);
if (iter != v_func_map.end())
callback cb = iter->second;
cb(user);
public:
c_event()
~c_event()
Lock lock(this);
while (!v_s_message_ptr_q.empty())
v_s_message_ptr_q.pop();
void emit(std::string str)
Lock lock(this);
s_message_ptr smptr = std::make_shared<s_message>();
smptr->flag = str;
v_s_message_ptr_q.push(smptr);
Notify();
void emit(std::string str,void *user)
s_message_ptr smptr = std::make_shared<s_message>();
smptr->flag = str;
smptr->user = user;
Lock lock(this);
v_s_message_ptr_q.push(smptr);
Notify();
void func_on(std::string str, callback cb)
//not support the link table
v_func_map[str] = cb;
void Run()
while (1)
if (IsStop())
break;
WaitForSignal();
process:
s_message_ptr smptr = nullptr;
Lock lock(this);
if (!v_s_message_ptr_q.empty())
smptr = v_s_message_ptr_q.front();
v_s_message_ptr_q.pop();
if (smptr != nullptr)
emit_now(smptr->flag,smptr->user);
Lock lock(this);
if (!v_s_message_ptr_q.empty())
goto process;
void Stop()
c_thread::Stop();
Join();
;
可以看到Run函数作为循环,既可以等待事件发生,又可以在探测有数据的情况下,直接处理,但是,数据是被锁定的,因为外面的线程很有可能正在插入事件。
我们来调用:
#include <iostream>
#include "c_event.h"
struct s_test3
int a = 1;
void print()
std::cout << "this is s_test1 a is "<<a << std::endl;
;
int main()
c_event event1;
s_test3 test3;
event1.func_on("test1", [](void *user)
cout << "this is test1" << endl;
);
event1.func_on("test2", [](void *user)
cout << "this is test2" << endl;
);
event1.func_on("test3", [](void *user)
cout << "this is test3" << endl;
s_test3 *test = (s_test3 *)user;
if(test!=NULL)
test->print();
);
event1.Start();
for (int i = 0; i < 10; i++)
event1.emit("test1");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
event1.emit("test2");
event1.emit("test3", &test3);
event1.Stop();
在定义完回调后,主线程就可以发送各类事件来启用相应的程序,所有回调在事件类单线程Run()中完成,这个设计有缺陷,就是暂不支持链式反应,只支持单个反应输出。
结果
以上是关于GB28181协议中的事件系统设计的主要内容,如果未能解决你的问题,请参考以下文章