GB28181协议中的事件系统设计

Posted qianbo_insist

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了GB28181协议中的事件系统设计相关的知识,希望对你有一定的参考价值。

GB28181事件

    继上次的GB p2p系统设计,我们先完成其中的一个组件,叫事件处理。
    处理网络和界面以及所有定时器在GB28181服务器处理中很常见,为此,有必要设计一个事件系统,该系统设计模仿了常见的nodejs服务器的event,作为回调和异步处理,要非常方便地让程序员调用各类网络和普通排队事件。如下所示:我们把事件class 叫做c_event

c_event event1;
gb_test3 test3;
event1.func_on("test1", [](void *user) 
	cout << "this is test1" << endl;
);

event1.func_on("test2", [](void *user) 
	cout << "this is test2" << endl;
);
event1.func_on("test3", [](void *user) 
	cout << "this is test3" << endl;
	gb_test3 *test = (gb_test3 *)user;
	if(test!=NULL)
		test->print();
);

event1.Start();

    该事件系统本身具有线程循环能力,因此从基本线程类继承。又事件本身需要带入其他的bean,在java,nodejs的语言里面作为实体,因此需要有一个void *类型带入,实际上是所有类型的抽象。如果我们能够这样定义lamba函数来处理事件发生,无疑对程序员来说是比较方便的。

GB28181中的事件主要有:
1 网络事件->sip协议
如接收到catalog事件
2 网络事件->udp数据
3 普通事件,协助处理 界面通知
如云台转向
4 接口事件处理
5 音视频数据到来事件
6 解码事件
7 超时事件
8 定时器事件
对于8 ,我们另外设计,实际上,可以在一个线程设计中处理所有事件+ 定时器,稍后讲解

线程设计

所有事件作为一个thread来start,下面是处理基本线程class

#ifndef _TTHREAD_RUN_ABLE_H_
#define _TTHREAD_RUN_ABLE_H_


#include <mutex>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
using namespace std;

class c_thread

private:

	//线程
	thread _thread;
	//等待信号
	std::mutex _signal_mutex;
	std::condition_variable _cond;
protected:
	class Lock 
	private:
		std::lock_guard<std::mutex> v_lock;
	public:
		inline Lock(c_thread* par) : v_lock(par->_mutex) 
	;
	//char  _running = false;
	char _stop = true;
	//锁定运行状态
	std::mutex _mutex;
public:
	c_thread()
	
	virtual ~c_thread()
	

public:
	char * status()
		return &_stop;
	
	
	void Join()
	
		if (_thread.joinable())
			_thread.join();
	
	bool  IsStop()
	
		return _stop == 1 ? true : false;
	

	void WaitForSignal()
	
		std::unique_lock<std::mutex> ul(_signal_mutex);
		_cond.wait(ul);
	
	void Notify()
	
		_cond.notify_one();
	

	virtual int Start()
	
		if (_stop == 1)//非運行中
		
			_stop = 0;
			_thread = std::thread(std::bind(&c_thread::Run, this));
			return 0;
		
		return -1;
		
	
	virtual void Stop()
	
		_stop = 1; // true;
	

	virtual void Run() = 0;
;

可以看到Run为纯抽象函数设计,下面来实现c_event

#pragma once
#include <queue>
#include <memory>
#include <map>
#include "c_thread.h"

struct s_message

	uint64_t no = 0;
	std::string flag;
	void *user = NULL;
	//...
;


typedef std::shared_ptr<s_message>  s_message_ptr;

typedef void(*callback)(void *user);
class c_event:public c_thread

	std::queue<s_message_ptr> v_s_message_ptr_q;
	std::map<std::string, callback> v_func_map;
	void emit_now(std::string str, void *user)
	
		auto iter = v_func_map.find(str);
		if (iter != v_func_map.end())
		
			callback cb = iter->second;
			cb(user);
		
	

public:
	c_event()
	

	
	~c_event()
	
		Lock lock(this);
		while (!v_s_message_ptr_q.empty())
			v_s_message_ptr_q.pop();
	
	void emit(std::string str)
	
		
			Lock lock(this);
			s_message_ptr smptr = std::make_shared<s_message>();
			smptr->flag = str;
			v_s_message_ptr_q.push(smptr);
		
		Notify();
	
	void emit(std::string str,void *user)
	
		
			s_message_ptr smptr = std::make_shared<s_message>();
			smptr->flag = str;
			smptr->user = user;
			Lock lock(this);
			v_s_message_ptr_q.push(smptr);
		
		Notify();
	

	void func_on(std::string str, callback cb)
	
		//not support the link table 
		v_func_map[str] = cb;
	
	void Run()
	
		while (1)
		
			if (IsStop())
				break;
			WaitForSignal();
			process:
			s_message_ptr smptr = nullptr;
			
				Lock lock(this);
				if (!v_s_message_ptr_q.empty())
				
					smptr = v_s_message_ptr_q.front();
					v_s_message_ptr_q.pop();
				
			
			if (smptr != nullptr)
			
				emit_now(smptr->flag,smptr->user);
				Lock lock(this);
				if (!v_s_message_ptr_q.empty())
					goto process;
			
		
	
	void Stop()
	
		c_thread::Stop();
		Join();
	

;

可以看到Run函数作为循环,既可以等待事件发生,又可以在探测有数据的情况下,直接处理,但是,数据是被锁定的,因为外面的线程很有可能正在插入事件。

我们来调用:

#include <iostream>
#include "c_event.h"

struct s_test3

	int a = 1;
	void print()
	
		std::cout << "this is s_test1 a is "<<a << std::endl;
	

;

int main()

	c_event event1;
	s_test3 test3;
	event1.func_on("test1", [](void *user) 
		cout << "this is test1" << endl;
	);

	event1.func_on("test2", [](void *user) 
		cout << "this is test2" << endl;
	);
	event1.func_on("test3", [](void *user) 
		cout << "this is test3" << endl;
		s_test3 *test = (s_test3 *)user;
		if(test!=NULL)
			test->print();
	);

	event1.Start();


	for (int i = 0; i < 10; i++)
	
		event1.emit("test1");
		std::this_thread::sleep_for(std::chrono::milliseconds(100));
		event1.emit("test2");
	
	event1.emit("test3", &test3);
	event1.Stop();

在定义完回调后,主线程就可以发送各类事件来启用相应的程序,所有回调在事件类单线程Run()中完成,这个设计有缺陷,就是暂不支持链式反应,只支持单个反应输出。

结果

以上是关于GB28181协议中的事件系统设计的主要内容,如果未能解决你的问题,请参考以下文章

GB28181的协议详解

GB28181系统设计(脚本化流程高级技巧)

GB28181协议的用途是啥

GB28181系统设计-摄像头位置聚类快速搜索Kd-Tree算法

GB28181之国标编码

GB28181的NAT穿透