SrsPublishRecvThread、SrsRecvThread、SrsReusableThread2、SrsThread 之间的关系图
1. recv 线程函数:SrsThread::thread_fun
void *SrsThread::thread_fun(void *arg)
{
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj);
/* 进入线程循环 */
obj->thread_cycle();
// for valgrind to detect.
SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
if (ctx) {
ctx->clear_cid();
}
st_thread_exit(NULL);
return NULL;
}
1.1 SrsThread::thread_cycle
void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS;
/* 生成 recv 线程的一个上下文 id */
_srs_context->generate_id();
srs_info("thread %s cycle start", _name);
/* 将生成的 recv 上下文 id 赋给 _cid,以便 recv 的父线程醒来后
* 可以继续往下执行,此时父线程会设置 can_run 为 true */
_cid = _srs_context->get_id();
srs_assert(handler);
/* 调用 SrsReusableThread2 实现的 on_thread_start 函数 */
handler->on_thread_start();
// thread is running now.
really_terminated = false;
/* 上面生成好 recv 线程的上下文 id 后,这里会陷入休眠,接着会
* 调度到 recv 的父线程执行,父线程检测到 _cid 准备好后,
* 即会设置 can_run 为 true,表示 recv 线程可以继续往下执行了 */
// wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) {
st_usleep(10 * 1000);
}
while (loop) {
/* 该函数没有具体做任何事,忽略 */
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d",
_name, ret);
goto failed;
}
srs_info("thread %s on before cycle success", _name);
/* 调用 SrsReusableThread2 实现的 cycle 函数 */
if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret))
{
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
}
goto failed;
}
srs_info("thread %s cycle success", _name);
if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d",
_name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", _name);
failed:
if (!loop) {
break;
}
// to improve performance, donot sleep when interval is zero.
// @see: https://github.com/ossrs/srs/issues/237
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
}
}
// readly terminated now.
really_terminated = true;
handler->on_thread_stop();
srs_info("thread %s cycle finished", _name);
}
1.2 SrsReusableThread2::on_thread_start
void SrsReusableThread2::on_thread_start()
{
handler->on_thread_start();
}
该函数中接着调用 SrsRecvThread 实现的 on_thread_start 函数。
1.2.1 SrsRecvThread::on_thread_start
void SrsRecvThread::on_thread_start()
{
// the multiple messages writev improve performance large,
// but the timeout recv will cause 33% sys call performance,
// to use isolate thread to recv, can improve about 33% performance.
// @see https://github.com/ossrs/srs/issues/194
// @see: https://github.com/ossrs/srs/issues/217
rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT);
handler->on_thread_start();
}
函数先设置 recv 的超时时间为 -1,然后接着调用 SrsPublishRecvThread 实现的 on_thread_start 函数。
1.2.2 SrsPublishRecvThread::on_thread_start
void SrsPublishRecvThread::on_thread_start()
{
// we donot set the auto response to false,
// for the main thread never send message.
/* 若配置文件中没有设置 mr 配置项,则默认没有启用该功能,可忽略 */
#ifdef SRS_PERF_MERGED_READ
if (mr) {
// set underlayer buffer size
set_socket_buffer(mr_sleep);
// disable the merge read
// @see https://github.com/ossrs/srs/issues/241
rtmp->set_merge_read(true, this);
}
#endif
}
1.3 SrsReusableThread2::cycle
int SrsReusableThread2::cycle()
{
return handler->cycle();
}
接着调用 SrsRecvThread 实现的 cycle 函数,该函数才开始真正接收客户端推流的数据。
2. 接收推流数据:SrsRecvThread::cycle
int SrsRecvThread::cycle()
{
int ret = ERROR_SUCCESS;
/* 若当前没有被中断的情况下,进入循环开始接收客户端的消息 */
while (!trd->interrupted()) {
/* 调用 SrsPublishRecvThread 实现的 can_handle 函数,该函数
* 默认返回 true,即默认推流线程总是可以处理消息 */
if (!handler->can_handler()) {
st_usleep(timeout * 1000);
continue;
}
SrsCommonMessage* msg = NULL;
// recv and handle message
ret = rtmp->recv_message(&msg);
if (ret == ERROR_SUCCESS) {
/* 调用 SrsPublishRecvThread 实现的 handle 函数处理接收到的消息 */
ret = handler->handle(msg);
}
/* 若发生错误,则中断当前 recv 线程 */
if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret))
{
srs_error("thread process message failed. ret=%d", ret);
}
// we use no timeout to recv, should never got any error.
trd->interrupt();
// notice the handler got a recv error
handler->on_recv_error(ret);
return ret;
}
srs_verbose("thread loop recv message. ret=%d", ret);
}
return ret;
}
3. 接收推流数据:SrsRtmpServer::recv_message
int SrsRtmpServer::recv_message(SrsCommonMessage** pmsg)
{
return protocol->recv_message(pmsg);
}
该函数接着调用 SrsProtocol 实现的 recv_message 函数。
3.1 SrsProtocol::recv_message
int SrsProtocol::recv_message(SrsCommonMessage** pmsg)
{
*pmsg = NULL;
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("recv interlaced message failed. ret=%d", ret);
}
srs_freep(msg);
return ret;
}
srs_verbose("entire msg received");
/* 若获取到一个空消息,则继续获取下一个消息 */
if (!msg) {
srs_info("got empty message without error.");
continue;
}
if (msg->size <= 0 || msg->header.payload_length <= 0) {
srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
srs_freep(msg);
continue;
}
/* 该函数首先检测当前接收到的字节数是否已经达到当前窗口大小,若是,则回应客户端窗口消息
* 然后接着对接收到的若为 应答窗口大小(5)、设置块大小(1)、用户控制消息(4) 则会进行解码,
* 并根据解析后的内容更新当前 rtmp 服务器的上下文信息 */
if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
srs_error("hook the received msg failed. ret=%d", ret);
srs_freep(msg);
return ret;
}
srs_verbose("got a msg, cid=%d, type=%d, size=%d, time=%"PRId64,
msg->header.perfer_cid, msg->header.message_type, msg->header.payload_length,
msg->header.timestamp);
*pmsg = msg;
break;
}
}
3.2 SrsProtocol::recv_interlaced_message
int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
int ret = ERROR_SUCCESS;
// chunk stream basic header.
char fmt = 0;
int cid = 0;
/* 读取 chunk 的基本头 */
if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read basic header failed. ret=%d", ret);
}
return ret;
}
srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
// the cid must not negative.
srs_assert(cid >= 0);
// get the cached chunk stream.
SrsChunkStream* chunk = NULL;
/* 一个消息客户端可能会分成几个 chunk 发送,因此需要把每次读取
* 的 chunk 的信息和负载缓存起来 */
// use chunk stream cache to get the chunk info.
// @see https://github.com/ossrs/srs/issues/249
if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
// chunk stream cache hit.
srs_verbose("cs-cache hit, cid=%d", cid);
// already init, use it direclty
chunk = cs_cache[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
} else {
// chunk stream cache miss, use map.
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
// set the perfer cid of chunk,
// which will copy to the message received.
chunk->header.perfer_cid = cid;
srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
} else {
chunk = chunk_streams[cid];
srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
}
}
// chunk stream message header
if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read message header failed. ret=%d", ret);
}
return ret;
}
srs_verbose("read message header success. fmt=%d, ext_time=%d, size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
fmt, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
// read msg payload from chunk stream.
SrsCommonMessage* msg = NULL;
if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) {
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
srs_error("read message payload failed. ret=%d", ret);
}
return ret;
}
// not got an entire RTMP message, try next chunk.
if (!msg) {
srs_verbose("get partial message success. size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
(msg? msg->size : (chunk->msg? chunk->msg->size : 0)),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
/* 获取到完整的消息 */
*pmsg = msg;
srs_info("get entire message success. size=%d, "
"message(type=%d, size=%d, time=%"PRId64", sid=%d)",
(msg? msg->size : (chunk->msg? chunk->msg->size : 0)),
chunk->header.message_type, chunk->header.payload_length,
chunk->header.timestamp, chunk->header.stream_id);
return ret;
}
4. 处理推流消息:SrsPublishRecvThread::handle
int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
// when cid changed, change it.
if (ncid != cid) {
_srs_context->set_id(ncid);
cid = ncid;
}
/* 每接收到一个消息,该将该消息计数值加 1 */
_nb_msgs++;
/* 若当前消息为视频,则视频帧数加 1 */
if (msg->header.is_video()) {
video_frames++;
}
/* log to show the time of recv thread. */
srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
srs_update_system_time_ms(), msg->header.timestamp, msg->size);
/* the rtmp connection will handle this message. */
ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
/* must always free it,
* the source will copy it if need to use. */
srs_freep(msg);
return ret;
}
该函数接着主要调用 SrsRtmpConn 实现的 handle_publish_message 函数。
4.1 SrsRtmpConn::handle_publish_message
int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg,
bool is_fmle, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
/* process publish event. */
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("fmle decode unpublish message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt);
/* for flash, any packet is republish. */
if (!is_fmle) {
/* flash unpublish.
* TODO: maybe need to support republish. */
srs_trace("flash flash publish finished.");
return ERROR_CONTROL_REPUBLISH;
}
/* for fmle, drop others except the fmle start packet. */
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id))
!= ERROR_SUCCESS) {
return ret;
}
return ERROR_CONTROL_REPUBLISH;
}
srs_trace("fmle ignore AMF0/AMF3 command message.");
return ret;
}
/* video, audio, data message */
if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) {
srs_error("fmle process publish message failed. ret=%d", ret);
return ret;
}
return ret;
}
这里暂先不分析接收到 unpublish 的情况,而对于接收到 video、audio 和 data message 等消息情况下,直接调用 SrsRtmpConn 实现的 process_publish_message 进行处理。
5. 媒体数据的处理:SrsRtmpConn::process_publish_message
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg,
bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
// for edge, directly proxy message to origin.
if (vhost_is_edge) {
if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
srs_error("edge publish proxy msg failed. ret=%d", ret);
return ret;
}
return ret;
}
// process audio packet
if (msg->header.is_audio()) {
if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("source process audio message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process video packet
if (msg->header.is_video()) {
if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
srs_error("source process video message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
srs_error("source process aggregate message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process onMetadata
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
/* 解析元数据 */
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
srs_error("source process onMetaData message failed. ret=%d", ret);
return ret;
}
srs_info("process onMetaData message success.");
return ret;
}
srs_info("ignore AMF0/AMF3 data message.");
return ret;
}
return ret;
}
5.1 onMetaData
通常接收到的第一个媒体数据包一般为 onMetaData,抓包图如下图所示。
接收到 onMetaData 数据包后,需要调用 SrsRtmpServer 实现的 decode_message 函数对该包进行解码。
5.1.1 SrsRtmpServer::decode_message
int SrsRtmpServer::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
return protocol->decode_message(msg, ppacket);
}
该函数接着调用 SrsProtocol 实现的 decode_message 函数。
5.1.2 SrsProtocol::decode_message
int SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** packet)
{
*ppacket = NULL;
int ret = ERROR_SUCCESS;
srs_assert(msg != NULL);
srs_assert(msg->payload != NULL);
srs_assert(msg->size > 0);
SrsStream stream;
// initialize the decode stream for all message,
// it\'s ok for the initialize if fast and without memory copy.
if ((ret = stream.initialize(msg->payload, msg->size)) != ERROR_SUCCESS) {
srs_error("initialize stream failed. ret=%d", ret);
return ret;
}
srs_verbose("decode stream initialized success");
// decode the packet.
SrsPacket* packet = NULL;
if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) {
srs_freep(packet);
return ret;
}
// set to output ppacket only when success.
*ppacket = packet;
return ret;
}
该函数将消息的负载转化为一个字节流,便于调用 SrsProtocol 实现的 do_decode_message 函数对负载数据进行解码。
5.1.3 SrsProtocol::do_decode_message
int SrsProtocol::do_decode_message(SrsMessageHeader& header,
SrsStream* stream, SrsPacket* ppacket)
{
int ret = ERROR_SUCCESS;
SrsPacket* packet = NULL;
// decode specified packet type
if (header.is_amf0_command() || header.is_amf3_command() ||
header.is_amf0_data() || header.is_amf3_data(0)
{
srs_verbose("start to decode AMF0/AMF3 command message.");
// skip 1bytes to decode the amf3 command.
if (header.is_amf3_command() && stream->require(1)) {
srs_verbose("skip 1bytes to decode AMF3 command");
stream->skip(1);
}
// amf0 command message.
// need to read the command name.
std::string command;
if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) {
srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
return ret;
}
srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
// result/error packet
if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
/* 这里先忽略,仅考虑对 amf0_data 类型的解码 */
...
}
// reset to zero(amf3 to 1) to restart decode.
stream->skip(-1 * stream->pos());
if (header.is_amf3_command()) {
stream->skip(1);
}
// decode command object.
if (command == RTMP_AMF0_COMMAND_CONNECT)
{
...
}
...
/* "@setDataFrame" or "onMetaData" */
else if (command == SRS_CONSTS_RTMP_SET_DATAFRAME ||
command == SRS_CONSTS_RTMP_ON_METADATA) {
srs_info("decode the AMF0/AMF3 data(onMetaData message).");
*ppacket = packet = new SrsOnMetaDataPacket();
/* 调用 SrsOnMetaDataPacket 类实现的 decode 函数 */
return packet->decode(stream);
}
...
// default packet to drop message.
srs_info("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
*ppacket = packet = new SrsPacket();
return ret;
} else if (header.is_user_control_message()) {
...
} else if
...
return ret;
}
对于接收到的 amf_data 类型的数据,统一构造一个 SrsOnMetaDataPacket 类,然后调用该类实现的 decode 函数进行解码。
5.1.4 SrsOnMetaDataPacket 构造函数
/**
* the stream metadata.
* FMLE: @setDataFrame
* others: onMetaData
*/
SrsOnMetaDataPacket::SrsOnMetaDataPacket()
{
name = SRS_CONSTS_RTMP_ON_METADATA;
/**
* Metadata of stream.
* @remark, never be NULL, an AMF0 object instance.
*/
metadata = SrsAmf0Any::object();
}
若为 FMLE(Flash Media Live Encoder) 软件,则发送的 amf0_data 消息名为 "@setDataFrame",其他的则为 "onMetaData"。
5.1.5 SrsOnMetaDataPacket::decode
int SrsOnMetaDataPacket::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, name)) != ERROR_SUCCESS) {
srs_error("decode metadata name failed. ret=%d", ret);
return ret;
}
// ignore the @setDataFrame
if (name == SRS_CONSTS_RTMP_SET_DATAFRAME) {
/* 名称以 "onMetaData" 为准 */
if ((ret = srs_amf0_read_string(stream, name)) != ERROR_SUCCESS) {
srs_error("decode metadata name failed. ret=%d", ret);
return ret;
}
}
srs_verbose("decode metadata name success. name=%s", name.c_str());
// the metadata mayby object or ecma array
SrsAmf0Any* any = NULL;
/* 由上图知,该 metadata 的数据类型是 ecma array */
if ((ret = srs_amf0_read_any(stream, &any)) != ERROR_SUCCESS) {
srs_error("decode metadata metadata failed. ret=%d", ret);
return ret;
}
srs_assert(any);
if (any_is_object()) {
srs_freep(metadata);
metadata = any->to_object();
srs_info("decode metadata object success");
return ret;
}
SrsAutoFree(SrsAmf0Any, any);
if (any->is_ecma_array()) {
SrsAmf0EcmaArray* arr = any->to_ecma_array();
// if ecma array, copy to object.
for (int i = 0; i < arr->count(); i++) {
/* 将解析出来的数据拷贝到 metadata 的 properties 中,
* metadata 是指向 SrsAmf0Object 对象的指针 */
metadata->set(arr->key_at(i), arr->value_at(i)->copy());
}
srs_info("decode metadata array success");
}
return ret;
}
该函数主要是解析 metadata 数据,然后将其保存在 SrsOnMetaDataPacket 类的成员 metadata 中。
5.1.6 srs_amf0_read_any
int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any** ppvalue)
{
int ret = ERROR_SUCCESS;
/* 读取 marker,发现是 ecma array 类型,则会构造一个 SrsAmf0EcmaArray 对象,
* 通过 ppvalue 返回该对象 */
if ((ret = SrsAmf0Any::discovery(stream, ppvalue)) != ERROR_SUCCESS) {
srs_error("amf0 discovery any elem failed. ret=%d", ret);
return ret;
}
srs_assert(*ppvalue);
/* 调用 SrsAmf0EcmaArray 类实现的 read 函数读取metadata携带的各项property */
if ((ret = (*ppvalue)->read(stream)) != ERROR_SUCCESS) {
srs_error("amf0 parse elem failed. ret=%d", ret);
srs_freep(*ppvalue);
return ret;
}
return ret;
}
5.1.7 SrsAmf0EcmaArray::read
int SrsAmf0EcmaArray::read(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 read ecma_array marker failed. ret=%d", ret);
return ret;
}
/* 读取 AMF0 type:ECMA array 为 0x08 */
char marker = stream->read_1bytes();
if (marker != RTMP_AMF0_EcmaArray) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 check ecma_array marker failed. "
"marker=%#x, required=%#x, ret=%d", marker, RTMP_AMF0_EcmaArray, ret);
return ret;
}
srs_verbose("amf0 read ecma_array marker success");
// count
if (!stream->require(4)) {
ret = ERROR_RTMP_AMF0_DECODE;
srs_error("amf0 read ecma_array count failed. ret=%d", ret);
return ret;
}
/* 读取该 ECMA array 中有多少个 property */
int32_t count = stream->read_4bytes();
srs_verbose("amf0 read ecma_array count success. count=%d", count);
// value
this->_count = count;
while (!stream->empty()) {
// detect whether is eof.
if (srs_amf0_is_object_eof(stream)) {
SrsAmf0ObjectEOF pbj_eof;
/* ECMA array 类型同样以 0x00 0x00 0x09 结尾,与 object 一样 */
if ((ret = pbj_eof.read(stream)) != ERROR_SUCCESS) {
srs_error("amf0 ecma_array read eof failed. ret=%d", ret);
return ret;
}
srs_info("amf0 read ecma_array EOF.");
break;
}
// property-name: utf8 string
std::string property_name;
/* 读取 property 的名称 */
if ((ret =srs_amf0_read_utf8(stream, property_name)) != ERROR_SUCCESS) {
srs_error("amf0 ecma_array read property name failed. ret=%d", ret);
return ret;
}
/* 读取 property 的值:number or string or boolean */
// property-value: any
SrsAmf0Any* property_value = NULL;
if ((ret = srs_amf0_read_any(stream, &property_value)) != ERROR_SUCCESS) {
srs_error("amf0 ecma_array read property_value failed. "
"name=%s, ret=%d", property_name.c_str(), ret);
return ret;
}
/* 将获取到的每一个 property 以该 property 的名称为 key,保存到 SrsAmf0EcmaArray 类的
* 成员 properties 中,该 properties 是一个指向 SrsUnSortedHashtable 类的指针,该类的
* 成员 properties 维护了一个 std::vector<SrsAmf0ObjectPropertyType> 容器,该容器用于
* 存放所有获取到的 property 项 */
// add property
this->set(property_name, property_value);
}
return ret;
}
解析 metadata 数据成功后,接下来是调用 SrsSource 实现的 on_meta_data 函数对解析后的 metadata 做进一步的处理。
5.1.8 SrsSource::on_meta_data
int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
int ret = ERROR_SUCCESS;
/* hls 和 dvr 的暂时忽略 */
#ifdef SRS_AUTO_HLS
if (metadata && (ret = hls->on_meta_data(metadata->metadata)) != ERROR_SUCCESS) {
srs_error("hls process onMetaData message failed. ret=%d", ret);
return ret;
}
#endif
#ifdef SRS_AUTO_DVR
if (metadata && (ret = dvr->on_meta_data(metadata)) != ERROR_SUCCESS) {
srs_error("dvr process onMetaData message failed. ret=%d", ret);
return ret;
}
#endif
SrsAmf0Any* prop = NULL;
// when exists the duration, remove it to make ExoPlayer happy.
if (metadata->metadata->get_property("duration") != NULL) {
metadata->metadata->remove("duration");
}
// generate metadata info to print
std::stringstream ss;
if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) {
ss << ", width=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) {
ss << ", height=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) {
ss << ", vcodec=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) {
ss << ", acodec=" << (int)prop->to_number();
}
srs_trace("got metadata%s", ss.str().c_str());
// add server info to metadata.
metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
metadata->metadata->set("srs_primary", SrsAmf0Any::str(RTMP_SIG_SRS_PRIMARY));
metadata->metadata->set("srs_authors", SrsAmf0Any::str(RTMP_SIG_SRS_AUTHROS));
// version, for example, 1.0.0
// add version to metadata, please donot remove it, for debug.
metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
// if allow atc_auto and bravo-atc detected, open atc for vhost.
atc = _srs_config->get_atc(_req->vhost);
if (_srs_config->get_atc_auto(_req->vhost)) {
if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) {
if (prop->is_string() && prop->to_str() == "true") {
atc = true;
}
}
}
// encode the metadata to payload
int size = 0;
char* payload = NULL;
/* 调用继承自父类 SrsPacket 的函数 encode 将 metadata 中的元数据信息编码成
* payload */
if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) {
srs_error("encode metadata error. ret=%d", ret);
srs_freep(payload);
return ret;
}
srs_verbose("encode metadata success.");
if (size <= 0) {
srs_warn("ignore the invalid metadata. size=%d", size);
return ret;
}
// when already got metadata, drop when reduce sequence header.
bool drop_for_reduce = false;
if (cache_metadata && _srs_config->get_reduce_sequence_header(_req->vhost)) {
drop_for_reduce = true;
srs_warn("drop for reduce sh metadata, size=%d", msg->size);
}
// create a shared ptr message.
srs_freep(cache_metadata);
cache_metadata = new SrsSharedPtrMessage();
// dump message to shared ptr message.
// the payload/size managed by cache_metadata, user should not free it.
if ((ret = cache_metadata->create(&msg->header, payload, size)) != ERROR_SUCCESS) {
srs_error("initialize the cache metadata failed. ret=%d", ret);
return ret;
}
srs_verbose("initialize shared ptr metadata success.");
// copy to all consumer
if (!drop_for_reduce) {
/* 若有其他客户端订阅了该直播流,则通知这些客户端 */
std::vector<SrsConsumer*>::iterator it;
for (it = consumer.begin(); it != consumers.end(); ++it) {
SrsConsumer* consumer = *it;
if ((ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm))
!= ERROR_SUCCESS) {
srs_error("dispatch the metadata failed. ret=%d", ret);
return ret;
}
}
}
// copy to all forwarders
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_meta_data(cache_metadata)) != ERROR_SUCCESS) {
srs_error("forwarder process onMetaData message failed. ret=%d", ret);
return ret;
}
}
}
return ret;
}
5.1.9 SrsPacket::encode
/*
* the subpacket can override this encode,
* for example, video and audio will directly set the payload without memory copy,
* other packet which need to serialize/encode to bytes by override the
* get_size and encode_packet.
*/
int SrsPacket::encode(int& psize, char*& ppayload)
{
int ret = ERROR_SUCCESS;
int size = get_size();
char* payload = NULL;
SrsStream stream;
if (size > 0) {
payload = new char[size];
if ((ret = stream.initialize(payload, sizse)) != ERROR_SUCCESS) {
srs_error("initialize the stream failed. ret=%d", ret);
srs_freepa(payload);
return ret;
}
}
/* 调用 SrsOnMetaDataPacket 类实现的 encode_packet 函数 */
if ((ret = encode_packet(&stream)) != ERROR_SUCCESS) {
srs_error("encode the packet failed. ret=%d", ret);
srs_freepa(payload);
return ret;
}
psize = size;
ppayload = payload;
srs_verbose("encode the packet success. size=%d", size);
return ret;
}
5.1.10 SrsOnMetaDataPacket::encode_packet
int SrsOnMetaDataPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, name)) != ERROR_SUCCESS) {
srs_error("encode name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode name success.");
/* 调用 SrsAmf0Object 类实现的 write 函数 */
if ((ret = metadata->write(stream)) != ERROR_SUCCESS) {
srs_error("encode metadata failed. ret=%d", ret);
return ret;
}
srs_verbose("encode metadata success.");
srs_info("encode onMetaData packet success.");
return ret;
}
5.1.11 SrsAmf0Object::write
int SrsAmf0Object::write(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
// marker
if (!stream->require(1)) {
ret = ERROR_RTMP_AMF0_ENCODE;
srs_error("amf0 write object marker failed. ret=%d", ret);
return ret;
}
/* 写入 1 字节的 amf 类型 */
stream->write_1bytes(RTMP_AMF0_Object);
srs_verbose("amf0 write object marker success");
// value
for (int i = 0; i < properties->count(); i++) {
std::string name = this->key_at(i);
SrsAmf0Any* any = this->value_at(i);
if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) {
srs_error("write object property name failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_write_any(stream, any)) != ERROR_SUCCESS) {
srs_error("write object property value failed. ret=%d", ret);
return ret;
}
srs_verbose("write amf0 property success. name=%s", name.c_str());
}
/* 写入结束标志 0x00 0x00 0x09 */
if ((ret = eof->write(stream)) != ERROR_SUCCESS) {
srs_error("write object eof failed. ret=%d", ret);
return ret;
}
srs_verbose("write amf0 object success.");
return ret;
}
5.1.12 SrsSharedPtrMessage 构造函数
/*
* shared ptr message.
* for audio/video/data message that need less memory copy.
* and only for output.
*
* create first object by constructor and create(),
* use copy if need reference count message.
*/
SrsSharedPtrMessage::SrsSharedPtrMessage()
{
ptr = NULL;
}
5.1.13 SrsSharedPtrMessage::create
/*
* create shared ptr message,
* from the header and payload.
* @remark user should never free the payload.
* @param pheader, the header to copy to the message. NULL to ignore.
*/
int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int size)
{
int ret = ERROR_SUCCESS;
if (ptr) {
ret = ERROR_SYSTEM_ASSERT_FAILED;
srs_error("should not set the payload twice. ret=%d", ret);
srs_assert(false);
return ret;
}
/* 构造 SrsSharedPtrPayload */
ptr = new SrsSharedPtrPayload();
/* 将解析自 metadata 消息的消息头内容赋给 ptr->header 的相应成员 */
// direct attach the data.
if (pheader) {
/* amf0_data: 0x12 */
ptr->header.message_type = pheader->message_type;
/* 负载的大小 */
ptr->header.payload_length = size;
ptr->header.perfer_cid = pheader->perfer_cid;
this->timestamp = pheader->timestamp;
this->stream_id = pheader->stream_id;
}
ptr->payload = payload;
ptr->size = size;
// message can access it.
/*
* payload:
* the payload of message, the SrsCommonMessage never know about the detail of payload,
* user must use SrsProtocol.decode_message to get concrete packet.
* @remark, not all message payload can be decoded to packet. for example,
* video/audio packet use raw bytes, no video/audio packet.
*/
this->payload = ptr->payload;
/*
* current message parsed size,
* size <= header.payload_length
* for the payload maybe sent in multiple chunks.
*/
this->size = ptr->size;
return ret;
}
5.1.14 SrsSharedPtrPayload 构造函数
SrsSharedPtrMessage::SrsSharedPtrPayload::SrsSharedPtrPayload()
{
/* actual shared payload. */
payload = NULL;
/* size of payload. */
size = 0;
/* the reference count */
shared_count = 0;
}
5.1.15 通知消费者:SrsConsumer::enqueue
/**
* enqueue an shared ptr message.
* @param shared_msg, directly ptr, copy it if need to save it.
* @param whether atc, donot use jitter correct if true.
* @param ag the algorithm of time jitter.
*/
int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc,
SrsRtmpJitterAlgorithm ag)
{
int ret = ERROR_SUCCESS;
/* 拷贝一个副本返回给 msg */
SrsSharedPtrMessage* msg = shared_msg->copy();
/* 若 atc 为 false,则使用 jitter 进行校正 */
if (!atc) {
if ((ret = jitter->correct(msg, ag)) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
}
}
if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) {
return ret;
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
srs_verbose("enqueue msg, time=%"PRId64", size=%d, "
"duration=%d, waiting=%d, min_msg=%d",
msg->timestamp, msg->size, queue->duration(), mw_waiting, mw_min_msgs);
// fire the mw when msgs is enough.
/* 若有消费者,即播放客户端正在等待 msg 准备好,即上面的 queue->enqueue 成功返回 */
if (mw_waiting) {
int duration_ms = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// For ATC, maybe the SH timestamp bigger than A/V packet,
// when encoder republish or overflow.
// @see https://github.com/ossrs/srs/pull/749
if (atc && duration_ms < 0) {
st_cond_signal(mw_wait);
mw_waiting = false;
return ret;
}
// when duration ok, signal to flush.
if (match_min_msgs && duration_ms > mw_duration) {
st_cond_signal(mw_wait);
mw_waitting = false;
return ret;
}
}
#endif
return ret;
}
5.1.16 SrsRtmpJitter::correct
int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag)
{
int ret = ERROR_SUCCESS;
// for performance issue
if (ag != SrsRtmpJitterAlgorithmFULL) {
// all jitter correct features is disabled, ignore.
if (ag == SrsRtmpJitterAlgorithmOFF) {
return ret;
}
// start at zero, but donot ensure monotonically increasing.
if (ag == SrsRtmpJitterAlgothmZERO) {
// for the first time, last_pkt_corrent_time is -1.
if (last_pkt_correct_time == -1) {
last_pkt_correct_time = msg->timestamp;
}
msg->timestamp -= last_pkt_correct_time;
return ret;
}
// other algorithm, ignore.
return ret;
}
// full jitter algorithm, do jitter correct.
// set to 0 for metadata.
if (!msg->is_av()) {
msg->timestamp = 0;
return ret;
}
/**
* we use a very simple time jitter detect/correct algorithm:
* 1. delta: ensure the delta is positive and valid,
* we set the delta to DEFAULT_FRAME_TIME_MS,
* if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.
* 2. last_pkt_time: specifies the original packet time,
* is used to detect next jitter.
* 3. last_pkt_correct_time: simply add the positive delta,
* and enforce the time monotonically.
*/
int64_t time = msg->timestamp;
int64_t delta = time - last_pkt_time;
// if jitter detected, reset the delta.
if (delta < CONST_MAX_JITTER_MS_NED || delta > CONST_MAX_JITTER_MS) {
// use default 10ms to notice the problem of stream.
// @see https://github.com/ossrs/srs/issues/425
delta = DEFAULT_FRAME_TIME_MS;
srs_info("jitter detected, last_pts=%"PRId64", pts=%"PRId64", "
"diff=%"PRId64", last_time=%"PRId64", time=%"PRId64", diff=%"PRId64"",
last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time,
last_pkt_correct_time + delta, delta);
} else {
srs_verbose("timestamp no jitter. time=%"PRId64", "
"last_pkt=%"PRId64", correct_to=%"PRId64"",
time, last_pkt_time, last_pkt_correct_time + delta);
}
last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
msg->timestamp = last_pkt_correct_time;
last_pkt_time = time;
return ret;
}
若传入的第二个参数为 SrsRtmpJitterAlgorithmOFF,则禁止所有的 jitter 校正,构造 SrsSource 的时候默认初始化为 SrsRtmpJitterAlgorithmOFF。
5.1.17 SrsMessageQueue::enqueue
/*
* enqueue the message, the timestamp always monotonically.
* @param msg, the msg to enqueue, user never free it whatever the return code.
* @param is_overflow, whether overflow and shrinked. NULL to ignore.
*/
int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
{
int ret = ERROR_SUCCESS;
if (msg->is_av()) {
if (av_start_time == -1) {
av_start_time = msg->timestamp;
}
av_end_time = msg->timestamp;
}
/* 若声明了 SRS_PERF_QUEUE_FAST_VECTOR 宏,则调用 SrsFastVector 类
* 实现的 push_back 函数 */
msgs.push_back(msg);
/* 检测 msgs 队列是否溢出 */
while (av_end_time - av_start_time > queue_size_ms) {
// notice the caller queue already overflow and sharinked.
if (is_overflow) {
*is_overflow = true;
}
/* 满溢的情况下,移除一个 gop */
sharink();
}
return ret;
}
5.1.18 SrsFastVector::push_back
void SrsFastVector::push_back(SrsSharedPtrMessage* msg)
{
// increase vector.
if (count >= nb_msgs) {
int size = nb_msgs * 2;
SrsSharedPtrMessage** buf = new SrsSharedPtrMessage*[size];
for (int i = 0; i < nb_msgs; i++) {
buf[i] = msgs[i];
}
srs_warn("fast vector incrase %d=>%d", nb_msgs, size);
// use new array.
srs_freep(msgs);
msgs = buf;
nb_msgs = size;
}
/* msgs 指向一个 SrsSharedPtrMessage 类型的二级数组,该数组的成员是
* SrsSharedPtrMessage* */
msgs[count++] = msg;
}
该函数是直接将 msg 放入到 SrsFastVector 类的成员 msgs 数组中(若该数组大小足够的话)。
5.1.19 SrsMessageQueue::shrink
/*
* remove a gop from the front.
* if no iframe found, clear it.
*/
void SrsMessageQueue::shrink()
{
SrsSharedPtrMessage* video_sh = NULL;
SrsSharedPtrMessage* audio_sh = NULL;
int msgs_size = (int)msgs.size();
// remove all msg
// ignore the sequence header
for (int i = 0; i < (int)msgs.size(); i++) {
SrsSharedPtrMessage* msg = msgs.at(i);
if (msg->is_video() &&
SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size))
{
srs_freep(video_sh);
video_sh = msg;
continue;
}
else if (msg->is_audio() &&
SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size))
{
srs_freep(audio_sh);
audio_sh = msg;
continue;
}
srs_freep(msg);
}
msgs.clear();
// update av_start_time
av_start_time = av_end_time;
// push_back sequence header and update timestamp
if (video_sh) {
video_sh->timestamp = av_end_time;
msgs.push_back(video_sh);
}
if (audio_sh) {
audio_sh->timestamp = av_end_time;
msgs.push_back(audio_sh);
}
if (_ignore_shrink) {
srs_info("shrink the cache queue, size=%d, removed=%d, max=%.2f",
(int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0);
} else {
srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f",
(int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0);
}
}
5.1.20 SrsMessageQueue::duration
/* get the duration of queue. */
int SrsMessageQueue::duration()
{
return (int)(av_end_time - av_start_time);
}
计算当前消息队列中所有消息的总 duration。
5.1.21 st_cond_signal
int st_cond_signal(_st_cond_t *cvar)
{
return _st_cond_signal(cvar, 0);
}
5.1.22 _st_cond_signal
static int _st_cond_signal(_st_cond_t *cvar, int broadcast)
{
_st_thread_t *thread;
_st_clist_t *q;
for (q = cvar->wait_q.next; q != &cvar->wait_q; q = q->next) {
thread = _ST_THREAD_WAITQ_PTR(q);
if (thread->state == _ST_ST_COND_WAIT) {
if (thread->flags & _ST_ST_ON_SLEEPQ)
_ST_DEL_SLEEPQ(thread);
/* Make thread runnable */
thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(thread);
if (!broadcast)
break;s
}
}
return 0;
}
5.2 Audio
假设接收到的第一个音频包如下图。
对于接收到的音频包,在 SrsRtmpConn::process_publish_message 函数中直接调用 SrsSource 类实现的 on_audio 函数进行处理。
5.2.1 SrsSource::on_audio
int SrsSource::on_audio(SrsCommonMessage* shared_audio)
{
int ret = ERROR_SUCCESS;
// monotically increate detect.
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) {
is_monotonically_increate = false;
srs_warn("AUDIO: stream not monotonically increase, please open mix_correct.");
}
}
/* 记录接收到的 audio 帧的时间戳 */
last_packet_time = shared_audio->header.timestamp;
// convert shared_audio to msg, user should not use shared_audio again.
// the payload is transfer to msg, and set to NULL in shared_audio.
SrsSharedPtrMessage msg;
/* 构造一个 SrsSharedPtrMessage 类,并将接收到的音频消息的消息头还有负载
* 拷贝到该新构建的 SrsSharedPtrMessage 类中 */
if ((ret = msg.create(shared_audio)) != ERROR_SUCCESS) {
srs_error("initialize the audio failed. ret=%d", ret);
return ret;
}
srs_info("Audio dts=%"PRId64", size=%d", msg.timestamp, msg.size);
/* 若没有开启 mix_correct 校正,则直接处理音频数据 */
// directly process the audio message.
if (!mix_correct) {
return on_audio_imp(&msg);
}
// insert msg to the queue.
mix_queue->push(msg.copy());
// fetch someone from mix_queue.
SrsSharedPtrMessage* m = mix_queue->pop();
if (!m) {
return ret;
}
// comsumer the monotonically increase message.
if (m->is_audio()) {
ret = on_audio_imp(m);
} else {
ret = on_video_imp(m);
}
srs_freep(m);
return ret;
}
5.2.2 SrsSource::on_audio_imp
int SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
srs_info("Audio dts=%"PRId64", size=%d", msg->timestamp, msg->size);
bool is_aac_sequence_header =
SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size);
bool is_sequence_header = is_aac_sequence_header;
// whether consumer should drop for the duplicated sequnece header.
bool drop_for_reduce = false;
if (is_sequence_header && cache_sh_audio &&
_srs_config->get_reduce_sequence_header(_req->vhost)) {
if (cache_sh_audio->size == msg->size) {
drop_for_reduce = srs_bytes_equals(cache_sh_audio->payload,
msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
}
}
/* 若是 AAC sequence header,则将该音频包的数据保存到 aac_extra_data 中 */
// cache the sequence header if aac
// donot cache the sequence header to gop_cache, return here.
if (is_aac_sequence_header) {
// parse detail audio codec
SrsAvcAacCodec codec;
SrsCodecSample sample;
/* 对接收到的音频数据进行解析 */
if ((ret = codec.audio_aac_demux(msg->payload, msg->size, &sample))
!= ERROR_SUCCESS) {
srs_error("source codec demux audio failed. ret=%d", ret);
return ret;
}
static int flv_sample_sizes[] = {8, 16, 0};
static int flv_shound_types[] = {1, 2, 0};
// when got audio stream info.
SrsStatistic* stat = SrsStatistic::instance();
if ((ret = stat->on_audio_info(_req, SrsCodecAudioAAC, sample.sound_rate,
sample.sound_type, codec.aac_object)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), "
"flv(%dbits, %dchannels, %dHZ)",
msg->size, codec.audio_codec_id,
srs_codec_aac_object2str(codec.aac_object).c_str(), codec.aac_channels,
codec.audio_data_rate / 1000, aac_sample_rates[codec.aac_sample_rate],
flv_sample_sizes[sample.sound_size], flv_sound_types[sample.sound_type],
flv_sample_rates[sample.sound_rate]);
}
#ifdef SRS_AUTO_HLS
if ((ret = hls->on_audio(msg)) != ERROR_SUCCESS) {
// apply the error strategy for hls.
// @see https://github.com/ossrs/srs/issues/264
std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);
if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {
srs_warn("hls process audio message failed, ignore and disable hls. ret=%d",
ret);
// unpublish, ignore ret.
hls->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
} else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {
if (srs_hls_can_continue(ret, cache_sh_audio, msg)) {
ret = ERROR_SUCCESS;
} else {
srs_warn("hls continue audio failed. ret=%d", ret);
return ret;
}
} else {
srs_warn("hls disconnect publisher for audio error. ret=%d", ret);
return ret;
}
}
#endif
#ifdef SRS_AUTO_DVR
if ((ret = dvr->on_audio(msg)) != ERROR_SUCCESS) {
srs_warn("dvr process audio message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
dvr->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
#ifdef SRS_AUTO_HDS
if ((ret = hds->on_audio(msg)) != ERROR_SUCCESS) {
srs_warn("hds process audio message failed, ignore and disable dvr. ret=%d", ret);
// unpublish, ignore ret.
hds->on_unpublish();
// ignore.
ret = ERROR_SUCCESS;
}
#endif
/* 将接收到的 audio message 放入到 consumer 所持有的 queue 队列中 */
// copy to all consumer
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {
srs_error("dispatch the audio failed. ret=%d", ret);
return ret;
}
}
srs_info("dispatch audio success.");
}
// copy to all forwarders.
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((ret = forwarder->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("forwarder process audio message failed. ret=%d", ret);
return ret;
}
}
}
// cache the sequence header of aac, or first packet of mp3.
// for example, the mp3 is used for hls to write the "right" audio codec.
// TODO: FIXME: to refine the stream info system.
if (is_aac_sequence_header || !cache_sh_audio) {
srs_freep(cache_sh_audio);
cache_sh_audio = msg->copy();
}
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
return ret;
}
// cache the last gop packets
if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) {
srs_error("shrink gop cache failed. ret=%d", ret);
return ret;
}
srs_verbose("cache gop success.");
// if aac, update the sequence header to abs time.
if (atc) {
if (cache_sh_audio) {
cache_sh_audio->timestamp = msg->timestamp;
}
if (cache_metadata) {
cache_metadata->timestamp = msg->timestamp;
}
}
return ret;
}
调用 SrsAvcAacCodec 类实现的 audio_aac_demux 对接收到的 aac 数据进行解析之前,需要构造两个类对象:SrsAvcAacCodec 和 SrsCodecSample。
5.2.3 SrsAvcAacCodec 构造函数
/*
* the h264/avc and aac codec, for media stream.
*
* to demux the FLV/RTMP video/audio packet to sample,
* add each NALUs of h.264 as a sample unit to sample,
* while the entire aac raw data as a sample unit.
*
* for sequence header,
* demux it and save it in the avc_extra_data and aac_extra_data。
*
* for the codec info, such as audio sample rate,
* decode from FLV/RTMP header, then use codec info in sequence
* header to override it.
*/
SrsAvcAacCodec::SrsAvcAacCodec()
{
/* for sequence header, whether parse the h.264 sps. */
avc_parse_sps = true;
width = 0;
height = 0;
duration = 0;
/* lengthSizeMinusOne, H.264-AVC-ISO_IEC_14496-15.pdf, page 16 */
NAL_unit_length = 0;
frame_rate = 0;
video_data_rate = 0;
video_codec_id = 0;
audio_data_rate = 0;
audio_codec_id = 0;
/* profile_idc, H.264-AVC-ISO_IEC_14496-10.pdf, page 45. */
avc_profile = SrsAvcProfileReserved;
/* level_idc, H.264-AVC-ISO_IEC_14496-10.pdf, page 45. */
avc_level = SrsAvcLevelReserved;
/**
* audio specified
* audioObjectType, in 1.6.2.1 AudiospecificConfig, page 33,
* 1.5.1.1 Audio object type definition, page 23,
* in aac-mp4a-format-ISO_IEC_14496-3+2001.pdf.
*/
aac_object = SrsAacObjectTypeReserved;
/* samplingFrequencyIndex */
aac_sample_rate = SRS_AAC_SAMPLE_RATE_UNSET; // sample rate ignored
/* channelConfiguration */
aac_channels = 0;
/**
* the avc extra data, the AVC sequence header,
* without the flv codec header,
* @see: ffmpeg, AVCodecContext::extradata
*/
avc_extra_size = 0;
avc_extra_data = NULL;
/**
* the aac extra data, the AAC sequence header,
* without the flv codec header,
* @see: ffmpeg, AVCodecContext::extradata
*/
aac_extra_size = 0;
aac_extra_data = NULL;
sequenceParameterSetLength = 0;
sequenceParameterSetNALUnit = NULL;
pictureParameterSetLength = 0;
pictureParameterSetNALUnit = NULL;
/* the avc payload format. */
payload_format = SrsAvcPayloadFormatGuess;
stream = new SrsStream();
}
5.2.4 SrsCodecSample 构造函数
/*
* the samples in the flv audio/video packet.
* the sample used to analysis a video/audio packet,
* split the h.264 NALUs to buffers, or aac raw data to a buffer,
* and decode the video/audio specified infos.
*
* the sample unit:
* a video packet codec in h.264 contains many NALUs, each is a sample unit.
* a audio packet codec in aac is a sample unit.
* @remark, the video/audio sequence header is not sample unit,
* all sequence header stores as extra data,
* @see SrsAvcAacCodec.avc_extra_data and SrsAvcAacCodec.aac_extra_data
* @remark, user must clear all samples before decode a new video/audio packet.
*/
SrsCodecSample::SrsCodecSample()
{
/* 复位所有保存的数据 */
clear();
}
/*
* clear all samples.
* the sample units never copy the bytes, it directly use the ptr,
* so when video/audio packet is destroyed, the sample must be clear.
* in a word, user must clear sample before demux it.
* @remark demux sample use SrsAvcAacCodec.audio_aac_demux or video_avc_demux.
*/
void SrsCodecSample::clear()
{
is_video = false;
nb_sample_units = 0;
cts = 0;
frame_type = SrsCodecVideoAVCFrameReserved;
avc_packet_type = SrsCodecVideoAVCTypeReserved;
has_sps_pps = has_aud = has_idr = false;
first_nalu_type = SrsAvcNaluTypeReserved;
acodec = SrsCodecAudioReserved1;
sound_rate = SrsCodecAudioSampleRateReserved;
sound_size = SrsCodecAudioSampleSizeReserved;
sound_type = SrsCodecAudioSoundTypeReserved;
aac_packet_type = SrsCodecAudioTypeReserved;
}
5.2.5 SrsAvcAacCodec::audio_aac_demux
/*
* demux the audio packet in aac codec.
* the packet mux in FLV/RTMP format defined in flv specification.
* demux the audio specified data(sound format, sound_size, ...) to sample.
* demux the aac specified data(aac_profile, ...) to codec from sequence header.
* demux the aac raw sample units.
*/
int SrsAvcAacCodec::audio_aac_demux(char* data, int size, SrsCodecSample* sample)
{
int ret = ERROR_SUCCESS;
sample->is_video = false;
if (!data || size <= 0) {
srs_trace("no audio present, ignore it.");
return ret;
}
if ((ret = stream->initialize(data, size)) != ERROR_SUCCESS) {
return ret;
}
// audio decode
if (!stream->require(1)) {
ret = ERROR_HLS_DECODE_ERROR;
srs_error("aac decode sound_format failed. ret=%d", ret);
return ret;
}
// @see: E.4.2 Audio Tags, video_file_format_spec_v10_1.pdf, page 76
int8_t sound_format = stream->read_1bytes();
/* 音频类型,即声道 */
int8_t sound_type = sound_format & 0x01;
/* 音频采样精度 */
int8_t sound_size = (sound_format >> 1) & 0x01;
/* 音频采样率 */
int8_t sound_rate = (sound_format >> 2) & 0x03;
/* 音频格式,对于 AAC,为 10 */
sound_format = (sound_format >> 4) & 0x0f;
audio_codec_id = sound_format;
sample->acodec = (SrsCodecAudio)audio_codec_id;
sample->sound_type = (SrsCodecAudioSoundType)sound_type;
sample->sound_rate = (SrsCodecAudioSampleRate)sound_rate;
sample->sound_size = (SrsCodecAudioSampleSize)sound_size;
// supoort h.264+mp3 for hls.
if (audio_codec_id == SrsCodecAudioMP3) {
return ERROR_HLS_TRY_MP3;
}
// only support aac
if (audio_codec_id != SrsCodecAudioAAC) {
ret = ERROR_HLS_DECODE_ERROR;
srs_error("aac only support mp3/aac codec. actual=%d, ret=%d",
audio_codec_id, ret);
return ret;
}
if (!stream->require(1)) {
ret = ERROR_HLS_DECODE_ERROR;
以上是关于SRS之接收推流线程:recv的主要内容,如果未能解决你的问题,请参考以下文章
SRS流媒体服务器搭建及拉取摄像头视频流经opencv处理后再推流至SRS
SRS4 对接海康威视GB28181协议推流 RTMPwebRTC拉流