大话ion系列

Posted LiveVideoStack_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大话ion系列相关的知识,希望对你有一定的参考价值。

点击上方“LiveVideoStack”关注我们

作者 | 王朋闯

本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。

大话ion系列(一)

大话ion系列(二)

大话ion系列(三)

七、Simulcast流程

1. Simulcast概念

先介绍WebRTC的一个概念——Simulcast(联播,俗称大小流):

推流端===f/h/q==>SFU--f--->收流端A
                 |---q--->收流端B
                 |---h--->收流端C
  • 上行一般是三路流,按分辨率和码率,一般分为fhq(大中小)三层

  • 下行可以分给不同的用户不同的流,比如网不好时分发个小流q,网变好了再切回大流f

  • 三层的streamId、trackId是一样的,但是rid和ssrc是不同的,rid一般是f、h、q

  • 对应的SDP部分

.........
a=rid:f send
a=rid:h send
a=rid:q send
a=simulcast:send f;h;q

2.收发流程

看本章之前,最好看一下前一章,熟悉一下收发流程,本文只重点介绍其中的Simulcast部分。

收发包逻辑打通步骤:

SDK推流---->OnTrack---->router.AddReceiver(设置Buffer和上行Track)------>SessionLocal.Publish(设置下行Track)---->收发包逻辑打通

3.Simulcast上行流程

非Simulcast情况,OnTrack一般会触发两次:一个audioTrack+一个videoTrack。

Simulcast下,OnTrack一般会触发四次:一个audioTrack+三个videoTrack(rid分别为fhq)。

这个流程会触发四次:

OnTrack--->router.AddReceiver--->WebRTCReceiver.AddUpTrack

三个videoTrack,共用同一个WebRTCReceiver。

type WebRTCReceiver struct 
。。。
    receiver       *webrtc.RTPReceiver
    codec          webrtc.RTPCodecParameters
    rtcpCh         chan []rtcp.Packet
    buffers        [3]*buffer.Buffer//需要三个buffer
    upTracks       [3]*webrtc.TrackRemote//三个TrackRemote
。。。
    pendingTracks  [3][]*DownTrack//三个层,每层来订阅的downtrack
。。。

接下来看一下AddUpTrack是如何工作的:

func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote,buff *buffer.Buffer, bestQualityFirst bool) 
    if w.closed.get() 
        return
    
 
  //根据RID来区分layer
    var layer int
    switch track.RID() //如果没开simulcast,为""
    case fullResolution:
        layer = 2
    case halfResolution:
        layer = 1
    default:
        layer = 0//如果没开simulcast,为0
    
 
    w.Lock()
  //设置空域层layer的track
    w.upTracks[layer] = track
 
  //设置空域层layer的buff
    w.buffers[layer] = buff
    w.available[layer].set(true)
 
  //设置空域层layer的downtrack
    w.downTracks[layer].Store(make([]*DownTrack,0, 10))
    w.pendingTracks[layer] = make([]*DownTrack,0, 10)
    w.Unlock()
 
  //闭包函数,按最佳质量订阅,切到f层
    subBestQuality := func(targetLayerint) 
        for l := 0; l <targetLayer; l++ 
            dts :=w.downTracks[l].Load()
            if dts == nil
                continue
            
            for _, dt :=range dts.([]*DownTrack) 
                _ = dt.SwitchSpatialLayer(int32(targetLayer), false)
            
        
    
 
  //闭包函数,按最差质量订阅,切到q层
    subLowestQuality := func(targetLayerint) 
        for l := 2; l !=targetLayer; l-- 
            dts :=w.downTracks[l].Load()
            if dts == nil
                continue
            
            for _, dt :=range dts.([]*DownTrack) 
                _ = dt.SwitchSpatialLayer(int32(targetLayer), false)
            
        
    
 
  //是否开启大小流
    if w.isSimulcast 
    //如果配置最佳质量,则等到f层到来时,订阅它
        if bestQualityFirst &&(!w.available[2].get() || layer == 2) 
            subBestQuality(layer)
      //如果配置最差质量,则等到q层到来时,订阅它
         else if!bestQualityFirst && (!w.available[0].get() ||layer == 0) 
            subLowestQuality(layer)
        
    
 
  //启动读写流程
    go w.writeRTP(layer)

真正的收发包流程来了:

func (w *WebRTCReceiver) writeRTP(layer int) 
    defer func() //这里设置自动清理函数
        w.closeOnce.Do(func() 
            w.closed.set(true)
            w.closeTracks()
        )
    ()
 
  //创建一个PLI包,后边要用
    pli := []rtcp.Packet
        &rtcp.PictureLossIndicationSenderSSRC:rand.Uint32(), MediaSSRC: w.SSRC(layer),
    
 
    for 
    //这里可以看到,真正读包是从buffer里读出来的,正是前边讲到的自定义buffer
        pkt, err :=w.buffers[layer].ReadExtended()
        if err ==io.EOF 
            return
        
 
    //如果开启大小流
        if w.isSimulcast 
      //一开始是pending状态
            ifw.pending[layer].get() 
        //如果收到的包是关键帧
                ifpkt.KeyFrame 
                    w.Lock()
          //如果有切换中的layer,那就切一下
                    for idx,dt := range w.pendingTracks[layer] 
                        w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
                        w.storeDownTrack(layer, dt)
                        dt.SwitchSpatialLayerDone(int32(layer))
                       w.pendingTracks[layer][idx] = nil
                    
                   w.pendingTracks[layer] = w.pendingTracks[layer][:0]
                   w.pending[layer].set(false)
                    w.Unlock()
                 else 
          //如果是非关键字,说明需要发送PLI
                    w.SendRTCP(pli)
                
            
        
 
    //这里是不是有疑问,[]*downTracks是SessionLocal.Publish里塞过来的,后边会介绍:)
        for _, dt := rangew.downTracks[layer].Load().([]*DownTrack)
      //下行track写入rtp包
            if err = dt.WriteRTP(pkt, layer);err != nil 
                if err ==io.EOF && err == io.ErrClosedPipe 
                    w.Lock()
                    w.deleteDownTrack(layer, dt.id)
                    w.Unlock()
                
                log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
            
        
    
 

至此一个简单的Simulcast收发模型:

SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
       |                                                    |....
       |                                                    |--->downTracks[0][N].WriteRTP
       |
       |---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
                    |                                      |....
                    |                                      |---->downTracks[0][N].WriteRTP
                    |
                    |------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
                    |                                      |....
                    |                                      |----->downTracks[1][N].WriteRTP
                    |
                    |------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
                                                           |....
                                                           |------>downTracks[2][N].WriteRTP

上面省略了SDK--->ReadStreamSRTP.buffer.Write,这个buffer和WebRTCReceiver.buffer是同一个。

订阅端SDK的切大小流操作,其实就是在0-2来回挂载downTrack而已。

4.Simulcast下行流程

读者前边的疑问,downTracks是哪里塞过来的?流程在这里:

OnTrack--->SessionLocal.Publish--->router.AddDownTracks--->router.AddDownTrack--->WebRTCReceiver.AddDownTrack--->WebRTCReceiver.storeDownTrack

pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver)
    //Simulcast一般会触发OnTrack四次,一个audio,三个video
    //由于三个video的trackId一样,共用一个WebRTCReceiver
        r, pub := p.router.AddReceiver(receiver,track)
        if pub //这里video到来的第一次pub才为true
      //这里把receiver发布到router里,其他peer的downtrack会挂载到receiver下
            p.session.Publish(p.router, r)

这里为了方便,再贴一下整个流程的代码,比较繁琐,可以跳过。

SessionLocal.Publish

func (s *SessionLocal) Publish(router Router,r Receiver) 
    for _, p := ranges.Peers() 
        // Don't sub toself
        if router.ID() == p.ID() || p.Subscriber() == nil
            continue
        
        //表示根据r的信息创建downtrack,并增加到p.Subscriber()和r中
        if err :=router.AddDownTracks(p.Subscriber(), r); err !=nil 
            Logger.Error(err, "Errorsubscribing transport to Router")
            continue
        
    

router.AddDownTracks

func (r *router) AddDownTracks(s *Subscriber,recv Receiver) error 
。。。
//如果recv不为空,表示根据recv的信息创建downtrack,并增加到s和recv中
    if recv != nil
        if _, err :=r.AddDownTrack(s, recv); err != nil 
            return err
        
        s.negotiate()
        return nil
    
//如果recv为空,表示遍历房间中所有的receivers,并增加到s和recv中
    if len(r.receivers)> 0 
        for _, rcv := ranger.receivers 
            if _, err :=r.AddDownTrack(s, rcv); err != nil 
                return err
            
        
        s.negotiate()
    
    return nil

router.AddDownTrack

根据recv的信息创建downtrack,并增加到sub和recv中。

func (r *router) AddDownTrack(sub *Subscriber,recv Receiver) (*DownTrack, error) 
    for _, dt := rangesub.GetDownTracks(recv.StreamID()) //避免重复添加
        if dt.ID() ==recv.TrackID() 
            return dt, nil
        
    
 
    codec := recv.Codec()
    if err := sub.me.RegisterCodec(codec, recv.Kind()); err !=nil 
        return nil,err
    
    //创建downtrack,downtrack用来给客户端下发流
    downTrack, err := NewDownTrack(webrtc.RTPCodecCapability
        MimeType:     codec.MimeType,
        ClockRate:    codec.ClockRate,
        Channels:     codec.Channels,
        SDPFmtpLine:  codec.SDPFmtpLine,
        RTCPFeedback:[]webrtc.RTCPFeedback"goog-remb", "", "nack", "", "nack", "pli",
    , recv, r.bufferFactory,sub.id, r.config.MaxPacketTrack)
    if err != nil
        return nil,err
    
    //把downtrack增加到pc中
    if downTrack.transceiver,err = sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit
        Direction:webrtc.RTPTransceiverDirectionSendonly,
    ); err != nil 
        return nil,err
    
 
    // 设置关闭回调,关闭时pc自动删除track
    downTrack.OnCloseHandler(func() 
        if sub.pc.ConnectionState() !=webrtc.PeerConnectionStateClosed 
            if err :=sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err !=nil 
                if err ==webrtc.ErrConnectionClosed 
                    return
                
                Logger.Error(err, "Errorclosing down track")
             else //如果删除成功,再从sub中删除,然后重协商
                sub.RemoveDownTrack(recv.StreamID(), downTrack)
                sub.negotiate()
            
        
    )
 
  //设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发
    downTrack.OnBind(func() 
        go sub.sendStreamDownTracksReports(recv.StreamID())
    )
 
  //增加downTrack到sub中,sub只是用来管理downtracks和生成SenderReport等
    sub.AddDownTrack(recv.StreamID(), downTrack)
 
  //增加downTrack到WebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP中
    recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)
    return downTrack, nil

5.Simulcast切换流程

第一种,自动切换。

上边的subBestQuality,会在f层receiver到来时,自动订阅f层。

第二种,手动切换。

通过信令或datachannel控制来切换。

先来讲一下datachannel信令通道,在main里创建了一个内置dc,处理函数为datachannel.SubscriberAPI。

func main() 
    nsfu := sfu.NewSFU(conf.Config)
    dc :=nsfu.NewDatachannel(sfu.APIChannelLabel)
   dc.Use(datachannel.SubscriberAPI)
    s :=server.NewWrapperedGRPCWebServer(options, nsfu)
    if err := s.Serve(); err != nil
        logger.Error(err,"failed to serve")
        os.Exit(1)
    
    select 

客户端发过来的切大小流指令会进入此函数。

funcSubscriberAPI(nextsfu.MessageProcessor) sfu.MessageProcessor 
    return sfu.ProcessFunc(func(ctxcontext.Context, args sfu.ProcessArgs) 
        srm := &setRemoteMedia
        if err :=json.Unmarshal(args.Message.Data, srm); err != nil 
            return
        
        // Publisherchanging active layers
        if srm.Layers !=nil && len(srm.Layers) > 0 
。。。//当前sdk逻辑不会进入这里
         else 
      //按流ID查找downTracks
            downTracks :=args.Peer.Subscriber().GetDownTracks(srm.StreamID)
            for _, dt :=range downTracks 
                switch dt.Kind() 
                casewebrtc.RTPCodecTypeAudio:
                    dt.Mute(!srm.Audio)//音频是否需要mute/unmute
                casewebrtc.RTPCodecTypeVideo:
                    switchsrm.Video //视频是否需要切大小流/mute
                    casehighValue:
            //这里把d.reSync.set设置为true了,writeSimulcastRTP里会自动发PLI
                        dt.Mute(false)
                        dt.SwitchSpatialLayer(2, true)
                    casemediumValue:
                        dt.Mute(false)
                        dt.SwitchSpatialLayer(1, true)
                    caselowValue:
                        dt.Mute(false)
                        dt.SwitchSpatialLayer(0, true)
                    casemutedValue:
                        dt.Mute(true)
                    
                    switchsrm.Framerate //当前sdk逻辑也不会进入这里,srm.Framerate=""
                    
                
 
            
        
        next.Process(ctx, args)
    )

DownTrack.SwitchSpatialLayer

func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error 
    if d.trackType ==SimulcastDownTrack 
        // Don't switchuntil previous switch is done or canceled
        csl := atomic.LoadInt32(&d.currentSpatialLayer)
 
        //如果当前运行layer不是正在切的layer,或当前layer是要切的
        //换句话说,如果当前layer没切完成,或者当前layer和要切的一样,那就返回错误
        if csl !=atomic.LoadInt32(&d.targetSpatialLayer) || csl ==targetLayer 
            returnErrSpatialLayerBusy
        
        //切换layer
        if err :=d.receiver.SwitchDownTrack(d, int(targetLayer));err == nil 
            atomic.StoreInt32(&d.targetSpatialLayer,targetLayer)
            if setAsMax 
                atomic.StoreInt32(&d.maxSpatialLayer,targetLayer)
            
        
        return nil
    
    returnErrSpatialNotSupported

WebRTCReceiver.SwitchDownTrack

func (w *WebRTCReceiver) SwitchDownTrack(track *DownTrack,layer int) error 
    if w.closed.get() 
        returnerrNoReceiverFound
    
    //切换就是把track放入pending
    if w.available[layer].get() 
        w.Lock()
        w.pending[layer].set(true)
        w.pendingTracks[layer] = append(w.pendingTracks[layer],track)
        w.Unlock()
        return nil
    
    return errNoReceiverFound

然后在writeRTP里切换:

func (w *WebRTCReceiver) writeRTP(layer int) 
....
    for 
        pkt, err :=w.buffers[layer].ReadExtended()
        if err ==io.EOF 
            return
        
 
        //如果是大小流
        if w.isSimulcast 
            //如果正在切换,pending[layer]get()为true
            ifw.pending[layer].get() 
                // 如果是关键帧,才会切换,好在前边Mute流程里发送了PLI,这里应该很快来一个关键帧
                ifpkt.KeyFrame 
                    w.Lock()
 
                    //=========这里切换
                    for idx, dt:= range w.pendingTracks[layer] 
                    //删除原来的
                        w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
                        //存储新的dt,以后writeRTP会写入新的dt
                        w.storeDownTrack(layer, dt)
                        //设置切换完成
                        dt.SwitchSpatialLayerDone(int32(layer))
                        //pending中此dt置空
                       w.pendingTracks[layer][idx] = nil
                    
                    //清空pendingTracks此layer
                   w.pendingTracks[layer] = w.pendingTracks[layer][:0]
                    //标志位置为false
                   w.pending[layer].set(false)
                    w.Unlock()
                 else 
                    // 如果不是关键帧,再次发送PLI
                    w.SendRTCP(pli)
                
            
        
 
        for _, dt := rangew.downTracks[layer].Load().([]*DownTrack)
            if err = dt.WriteRTP(pkt, layer);err != nil 
                if err ==io.EOF && err == io.ErrClosedPipe 
                    w.Lock()
                    w.deleteDownTrack(layer, dt.id)
                    w.Unlock()
                
                log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
            
        
    

6. 总结

Simulcast在ion-sfu中,默认是通过datachannel来操作切换的。

首先,切换是操作pendingTracks:

SubscriberAPI---》dt.SwitchSpatialLayer-->WebRTCReceiver.SwitchDownTrack--->写入pendingTracks

然后,在WebRTCReceiver.writeRTP里进行实质切换:

WebRTCReceiver.writeRTP--->读取pendingTracks---》更换downTracks--》storeDownTrack--》OK

之后,写包就会写入新track。至此一个简单的Simulcast收发模型就建成了:

SDK---SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
       |                                              |....
       |                                              |--->downTracks[0][N].WriteRTP
       |
       |---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
                    |                                    |....
                    |                                    |---->downTracks[0][N].WriteRTP
                    |
                    |------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
                    |                                     |....
                    |                                     |----->downTracks[1][N].WriteRTP
                    |
                    |------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
                                                         |....
                                                         |------>downTracks[2][N].WriteRTP

作者简介:

王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。

特别说明:

本文发布于知乎,已获得作者授权转载。


讲师招募

LiveVideoStackCon 2022 音视频技术大会 上海站,正在面向社会公开招募讲师,无论你所处的公司大小,title高低,老鸟还是菜鸟,只要你的内容对技术人有帮助,其他都是次要的。欢迎通过 speaker@livevideostack.com 提交个人资料及议题描述,我们将会在24小时内给予反馈。

喜欢我们的内容就点个“在看”吧!

以上是关于大话ion系列的主要内容,如果未能解决你的问题,请参考以下文章

大话ion系列

大话ion系列

大话ion系列

大话设计模式-抽象工厂模式

大话数据结构读书笔记系列线性表

JUC系列01之大话并发