WebRTC进阶流媒体服务器开发Mediasoup源码分析之Mediasoup主业务流程
Posted 山上有风景
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WebRTC进阶流媒体服务器开发Mediasoup源码分析之Mediasoup主业务流程相关的知识,希望对你有一定的参考价值。
一:主业务的创建
主要场景是对房间的管理,多方进行音视频互动。
Router代表房间,Transport代表一个传输,每个用户加入房间都会创建一个对应的连接。
Producer生产者,共享的音视频流中,每个音频、视频流都会产生一个生产者
Consumer消费者,对于每个加入房间的用户,都可以消费其他用户的音视频数据
1.首先调用CreateRouter,创建房间Router,然后加入worker的管理列表中。对于每个worker都会包含多个Router
2.创建Router之后,调用CreateTransport创建Transport,返回accept告诉Router创建transport成功,返回给Router
3.Router发送connect信令给Transport,连接创建成功
4.连接创建成功之后,调用CreateProducer创建生产者(可以创建多个)
5.如果需要获取其他用户的数据,则需要去CreateConsumer创建消费者
二:主业务源码分析
见worker.cpp中OnChannelRequest方法,处理请求之后,使用Request->Accept(data);返回确认消息
inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request) { MS_TRACE(); MS_DEBUG_DEV( "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id); switch (request->methodId) { case Channel::ChannelRequest::MethodId::WORKER_CLOSE: { if (this->closed) return; MS_DEBUG_DEV("Worker close request, stopping"); Close(); break; } case Channel::ChannelRequest::MethodId::WORKER_DUMP: { json data = json::object(); FillJson(data); request->Accept(data); break; } case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE: { json data = json::object(); FillJsonResourceUsage(data); request->Accept(data); break; } case Channel::ChannelRequest::MethodId::WORKER_UPDATE_SETTINGS: { Settings::HandleRequest(request); break; } case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER: //创建房间 { std::string routerId; // This may throw. SetNewRouterIdFromInternal(request->internal, routerId); //根据request获取routerId(在应用层随机产生的) auto* router = new RTC::Router(routerId); //创建router this->mapRouters[routerId] = router; //存放在字典中 MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str()); request->Accept(); //回复消息,创建完成 break; } case Channel::ChannelRequest::MethodId::ROUTER_CLOSE: { // This may throw. RTC::Router* router = GetRouterFromInternal(request->internal); // Remove it from the map and delete it. this->mapRouters.erase(router->id); delete router; MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str()); request->Accept(); break; } // Any other request must be delivered to the corresponding Router. default: //其他的信令处理,比如创建transport,accept没有在这里处理,在HandleRequest中调用!!!! { // This may throw. RTC::Router* router = GetRouterFromInternal(request->internal); router->HandleRequest(request); break; } } }
(一)进入Router.cpp中----创建Transport
void Router::SetNewTransportIdFromInternal(json& internal, std::string& transportId) const { MS_TRACE(); auto jsonTransportIdIt = internal.find("transportId"); //查找transportId,在jsonRequest中的5元组中的transportId中
if (jsonTransportIdIt == internal.end() || !jsonTransportIdIt->is_string()) MS_THROW_ERROR("missing internal.transportId"); transportId.assign(jsonTransportIdIt->get<std::string>()); //赋值 if (this->mapTransports.find(transportId) != this->mapTransports.end()) MS_THROW_ERROR("a Transport with same transportId already exists"); }
void Router::HandleRequest(Channel::ChannelRequest* request) { MS_TRACE(); switch (request->methodId) { case Channel::ChannelRequest::MethodId::ROUTER_DUMP: { json data = json::object(); FillJson(data); request->Accept(data); break; } case Channel::ChannelRequest::MethodId::ROUTER_CREATE_WEBRTC_TRANSPORT: //多个transport,这里主要查看WebRtcTransport { std::string transportId; // This may throw. SetNewTransportIdFromInternal(request->internal, transportId); //从request中获取transportId,存放到变量transportId中 // This may throw. auto* webRtcTransport = new RTC::WebRtcTransport(transportId, this, request->data); //创建WebRtcTransport对象,传入transportId和request5元组中的data数据 // Insert into the map. this->mapTransports[transportId] = webRtcTransport; //放入到map中去 MS_DEBUG_DEV("WebRtcTransport created [transportId:%s]", transportId.c_str()); json data = json::object(); //声明一个JSON格式对象 webRtcTransport->FillJson(data); //开始将webRtcTransport中的响应,返回确认消息到data request->Accept(data); //将接受的数据返回给JS层 break; } case Channel::ChannelRequest::MethodId::ROUTER_CREATE_PLAIN_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_PIPE_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_DIRECT_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_AUDIO_LEVEL_OBSERVER:case Channel::ChannelRequest::MethodId::TRANSPORT_CLOSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_CLOSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_PAUSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_RESUME:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_ADD_PRODUCER:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_REMOVE_PRODUCER:// Any other request must be delivered to the corresponding Transport. default: { // This may throw. RTC::Transport* transport = GetTransportFromInternal(request->internal); transport->HandleRequest(request); break; } } }
查看WebRtcTransport.cpp文件,构造函数
WebRtcTransport::WebRtcTransport(const std::string& id, RTC::Transport::Listener* listener, json& data) : RTC::Transport::Transport(id, listener, data) { //主要实现JSON数据的解析---request->data就是data bool enableUdp{ true }; auto jsonEnableUdpIt = data.find("enableUdp");bool enableTcp{ false }; auto jsonEnableTcpIt = data.find("enableTcp");bool preferUdp{ false }; auto jsonPreferUdpIt = data.find("preferUdp");bool preferTcp{ false }; auto jsonPreferTcpIt = data.find("preferTcp"); auto jsonListenIpsIt = data.find("listenIps"); std::vector<ListenIp> listenIps(jsonListenIpsIt->size()); for (size_t i{ 0 }; i < jsonListenIpsIt->size(); ++i) { auto& jsonListenIp = (*jsonListenIpsIt)[i]; auto& listenIp = listenIps[i]; if (!jsonListenIp.is_object()) MS_THROW_TYPE_ERROR("wrong listenIp (not an object)"); auto jsonIpIt = jsonListenIp.find("ip"); if (jsonIpIt == jsonListenIp.end()) MS_THROW_TYPE_ERROR("missing listenIp.ip"); else if (!jsonIpIt->is_string()) MS_THROW_TYPE_ERROR("wrong listenIp.ip (not an string"); listenIp.ip.assign(jsonIpIt->get<std::string>()); // This may throw. Utils::IP::NormalizeIp(listenIp.ip); auto jsonAnnouncedIpIt = jsonListenIp.find("announcedIp"); if (jsonAnnouncedIpIt != jsonListenIp.end()) { if (!jsonAnnouncedIpIt->is_string()) MS_THROW_TYPE_ERROR("wrong listenIp.announcedIp (not an string)"); listenIp.announcedIp.assign(jsonAnnouncedIpIt->get<std::string>()); } } try { uint16_t iceLocalPreferenceDecrement{ 0 }; if (enableUdp && enableTcp) this->iceCandidates.reserve(2 * jsonListenIpsIt->size()); else this->iceCandidates.reserve(jsonListenIpsIt->size()); for (auto& listenIp : listenIps) { if (enableUdp) { uint16_t iceLocalPreference = IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement; if (preferUdp) iceLocalPreference += 1000; uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference); // This may throw. auto* udpSocket = new RTC::UdpSocket(this, listenIp.ip); this->udpSockets[udpSocket] = listenIp.announcedIp; if (listenIp.announcedIp.empty()) this->iceCandidates.emplace_back(udpSocket, icePriority); else this->iceCandidates.emplace_back(udpSocket, icePriority, listenIp.announcedIp); } if (enableTcp) { uint16_t iceLocalPreference = IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement; if (preferTcp) iceLocalPreference += 1000; uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference); // This may throw. auto* tcpServer = new RTC::TcpServer(this, this, listenIp.ip); this->tcpServers[tcpServer] = listenIp.announcedIp; if (listenIp.announcedIp.empty()) this->iceCandidates.emplace_back(tcpServer, icePriority); else this->iceCandidates.emplace_back(tcpServer, icePriority, listenIp.announcedIp); } // Decrement initial ICE local preference for next IP. iceLocalPreferenceDecrement += 100; } // Create a ICE server. this->iceServer = new RTC::IceServer( this, Utils::Crypto::GetRandomString(16), Utils::Crypto::GetRandomString(32)); // Create a DTLS transport. this->dtlsTransport = new RTC::DtlsTransport(this); } catch (const MediaSoupError& error) { // Must delete everything since the destructor won\'t be called. } }
Mediasoup手册:https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransport
(二)进入Router.cpp中----创建connect连接
void Router::HandleRequest(Channel::ChannelRequest* request) { MS_TRACE(); switch (request->methodId) { //因为connect不属于Router,所以最后也是进入default中去!!! // Any other request must be delivered to the corresponding Transport. default: { // This may throw. RTC::Transport* transport = GetTransportFromInternal(request->internal); //从request中获取transport对象 transport->HandleRequest(request); break; } } }
进入WebRtcTransport.cpp中去,查看对应的HandleRequest方法,创建connect连接!
void WebRtcTransport::HandleRequest(Channel::ChannelRequest* request) { MS_TRACE(); switch (request->methodId) { case Channel::ChannelRequest::MethodId::TRANSPORT_CONNECT: //涉及到很多协议! { // Ensure this method is not called twice. if (this->connectCalled) MS_THROW_ERROR("connect() already called"); RTC::DtlsTransport::Fingerprint dtlsRemoteFingerprint; RTC::DtlsTransport::Role dtlsRemoteRole; auto jsonDtlsParametersIt = request->data.find("dtlsParameters"); if (jsonDtlsParametersIt == request->data.end() || !jsonDtlsParametersIt->is_object()) MS_THROW_TYPE_ERROR("missing dtlsParameters"); auto jsonFingerprintsIt = jsonDtlsParametersIt->find("fingerprints"); if (jsonFingerprintsIt == jsonDtlsParametersIt->end() || !jsonFingerprintsIt->is_array()) { MS_THROW_TYPE_ERROR("missing dtlsParameters.fingerprints"); } else if (jsonFingerprintsIt->empty()) { MS_THROW_TYPE_ERROR("empty dtlsParameters.fingerprints array"); } // NOTE: Just take the first fingerprint. for (auto& jsonFingerprint : *jsonFingerprintsIt) { if (!jsonFingerprint.is_object()) MS_THROW_TYPE_ERROR("wrong entry in dtlsParameters.fingerprints (not an object)"); auto jsonAlgorithmIt = jsonFingerprint.find("algorithm"); if (jsonAlgorithmIt == jsonFingerprint.end()) MS_THROW_TYPE_ERROR("missing fingerprint.algorithm"); else if (!jsonAlgorithmIt->is_string()) MS_THROW_TYPE_ERROR("wrong fingerprint.algorithm (not a string)"); dtlsRemoteFingerprint.algorithm = RTC::DtlsTransport::GetFingerprintAlgorithm(jsonAlgorithmIt->get<std::string>()); if (dtlsRemoteFingerprint.algorithm == RTC::DtlsTransport::FingerprintAlgorithm::NONE) { MS_THROW_TYPE_ERROR("invalid fingerprint.algorithm value"); } auto jsonValueIt = jsonFingerprint.find("value"); if (jsonValueIt == jsonFingerprint.end()) MS_THROW_TYPE_ERROR("missing fingerprint.value"); else if (!jsonValueIt->is_string()) MS_THROW_TYPE_ERROR("wrong fingerprint.value (not a string)"); dtlsRemoteFingerprint.value = jsonValueIt->get<std::string>(); // Just use the first fingerprint. break; } auto jsonRoleIt = jsonDtlsParametersIt->find("role"); if (jsonRoleIt != jsonDtlsParametersIt->end()) { if (!jsonRoleIt->is_string()) MS_THROW_TYPE_ERROR("wrong dtlsParameters.role (not a string)"); dtlsRemoteRole = RTC::DtlsTransport::StringToRole(jsonRoleIt->get<std::string>()); if (dtlsRemoteRole == RTC::DtlsTransport::Role::NONE) MS_THROW_TYPE_ERROR("invalid dtlsParameters.role value"); } else { dtlsRemoteRole = RTC::DtlsTransport::Role::AUTO; } // Set local DTLS role. switch (dtlsRemoteRole) { case RTC::DtlsTransport::Role::CLIENT: { this->dtlsRole = RTC::DtlsTransport::Role::SERVER; break; } // If the peer has role "auto" we become "client" since we are ICE controlled. case RTC::DtlsTransport::Role::SERVER: case RTC::DtlsTransport::Role::AUTO: { this->dtlsRole = RTC::DtlsTransport::Role::CLIENT; break; } case RTC::DtlsTransport::Role::NONE: { MS_THROW_TYPE_ERROR("invalid remote DTLS role"); } } this->connectCalled = true; // Pass the remote fingerprint to the DTLS transport. if (this->dtlsTransport->SetRemoteFingerprint(dtlsRemoteFingerprint)) { // If everything is fine, we may run the DTLS transport if ready. MayRunDtlsTransport(); } // Tell the caller about the selected local DTLS role. json data = json::object(); switch (this->dtlsRole) { case RTC::DtlsTransport::Role::CLIENT: data["dtlsLocalRole"] = "client"; break; case RTC::DtlsTransport::Role::SERVER: data["dtlsLocalRole"] = "server"; break; default: MS_ABORT("invalid local DTLS role"); } request->Accept(data); break; } case Channel::ChannelRequest::MethodId::TRANSPORT_RESTART_ICE: { std::string usernameFragment = Utils::Crypto::GetRandomString(16); std::string password = Utils::Crypto::GetRandomString(32); this->iceServer->SetUsernameFragment(usernameFragment); this->iceServer->SetPassword(password); MS_DEBUG_DEV( "WebRtcTransport ICE usernameFragment and password changed [id:%s]", this->id.c_str()); // Reply with the updated ICE local parameters. json data = json::object(); data["iceParameters"] = json::object(); auto jsonIceParametersIt = data.find("iceParameters"); (*jsonIceParametersIt)["usernameFragment"] = this->iceServer->GetUsernameFragment(); (*jsonIceParametersIt)["password"] = this->iceServer->GetPassword(); (*jsonIceParametersIt)["iceLite"] = true; request->Accept(data); break; } default: { // Pass it to the parent class. RTC::Transport::HandleRequest(request); } } }
(三)进入WebRtcTransport.cpp中----使用CreateProducer创建生产者(消费者)
default: //如前面的WebRtcTransport中的是我itch所示,进入default中去,调用父类方法 { // Pass it to the parent class. RTC::Transport::HandleRequest(request); }
进入Transport.cpp文件中去!,创建生产者和消费者(两个方式一致)
void Transport::HandleRequest(Channel::ChannelRequest* request) { MS_TRACE(); switch (request->methodId) { case Channel::ChannelRequest::MethodId::TRANSPORT_PRODUCE: //创建生产者 { std::string producerId; // This may throw. SetNewProducerIdFromInternal(request->internal, producerId); //获取producerId // This may throw. auto* producer = new RTC::Producer(producerId, this, request->data); //根据上面的Id创建producer生产者 // Insert the Producer into the RtpListener. // This may throw. If so, delete the Producer and throw. try { this->rtpListener.AddProducer(producer); //加入数据 } catch (const MediaSoupError& error) { delete producer; throw; } // Notify the listener. // This may throw if a Producer with same id already exists. try { this->listener->OnTransportNewProducer(this, producer); //router中加入生产者 } catch (const MediaSoupError& error) { this->rtpListener.RemoveProducer(producer); delete producer; throw; } // Insert into the map. this->mapProducers[producerId] = producer; MS_DEBUG_DEV("Producer created [producerId:%s]", producerId.c_str()); // Take the transport related RTP header extensions of the Producer and // add them to the Transport. // NOTE: Producer::GetRtpHeaderExtensionIds() returns the original // header extension ids of the Producer (and not their mapped values). const auto& producerRtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds(); if (producerRtpHeaderExtensionIds.mid != 0u) { this->recvRtpHeaderExtensionIds.mid = producerRtpHeaderExtensionIds.mid; } if (producerRtpHeaderExtensionIds.rid != 0u) { this->recvRtpHeaderExtensionIds.rid = producerRtpHeaderExtensionIds.rid; } if (producerRtpHeaderExtensionIds.rrid != 0u) { this->recvRtpHeaderExtensionIds.rrid = producerRtpHeaderExtensionIds.rrid; } if (producerRtpHeaderExtensionIds.absSendTime != 0u) { this->recvRtpHeaderExtensionIds.absSendTime = producerRtpHeaderExtensionIds.absSendTime; } if (producerRtpHeaderExtensionIds.transportWideCc01 != 0u) { this->recvRtpHeaderExtensionIds.transportWideCc01 = producerRtpHeaderExtensionIds.transportWideCc01; } // Create status response. json data = json::object(); data["type"] = RTC::RtpParameters::GetTypeString(producer->GetType()); request->Accept(data); // Check if TransportCongestionControlServer or REMB server must be // created. const auto& rtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds(); const auto& codecs = producer->GetRtpParameters().codecs; // Set TransportCongestionControlServer. if (!this->tccServer) { bool createTccServer{ false }; RTC::BweType bweType; // Use transport-cc if: // - there is transport-wide-cc-01 RTP header extension, and // - there is "transport-cc" in codecs RTCP feedback. // // clang-format off if ( rtpHeaderExtensionIds.transportWideCc01 != 0u && std::any_of( codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec) { return std::any_of( codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb) { return fb.type == "transport-cc"; }); }) ) // clang-format on { MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with transport-cc"); createTccServer = true; bweType = RTC::BweType::TRANSPORT_CC; } // Use REMB if: // - there is abs-send-time RTP header extension, and // - there is "remb" in codecs RTCP feedback. // // clang-format off else if ( rtpHeaderExtensionIds.absSendTime != 0u && std::any_of( codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec) {以上是关于WebRTC进阶流媒体服务器开发Mediasoup源码分析之Mediasoup主业务流程的主要内容,如果未能解决你的问题,请参考以下文章
今晚7点半:现代C++和Mediasoup的WebRTC集群服务实践
安卓 Mediasoup V3 基于webrtc 分支m84 的编译
安卓 Mediasoup V3 基于webrtc 分支m84 的编译
安卓mediasoup webrtc h264 编解码相关源码分析