大话ion系列
Posted LiveVideoStack_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大话ion系列相关的知识,希望对你有一定的参考价值。
点击上方“LiveVideoStack”关注我们
作者 | 王朋闯
本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。
七、Simulcast流程
1. Simulcast概念
先介绍WebRTC的一个概念——Simulcast(联播,俗称大小流):
推流端===f/h/q==>SFU--f--->收流端A
|---q--->收流端B
|---h--->收流端C
上行一般是三路流,按分辨率和码率,一般分为fhq(大中小)三层
下行可以分给不同的用户不同的流,比如网不好时分发个小流q,网变好了再切回大流f
三层的streamId、trackId是一样的,但是rid和ssrc是不同的,rid一般是f、h、q
对应的SDP部分
.........
a=rid:f send
a=rid:h send
a=rid:q send
a=simulcast:send f;h;q
2.收发流程
看本章之前,最好看一下前一章,熟悉一下收发流程,本文只重点介绍其中的Simulcast部分。
收发包逻辑打通步骤:
SDK推流---->OnTrack---->router.AddReceiver(设置Buffer和上行Track)------>SessionLocal.Publish(设置下行Track)---->收发包逻辑打通
3.Simulcast上行流程
非Simulcast情况,OnTrack一般会触发两次:一个audioTrack+一个videoTrack。
Simulcast下,OnTrack一般会触发四次:一个audioTrack+三个videoTrack(rid分别为fhq)。
这个流程会触发四次:
OnTrack--->router.AddReceiver--->WebRTCReceiver.AddUpTrack
三个videoTrack,共用同一个WebRTCReceiver。
type WebRTCReceiver struct
。。。
receiver *webrtc.RTPReceiver
codec webrtc.RTPCodecParameters
rtcpCh chan []rtcp.Packet
buffers [3]*buffer.Buffer//需要三个buffer
upTracks [3]*webrtc.TrackRemote//三个TrackRemote
。。。
pendingTracks [3][]*DownTrack//三个层,每层来订阅的downtrack
。。。
接下来看一下AddUpTrack是如何工作的:
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote,buff *buffer.Buffer, bestQualityFirst bool)
if w.closed.get()
return
//根据RID来区分layer
var layer int
switch track.RID() //如果没开simulcast,为""
case fullResolution:
layer = 2
case halfResolution:
layer = 1
default:
layer = 0//如果没开simulcast,为0
w.Lock()
//设置空域层layer的track
w.upTracks[layer] = track
//设置空域层layer的buff
w.buffers[layer] = buff
w.available[layer].set(true)
//设置空域层layer的downtrack
w.downTracks[layer].Store(make([]*DownTrack,0, 10))
w.pendingTracks[layer] = make([]*DownTrack,0, 10)
w.Unlock()
//闭包函数,按最佳质量订阅,切到f层
subBestQuality := func(targetLayerint)
for l := 0; l <targetLayer; l++
dts :=w.downTracks[l].Load()
if dts == nil
continue
for _, dt :=range dts.([]*DownTrack)
_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
//闭包函数,按最差质量订阅,切到q层
subLowestQuality := func(targetLayerint)
for l := 2; l !=targetLayer; l--
dts :=w.downTracks[l].Load()
if dts == nil
continue
for _, dt :=range dts.([]*DownTrack)
_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
//是否开启大小流
if w.isSimulcast
//如果配置最佳质量,则等到f层到来时,订阅它
if bestQualityFirst &&(!w.available[2].get() || layer == 2)
subBestQuality(layer)
//如果配置最差质量,则等到q层到来时,订阅它
else if!bestQualityFirst && (!w.available[0].get() ||layer == 0)
subLowestQuality(layer)
//启动读写流程
go w.writeRTP(layer)
真正的收发包流程来了:
func (w *WebRTCReceiver) writeRTP(layer int)
defer func() //这里设置自动清理函数
w.closeOnce.Do(func()
w.closed.set(true)
w.closeTracks()
)
()
//创建一个PLI包,后边要用
pli := []rtcp.Packet
&rtcp.PictureLossIndicationSenderSSRC:rand.Uint32(), MediaSSRC: w.SSRC(layer),
for
//这里可以看到,真正读包是从buffer里读出来的,正是前边讲到的自定义buffer
pkt, err :=w.buffers[layer].ReadExtended()
if err ==io.EOF
return
//如果开启大小流
if w.isSimulcast
//一开始是pending状态
ifw.pending[layer].get()
//如果收到的包是关键帧
ifpkt.KeyFrame
w.Lock()
//如果有切换中的layer,那就切一下
for idx,dt := range w.pendingTracks[layer]
w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
w.storeDownTrack(layer, dt)
dt.SwitchSpatialLayerDone(int32(layer))
w.pendingTracks[layer][idx] = nil
w.pendingTracks[layer] = w.pendingTracks[layer][:0]
w.pending[layer].set(false)
w.Unlock()
else
//如果是非关键字,说明需要发送PLI
w.SendRTCP(pli)
//这里是不是有疑问,[]*downTracks是SessionLocal.Publish里塞过来的,后边会介绍:)
for _, dt := rangew.downTracks[layer].Load().([]*DownTrack)
//下行track写入rtp包
if err = dt.WriteRTP(pkt, layer);err != nil
if err ==io.EOF && err == io.ErrClosedPipe
w.Lock()
w.deleteDownTrack(layer, dt.id)
w.Unlock()
log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
至此一个简单的Simulcast收发模型:
SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
| |....
| |--->downTracks[0][N].WriteRTP
|
|---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
| |....
| |---->downTracks[0][N].WriteRTP
|
|------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
| |....
| |----->downTracks[1][N].WriteRTP
|
|------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
|....
|------>downTracks[2][N].WriteRTP
上面省略了SDK--->ReadStreamSRTP.buffer.Write,这个buffer和WebRTCReceiver.buffer是同一个。
订阅端SDK的切大小流操作,其实就是在0-2来回挂载downTrack而已。
4.Simulcast下行流程
读者前边的疑问,downTracks是哪里塞过来的?流程在这里:
OnTrack--->SessionLocal.Publish--->router.AddDownTracks--->router.AddDownTrack--->WebRTCReceiver.AddDownTrack--->WebRTCReceiver.storeDownTrack
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver)
//Simulcast一般会触发OnTrack四次,一个audio,三个video
//由于三个video的trackId一样,共用一个WebRTCReceiver
r, pub := p.router.AddReceiver(receiver,track)
if pub //这里video到来的第一次pub才为true
//这里把receiver发布到router里,其他peer的downtrack会挂载到receiver下
p.session.Publish(p.router, r)
这里为了方便,再贴一下整个流程的代码,比较繁琐,可以跳过。
SessionLocal.Publish
func (s *SessionLocal) Publish(router Router,r Receiver)
for _, p := ranges.Peers()
// Don't sub toself
if router.ID() == p.ID() || p.Subscriber() == nil
continue
//表示根据r的信息创建downtrack,并增加到p.Subscriber()和r中
if err :=router.AddDownTracks(p.Subscriber(), r); err !=nil
Logger.Error(err, "Errorsubscribing transport to Router")
continue
router.AddDownTracks
func (r *router) AddDownTracks(s *Subscriber,recv Receiver) error
。。。
//如果recv不为空,表示根据recv的信息创建downtrack,并增加到s和recv中
if recv != nil
if _, err :=r.AddDownTrack(s, recv); err != nil
return err
s.negotiate()
return nil
//如果recv为空,表示遍历房间中所有的receivers,并增加到s和recv中
if len(r.receivers)> 0
for _, rcv := ranger.receivers
if _, err :=r.AddDownTrack(s, rcv); err != nil
return err
s.negotiate()
return nil
router.AddDownTrack
根据recv的信息创建downtrack,并增加到sub和recv中。
func (r *router) AddDownTrack(sub *Subscriber,recv Receiver) (*DownTrack, error)
for _, dt := rangesub.GetDownTracks(recv.StreamID()) //避免重复添加
if dt.ID() ==recv.TrackID()
return dt, nil
codec := recv.Codec()
if err := sub.me.RegisterCodec(codec, recv.Kind()); err !=nil
return nil,err
//创建downtrack,downtrack用来给客户端下发流
downTrack, err := NewDownTrack(webrtc.RTPCodecCapability
MimeType: codec.MimeType,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.SDPFmtpLine,
RTCPFeedback:[]webrtc.RTCPFeedback"goog-remb", "", "nack", "", "nack", "pli",
, recv, r.bufferFactory,sub.id, r.config.MaxPacketTrack)
if err != nil
return nil,err
//把downtrack增加到pc中
if downTrack.transceiver,err = sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit
Direction:webrtc.RTPTransceiverDirectionSendonly,
); err != nil
return nil,err
// 设置关闭回调,关闭时pc自动删除track
downTrack.OnCloseHandler(func()
if sub.pc.ConnectionState() !=webrtc.PeerConnectionStateClosed
if err :=sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err !=nil
if err ==webrtc.ErrConnectionClosed
return
Logger.Error(err, "Errorclosing down track")
else //如果删除成功,再从sub中删除,然后重协商
sub.RemoveDownTrack(recv.StreamID(), downTrack)
sub.negotiate()
)
//设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发
downTrack.OnBind(func()
go sub.sendStreamDownTracksReports(recv.StreamID())
)
//增加downTrack到sub中,sub只是用来管理downtracks和生成SenderReport等
sub.AddDownTrack(recv.StreamID(), downTrack)
//增加downTrack到WebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP中
recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)
return downTrack, nil
5.Simulcast切换流程
第一种,自动切换。
上边的subBestQuality,会在f层receiver到来时,自动订阅f层。
第二种,手动切换。
通过信令或datachannel控制来切换。
先来讲一下datachannel信令通道,在main里创建了一个内置dc,处理函数为datachannel.SubscriberAPI。
func main()
nsfu := sfu.NewSFU(conf.Config)
dc :=nsfu.NewDatachannel(sfu.APIChannelLabel)
dc.Use(datachannel.SubscriberAPI)
s :=server.NewWrapperedGRPCWebServer(options, nsfu)
if err := s.Serve(); err != nil
logger.Error(err,"failed to serve")
os.Exit(1)
select
客户端发过来的切大小流指令会进入此函数。
funcSubscriberAPI(nextsfu.MessageProcessor) sfu.MessageProcessor
return sfu.ProcessFunc(func(ctxcontext.Context, args sfu.ProcessArgs)
srm := &setRemoteMedia
if err :=json.Unmarshal(args.Message.Data, srm); err != nil
return
// Publisherchanging active layers
if srm.Layers !=nil && len(srm.Layers) > 0
。。。//当前sdk逻辑不会进入这里
else
//按流ID查找downTracks
downTracks :=args.Peer.Subscriber().GetDownTracks(srm.StreamID)
for _, dt :=range downTracks
switch dt.Kind()
casewebrtc.RTPCodecTypeAudio:
dt.Mute(!srm.Audio)//音频是否需要mute/unmute
casewebrtc.RTPCodecTypeVideo:
switchsrm.Video //视频是否需要切大小流/mute
casehighValue:
//这里把d.reSync.set设置为true了,writeSimulcastRTP里会自动发PLI
dt.Mute(false)
dt.SwitchSpatialLayer(2, true)
casemediumValue:
dt.Mute(false)
dt.SwitchSpatialLayer(1, true)
caselowValue:
dt.Mute(false)
dt.SwitchSpatialLayer(0, true)
casemutedValue:
dt.Mute(true)
switchsrm.Framerate //当前sdk逻辑也不会进入这里,srm.Framerate=""
next.Process(ctx, args)
)
DownTrack.SwitchSpatialLayer
func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error
if d.trackType ==SimulcastDownTrack
// Don't switchuntil previous switch is done or canceled
csl := atomic.LoadInt32(&d.currentSpatialLayer)
//如果当前运行layer不是正在切的layer,或当前layer是要切的
//换句话说,如果当前layer没切完成,或者当前layer和要切的一样,那就返回错误
if csl !=atomic.LoadInt32(&d.targetSpatialLayer) || csl ==targetLayer
returnErrSpatialLayerBusy
//切换layer
if err :=d.receiver.SwitchDownTrack(d, int(targetLayer));err == nil
atomic.StoreInt32(&d.targetSpatialLayer,targetLayer)
if setAsMax
atomic.StoreInt32(&d.maxSpatialLayer,targetLayer)
return nil
returnErrSpatialNotSupported
WebRTCReceiver.SwitchDownTrack
func (w *WebRTCReceiver) SwitchDownTrack(track *DownTrack,layer int) error
if w.closed.get()
returnerrNoReceiverFound
//切换就是把track放入pending
if w.available[layer].get()
w.Lock()
w.pending[layer].set(true)
w.pendingTracks[layer] = append(w.pendingTracks[layer],track)
w.Unlock()
return nil
return errNoReceiverFound
然后在writeRTP里切换:
func (w *WebRTCReceiver) writeRTP(layer int)
....
for
pkt, err :=w.buffers[layer].ReadExtended()
if err ==io.EOF
return
//如果是大小流
if w.isSimulcast
//如果正在切换,pending[layer]get()为true
ifw.pending[layer].get()
// 如果是关键帧,才会切换,好在前边Mute流程里发送了PLI,这里应该很快来一个关键帧
ifpkt.KeyFrame
w.Lock()
//=========这里切换
for idx, dt:= range w.pendingTracks[layer]
//删除原来的
w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
//存储新的dt,以后writeRTP会写入新的dt
w.storeDownTrack(layer, dt)
//设置切换完成
dt.SwitchSpatialLayerDone(int32(layer))
//pending中此dt置空
w.pendingTracks[layer][idx] = nil
//清空pendingTracks此layer
w.pendingTracks[layer] = w.pendingTracks[layer][:0]
//标志位置为false
w.pending[layer].set(false)
w.Unlock()
else
// 如果不是关键帧,再次发送PLI
w.SendRTCP(pli)
for _, dt := rangew.downTracks[layer].Load().([]*DownTrack)
if err = dt.WriteRTP(pkt, layer);err != nil
if err ==io.EOF && err == io.ErrClosedPipe
w.Lock()
w.deleteDownTrack(layer, dt.id)
w.Unlock()
log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
6. 总结
Simulcast在ion-sfu中,默认是通过datachannel来操作切换的。
首先,切换是操作pendingTracks:
SubscriberAPI---》dt.SwitchSpatialLayer-->WebRTCReceiver.SwitchDownTrack--->写入pendingTracks
然后,在WebRTCReceiver.writeRTP里进行实质切换:
WebRTCReceiver.writeRTP--->读取pendingTracks---》更换downTracks--》storeDownTrack--》OK
之后,写包就会写入新track。至此一个简单的Simulcast收发模型就建成了:
SDK---SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
| |....
| |--->downTracks[0][N].WriteRTP
|
|---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
| |....
| |---->downTracks[0][N].WriteRTP
|
|------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
| |....
| |----->downTracks[1][N].WriteRTP
|
|------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
|....
|------>downTracks[2][N].WriteRTP
作者简介:
王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。
特别说明:
本文发布于知乎,已获得作者授权转载。
讲师招募
LiveVideoStackCon 2022 音视频技术大会 上海站,正在面向社会公开招募讲师,无论你所处的公司大小,title高低,老鸟还是菜鸟,只要你的内容对技术人有帮助,其他都是次要的。欢迎通过 speaker@livevideostack.com 提交个人资料及议题描述,我们将会在24小时内给予反馈。
喜欢我们的内容就点个“在看”吧!
以上是关于大话ion系列的主要内容,如果未能解决你的问题,请参考以下文章