大话ion系列

Posted LiveVideoStack_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大话ion系列相关的知识,希望对你有一定的参考价值。

点击上方“LiveVideoStack”关注我们

作者 | 王朋闯

本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。

大话ion系列(一)

大话ion系列(二)

五、offer与answer流程

1.前言

之前的文章已经介绍了前两次重协商:

  • 客户端sdk的pub的dc已经打通,此时使用dc控制simulcast和监听audiolevel speaker。

  • 客户端sdk的sub订阅到了房间内的流。

接下来,SDK推流是第三次协商了。

2.offer流程

当点击ion-sfu的demo里“publish”按钮的时候,就会触发ion-sdk-js的操作:

把音视频track增加到pub的pc,此时会触发onNegotiationNeeded。

首先来看一下ion-sdk-js的代码:

this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);

这里把onNegotiationNeeded绑定到了pc.onnegotiationneeded,意思是当推流增加track到pc时,就会触发onNegotiationNeeded。

参考:

https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/onnegotiationneeded

接下来看一下onNegotiationNeeded,是个标准的重协商流程。

private async onNegotiationNeeded() {
    if (!this.transports) {
      throw Error(ERR_NO_SESSION);
    }


    let offer: RTCSessionDescriptionInit | undefined;
    let answer: RTCSessionDescriptionInit | undefined;
    try {
      offer = await this.transports[Role.pub].pc.createOffer();
      await this.transports[Role.pub].pc.setLocalDescription(offer);
      answer = await this.signal.offer(offer);//在这里发送offer到SFU
      await this.transports[Role.pub].pc.setRemoteDescription(answer);
    } catch (err) {
      /* tslint:disable-next-line:no-console */
      console.error(err);
      if (this.onerrnegotiate) this.onerrnegotiate(Role.pub, err, offer, answer);
    }
  }

接下来看一下SFU里的处理:

func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
....
case "offer":
        var negotiation Negotiation
        err := json.Unmarshal(*req.Params, &negotiation)
        if err != nil {
            p.Logger.Error(err, "connect: error parsing offer")
            replyError(err)
            break
        }
        //调用peerLocal.Answer()
        answer, err := p.Answer(negotiation.Desc)
        if err != nil {
            replyError(err)
            break
        }
        // 发送answer
        _ = conn.Reply(ctx, req.ID, answer)
func (p *PeerLocal) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
....//这里调用了publisher.Answer()
    answer, err := p.publisher.Answer(sdp)
    if err != nil {
        return nil, fmt.Errorf("error creating answer: %v", err)
    }
 
    Logger.V(0).Info("PeerLocal send answer", "peer_id", p.id)
 
    return &answer, nil
}
// 这里可以看到Publisher.Answer就是标准的协商流程
// 在前边的文章详细介绍过什么叫协商和重协商,这里不在重复了
func (p *Publisher) Answer(offer webrtc.SessionDescription) (webrtc.SessionDescription, error) {
    if err := p.pc.SetRemoteDescription(offer); err != nil {
        return webrtc.SessionDescription{}, err
    }
 
    for _, c := range p.candidates {
        if err := p.pc.AddICECandidate(c); err != nil {
            Logger.Error(err, "Add publisher ice candidate to peer err", "peer_id", p.id)
        }
    }
    p.candidates = nil
 
    answer, err := p.pc.CreateAnswer(nil)
    if err != nil {
        return webrtc.SessionDescription{}, err
    }
    if err := p.pc.SetLocalDescription(answer); err != nil {
        return webrtc.SessionDescription{}, err
    }
    return answer, nil
}

3.answer流程

func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
....
    case "answer":
        var negotiation Negotiation
        err := json.Unmarshal(*req.Params, &negotiation)
        if err != nil {
            p.Logger.Error(err, "connect: error parsing offer")
            replyError(err)
            break
        }
        // peerLocal.SetRemoteDescription
        err = p.SetRemoteDescription(negotiation.Desc)
        if err != nil {
            replyError(err)
        }
 
 
func (p *PeerLocal) SetRemoteDescription(sdp webrtc.SessionDescription) error {
    if p.subscriber == nil {
        return ErrNoTransportEstablished
    }
    p.Lock()
    defer p.Unlock()
 
    Logger.V(0).Info("PeerLocal got answer", "peer_id", p.id)
  // 这里调用subscriber.SetRemoteDescription
    if err := p.subscriber.SetRemoteDescription(sdp); err != nil {
        return fmt.Errorf("setting remote description: %w", err)
    }
 
    p.remoteAnswerPending = false
 
    if p.negotiationPending {//这里两个标志位是为了防止重协商竞争冲突
        p.negotiationPending = false
        p.subscriber.negotiate()
    }
 
    return nil
}
 
  //这里仅仅是调用pc.SetRemoteDescription
func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error {
    if err := s.pc.SetRemoteDescription(desc); err != nil {
        Logger.Error(err, "SetRemoteDescription error")
        return err
    }
.....
 
    return nil
}

至此,第三次重协商完成,两端交换完sdp,接下来ice打通后会推流过来。


4.总结

pub在第一次协商后,只打通了dc,此时使用dc控制simulcast和监听audiolevel speaker,也可以定制自己的dc。

这样的好处是灵活。

sub在第二次协商后,可以订阅到房间内的其他人的流了。

pub在第三次协商时,是增加音视频track后,然后走标准重协商流程,开始推流。

六、包的收发流程

1.前言

本文从ion-sfu中的demo点击“publish”开始,讲一下如何收包转发。

前边讲到,点击“publish”,会进行第三次重协商,协商完成,客户端此时推流到SFU。

此时会触发OnTrack,这里的OnTrack和标准webrtc接口是一样的,会在流到达时自动触发。

参考:

https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/ontrack

2.收包流程

OnTrack收包流程的入口

首先贴一下老代码,OnTrack是在NewPublisher里边设置的回调。

func NewPublisher(idstring,sessionSession,cfg*WebRTCTransportConfig)(*Publisher,error){
。。。
  //这里要注意cfg.Setting,里边的bufferFactory已经设置好了为自定义的c.BufferFactory.GetOrNew
  //可以搜一下这个函数NewWebRTCTransportConfig,这一行“se.BufferFactory = c.BufferFactory.GetOrNew”
    api:=webrtc.NewAPI(webrtc.WithMediaEngine(me),webrtc.WithSettingEngine(cfg.Setting))
    pc,err:=api.NewPeerConnection(cfg.Configuration)
。。。
 
    p:=&Publisher{
        id:      id,
        pc:      pc,
        cfg:     cfg,
        router:  newRouter(id,session,cfg),
        session:session,
    }
//===========OnTrack在这里
    pc.OnTrack(func(track*webrtc.TrackRemote,receiver*webrtc.RTPReceiver){
。。。
    //这里AddReceiver会新建WebRTCReceiver,然后AddUpTrack
    //uptrack是收流的,downtrack是发流的
        r,pub:=p.router.AddReceiver(receiver,track)
 
        if pub{
      //这里会把流发布到房间内,其他peer会订阅到
            p.session.Publish(p.router,r)
 
            p.mu.Lock()
            publisherTrack:=PublisherTrack{track,r,true}
            p.tracks=append(p.tracks,publisherTrack)
。。。
            p.mu.Unlock()
            if handler,ok:=p.onPublisherTrack.Load().(func(PublisherTrack));ok&&handler!=nil{
        //这里如果上层业务,通过OnPublisherTrack设置了回调,就会触发
        //一般只有包导入的情况下,才会这样用,比如业务不想加入房间就自动订阅,想要按需订阅
                handler(publisherTrack)
            }
        } else {
            p.mu.Lock()
            p.tracks=append(p.tracks,PublisherTrack{track,r,false})
            p.mu.Unlock()
        }
    })
  //===========


自定义buffer

这里不得不介绍一下自定义buffer了,看懂了才知道包是从哪里来的。

Pion/webrtc支持自定义BufferFactory,设置好之后,pion/webrtc的组件会使用自定义buffer。

比如pion/srtp是实际收发srtp和srtcp包的类,它们也会使用自定义buffer。

首先来看一下ion-sfu是在哪里设置自定义buffer的:

func NewWebRTCTransportConfig(cConfig)WebRTCTransportConfig{
  //这个SettingEngine是pion里很重要的设置类,可以控制pion/webrtc很多行为和参数,比如ice-lite等
    se:=webrtc.SettingEngine{}
    se.DisableMediaEngineCopy(true)
  ....
  //这里把自定义的BufferFactory给配置进去了
  //意思是pion/srtp会使用这个buffer来传包
    se.BufferFactory=c.BufferFactory.GetOrNew
}

srtp和srtcp流向是这样的:

客户端---srtp--->srtp.ReadStreamSRTP------->SFU
客户端<---srtcp---srtp.ReadStreamSRTCP<------SFU

当包到达pion/srtp时,就会触发ReadStreamSRTP.init函数和ReadStreamSRTCP.init函数。

  • ReadStreamSRTP.init调用自定义的BufferFactory.GetOrNew函数了,new了一个buffer。

  • ReadStreamSRTCP.init调用自定义的BufferFactory.GetOrNew函数,new一个rtcpReader。

之后收发rtp和rtcp包,就会流经这个buffer和rtcpReader:

https://github.com/pion/srtp/blob/3c34651fa0c6de900bdc91062e7ccb5992409643/stream_srtp.go#L53

func(r*ReadStreamSRTP)init(childstreamSession,ssrcuint32)error{
    sessionSRTP,ok:=child.(*SessionSRTP)
......
    if r.session.bufferFactory!=nil{
    //这里就是调用自定义的BufferFactory.GetOrNew函数了,new了一个buffer
        r.buffer=r.session.bufferFactory(packetio.RTPBufferPacket,ssrc)
    } else {
.......
    }
 
    return nil
}
//这里就把包写入了自定义buffer
func(r*ReadStreamSRTP)write(buf[]byte)(nint,errerror){
    n,err=r.buffer.Write(buf)
 
    if errors.Is(err,packetio.ErrFull){
        // Silently drop data when the buffer is full.
        return len(buf),nil
    }
 
    return n,err
}

为什么这么搞呢?

仔细想想,如果控制了rtp和rtcp的buffer,是不是计算twcc、nack、stats等就很方便了?在buffer写入包的同时,就可以通过设置的回调函数搞各种复杂计算。

router.AddReceiver

接下来可以看到buffer的各种回调。

func(r*router)AddReceiver(receiver*webrtc.RTPReceiver,track*webrtc.TrackRemote)(Receiver,bool){
    r.Lock()
    deferr.Unlock()
 
    publish:=false
    trackID:=track.ID()
 
//这里获取了之前init函数中,new出来的buffer和rtcpReader,开始搞事情
    buff,rtcpReader:=r.bufferFactory.GetBufferPair(uint32(track.SSRC()))
 
  //设置rtcp的回调,比如nack、twcc、rr
    buff.OnFeedback(func(fb[]rtcp.Packet){
        r.rtcpCh<-fb
    })
 
    if track.Kind()==webrtc.RTPCodecTypeAudio{
        streamID:=track.StreamID()
    //如果是音频track,设置OnAudioLevel回调
        buff.OnAudioLevel(func(leveluint8){
            r.session.AudioObserver().observe(streamID,level)
        })
        r.session.AudioObserver().addStream(streamID)
 
    }else if track.Kind()==webrtc.RTPCodecTypeVideo{
        if r.twcc==nil{
      //如果是视频track,创建twcc计算器,并设置回调,当计算器生成twcc包就会回调
            r.twcc=twcc.NewTransportWideCCResponder(uint32(track.SSRC()))
            r.twcc.OnFeedback(func(prtcp.RawPacket){
                r.rtcpCh<-[]rtcp.Packet{&p}
            })
        }
    //设置buffer的twcc回调,buffer收到包后调用,塞入twcc计算器
    //twcc计算生成rtcp包,再回调OnFeedback发送给客户端
        buff.OnTransportWideCC(func(snuint16,timeNSint64,markerbool){
            r.twcc.Push(sn,timeNS,marker)
        })
    }
 
    if r.config.WithStats{
        r.stats[uint32(track.SSRC())]=stats.NewStream(buff)
    }
    //设置rtcpReader.OnPacket
    rtcpReader.OnPacket(func(bytes[]byte){
    //收到SDES、SR包做些处理
    })
 
    recv,ok:=r.receivers[trackID]
    if!ok{
    //创建WebRTCReceiver并设置回调
        recv=NewWebRTCReceiver(receiver,track,r.id)
        r.receivers[trackID]=recv
        recv.SetRTCPCh(r.rtcpCh)
        recv.OnCloseHandler(func(){
    。。。。
        })
        publish=true
    }
 
    //把track buffer塞入recv
    recv.AddUpTrack(track,buff,r.config.Simulcast.BestQualityFirst)
 
    //初始化buff
    buff.Bind(receiver.GetParameters(),buffer.Options{
        MaxBitRate:r.config.MaxBandwidth,
    })
 
。。。
    return recv,publish
}

这里很重要,WebRTCReceiver是真实负责收发包的,可以看到AddUpTrack已经把buffer塞进去了。

接下来看一下AddUpTrack是如何工作的:

func(w*WebRTCReceiver)AddUpTrack(track*webrtc.TrackRemote,buff*buffer.Buffer,bestQualityFirstbool){
    if w.closed.get(){
        return
    }
 
  //根据RID来区分layer
    varlayerint
    switchtrack.RID(){//如果没开simulcast,为""
    casefullResolution:
        layer=2
    casehalfResolution:
        layer=1
    default:
        layer=0//如果没开simulcast,为0
    }
 
    w.Lock()
  //设置空域层layer的track
    w.upTracks[layer]=track
 
  //设置空域层layer的buff
    w.buffers[layer]=buff
    w.available[layer].set(true)
 
  //设置空域层layer的downtrack,这里的[]*DownTrack数组,订阅该layer的downtrack存在这里
    w.downTracks[layer].Store(make([]*DownTrack,0,10))
    w.pendingTracks[layer]=make([]*DownTrack,0,10)
    w.Unlock()
 
  //闭包函数,按最佳质量订阅,切到f层
    subBestQuality:=func(targetLayerint){
        for l:=0;l<targetLayer;l++{
            dts:=w.downTracks[l].Load()
            if dts==nil{
                continue
            }
            for_,dt:=rangedts.([]*DownTrack){
                _=dt.SwitchSpatialLayer(int32(targetLayer),false)
            }
        }
    }
 
  //闭包函数,按最差质量订阅,切到q层
    subLowestQuality:=func(targetLayerint){
        for l:=2;l!=targetLayer;l--{
            dts:=w.downTracks[l].Load()
            if dts==nil{
                continue
            }
            for_,dt:=rangedts.([]*DownTrack){
                _=dt.SwitchSpatialLayer(int32(targetLayer),false)
            }
        }
    }
 
  //是否开启大小流
    if w.isSimulcast{
    //如果配置最佳质量,则等到f层到来时,订阅它
        if bestQualityFirst&&(!w.available[2].get()||layer==2){
            subBestQuality(layer)
      //如果配置最差质量,则等到q层到来时,订阅它
        }else if!bestQualityFirst&&(!w.available[0].get()||layer==0){
            subLowestQuality(layer)
        }
    }
 
  //启动读写流程
    go w.writeRTP(layer)
}

真正的收发包流程来了:

func(w*WebRTCReceiver)writeRTP(layerint){
    defer func(){//这里设置自动清理函数
        w.closeOnce.Do(func(){
            w.closed.set(true)
            w.closeTracks()
        })
    }()
 
  //创建一个PLI包,后边要用
    pli:=[]rtcp.Packet{
        &rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(),MediaSSRC:w.SSRC(layer)},
    }
 
    for {
    //这里可以看到,真正读包是从buffer里读出来的,正是前边讲到的自定义buffer
        pkt,err:=w.buffers[layer].ReadExtended()
        if err==io.EOF{
            return
        }
 
    //如果开启大小流
        if w.isSimulcast{
。。。//这里跳过,以后再讲
        }
 
        for_,dt:=rangew.downTracks[layer].Load().([]*DownTrack){
      //下行track写入rtp包,这样订阅者可以收到流了
            if err=dt.WriteRTP(pkt,layer);err!=nil{
                if err==io.EOF&&err==io.ErrClosedPipe{
                    w.Lock()
                    w.deleteDownTrack(layer,dt.id)
                    w.Unlock()
                }
                log.Error().Err(err).Str("id",dt.id).Msg("Error writing to down track")
            }
        }
    }
 
}


3.发包流程

SessionLocal.Publish

func(s*SessionLocal)Publish(routerRouter,rReceiver){
    for_,p:=ranges.Peers(){
        // Don't sub to self
        if router.ID()==p.ID()||p.Subscriber()==nil{
            continue
        }
        //表示根据r的信息创建downtrack,并增加到p.Subscriber()和r中
        if err:=router.AddDownTracks(p.Subscriber(),r);err!=nil{
            Logger.Error(err,"Error subscribing transport to Router")
            continue
        }
    }
}


router.AddDownTracks

func(r*router)AddDownTracks(s*Subscriber,recvReceiver)error{
。。。
//如果recv不为空,表示根据recv的信息创建downtrack,并增加到s和recv中
    if recv!=nil{
        if_,err:=r.AddDownTrack(s,recv);err!=nil{
            return err
        }
        s.negotiate()
        return nil
    }
//如果recv为空,表示遍历房间中所有的receivers,并增加到s和recv中
    if len(r.receivers)>0{
        for_,rcv:=ranger.receivers{
            if_,err:=r.AddDownTrack(s,rcv);err!=nil{
                return err
            }
        }
        s.negotiate()
    }
    return nil
}

router.AddDownTrack

根据recv的信息创建downtrack,并增加到sub和recv中。

func(r*router)AddDownTrack(sub*Subscriber,recvReceiver)(*DownTrack,error){
    for_,dt:=rangesub.GetDownTracks(recv.StreamID()){//避免重复添加
        if dt.ID()==recv.TrackID(){
            return dt,nil
        }
    }
 
    codec:=recv.Codec()
    if err:=sub.me.RegisterCodec(codec,recv.Kind());err!=nil{
        return nil,err
    }
    //创建downtrack,downtrack用来给客户端下发流
    downTrack,err:=NewDownTrack(webrtc.RTPCodecCapability{
        MimeType:     codec.MimeType,
        ClockRate:    codec.ClockRate,
        Channels:     codec.Channels,
        SDPFmtpLine:  codec.SDPFmtpLine,
        RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb",""},{"nack",""},{"nack","pli"}},
    },recv,r.bufferFactory,sub.id,r.config.MaxPacketTrack)
    if err!=nil{
        return nil,err
    }
    //把downtrack增加到pc中
    if downTrack.transceiver,err=sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{
        Direction:webrtc.RTPTransceiverDirectionSendonly,
    });err!=nil{
        return nil,err
    }
 
    // 设置关闭回调,关闭时pc自动删除track
    downTrack.OnCloseHandler(func(){
        if sub.pc.ConnectionState()!=webrtc.PeerConnectionStateClosed{
            if err:=sub.pc.RemoveTrack(downTrack.transceiver.Sender());err!=nil{
                if err==webrtc.ErrConnectionClosed{
                    return
                }
                Logger.Error(err,"Error closing down track")
            }else{//如果删除成功,再从sub中删除,然后重协商
                sub.RemoveDownTrack(recv.StreamID(),downTrack)
                sub.negotiate()
            }
        }
    })
 
  //设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发
    downTrack.OnBind(func(){
        gosub.sendStreamDownTracksReports(recv.StreamID())
    })
 
  //增加downTrack到sub中,sub只是用来管理downtracks和生成SenderReport等
    sub.AddDownTrack(recv.StreamID(),downTrack)
 
  //增加downTrack到WebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP中
    recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)
    returndownTrack,nil
}

这样下行track也增加好了,之前的writeRTP可以正常工作了。

4.总结

收发包逻辑打通步骤:

SDK推流---->OnTrack---->router.AddReceiver(设置Buffer和上行Track)------>SessionLocal.Publish(设置下行Track)---->收发包逻辑打通

收发包流程图简单总结:

srtp.write--->buffer.write--->buffer.ReadExtended--->downtrack.writeRTP

收包流程堆栈:

github.com/pion/ion-sfu/pkg/buffer.(*Buffer).Write (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:187)
github.com/pion/srtp/v2.(*ReadStreamSRTP).write (/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/stream_srtp.go:64)
github.com/pion/srtp/v2.(*SessionSRTP).decrypt (/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/session_srtp.go:166)

发包流程堆栈:

github.com/pion/ion-sfu/pkg/buffer.(*Buffer).ReadExtended (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:236)
github.com/pion/ion-sfu/pkg/sfu.(*WebRTCReceiver).writeRTP (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/sfu/receiver.go:345)

作者简介:

王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。

特别说明:

本文发布于知乎,已获得作者授权转载。


扫描图中二维码或点击阅读原文

了解大会更多信息

喜欢我们的内容就点个“在看”吧!

以上是关于大话ion系列的主要内容,如果未能解决你的问题,请参考以下文章

大话ion系列

大话ion系列

大话ion系列

大话设计模式-抽象工厂模式

大话数据结构读书笔记系列线性表

JUC系列01之大话并发