大话ion系列
Posted LiveVideoStack_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大话ion系列相关的知识,希望对你有一定的参考价值。
点击上方“LiveVideoStack”关注我们
作者 | 王朋闯
本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。
五、offer与answer流程
1.前言
之前的文章已经介绍了前两次重协商:
客户端sdk的pub的dc已经打通,此时使用dc控制simulcast和监听audiolevel speaker。
客户端sdk的sub订阅到了房间内的流。
接下来,SDK推流是第三次协商了。
2.offer流程
当点击ion-sfu的demo里“publish”按钮的时候,就会触发ion-sdk-js的操作:
把音视频track增加到pub的pc,此时会触发onNegotiationNeeded。
首先来看一下ion-sdk-js的代码:
this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);
这里把onNegotiationNeeded绑定到了pc.onnegotiationneeded,意思是当推流增加track到pc时,就会触发onNegotiationNeeded。
参考:
https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/onnegotiationneeded
接下来看一下onNegotiationNeeded,是个标准的重协商流程。
private async onNegotiationNeeded() {
if (!this.transports) {
throw Error(ERR_NO_SESSION);
}
let offer: RTCSessionDescriptionInit | undefined;
let answer: RTCSessionDescriptionInit | undefined;
try {
offer = await this.transports[Role.pub].pc.createOffer();
await this.transports[Role.pub].pc.setLocalDescription(offer);
answer = await this.signal.offer(offer);//在这里发送offer到SFU
await this.transports[Role.pub].pc.setRemoteDescription(answer);
} catch (err) {
/* tslint:disable-next-line:no-console */
console.error(err);
if (this.onerrnegotiate) this.onerrnegotiate(Role.pub, err, offer, answer);
}
}
接下来看一下SFU里的处理:
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
....
case "offer":
var negotiation Negotiation
err := json.Unmarshal(*req.Params, &negotiation)
if err != nil {
p.Logger.Error(err, "connect: error parsing offer")
replyError(err)
break
}
//调用peerLocal.Answer()
answer, err := p.Answer(negotiation.Desc)
if err != nil {
replyError(err)
break
}
// 发送answer
_ = conn.Reply(ctx, req.ID, answer)
func (p *PeerLocal) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
....//这里调用了publisher.Answer()
answer, err := p.publisher.Answer(sdp)
if err != nil {
return nil, fmt.Errorf("error creating answer: %v", err)
}
Logger.V(0).Info("PeerLocal send answer", "peer_id", p.id)
return &answer, nil
}
// 这里可以看到Publisher.Answer就是标准的协商流程
// 在前边的文章详细介绍过什么叫协商和重协商,这里不在重复了
func (p *Publisher) Answer(offer webrtc.SessionDescription) (webrtc.SessionDescription, error) {
if err := p.pc.SetRemoteDescription(offer); err != nil {
return webrtc.SessionDescription{}, err
}
for _, c := range p.candidates {
if err := p.pc.AddICECandidate(c); err != nil {
Logger.Error(err, "Add publisher ice candidate to peer err", "peer_id", p.id)
}
}
p.candidates = nil
answer, err := p.pc.CreateAnswer(nil)
if err != nil {
return webrtc.SessionDescription{}, err
}
if err := p.pc.SetLocalDescription(answer); err != nil {
return webrtc.SessionDescription{}, err
}
return answer, nil
}
3.answer流程
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
....
case "answer":
var negotiation Negotiation
err := json.Unmarshal(*req.Params, &negotiation)
if err != nil {
p.Logger.Error(err, "connect: error parsing offer")
replyError(err)
break
}
// peerLocal.SetRemoteDescription
err = p.SetRemoteDescription(negotiation.Desc)
if err != nil {
replyError(err)
}
func (p *PeerLocal) SetRemoteDescription(sdp webrtc.SessionDescription) error {
if p.subscriber == nil {
return ErrNoTransportEstablished
}
p.Lock()
defer p.Unlock()
Logger.V(0).Info("PeerLocal got answer", "peer_id", p.id)
// 这里调用subscriber.SetRemoteDescription
if err := p.subscriber.SetRemoteDescription(sdp); err != nil {
return fmt.Errorf("setting remote description: %w", err)
}
p.remoteAnswerPending = false
if p.negotiationPending {//这里两个标志位是为了防止重协商竞争冲突
p.negotiationPending = false
p.subscriber.negotiate()
}
return nil
}
//这里仅仅是调用pc.SetRemoteDescription
func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error {
if err := s.pc.SetRemoteDescription(desc); err != nil {
Logger.Error(err, "SetRemoteDescription error")
return err
}
.....
return nil
}
至此,第三次重协商完成,两端交换完sdp,接下来ice打通后会推流过来。
4.总结
pub在第一次协商后,只打通了dc,此时使用dc控制simulcast和监听audiolevel speaker,也可以定制自己的dc。
这样的好处是灵活。
sub在第二次协商后,可以订阅到房间内的其他人的流了。
pub在第三次协商时,是增加音视频track后,然后走标准重协商流程,开始推流。
六、包的收发流程
1.前言
本文从ion-sfu中的demo点击“publish”开始,讲一下如何收包转发。
前边讲到,点击“publish”,会进行第三次重协商,协商完成,客户端此时推流到SFU。
此时会触发OnTrack,这里的OnTrack和标准webrtc接口是一样的,会在流到达时自动触发。
参考:
https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/ontrack
2.收包流程
OnTrack收包流程的入口
首先贴一下老代码,OnTrack是在NewPublisher里边设置的回调。
func NewPublisher(idstring,sessionSession,cfg*WebRTCTransportConfig)(*Publisher,error){
。。。
//这里要注意cfg.Setting,里边的bufferFactory已经设置好了为自定义的c.BufferFactory.GetOrNew
//可以搜一下这个函数NewWebRTCTransportConfig,这一行“se.BufferFactory = c.BufferFactory.GetOrNew”
api:=webrtc.NewAPI(webrtc.WithMediaEngine(me),webrtc.WithSettingEngine(cfg.Setting))
pc,err:=api.NewPeerConnection(cfg.Configuration)
。。。
p:=&Publisher{
id: id,
pc: pc,
cfg: cfg,
router: newRouter(id,session,cfg),
session:session,
}
//===========OnTrack在这里
pc.OnTrack(func(track*webrtc.TrackRemote,receiver*webrtc.RTPReceiver){
。。。
//这里AddReceiver会新建WebRTCReceiver,然后AddUpTrack
//uptrack是收流的,downtrack是发流的
r,pub:=p.router.AddReceiver(receiver,track)
if pub{
//这里会把流发布到房间内,其他peer会订阅到
p.session.Publish(p.router,r)
p.mu.Lock()
publisherTrack:=PublisherTrack{track,r,true}
p.tracks=append(p.tracks,publisherTrack)
。。。
p.mu.Unlock()
if handler,ok:=p.onPublisherTrack.Load().(func(PublisherTrack));ok&&handler!=nil{
//这里如果上层业务,通过OnPublisherTrack设置了回调,就会触发
//一般只有包导入的情况下,才会这样用,比如业务不想加入房间就自动订阅,想要按需订阅
handler(publisherTrack)
}
} else {
p.mu.Lock()
p.tracks=append(p.tracks,PublisherTrack{track,r,false})
p.mu.Unlock()
}
})
//===========
自定义buffer
这里不得不介绍一下自定义buffer了,看懂了才知道包是从哪里来的。
Pion/webrtc支持自定义BufferFactory,设置好之后,pion/webrtc的组件会使用自定义buffer。
比如pion/srtp是实际收发srtp和srtcp包的类,它们也会使用自定义buffer。
首先来看一下ion-sfu是在哪里设置自定义buffer的:
func NewWebRTCTransportConfig(cConfig)WebRTCTransportConfig{
//这个SettingEngine是pion里很重要的设置类,可以控制pion/webrtc很多行为和参数,比如ice-lite等
se:=webrtc.SettingEngine{}
se.DisableMediaEngineCopy(true)
....
//这里把自定义的BufferFactory给配置进去了
//意思是pion/srtp会使用这个buffer来传包
se.BufferFactory=c.BufferFactory.GetOrNew
}
srtp和srtcp流向是这样的:
客户端---srtp--->srtp.ReadStreamSRTP------->SFU
客户端<---srtcp---srtp.ReadStreamSRTCP<------SFU
当包到达pion/srtp时,就会触发ReadStreamSRTP.init函数和ReadStreamSRTCP.init函数。
ReadStreamSRTP.init调用自定义的BufferFactory.GetOrNew函数了,new了一个buffer。
ReadStreamSRTCP.init调用自定义的BufferFactory.GetOrNew函数,new一个rtcpReader。
之后收发rtp和rtcp包,就会流经这个buffer和rtcpReader:
https://github.com/pion/srtp/blob/3c34651fa0c6de900bdc91062e7ccb5992409643/stream_srtp.go#L53
func(r*ReadStreamSRTP)init(childstreamSession,ssrcuint32)error{
sessionSRTP,ok:=child.(*SessionSRTP)
......
if r.session.bufferFactory!=nil{
//这里就是调用自定义的BufferFactory.GetOrNew函数了,new了一个buffer
r.buffer=r.session.bufferFactory(packetio.RTPBufferPacket,ssrc)
} else {
.......
}
return nil
}
//这里就把包写入了自定义buffer
func(r*ReadStreamSRTP)write(buf[]byte)(nint,errerror){
n,err=r.buffer.Write(buf)
if errors.Is(err,packetio.ErrFull){
// Silently drop data when the buffer is full.
return len(buf),nil
}
return n,err
}
为什么这么搞呢?
仔细想想,如果控制了rtp和rtcp的buffer,是不是计算twcc、nack、stats等就很方便了?在buffer写入包的同时,就可以通过设置的回调函数搞各种复杂计算。
router.AddReceiver
接下来可以看到buffer的各种回调。
func(r*router)AddReceiver(receiver*webrtc.RTPReceiver,track*webrtc.TrackRemote)(Receiver,bool){
r.Lock()
deferr.Unlock()
publish:=false
trackID:=track.ID()
//这里获取了之前init函数中,new出来的buffer和rtcpReader,开始搞事情
buff,rtcpReader:=r.bufferFactory.GetBufferPair(uint32(track.SSRC()))
//设置rtcp的回调,比如nack、twcc、rr
buff.OnFeedback(func(fb[]rtcp.Packet){
r.rtcpCh<-fb
})
if track.Kind()==webrtc.RTPCodecTypeAudio{
streamID:=track.StreamID()
//如果是音频track,设置OnAudioLevel回调
buff.OnAudioLevel(func(leveluint8){
r.session.AudioObserver().observe(streamID,level)
})
r.session.AudioObserver().addStream(streamID)
}else if track.Kind()==webrtc.RTPCodecTypeVideo{
if r.twcc==nil{
//如果是视频track,创建twcc计算器,并设置回调,当计算器生成twcc包就会回调
r.twcc=twcc.NewTransportWideCCResponder(uint32(track.SSRC()))
r.twcc.OnFeedback(func(prtcp.RawPacket){
r.rtcpCh<-[]rtcp.Packet{&p}
})
}
//设置buffer的twcc回调,buffer收到包后调用,塞入twcc计算器
//twcc计算生成rtcp包,再回调OnFeedback发送给客户端
buff.OnTransportWideCC(func(snuint16,timeNSint64,markerbool){
r.twcc.Push(sn,timeNS,marker)
})
}
if r.config.WithStats{
r.stats[uint32(track.SSRC())]=stats.NewStream(buff)
}
//设置rtcpReader.OnPacket
rtcpReader.OnPacket(func(bytes[]byte){
//收到SDES、SR包做些处理
})
recv,ok:=r.receivers[trackID]
if!ok{
//创建WebRTCReceiver并设置回调
recv=NewWebRTCReceiver(receiver,track,r.id)
r.receivers[trackID]=recv
recv.SetRTCPCh(r.rtcpCh)
recv.OnCloseHandler(func(){
。。。。
})
publish=true
}
//把track buffer塞入recv
recv.AddUpTrack(track,buff,r.config.Simulcast.BestQualityFirst)
//初始化buff
buff.Bind(receiver.GetParameters(),buffer.Options{
MaxBitRate:r.config.MaxBandwidth,
})
。。。
return recv,publish
}
这里很重要,WebRTCReceiver是真实负责收发包的,可以看到AddUpTrack已经把buffer塞进去了。
接下来看一下AddUpTrack是如何工作的:
func(w*WebRTCReceiver)AddUpTrack(track*webrtc.TrackRemote,buff*buffer.Buffer,bestQualityFirstbool){
if w.closed.get(){
return
}
//根据RID来区分layer
varlayerint
switchtrack.RID(){//如果没开simulcast,为""
casefullResolution:
layer=2
casehalfResolution:
layer=1
default:
layer=0//如果没开simulcast,为0
}
w.Lock()
//设置空域层layer的track
w.upTracks[layer]=track
//设置空域层layer的buff
w.buffers[layer]=buff
w.available[layer].set(true)
//设置空域层layer的downtrack,这里的[]*DownTrack数组,订阅该layer的downtrack存在这里
w.downTracks[layer].Store(make([]*DownTrack,0,10))
w.pendingTracks[layer]=make([]*DownTrack,0,10)
w.Unlock()
//闭包函数,按最佳质量订阅,切到f层
subBestQuality:=func(targetLayerint){
for l:=0;l<targetLayer;l++{
dts:=w.downTracks[l].Load()
if dts==nil{
continue
}
for_,dt:=rangedts.([]*DownTrack){
_=dt.SwitchSpatialLayer(int32(targetLayer),false)
}
}
}
//闭包函数,按最差质量订阅,切到q层
subLowestQuality:=func(targetLayerint){
for l:=2;l!=targetLayer;l--{
dts:=w.downTracks[l].Load()
if dts==nil{
continue
}
for_,dt:=rangedts.([]*DownTrack){
_=dt.SwitchSpatialLayer(int32(targetLayer),false)
}
}
}
//是否开启大小流
if w.isSimulcast{
//如果配置最佳质量,则等到f层到来时,订阅它
if bestQualityFirst&&(!w.available[2].get()||layer==2){
subBestQuality(layer)
//如果配置最差质量,则等到q层到来时,订阅它
}else if!bestQualityFirst&&(!w.available[0].get()||layer==0){
subLowestQuality(layer)
}
}
//启动读写流程
go w.writeRTP(layer)
}
真正的收发包流程来了:
func(w*WebRTCReceiver)writeRTP(layerint){
defer func(){//这里设置自动清理函数
w.closeOnce.Do(func(){
w.closed.set(true)
w.closeTracks()
})
}()
//创建一个PLI包,后边要用
pli:=[]rtcp.Packet{
&rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(),MediaSSRC:w.SSRC(layer)},
}
for {
//这里可以看到,真正读包是从buffer里读出来的,正是前边讲到的自定义buffer
pkt,err:=w.buffers[layer].ReadExtended()
if err==io.EOF{
return
}
//如果开启大小流
if w.isSimulcast{
。。。//这里跳过,以后再讲
}
for_,dt:=rangew.downTracks[layer].Load().([]*DownTrack){
//下行track写入rtp包,这样订阅者可以收到流了
if err=dt.WriteRTP(pkt,layer);err!=nil{
if err==io.EOF&&err==io.ErrClosedPipe{
w.Lock()
w.deleteDownTrack(layer,dt.id)
w.Unlock()
}
log.Error().Err(err).Str("id",dt.id).Msg("Error writing to down track")
}
}
}
}
3.发包流程
SessionLocal.Publish
func(s*SessionLocal)Publish(routerRouter,rReceiver){
for_,p:=ranges.Peers(){
// Don't sub to self
if router.ID()==p.ID()||p.Subscriber()==nil{
continue
}
//表示根据r的信息创建downtrack,并增加到p.Subscriber()和r中
if err:=router.AddDownTracks(p.Subscriber(),r);err!=nil{
Logger.Error(err,"Error subscribing transport to Router")
continue
}
}
}
router.AddDownTracks
func(r*router)AddDownTracks(s*Subscriber,recvReceiver)error{
。。。
//如果recv不为空,表示根据recv的信息创建downtrack,并增加到s和recv中
if recv!=nil{
if_,err:=r.AddDownTrack(s,recv);err!=nil{
return err
}
s.negotiate()
return nil
}
//如果recv为空,表示遍历房间中所有的receivers,并增加到s和recv中
if len(r.receivers)>0{
for_,rcv:=ranger.receivers{
if_,err:=r.AddDownTrack(s,rcv);err!=nil{
return err
}
}
s.negotiate()
}
return nil
}
router.AddDownTrack
根据recv的信息创建downtrack,并增加到sub和recv中。
func(r*router)AddDownTrack(sub*Subscriber,recvReceiver)(*DownTrack,error){
for_,dt:=rangesub.GetDownTracks(recv.StreamID()){//避免重复添加
if dt.ID()==recv.TrackID(){
return dt,nil
}
}
codec:=recv.Codec()
if err:=sub.me.RegisterCodec(codec,recv.Kind());err!=nil{
return nil,err
}
//创建downtrack,downtrack用来给客户端下发流
downTrack,err:=NewDownTrack(webrtc.RTPCodecCapability{
MimeType: codec.MimeType,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.SDPFmtpLine,
RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb",""},{"nack",""},{"nack","pli"}},
},recv,r.bufferFactory,sub.id,r.config.MaxPacketTrack)
if err!=nil{
return nil,err
}
//把downtrack增加到pc中
if downTrack.transceiver,err=sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{
Direction:webrtc.RTPTransceiverDirectionSendonly,
});err!=nil{
return nil,err
}
// 设置关闭回调,关闭时pc自动删除track
downTrack.OnCloseHandler(func(){
if sub.pc.ConnectionState()!=webrtc.PeerConnectionStateClosed{
if err:=sub.pc.RemoveTrack(downTrack.transceiver.Sender());err!=nil{
if err==webrtc.ErrConnectionClosed{
return
}
Logger.Error(err,"Error closing down track")
}else{//如果删除成功,再从sub中删除,然后重协商
sub.RemoveDownTrack(recv.StreamID(),downTrack)
sub.negotiate()
}
}
})
//设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发
downTrack.OnBind(func(){
gosub.sendStreamDownTracksReports(recv.StreamID())
})
//增加downTrack到sub中,sub只是用来管理downtracks和生成SenderReport等
sub.AddDownTrack(recv.StreamID(),downTrack)
//增加downTrack到WebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP中
recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)
returndownTrack,nil
}
这样下行track也增加好了,之前的writeRTP可以正常工作了。
4.总结
收发包逻辑打通步骤:
SDK推流---->OnTrack---->router.AddReceiver(设置Buffer和上行Track)------>SessionLocal.Publish(设置下行Track)---->收发包逻辑打通
收发包流程图简单总结:
srtp.write--->buffer.write--->buffer.ReadExtended--->downtrack.writeRTP
收包流程堆栈:
github.com/pion/ion-sfu/pkg/buffer.(*Buffer).Write (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:187)
github.com/pion/srtp/v2.(*ReadStreamSRTP).write (/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/stream_srtp.go:64)
github.com/pion/srtp/v2.(*SessionSRTP).decrypt (/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/session_srtp.go:166)
发包流程堆栈:
github.com/pion/ion-sfu/pkg/buffer.(*Buffer).ReadExtended (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:236)
github.com/pion/ion-sfu/pkg/sfu.(*WebRTCReceiver).writeRTP (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/sfu/receiver.go:345)
作者简介:
王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。
特别说明:
本文发布于知乎,已获得作者授权转载。
扫描图中二维码或点击阅读原文
了解大会更多信息
喜欢我们的内容就点个“在看”吧!
以上是关于大话ion系列的主要内容,如果未能解决你的问题,请参考以下文章