大话ion系列

Posted LiveVideoStack_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大话ion系列相关的知识,希望对你有一定的参考价值。

点击上方“LiveVideoStack”关注我们

作者 | 王朋闯

本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。

大话ion系列(一)

三、演进与模块说明

1.演进

ion-sfu最早从ion抽出来,经过长时间的演变和社区的打磨,功能和复杂度都在增加,主要过程有:

  • 多PC模式:

每个人推/拉每路流都是单独一个PC(PeerConnection),同一房间内PC个数是N^2,比如3个人是9个PC,4个人是16个。

实现简单,但随着人数增多,PC个数骤增,会造成资源浪费。

  • 单PC模式:

每个人推/拉多路流都是同一个PC,同一房间内PC个数是N,比如3个人是3个PC,4个人是4个PC。

上下行流都用同一个PC重协商,浏览器兼容性差,造成SFU状态竞争冲突问题等。

  • 双PC模式:

每个人推流用同一个PC,拉流用另一个PC。同一房间内PC个数是N*2。存在问题:端口占用是N*2,后来支持单端口解决了这个问题

  • 抽象接口:

很多人将ion-sfu作为包导入使用,为了方便业务定制,抽象出了很多接口。

比如一开始只有一个SFU结构体,后来抽象出SessionProvider接口组,然后SFU结构体实现这个接口,上层业务就可以继承派生SFU出自己的子类,方便做定制业务,然后再作为SessionProvider传入NewPeer。之后GetSession获取的就是派生的子类。同理也可以发现很多其他接口和对应的结构体,同样都可以基于结构体派生出自己的定制类。

  • 性能优化:

在ion-sdk-go完成后,基于它实现了ion-sfu-load-tool,压测出很多channel,goroutine滥用导致性能下降,之后ion-sfu使用回调和queue来减少了滥用,性能得到很大提升。

  • 功能强化:

  • Simulcast,大小流支持

  • Qos优化,TWCC/REMB+PLI/FIR+Nack+SR/RR等

  • 发音检测、定制dc等

2. 模块介绍

SFU

一个SFU里应包含多个会话(房间)。

// SFU represents an sfu instance
type SFU struct {//业务可以派生SFU来构建自己的类
    sync.RWMutex
    webrtc       WebRTCTransportConfig//配置
    turn         *turn.Server//内置turn
    sessions     map[string]Session//session管理
    datachannels []*Datachannel//dc管理
    withStats    bool//是否开启状态监控
}

Session

一个会话管理多个peer。

// Session represents a set of peers. Transports inside a SessionLocal


// are automatically subscribed to each other.


type Session interface {//Session是一组接口,具体的实现在SessionLocal里


    ID() string


    Publish(router Router, r Receiver)//把Receiver的流发布到router中,给Session中的每个Peer增加一个


    Subscribe(peer Peer)//把peer的Subscriber订阅到房间中其他peer


    AddPeer(peer Peer)//房间增加一个peer


    GetPeer(peerID string) Peer//获取peer


    RemovePeer(peer Peer)//删除peer


    AddRelayPeer(peerID string, signalData []byte) ([]byte, error)//增加级联peer


    AudioObserver() *AudioObserver//获取声音检测


    AddDatachannel(owner string, dc *webrtc.DataChannel)//增加dc


    GetDCMiddlewares() []*Datachannel//获取dc中间件


    GetFanOutDataChannelLabels() []string//获取扇出dc


    GetDataChannels(peerID, label string) (dcs []*webrtc.DataChannel)//获取某个peer的全部dc


    FanOutMessage(origin, label string, msg webrtc.DataChannelMessage)//扇出消息


    Peers() []Peer//获取全部peer


    RelayPeers() []*RelayPeer//获取全部级联peer


}


type SessionLocal struct {//SessionLocal是真实的管理类,当然上层业务可以派生它实现业务定制


    id             string


    mu             sync.RWMutex


    config         WebRTCTransportConfig


    peers          map[string]Peer//管理peer


    relayPeers     map[string]*RelayPeer//管理级联peer


    closed         atomicBool


    audioObs       *AudioObserver//声音检测


    fanOutDCs      []string//扇出dc管理


    datachannels   []*Datachannel//dc管理


    onCloseHandler func()//关闭时回调


}

Router

路由,表示流从哪个receiver接收,下发到哪个downtrack。

// Router defines a track rtp/rtcp Router


type Router interface {


    ID() string


    AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote) (Receiver, bool)//增加receiver


    AddDownTracks(s *Subscriber, r Receiver) error//把Receiver的downtracks加到Subscriber


    SetRTCPWriter(func([]rtcp.Packet) error)//设置rtcp写入回调


    AddDownTrack(s *Subscriber, r Receiver) (*DownTrack, error)//根据Receiver创建downtrack,加到Subscriber


    Stop()//停止


}


type router struct {//路由


    sync.RWMutex


    id            string


    twcc          *twcc.Responder//twcc响应器


    stats         map[uint32]*stats.Stream//流状态管理


    rtcpCh        chan []rtcp.Packet//rtcp通道


    stopCh        chan struct{}


    config        RouterConfig


    session       Session//会话


    receivers     map[string]Receiver//receiver管理,实际上是WebRTCReceiver


    bufferFactory *buffer.Factory//buffer管理


    writeRTCP     func([]rtcp.Packet) error


}

Peer

一个peer对应一个客户端,包含一个Publisher和Subscriber。

// PeerLocal represents a pair peer connection


type PeerLocal struct {//代表SFU端本地peer


    sync.Mutex


    id       string


    closed   atomicBool


    session  Session//会话,代表一个房间


    provider SessionProvider//接口,只有一个GetSession,业务可以派生SFU来构建自己的类,然后传给这个provider


    publisher  *Publisher//发布者


    subscriber *Subscriber//订阅者


  OnOffer                    func(*webrtc.SessionDescription)//服务端发起协商(negotiate)时调用此接口


  OnIceCandidate             func(*webrtc.ICECandidateInit, int)//服务端收集到候选(candidate)时调用


    OnICEConnectionStateChange func(webrtc.ICEConnectionState)//服务器ICE状态改变时调用


    remoteAnswerPending bool//远程是否answer中


    negotiationPending  bool//是否协商中,为了避免状态乱掉


}


type SessionProvider interface {


  //只有一个GetSession方法,业务可以派生SFU结构体来构建自己的类,SFU也包含这个GetSession方法


    GetSession(sid string) (Session, WebRTCTransportConfig)


}

Publisher

Publisher管理webrtc.TrackRemote和webrtc.RTPReceiver。

type Publisher struct {


    mu  sync.RWMutex


    id  string


    pc  *webrtc.PeerConnection//推流pc


    cfg *WebRTCTransportConfig


    router     Router//rtp/rtcp路由


    session    Session//会话


    tracks     []PublisherTrack//推流track,封装了*webrtc.TrackRemote和*webrtc.RTPReceiver


    relayed    atomicBool


    relayPeers []*relayPeer//级联peer


    candidates []webrtc.ICECandidateInit//trickle-ice候选


    onICEConnectionStateChangeHandler atomic.Value // func(webrtc.ICEConnectionState)回调


    onPublisherTrack                  atomic.Value // func(PublisherTrack)回调


    closeOnce sync.Once


}


type PublisherTrack struct {//封装了TrackRemote和Receiver


    Track    *webrtc.TrackRemote


    Receiver Receiver


    // This will be used in the future for tracks that will be relayed as clients or servers


    // This is for SVC and Simulcast where you will be able to chose if the relayed peer just


    // want a single track (for recording/ processing) or get all the tracks (for load balancing)


    clientRelay bool//标记为客户端或服务端


}

Subscriber

Subscriber管理下行PC和DownTrack。

type Subscriber struct {


    sync.RWMutex


    id string


    pc *webrtc.PeerConnection//封装下行PC,从客户端接收流


    me *webrtc.MediaEngine


    tracks     map[string][]*DownTrack//管理downtrack


    channels   map[string]*webrtc.DataChannel//管理dc


    candidates []webrtc.ICECandidateInit


    negotiate func()//协商回调


    closeOnce sync.Once


    noAutoSubscribe bool//不自动订阅,router.AddDownTracks时会直接返回,并不添加track


}

Receiver

Receiver从客户端接收rtp流,并发送rtcp。

// Receiver defines a interface for a track receivers


type Receiver interface {


    TrackID() string//track的id


    StreamID() string//stream的id,webrtc一个stream可以包含多个track


    Codec() webrtc.RTPCodecParameters//codec参数,对应sdp里的


    Kind() webrtc.RTPCodecType//kind表示video、audio


    SSRC(layer int) uint32//同步信源(SSRC)标识符


    SetTrackMeta(trackID, streamID string)//设置trackID,streamID


    AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer, bestQualityFirst bool)//增加上行track、buffer,实际上读写包是从buffer里读的


    AddDownTrack(track *DownTrack, bestQualityFirst bool)//增加下行track


    SwitchDownTrack(track *DownTrack, layer int) error//切换downtrack的simulcast的空间层


    GetBitrate() [3]uint64//获取每个layer的码流


    GetMaxTemporalLayer() [3]int32//获取每个空间层最大的时域层


    RetransmitPackets(track *DownTrack, packets []packetMeta) error//收到nack时向downtrack重传包


    DeleteDownTrack(layer int, id string)//根据空间层和id删除downtrack


    OnCloseHandler(fn func())//关闭回调


    SendRTCP(p []rtcp.Packet)//发送rtcp给


    SetRTCPCh(ch chan []rtcp.Packet)


    GetSenderReportTime(layer int) (rtpTS uint32, ntpTS uint64)


}


// WebRTCReceiver receives a video track


type WebRTCReceiver struct {//WebRTCReceiver实现了Receiver的接口,是实际工作的接收器


    sync.Mutex


    closeOnce sync.Once


    peerID         string//哪个peer


    trackID        string//哪个track


    streamID       string//哪个stream


    kind           webrtc.RTPCodecType//视频or音频


    closed         atomicBool


    bandwidth      uint64//带宽


    lastPli        int64//上次pli的时间


    stream         string//没用


    receiver       *webrtc.RTPReceiver//封装实际的RTPReceiver


    codec          webrtc.RTPCodecParameters//codec参数


    rtcpCh         chan []rtcp.Packet//rtcp通道


    buffers        [3]*buffer.Buffer//三个空间层各有一个接收包的buffer


    upTracks       [3]*webrtc.TrackRemote//三个空间层各有一个实际的TrackRemote


    stats          [3]*stats.Stream//三个空间层各有一个状态统计


    available      [3]atomicBool//三个空间层各自是否可用


    downTracks     [3]atomic.Value //三个空间层各自有一组[]*DownTrack


    pending        [3]atomicBool//三个空间层各自是否pending


    pendingTracks  [3][]*DownTrack//存三个空间层各自pending的track


    nackWorker     *workerpool.WorkerPool


    isSimulcast    bool//是否开启大小流


    onCloseHandler func()//关闭回调函数


}

其实,真正收流并转发的是WebRTCReceiver,参考WebRTCReceiver.writeRTP接口。

四、Join流程

1. 简介

这里以JsonRPC为例,前边提到过,所有信令入口是Handle函数。

推荐使用vscode等IDE分析,使用command+左键点击,跳转很方便,command+shift+F可以搜索关键字。

本文分析会省略Trickle-ICE和datachannel处理部分代码,只分析核心流程,这样更容易理解。

首先,来看一下ion-sdk-js的join流程:

const offer = await this.transports[Role.pub].pc.createOffer();//sdk端的pub是用来推流


    await this.transports[Role.pub].pc.setLocalDescription(offer);


    const answer = await this.signal.join(sid, uid, offer);//这里发送信令join到SFU


    await this.transports[Role.pub].pc.setRemoteDescription(answer);


    this.transports[Role.pub].candidates.forEach((c) => this.transports![Role.pub].pc.addIceCandidate(c));


    this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);

然后,用一张简图来表示PeerLocal和Publisher、Subscriber之间的关系。

这里Publisher是对应sdk的pub,用来收pub的流。

2. Join信令逻辑

PeerLocal是信令连接成功后,NewPeer创建的,可以理解为SFU端的Peer。

func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {


。。。


    switch req.Method {


    case "join":


        var join Join


        err := json.Unmarshal(*req.Params, &join)


        if err != nil {


            p.Logger.Error(err, "connect: error parsing offer")


            replyError(err)


            break


        }


        //设置OnOffer回调,SFU需要重协商时,会调用这个函数


        p.OnOffer = func(offer *webrtc.SessionDescription) {


            if err := conn.Notify(ctx, "offer", offer); err != nil {


                p.Logger.Error(err, "error sending offer")


            }






        }


    //设置OnIceCandidate,SFU收到新候选时,会回调这个函数


        p.OnIceCandidate = func(candidate *webrtc.ICECandidateInit, target int) {


            if err := conn.Notify(ctx, "trickle", Trickle{


                Candidate: *candidate,


                Target:    target,


            }); err != nil {


                p.Logger.Error(err, "error sending ice candidate")


            }


        }


        //调用PeerLocal.Join


        err = p.Join(join.SID, join.UID, join.Config)


        if err != nil {


            replyError(err)


            break


        }


        //调用PeerLocal.Answer生成answer


        answer, err := p.Answer(join.Offer)


        if err != nil {


            replyError(err)


            break


        }


        //返回answer


        _ = conn.Reply(ctx, req.ID, answer)

接下来分析一下Peer.Join和Peer.Answer。

3. PeerLocal.Join分析

最终进入的是PeerLocal.Join。

func (p *PeerLocal) Join(sid, uid string, config ...JoinConfig) error {


...


    s, cfg := p.provider.GetSession(sid)//这里实际调的是SFU.GetSession,获取或创建一个session


    p.session = s






    if !conf.NoSubscribe {


        p.subscriber, err = NewSubscriber(uid, cfg)//创建subscriber,来管理一组downtrack


        if err != nil {


            return fmt.Errorf("error creating transport: %v", err)


        }






        p.subscriber.noAutoSubscribe = conf.NoAutoSubscribe






        //设置subscriber的重协商回调函数,最终调用subscriber.Negotiate()就会触发此函数


        //调用宗旨:subcriber的一组downtrack有变化,比如增加、删除,此时需要subscriber重协商,


        //重协商的流程在前边的文章有讲过


        p.subscriber.OnNegotiationNeeded(func() {


            p.Lock()


            defer p.Unlock()






            if p.remoteAnswerPending {


                p.negotiationPending = true//是否为协商中


                return


            }






            Logger.V(1).Info("Negotiation needed", "peer_id", p.id)


            offer, err := p.subscriber.CreateOffer()//协商第一步骤CreateOffer+SetLocalDescription


            if err != nil {


                Logger.Error(err, "CreateOffer error")


                return


            }






            p.remoteAnswerPending = true


            if p.OnOffer != nil && !p.closed.get() {


                Logger.V(0).Info("Send offer", "peer_id", p.id)


                p.OnOffer(&offer)//回调信令发送offer


            }


        })






。。。


    }






    if !conf.NoPublish {


        //创建Publisher,创建上行pc以及设置pc.OnTrack,这是客户端上行流的核心驱动函数


        //熟悉webrtc的都知道OnTrack是收到了远端track流才会触发,比如音频、视频


        p.publisher, err = NewPublisher(uid, p.session, &cfg)


        if err != nil {


            return fmt.Errorf("error creating transport: %v", err)


        }


。。。






    p.session.AddPeer(p)//加入session管理






    Logger.V(0).Info("PeerLocal join SessionLocal", "peer_id", p.id, "session_id", sid)






    if !conf.NoSubscribe {


        p.session.Subscribe(p)//使p订阅session中的其他peer


    }


    return nil


}

接下来分析一下session.Subscribe的核心代码。

// 使peer订阅房间内所有其他peer的流


func (s *SessionLocal) Subscribe(peer Peer) {


。。。


    // 循环遍历房间内的peers,对每个peer的Publisher下的router进行操作:根据router的所有receiver,创建downtrack,并增加到peer.Subscriber中


    for _, p := range peers {


        err := p.Publisher().GetRouter().AddDownTracks(peer.Subscriber(), nil)


        if err != nil {


            Logger.Error(err, "Subscribing to Router err")


            continue


        }


    }


。。。


    peer.Subscriber().negotiate()//订阅好了,重协商


}

这里提一下关键函数调用链:

router.AddDownTracks—》router.AddDownTrack—》Subscriber.AddDownTrack/Receiver.AddDownTrack

func (r *router) AddDownTrack(sub *Subscriber, recv Receiver) (*DownTrack, error) {
    for _, dt := range sub.GetDownTracks(recv.StreamID()) {//避免重复添加
        if dt.ID() == recv.TrackID() {
            return dt, nil
        }
    }


    codec := recv.Codec()
    if err := sub.me.RegisterCodec(codec, recv.Kind()); err != nil {
        return nil, err
    }
    //创建downtrack,downtrack用来给客户端下发流
    downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
        MimeType:     codec.MimeType,
        ClockRate:    codec.ClockRate,
        Channels:     codec.Channels,
        SDPFmtpLine:  codec.SDPFmtpLine,
        RTCPFeedback: []webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},
    }, recv, r.bufferFactory, sub.id, r.config.MaxPacketTrack)
    if err != nil {
        return nil, err
    }
    //把downtrack增加到pc中
    if downTrack.transceiver, err = sub.pc.AddTransceiverFromTrack(downTrack, webrtc.RTPTransceiverInit{
        Direction: webrtc.RTPTransceiverDirectionSendonly,
    }); err != nil {
        return nil, err
    }


    // 设置关闭回调,关闭时pc自动删除track
    downTrack.OnCloseHandler(func() {
        if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
            if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
                if err == webrtc.ErrConnectionClosed {
                    return
                }
                Logger.Error(err, "Error closing down track")
            } else {//如果删除成功,再从sub中删除,然后重协商
                sub.RemoveDownTrack(recv.StreamID(), downTrack)
                sub.negotiate()
            }
        }
    })


  //设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发
    downTrack.OnBind(func() {
        go sub.sendStreamDownTracksReports(recv.StreamID())
    })


  //增加downTrack到sub中,sub只是用来管理downtracks和生成SenderReport等
    sub.AddDownTrack(recv.StreamID(), downTrack)


  //增加downTrack到WebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP中
    recv.AddDownTrack(downTrack, r.config.Simulcast.BestQualityFirst)
    return downTrack, nil
}

4. PeerLocal.Answer分析

func (p *PeerLocal) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
。。。




    answer, err := p.publisher.Answer(sdp)
    if err != nil {
        return nil, fmt.Errorf("error creating answer: %v", err)
    }




    Logger.V(0).Info("PeerLocal send answer", "peer_id", p.id)




    return &answer, nil
}

5. 总结

总结一下:

  • 连接成功,创建PeerLocal,Join信令进入Handle函数

  • PeerLocal.Join

  • 获取或创建一个session

  • 给PeerLocal创建Subscriber和Publisher,并设置他们的一些回调函数

  • 使PeerLocal订阅session中的其他peer,因为此时其他peer可能正在推流:

1)router.AddDownTracks—》router.AddDownTrack—》Subscriber.AddDownTrack/Receiver.AddDownTrack

2)PeerLocal.Subscriber().negotiate();订阅好了,协商,这里是第二次重协商,完成后客户端可以收到其他人的流了

  • PeerLocal.Answer生成answer

  • 信令发送answer

  • 客户端收到answer后,完成第一次协商,此时pub的dc通道打通了(在前边的文章有一张图,作为参考)


作者简介:

王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。

特别说明:

本文发布于知乎,已获得作者授权转载。


扫描图中二维码或点击阅读原文

了解大会更多信息

喜欢我们的内容就点个“在看”吧!

以上是关于大话ion系列的主要内容,如果未能解决你的问题,请参考以下文章

大话ion系列

大话ion系列

大话ion系列

大话设计模式-抽象工厂模式

大话数据结构读书笔记系列线性表

JUC系列01之大话并发