WebRTC进阶流媒体服务器开发Mediasoup源码分析之Mediasoup主业务流程

Posted 山上有风景

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WebRTC进阶流媒体服务器开发Mediasoup源码分析之Mediasoup主业务流程相关的知识,希望对你有一定的参考价值。

一:主业务的创建

主要场景是对房间的管理,多方进行音视频互动。

Router代表房间,Transport代表一个传输,每个用户加入房间都会创建一个对应的连接。
Producer生产者,共享的音视频流中,每个音频、视频流都会产生一个生产者
Consumer消费者,对于每个加入房间的用户,都可以消费其他用户的音视频数据

1.首先调用CreateRouter,创建房间Router,然后加入worker的管理列表中。对于每个worker都会包含多个Router

2.创建Router之后,调用CreateTransport创建Transport,返回accept告诉Router创建transport成功,返回给Router

3.Router发送connect信令给Transport,连接创建成功

4.连接创建成功之后,调用CreateProducer创建生产者(可以创建多个)

5.如果需要获取其他用户的数据,则需要去CreateConsumer创建消费者

二:主业务源码分析

见worker.cpp中OnChannelRequest方法,处理请求之后,使用Request->Accept(data);返回确认消息

inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request)
{
    MS_TRACE();

    MS_DEBUG_DEV(
      "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id);

    switch (request->methodId)
    {
        case Channel::ChannelRequest::MethodId::WORKER_CLOSE:
        {
            if (this->closed)
                return;

            MS_DEBUG_DEV("Worker close request, stopping");

            Close();

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_DUMP:
        {
            json data = json::object();

            FillJson(data);

            request->Accept(data);

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE:
        {
            json data = json::object();

            FillJsonResourceUsage(data);

            request->Accept(data);

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_UPDATE_SETTINGS:
        {
            Settings::HandleRequest(request);

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER:  //创建房间
        {
            std::string routerId; 
            // This may throw.
            SetNewRouterIdFromInternal(request->internal, routerId);   //根据request获取routerId(在应用层随机产生的)
            auto* router = new RTC::Router(routerId);   //创建router
            this->mapRouters[routerId] = router;   //存放在字典中
            MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str()); 
            request->Accept();   //回复消息,创建完成
            break;
        }

        case Channel::ChannelRequest::MethodId::ROUTER_CLOSE:
        {
            // This may throw.
            RTC::Router* router = GetRouterFromInternal(request->internal);

            // Remove it from the map and delete it.
            this->mapRouters.erase(router->id);
            delete router;

            MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());

            request->Accept();

            break;
        }

        // Any other request must be delivered to the corresponding Router.
        default:  //其他的信令处理,比如创建transport,accept没有在这里处理,在HandleRequest中调用!!!!
        {
            // This may throw.
            RTC::Router* router = GetRouterFromInternal(request->internal);

            router->HandleRequest(request);

            break;
        }
    }
}

(一)进入Router.cpp中----创建Transport

    void Router::SetNewTransportIdFromInternal(json& internal, std::string& transportId) const
    {
        MS_TRACE();

        auto jsonTransportIdIt = internal.find("transportId");  //查找transportId,在jsonRequest中的5元组中的transportId中
if (jsonTransportIdIt == internal.end() || !jsonTransportIdIt->is_string()) MS_THROW_ERROR("missing internal.transportId"); transportId.assign(jsonTransportIdIt->get<std::string>());  //赋值 if (this->mapTransports.find(transportId) != this->mapTransports.end()) MS_THROW_ERROR("a Transport with same transportId already exists"); }
    void Router::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {
            case Channel::ChannelRequest::MethodId::ROUTER_DUMP:
            {
                json data = json::object();

                FillJson(data);

                request->Accept(data);

                break;
            }

            case Channel::ChannelRequest::MethodId::ROUTER_CREATE_WEBRTC_TRANSPORT:  //多个transport,这里主要查看WebRtcTransport
            {
                std::string transportId;

                // This may throw.
                SetNewTransportIdFromInternal(request->internal, transportId);  //从request中获取transportId,存放到变量transportId中

                // This may throw.
                auto* webRtcTransport = new RTC::WebRtcTransport(transportId, this, request->data);  //创建WebRtcTransport对象,传入transportId和request5元组中的data数// Insert into the map.
                this->mapTransports[transportId] = webRtcTransport;  //放入到map中去

                MS_DEBUG_DEV("WebRtcTransport created [transportId:%s]", transportId.c_str());

                json data = json::object();  //声明一个JSON格式对象

                webRtcTransport->FillJson(data);  //开始将webRtcTransport中的响应,返回确认消息到data

                request->Accept(data);  //将接受的数据返回给JS层

                break;
            }
            case Channel::ChannelRequest::MethodId::ROUTER_CREATE_PLAIN_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_PIPE_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_DIRECT_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_AUDIO_LEVEL_OBSERVER:case Channel::ChannelRequest::MethodId::TRANSPORT_CLOSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_CLOSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_PAUSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_RESUME:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_ADD_PRODUCER:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_REMOVE_PRODUCER:// Any other request must be delivered to the corresponding Transport.
            default:
            {
                // This may throw.
                RTC::Transport* transport = GetTransportFromInternal(request->internal);

                transport->HandleRequest(request);

                break;
            }
        }
    }

查看WebRtcTransport.cpp文件,构造函数

    WebRtcTransport::WebRtcTransport(const std::string& id, RTC::Transport::Listener* listener, json& data)
      : RTC::Transport::Transport(id, listener, data)
    {
     //主要实现JSON数据的解析---request->data就是data
        bool enableUdp{ true };
        auto jsonEnableUdpIt = data.find("enableUdp");bool enableTcp{ false };
        auto jsonEnableTcpIt = data.find("enableTcp");bool preferUdp{ false };
        auto jsonPreferUdpIt = data.find("preferUdp");bool preferTcp{ false };
        auto jsonPreferTcpIt = data.find("preferTcp");
        auto jsonListenIpsIt = data.find("listenIps");
        std::vector<ListenIp> listenIps(jsonListenIpsIt->size());

        for (size_t i{ 0 }; i < jsonListenIpsIt->size(); ++i)
        {
            auto& jsonListenIp = (*jsonListenIpsIt)[i];
            auto& listenIp     = listenIps[i];

            if (!jsonListenIp.is_object())
                MS_THROW_TYPE_ERROR("wrong listenIp (not an object)");

            auto jsonIpIt = jsonListenIp.find("ip");

            if (jsonIpIt == jsonListenIp.end())
                MS_THROW_TYPE_ERROR("missing listenIp.ip");
            else if (!jsonIpIt->is_string())
                MS_THROW_TYPE_ERROR("wrong listenIp.ip (not an string");

            listenIp.ip.assign(jsonIpIt->get<std::string>());

            // This may throw.
            Utils::IP::NormalizeIp(listenIp.ip);

            auto jsonAnnouncedIpIt = jsonListenIp.find("announcedIp");

            if (jsonAnnouncedIpIt != jsonListenIp.end())
            {
                if (!jsonAnnouncedIpIt->is_string())
                    MS_THROW_TYPE_ERROR("wrong listenIp.announcedIp (not an string)");

                listenIp.announcedIp.assign(jsonAnnouncedIpIt->get<std::string>());
            }
        }

        try
        {
            uint16_t iceLocalPreferenceDecrement{ 0 };

            if (enableUdp && enableTcp)
                this->iceCandidates.reserve(2 * jsonListenIpsIt->size());
            else
                this->iceCandidates.reserve(jsonListenIpsIt->size());

            for (auto& listenIp : listenIps)
            {
                if (enableUdp)
                {
                    uint16_t iceLocalPreference =
                      IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement;

                    if (preferUdp)
                        iceLocalPreference += 1000;

                    uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference);

                    // This may throw.
                    auto* udpSocket = new RTC::UdpSocket(this, listenIp.ip);

                    this->udpSockets[udpSocket] = listenIp.announcedIp;

                    if (listenIp.announcedIp.empty())
                        this->iceCandidates.emplace_back(udpSocket, icePriority);
                    else
                        this->iceCandidates.emplace_back(udpSocket, icePriority, listenIp.announcedIp);
                }

                if (enableTcp)
                {
                    uint16_t iceLocalPreference =
                      IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement;

                    if (preferTcp)
                        iceLocalPreference += 1000;

                    uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference);

                    // This may throw.
                    auto* tcpServer = new RTC::TcpServer(this, this, listenIp.ip);

                    this->tcpServers[tcpServer] = listenIp.announcedIp;

                    if (listenIp.announcedIp.empty())
                        this->iceCandidates.emplace_back(tcpServer, icePriority);
                    else
                        this->iceCandidates.emplace_back(tcpServer, icePriority, listenIp.announcedIp);
                }

                // Decrement initial ICE local preference for next IP.
                iceLocalPreferenceDecrement += 100;
            }

            // Create a ICE server.
            this->iceServer = new RTC::IceServer(
              this, Utils::Crypto::GetRandomString(16), Utils::Crypto::GetRandomString(32));

            // Create a DTLS transport.
            this->dtlsTransport = new RTC::DtlsTransport(this);
        }
        catch (const MediaSoupError& error)
        {
            // Must delete everything since the destructor won\'t be called.
        }
    }

Mediasoup手册:https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransport

(二)进入Router.cpp中----创建connect连接

    void Router::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {
       //因为connect不属于Router,所以最后也是进入default中去!!!
            // Any other request must be delivered to the corresponding Transport.
            default:
            {
                // This may throw.
                RTC::Transport* transport = GetTransportFromInternal(request->internal);  //从request中获取transport对象

                transport->HandleRequest(request);

                break;
            }
        }
    }

进入WebRtcTransport.cpp中去,查看对应的HandleRequest方法,创建connect连接!

    void WebRtcTransport::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {
            case Channel::ChannelRequest::MethodId::TRANSPORT_CONNECT:  //涉及到很多协议!
            {
                // Ensure this method is not called twice.
                if (this->connectCalled)
                    MS_THROW_ERROR("connect() already called");

                RTC::DtlsTransport::Fingerprint dtlsRemoteFingerprint;
                RTC::DtlsTransport::Role dtlsRemoteRole;

                auto jsonDtlsParametersIt = request->data.find("dtlsParameters");

                if (jsonDtlsParametersIt == request->data.end() || !jsonDtlsParametersIt->is_object())
                    MS_THROW_TYPE_ERROR("missing dtlsParameters");

                auto jsonFingerprintsIt = jsonDtlsParametersIt->find("fingerprints");

                if (jsonFingerprintsIt == jsonDtlsParametersIt->end() || !jsonFingerprintsIt->is_array())
                {
                    MS_THROW_TYPE_ERROR("missing dtlsParameters.fingerprints");
                }
                else if (jsonFingerprintsIt->empty())
                {
                    MS_THROW_TYPE_ERROR("empty dtlsParameters.fingerprints array");
                }

                // NOTE: Just take the first fingerprint.
                for (auto& jsonFingerprint : *jsonFingerprintsIt)
                {
                    if (!jsonFingerprint.is_object())
                        MS_THROW_TYPE_ERROR("wrong entry in dtlsParameters.fingerprints (not an object)");

                    auto jsonAlgorithmIt = jsonFingerprint.find("algorithm");

                    if (jsonAlgorithmIt == jsonFingerprint.end())
                        MS_THROW_TYPE_ERROR("missing fingerprint.algorithm");
                    else if (!jsonAlgorithmIt->is_string())
                        MS_THROW_TYPE_ERROR("wrong fingerprint.algorithm (not a string)");

                    dtlsRemoteFingerprint.algorithm =
                      RTC::DtlsTransport::GetFingerprintAlgorithm(jsonAlgorithmIt->get<std::string>());

                    if (dtlsRemoteFingerprint.algorithm == RTC::DtlsTransport::FingerprintAlgorithm::NONE)
                    {
                        MS_THROW_TYPE_ERROR("invalid fingerprint.algorithm value");
                    }

                    auto jsonValueIt = jsonFingerprint.find("value");

                    if (jsonValueIt == jsonFingerprint.end())
                        MS_THROW_TYPE_ERROR("missing fingerprint.value");
                    else if (!jsonValueIt->is_string())
                        MS_THROW_TYPE_ERROR("wrong fingerprint.value (not a string)");

                    dtlsRemoteFingerprint.value = jsonValueIt->get<std::string>();

                    // Just use the first fingerprint.
                    break;
                }

                auto jsonRoleIt = jsonDtlsParametersIt->find("role");

                if (jsonRoleIt != jsonDtlsParametersIt->end())
                {
                    if (!jsonRoleIt->is_string())
                        MS_THROW_TYPE_ERROR("wrong dtlsParameters.role (not a string)");

                    dtlsRemoteRole = RTC::DtlsTransport::StringToRole(jsonRoleIt->get<std::string>());

                    if (dtlsRemoteRole == RTC::DtlsTransport::Role::NONE)
                        MS_THROW_TYPE_ERROR("invalid dtlsParameters.role value");
                }
                else
                {
                    dtlsRemoteRole = RTC::DtlsTransport::Role::AUTO;
                }

                // Set local DTLS role.
                switch (dtlsRemoteRole)
                {
                    case RTC::DtlsTransport::Role::CLIENT:
                    {
                        this->dtlsRole = RTC::DtlsTransport::Role::SERVER;

                        break;
                    }
                    // If the peer has role "auto" we become "client" since we are ICE controlled.
                    case RTC::DtlsTransport::Role::SERVER:
                    case RTC::DtlsTransport::Role::AUTO:
                    {
                        this->dtlsRole = RTC::DtlsTransport::Role::CLIENT;

                        break;
                    }
                    case RTC::DtlsTransport::Role::NONE:
                    {
                        MS_THROW_TYPE_ERROR("invalid remote DTLS role");
                    }
                }

                this->connectCalled = true;

                // Pass the remote fingerprint to the DTLS transport.
                if (this->dtlsTransport->SetRemoteFingerprint(dtlsRemoteFingerprint))
                {
                    // If everything is fine, we may run the DTLS transport if ready.
                    MayRunDtlsTransport();
                }

                // Tell the caller about the selected local DTLS role.
                json data = json::object();

                switch (this->dtlsRole)
                {
                    case RTC::DtlsTransport::Role::CLIENT:
                        data["dtlsLocalRole"] = "client";
                        break;

                    case RTC::DtlsTransport::Role::SERVER:
                        data["dtlsLocalRole"] = "server";
                        break;

                    default:
                        MS_ABORT("invalid local DTLS role");
                }

                request->Accept(data);

                break;
            }

            case Channel::ChannelRequest::MethodId::TRANSPORT_RESTART_ICE:
            {
                std::string usernameFragment = Utils::Crypto::GetRandomString(16);
                std::string password         = Utils::Crypto::GetRandomString(32);

                this->iceServer->SetUsernameFragment(usernameFragment);
                this->iceServer->SetPassword(password);

                MS_DEBUG_DEV(
                  "WebRtcTransport ICE usernameFragment and password changed [id:%s]", this->id.c_str());

                // Reply with the updated ICE local parameters.
                json data = json::object();

                data["iceParameters"]    = json::object();
                auto jsonIceParametersIt = data.find("iceParameters");

                (*jsonIceParametersIt)["usernameFragment"] = this->iceServer->GetUsernameFragment();
                (*jsonIceParametersIt)["password"]         = this->iceServer->GetPassword();
                (*jsonIceParametersIt)["iceLite"]          = true;

                request->Accept(data);

                break;
            }

            default:
            {
                // Pass it to the parent class.
                RTC::Transport::HandleRequest(request);
            }
        }
    }

(三)进入WebRtcTransport.cpp中----使用CreateProducer创建生产者(消费者)

            default:  //如前面的WebRtcTransport中的是我itch所示,进入default中去,调用父类方法
            {
                // Pass it to the parent class.
                RTC::Transport::HandleRequest(request);
            }

进入Transport.cpp文件中去!,创建生产者和消费者(两个方式一致)

    void Transport::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {

            case Channel::ChannelRequest::MethodId::TRANSPORT_PRODUCE:  //创建生产者
            {
                std::string producerId;

                // This may throw.
                SetNewProducerIdFromInternal(request->internal, producerId);  //获取producerId

                // This may throw.
                auto* producer = new RTC::Producer(producerId, this, request->data);  //根据上面的Id创建producer生产者

                // Insert the Producer into the RtpListener.
                // This may throw. If so, delete the Producer and throw.
                try
                {
                    this->rtpListener.AddProducer(producer);  //加入数据
                }
                catch (const MediaSoupError& error)
                {
                    delete producer;

                    throw;
                }

                // Notify the listener.
                // This may throw if a Producer with same id already exists.
                try
                {
                    this->listener->OnTransportNewProducer(this, producer);  //router中加入生产者
                }
                catch (const MediaSoupError& error)
                {
                    this->rtpListener.RemoveProducer(producer);

                    delete producer;

                    throw;
                }

                // Insert into the map.
                this->mapProducers[producerId] = producer;

                MS_DEBUG_DEV("Producer created [producerId:%s]", producerId.c_str());

                // Take the transport related RTP header extensions of the Producer and
                // add them to the Transport.
                // NOTE: Producer::GetRtpHeaderExtensionIds() returns the original
                // header extension ids of the Producer (and not their mapped values).
                const auto& producerRtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();

                if (producerRtpHeaderExtensionIds.mid != 0u)
                {
                    this->recvRtpHeaderExtensionIds.mid = producerRtpHeaderExtensionIds.mid;
                }

                if (producerRtpHeaderExtensionIds.rid != 0u)
                {
                    this->recvRtpHeaderExtensionIds.rid = producerRtpHeaderExtensionIds.rid;
                }

                if (producerRtpHeaderExtensionIds.rrid != 0u)
                {
                    this->recvRtpHeaderExtensionIds.rrid = producerRtpHeaderExtensionIds.rrid;
                }

                if (producerRtpHeaderExtensionIds.absSendTime != 0u)
                {
                    this->recvRtpHeaderExtensionIds.absSendTime = producerRtpHeaderExtensionIds.absSendTime;
                }

                if (producerRtpHeaderExtensionIds.transportWideCc01 != 0u)
                {
                    this->recvRtpHeaderExtensionIds.transportWideCc01 =
                      producerRtpHeaderExtensionIds.transportWideCc01;
                }

                // Create status response.
                json data = json::object();

                data["type"] = RTC::RtpParameters::GetTypeString(producer->GetType());

                request->Accept(data);

                // Check if TransportCongestionControlServer or REMB server must be
                // created.
                const auto& rtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();
                const auto& codecs                = producer->GetRtpParameters().codecs;

                // Set TransportCongestionControlServer.
                if (!this->tccServer)
                {
                    bool createTccServer{ false };
                    RTC::BweType bweType;

                    // Use transport-cc if:
                    // - there is transport-wide-cc-01 RTP header extension, and
                    // - there is "transport-cc" in codecs RTCP feedback.
                    //
                    // clang-format off
                    if (
                        rtpHeaderExtensionIds.transportWideCc01 != 0u &&
                        std::any_of(
                            codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
                            {
                                return std::any_of(
                                    codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
                                    {
                                        return fb.type == "transport-cc";
                                    });
                            })
                    )
                    // clang-format on
                    {
                        MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with transport-cc");

                        createTccServer = true;
                        bweType         = RTC::BweType::TRANSPORT_CC;
                    }
                    // Use REMB if:
                    // - there is abs-send-time RTP header extension, and
                    // - there is "remb" in codecs RTCP feedback.
                    //
                    // clang-format off
                    else if (
                        rtpHeaderExtensionIds.absSendTime != 0u &&
                        std::any_of(
                            codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
                            {
                        

以上是关于WebRTC进阶流媒体服务器开发Mediasoup源码分析之Mediasoup主业务流程的主要内容,如果未能解决你的问题,请参考以下文章

今晚7点半:现代C++和Mediasoup的WebRTC集群服务实践

安卓 Mediasoup V3 基于webrtc 分支m84 的编译

安卓 Mediasoup V3 基于webrtc 分支m84 的编译

安卓mediasoup webrtc h264 编解码相关源码分析

安卓mediasoup webrtc h264 编解码相关源码分析

安卓mediasoup webrtc h264 编解码相关源码分析