大话ion系列

Posted LiveVideoStack_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大话ion系列相关的知识,希望对你有一定的参考价值。

点击上方“LiveVideoStack”关注我们

作者 | 王朋闯

本文为王朋闯老师创作的系列ion文章,LiveVideoStack已获得授权发布,未来将持续更新。

一、为什么用ion-sfu

1.简介

ion-sfu作为ion分布式架构里的核心模块,SFU是选择转发单元的简称,可以分发WebRTC的媒体流。ion-sfu从pion/ion拆分出来,经过社区打磨,是目前GO方案中最成熟且使用最广的SFU。

https://github.com/pion/ion

已经有多家开始商用了,这点国外公司比较快,比如:100ms、Screenleap和Tandem等。

100ms:https://www.100ms.live/

Screenleap:https://www.screenleap.com/

Tandem:https://tandem.chat/

2.ion-sfu优点

  • 纯GO,开发效率高,且能帮你绕过很多坑

  • 单进程多协程模型:

    - 可以利用多核

    - 大大降低级联/单端口复杂度(其他SFU,可能存在本机不同worker间relay的问题;监听单端口时,存在worker间抢包的问题)

  • 高并发,曾在谷歌云4核压测到单房间50方会议 (大概2500路流-0.5Mbps)

  • 功能全面:

    - 双PeerConnection+多Track设计,有良好的浏览器兼容性,节省系统资源

    - 支持多对多音视频通信

    - 支持大小流Simulcast

    - 支持屏幕分享Screenshare

    - 支持发言方自动检测Audio-Level-Detect

    - 支持定制DataChannel

    - 支持节点间Relay

    - 支持单端口,大大降低部署难度

    - 完善的抗弱网机制,抗丢包40%左右,支持TWCC/REMB+PLI/FIR+Nack+SR/RR等

  • 配套SDK完善,JS/Flutter/GO等


3.使用方式

ion-sfu使用方式有两种:

  • 作为服务使用,比如编译带grpc或jsonrpc信令的ion-sfu,然后再做一个自己的信令服务(推荐ion分布式套装),远程调用即可。

  • 作为包使用,import导入,然后做二次开发。此时抛弃了cmd下边的信令层,只需导入pkg/sfu下边的包即可,然后自行定制信令层,可以在sfu、session、peer层面,通过继承接口定制自己的业务,比较复杂。

import (
      sfu "github.com/pion/ion-sfu/pkg/sfu"
)

二、架构与模块

上面给一个简单架构图,很多细节表示不出来,需要看代码。

1.简介

得益于GO,ion-sfu整体代码精简,拥有极高的开发效率。结合现有SDK使用,可以避免很多坑:ion-sdk-js等。

ion-sfu基于pion/webrtc,所以代码风格偏标准webrtc,比如:PeerConnection。因为是使用了标准API,熟悉了之后很容易看懂其他工程,比如:ion-sdk-go/js/flutter。

这样从前到后,整体门槛都降低了。

2.工程组织

这里给出主要模块列表:

├── Makefile //用来编译二进制和grpc文件
├── bin //编译好的二进制目录
├── cmd
│   └── signal //包含三个主文件 grpc、jsonrpc、allrpc
├── config.toml //配置文件
├── examples //网页示例目录
├── pkg
    ├── buffer //buffer包,用于缓存包
    ├── logger //日志
    ├── middlewares //中间件,主要是支持自定义datachannel
    ├── relay //中继
    ├── sfu //sfu主模块,包含router、session、peer等
    ├── stats //状态统计
    └── twcc //transport-cc


3.信令层

信令代码和主程序在一起,在cmd/signal/下边。

  • 支持jsonrpc,主要处理逻辑在:

func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
  • 支持grpc,主要处理逻辑在:

func (s *SFUServer) Signal(stream pb.SFU_SignalServer) error {

而allrpc,是jsonrpc和grpc的合体封装,运行时会进入上面两个函数。

信令很简单:

  • join:加入一个session。

  • description:发起offer或回复answer,用于协商和重协商。

  • trickle:发送trickle candidate。

另外,出于简单考虑,一些信令和事件,直接走datachannel了,比如:大小流切换、声音检测、自定义信令等。


4.媒体层

媒体层的主要模块:

├── audioobserver.go //声音检测
├── datachannel.go //dc中间件的封装
├── downtrack.go //下行track
├── helpers.go //工具函数集
├── mediaengine.go //SDP相关codec、rtp参数设置
├── peer.go //peer封装,一个peer包含一个publisher和一个subscriber,双pc设计
├── publisher.go //publisher,封装上行pc
├── receiver.go //subscriber,封装下行pc
├── router.go //router,包含pc、session、一组receivers
├── sequencer.go //记录包的信息:序列号sn、偏移、时间戳ts等
├── session.go //会话,包含多个peer、dc
├── sfu.go //分发单元,包含多个session、dc
├── simulcast.go //大小流配置
├── subscriber.go //subscriber,封装下行pc、DownTrack、dc
└── turn.go //内置turn server

相比以前版本,增加了一些interface,主要是为了作为包使用时,封装自己的类。

三、主函数与信令流程

1.主函数

这里拿jsonrpc来分析,其他rpc流程上是一样的。

func main() {
      if !parse() {
            showHelp()
            os.Exit(-1)
      }




      //创建SFU和DC,这里的DC用于Simulcast和AudioLevel
      s := sfu.NewSFU(conf)
      dc := s.NewDatachannel(sfu.APIChannelLabel)
      dc.Use(datachannel.SubscriberAPI)




      //接下来是标准websocket服务器启动的流程
      upgrader := websocket.Upgrader{
            CheckOrigin: func(r *http.Request) bool {
                  return true
            },
            ReadBufferSize:  1024,
            WriteBufferSize: 1024,
      }




      //这里jsonrpc基于websocket,websocket从标准http upgrade过来的
      http.Handle("/ws", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            c, err := upgrader.Upgrade(w, r, nil)
            if err != nil {
                  panic(err)
            }
            defer c.Close()




            //这里创建了JSONSignal,每次真实请求到来时会新建一个Peer,进入Handle函数处理
            p := server.NewJSONSignal(sfu.NewPeer(s), logger)
            defer p.Close()




            jc := jsonrpc2.NewConn(r.Context(), websocketjsonrpc2.NewObjectStream(c), p)
            <-< span="">jc.DisconnectNotify()
      }))




      go startMetrics(metricsAddr)




      var err error
      if key != "" && cert != "" {
            logger.Info("Started listening", "addr", "https://"+addr)
            err = http.ListenAndServeTLS(addr, cert, key, nil)
      } else {
            logger.Info("Started listening", "addr", "http://"+addr)
            err = http.ListenAndServe(addr, nil)
      }
      if err != nil {
            panic(err)
      }
}

2.协商&重协商

协商(negotiate):

WebRTC对外的类是PeerConnection,简称PC,通过信令服务交换SDP给PC进行操作。协商就是指双方通过信令交换SDP,通过PC的一些接口,达到协商双方的媒体格式、传输地址端口等信息,从而实现推流和播放的目的。

一次协商完整流程:

本端CreateOffer-》本端SetLocalDescription(offer)-》本端发送offer-》对端SetRemoteDescription(offer)-》对端CreateAnswer-》SetLocalDescription(answer)-》对端对端返回answer-》本端SetRemoteDescription(answer)

重协商(renegotiate):

就是指再次协商。

为什么要重协商?

因为客户端和服务器的track都是变化的,重协商是通知对端的必要手段,比如:客户端发起屏幕分享,服务器有人进出房间等。

重协商的原则:

谁变化谁发起(offer)。

3.信令流程


首先,客户端ws连接成功:

服务端会建立一个Peer,可以参考上边代码。

然后,客户端发起第一次协商:

客户端pc.CreateOffer(一个只包含dc的offer)-》pc.SetLocalDescription(offer),然后把offer放入Join信令,发送给服务端,然后服务器协商【pc.SetRemoteDescription(offer)-》pc.CreateAnswer-》pc.SetLocalDescription(answer)】,返回answer给客户端,至此完成数据通道(datachannel)建立。首先打通dc,是为了方便audio-level/simulcast通道的建立,此时也可以创建自定dc做定制业务。

接下来,服务端发起第二次协商:

服务端pc.CreateOffer,SetLocalDescription,发送offer,此时offer会携带上此房间内的所有track信息,客户端收到后会CreateAnswer,SetLocalDescription,把answer返回来,然后服务端pc.SetRemoteDescription(answer),此时客户端可以收到服务器此房间内的所有流了。

最后,客户端publish发流时会发起第三次协商:

同第一次流程一样,不同的是同时携带了音视频的track,本次协商完成后,服务器可以收到客户端的流了,收到之后会对同房间内的其他客户端发起重协商。

往后只要客户端或服务器track有变化,都会再次发起重协商。

4.代码分析

JsonRPC所有的信令都会进入Handle函数。为了简化流程,可以暂时不看Trickle和OnIceCandidate函数,这个是开启trickle-ICE时才会有。

// Handle incoming RPC call events like join, answer, offer and trickle
// JSONSignal是继承了LocalPeer,所以会继承一些属性和回调:OnOffer等。
// 可以在浏览器端ws网络工具里查看具体信令内容
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
      replyError := func(err error) {
            _ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
                  Code:    500,
                  Message: fmt.Sprintf("%s", err),
            })
      }




      switch req.Method {
      case "join":// 首先客户端会发join信令过来
            var join Join
            err := json.Unmarshal(*req.Params, &join)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing offer")
                  replyError(err)
                  break
            }




            //设置OnOffer,即SFU发起offer时(重协商),会使用这个回调,比如重协商时,因为有很多客户端peer同时连到SFU,每个Peer的Track增删时,SFU需要向其他Peer重协商来告诉Track的变更
            p.OnOffer = func(offer *webrtc.SessionDescription) {
                  if err := conn.Notify(ctx, "offer", offer); err != nil {
                        p.Logger.Error(err, "error sending offer")
                  }




            }




            //设置OnIceCandidate,即SFU在ICE流程获取到新候选时,会回调这个函数,告诉客户端新增了啥候选
            p.OnIceCandidate = func(candidate *webrtc.ICECandidateInit, target int) {
                  if err := conn.Notify(ctx, "trickle", Trickle{
                        Candidate: *candidate,
                        Target:    target,
                  }); err != nil {
                        p.Logger.Error(err, "error sending ice candidate")
                  }
            }
            //加入某个会话(房间)
            err = p.Join(join.SID, join.UID, join.Config)
            if err != nil {
                  replyError(err)
                  break
            }




            //根据offer回复answer
            answer, err := p.Answer(join.Offer)
            if err != nil {
                  replyError(err)
                  break
            }




            _ = conn.Reply(ctx, req.ID, answer)




      //如果是客户端发offer,回复answer,此时为客户端发起重协商
      case "offer":
            var negotiation Negotiation
            err := json.Unmarshal(*req.Params, &negotiation)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing offer")
                  replyError(err)
                  break
            }




            answer, err := p.Answer(negotiation.Desc)
            if err != nil {
                  replyError(err)
                  break
            }
            _ = conn.Reply(ctx, req.ID, answer)




      //如果是客户端发answer,设置SetRemoteDescription即可
      case "answer":
            var negotiation Negotiation
            err := json.Unmarshal(*req.Params, &negotiation)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing offer")
                  replyError(err)
                  break
            }




            err = p.SetRemoteDescription(negotiation.Desc)
            if err != nil {
                  replyError(err)
            }




      //如果是客户端发送Trickle-ICE的候选过来,设置即可
      case "trickle":
            var trickle Trickle
            err := json.Unmarshal(*req.Params, &trickle)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing candidate")
                  replyError(err)
                  break
            }




            err = p.Trickle(trickle.Candidate, trickle.Target)
            if err != nil {
                  replyError(err)
            }
      }
}

注意OnOffer是服务器重协商的回调,即房间内某客户端track有变化,服务器会回调此函数通知其他客户端。

总结一句话,客户端《---》SFU的核心逻辑就是不断重协商,谁变化谁发起offer


作者简介:

王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。

特别说明:

本文发布于知乎,已获得作者授权转载。


扫描图中二维码或点击阅读原文

了解大会更多信息

喜欢我们的内容就点个“在看”吧!

以上是关于大话ion系列的主要内容,如果未能解决你的问题,请参考以下文章

大话ion系列

大话ion系列

大话ion系列

大话设计模式-抽象工厂模式

大话数据结构读书笔记系列线性表

JUC系列01之大话并发