一种流水线方式处理远程视频的方案
Posted qianbo_insist
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一种流水线方式处理远程视频的方案相关的知识,希望对你有一定的参考价值。
目的
目的是为了流水处理网络视频,所有处理都为线程,每个线程单元都从TThreadRunable 继承下来,以下为TThreadRunable 基础类。从读,解码,处理算法,压缩,到发送,几个部分合成一整个流程,其中算法处理比较耗时,最后要显示到web浏览器里面,所有最后处理算法以后又直接通过websocket送到浏览器显示。远程视频这里启用的是rtsp视频。
#ifndef _TTHREAD_RUN_ABLE_H_
#define _TTHREAD_RUN_ABLE_H_
#include <mutex>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
using namespace std;
class TThreadRunable
private:
//线程
thread _thread;
//等待信号
std::mutex _signal_mutex;
std::condition_variable _cond;
protected:
//char _running = false;
char _stop = true;
//锁定运行状态
std::mutex _mutex;
public:
TThreadRunable()
virtual ~TThreadRunable()
public:
char * status()
return &_stop;
void Join()
if (_thread.joinable())
_thread.join();
bool IsStop()
return _stop == 1 ? true : false;
void WaitForSignal()
std::unique_lock<std::mutex> ul(_signal_mutex);
_cond.wait(ul);
void Notify()
_cond.notify_one();
virtual int Start()
if (_stop == 0)
return -1;
_stop = 0;
_thread = std::thread(std::bind(&TThreadRunable::Run, this));
return 0;
virtual void Stop()
_stop = 1; // true;
virtual void Run() = 0;
;
#endif
定义流程中第一个读
注意其中的SetNextProcess 函数。
#pragma once
#include <stdio.h>
#include <iostream>
#include "TThreadRunable.h"
#include "config.h"
#include <functional>
#include "TQueue.h"
#include <opencv2/opencv.hpp>
typedef enum open_type
_type_usb,
_type_rtsp,
_type_rtsp_live555
open_type;
class TProcessRead :public TThreadRunable
cv::VideoCapture cap;
open_type _type;
int _show = 0;
vconfig * _videoconfig = NULL;
func_cb_send _cs = NULL;
string _name;
public:
void Set(open_type type, string name,vconfig * obj)
_name = name;
_type = type;
_videoconfig = obj;
void SetNextProcess(func_cb_send cs)
_cs = cs;
Group_Mat* ReadFrame()
Group_Mat * gm = new Group_Mat();
if (gm == NULL)
cout << "not enough memory error" << endl;
return NULL;
cap >> gm->_bgrMat;
if (gm->_bgrMat.empty())
delete gm;
cout << "error read frame" << endl;
return NULL;
cvtColor(gm->_bgrMat, gm->_grayMat, cv::COLOR_BGR2GRAY);
gm->name = _name;
return gm;
int USB()
cap.open(0, cv::CAP_DSHOW);
cap.set(cv::CAP_PROP_FRAME_WIDTH, _videoconfig->width);
cap.set(cv::CAP_PROP_FRAME_HEIGHT, _videoconfig->height);
//cap.set(cv::CAP_PROP_FPS, _videoconfig->fps); cap.open(0);
cv::CAP_PROP_CHANNEL
//VideoCapture cap(0);
//cap.set(CAP_PROP_FRAME_WIDTH, 1920);
//cap.set(CAP_PROP_FRAME_HEIGHT, 1080);
//cv::CAP_CROSSBAR_INPIN_TYPE
//cap.set(cv::CAP_CROSSBAR_INPIN_TYPE , 6);
//Mat img;
//namedWindow("test", WINDOW_NORMAL);
//resizeWindow("test", 960, 640);
if (!cap.isOpened())
cout << "can not open the usb" << endl;
return -1;
//cap.set()
double w = cap.get(cv::CAP_PROP_FRAME_WIDTH);
double h = cap.get(cv::CAP_PROP_FRAME_HEIGHT);
double fps = cap.get(cv::CAP_PROP_FPS);
if (fps == 0)
fps = 10;
int delay = 1000 / fps;
while (1)
if (IsStop())
break;
Group_Mat * gm = ReadFrame();
if (gm == NULL)
break;
if (_cs != NULL)
_cs(gm);
/*if (_show == 1)
imshow("show", gm->_bgrMat);
*/
cv::waitKey(delay);
this->Stop();
return 0;
;
int Rtsp()
if (_videoconfig == NULL)
return -1;
string url = _videoconfig->url;
if (cap.open(url) == false)
cout << "error:rtsp url :" << endl;
cout << url << " can't be open" << endl;
return -1;
double w = cap.get(cv::CAP_PROP_FRAME_WIDTH);
double h = cap.get(cv::CAP_PROP_FRAME_HEIGHT);
double fps = cap.get(cv::CAP_PROP_FPS);
if (fps == 0)
fps = 20;
int delay = 1000 / fps;
while (1)
if (IsStop())
break;
Group_Mat * gm = ReadFrame();
if (gm == NULL)
break;
if (_cs != NULL)
_cs(gm);
/*if (_show == 1)
imshow("show", gm->_bgrMat);
*/
cv::waitKey(delay);
this->Stop();
return 0;
void Run()
if (_type == _type_usb)
USB();
else if (_type == _type_rtsp)
Rtsp();
else if (_type == _type_rtsp_live555)
;
先来看看我们的example.cpp 怎么进行流程的,
TProcessRead
TProcessRtsp
TProcessPdec
TProcessPbgr
TProcessSend
TProcessSpre
每个类都是从TThreadRunable继承,每个类都为线程单元类,接下去为每个对象赋值他们的回调函数
func_cb_send read_2_pdec = std::bind(&TProcessPdec::callback_recv, &process_pdec, _1);
func_cb_send read_2_pbgr = std::bind(&TProcessPbgr::callback_recv, &process_pbgr, _1);
func_cb_send read_2_send = std::bind(&TProcessSend::callback_recv, &process_send, _1);
func_cb_send read_2_serv = std::bind(&TProcessSpre::callback_recv, &process_serv, _1);
使用SetNextProcess 函数来设置下一个流程处理。
int main()
Init();
TReadConfig config;
if (config.ReadConfig() != 0)
return -1;
//计算返回的数据
//TProcessffmp process_ffmp;
TProcessRead process_read;
TProcessRtsp process_rtsp;
TProcessPdec process_pdec;
TProcessPbgr process_pbgr;
TProcessSend process_send;
TProcessSpre process_serv;
func_cb_send read_2_pdec = std::bind(&TProcessPdec::callback_recv, &process_pdec, _1);
func_cb_send read_2_pbgr = std::bind(&TProcessPbgr::callback_recv, &process_pbgr, _1);
func_cb_send read_2_send = std::bind(&TProcessSend::callback_recv, &process_send, _1);
func_cb_send read_2_serv = std::bind(&TProcessSpre::callback_recv, &process_serv, _1);
//config start
if (config.use_usb == 1)
string name;
vconfig *obj = config.GetUsbFirst(name);
if(obj!=NULL)
process_read.Set(_type_usb,name,obj);
else if (config.use_rtsp == 1)
string name;
vconfig * obj = config.GetRtspFirst(name);
if (obj != NULL)
//process_ffmp.Set(obj->url.c_str(), obj->tcp, name.c_str());
process_rtsp.Set(obj->url.c_str(), obj->tcp, name.c_str());
process_pdec.Set(name.c_str());
process_send.Set(NULL, ws_callback);
//config over
if (config.use_usb == 1)
process_read.SetNextProcess(read_2_pbgr);
else if (config.use_rtsp == 1)
process_rtsp.SetNextProcess(read_2_pdec);
process_pdec.SetNextProcess(read_2_pbgr);
//else if (config.user_file == 1)
//
// process_ffmp.SetNextProcess(read_2_pbgr);
//
process_pbgr.SetNextProcess(read_2_send);
process_send.SetNextProcess(read_2_serv);
uint16_t port =(uint16_t)config.wsserver_port;
if (port == 0)
port = 9090;
//在9090端口上等待数据
cout << "ws server list at " << port << endl;
process_serv.Start(port);
process_send.Start();
cout << "process algorithm start"<<endl;
process_pbgr.Start();
cout << "process read start" << endl;
process_pdec.Start();
cout << "process pdec start" << endl;
if (config.use_usb == 1)
process_read.Start();
process_read.Join();
else if(config.use_rtsp == 1)
process_rtsp.Start();
process_rtsp.Join();
return 0;
算法处理单元
以下为线程处理单元的run线程处理函数
void TProcessPbgr::Run()
while (1)
if (IsStop())
break;
WaitForSignal();
Group_Mat * data = _group.Pop();
while (data != NULL)
if (IsStop())
break;
if (_cs != NULL)
param_data * d = new param_data();
//传递名字
d->name = data->name;
//准备以jpg发送出去
d->type = _enumjpg;
d->rgb = data->_bgrMat;
_algorithm.articulation(data->_grayMat, d->articulate);
_algorithm.colorException(data->_bgrMat, d->c_cast, d->c_da, d->c_db);
_algorithm.brightnessException(data->_grayMat, d->b_cast, d->b_da);
//msk = _algorithm.DectectorMsk(data->_grayMat,40,200);
_cs(d);
delete data;
data = _group.Pop();
总结
这种方式比较好理解,改进也有很多方式,线程是宝贵的资源,该节省的地方还是要节省,每一路都使用很多线程是比较耗费的。后面会进行改进,除了普通算法,为AI处理推理也提供了一种思路
以上是关于一种流水线方式处理远程视频的方案的主要内容,如果未能解决你的问题,请参考以下文章