peer-connection - ICE

Posted bloglearning

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了peer-connection - ICE相关的知识,希望对你有一定的参考价值。

webrtc点对点会话建立过程:https://blog.csdn.net/zhuiyuanqingya/article/details/84108763

本地Candidate收集

技术图片

本地的IceCandidate收集过程起始于PeerConnection::SetLocalDescription,其中会启动收集

  // MaybeStartGathering needs to be called after posting
  // MSG_SET_SESSIONDESCRIPTION_SUCCESS, so that we don't signal any candidates
  // before signaling that SetLocalDescription completed.
  transport_controller_->MaybeStartGathering();

MaybeStartGathering具体实现是:

void JsepTransportController::MaybeStartGathering() {
  if (!network_thread_->IsCurrent()) {
    network_thread_->Invoke<void>(RTC_FROM_HERE,
                                  [&] { MaybeStartGathering(); });
    return;
  }

  for (auto& dtls : GetDtlsTransports()) {
    dtls->ice_transport()->MaybeStartGathering();
  }
}

在这里会在network_thread_启动收集函数,然后对每个DtlsTransports启动收集过程,转到P2PTransportChannel::MaybeStartGathering():

    if (pooled_session) {
      AddAllocatorSession(std::move(pooled_session));
      PortAllocatorSession* raw_pooled_session =
          allocator_sessions_.back().get();
      // Process the pooled session's existing candidates/ports, if they exist.
      OnCandidatesReady(raw_pooled_session,
                        raw_pooled_session->ReadyCandidates());
      for (PortInterface* port : allocator_sessions_.back()->ReadyPorts()) {
        OnPortReady(raw_pooled_session, port);
      }
      if (allocator_sessions_.back()->CandidatesAllocationDone()) {
        OnCandidatesAllocationDone(raw_pooled_session);
      }
    } else {
      AddAllocatorSession(allocator_->CreateSession(
          transport_name(), component(), ice_parameters_.ufrag,
          ice_parameters_.pwd));
      allocator_sessions_.back()->StartGettingPorts();
    }

然后启动port收集BasicPortAllocatorSession::StartGettingPorts()

void BasicPortAllocatorSession::StartGettingPorts() {
  RTC_DCHECK_RUN_ON(network_thread_);
  state_ = SessionState::GATHERING;
  if (!socket_factory_) {
    owned_socket_factory_.reset(
        new rtc::BasicPacketSocketFactory(network_thread_));
    socket_factory_ = owned_socket_factory_.get();
  }

  network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START);

  RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
                   << turn_port_prune_policy_;
}

此时的post消息是放到消息队列中MessageQueue::Post,会在this的BasicPortAllocatorSession::OnMessage中监听消息并进行处理:MSG_CONFIG_START->MSG_CONFIG_READY->MSG_ALLOCATE->MSG_SEQUENCEOBJECTS_CREATED

void BasicPortAllocatorSession::OnMessage(rtc::Message* message) {
  switch (message->message_id) {
    case MSG_CONFIG_START:
      GetPortConfigurations();
      break;
    case MSG_CONFIG_READY:
      OnConfigReady(static_cast<PortConfiguration*>(message->pdata));
      break;
    case MSG_ALLOCATE:
      OnAllocate();
      break;
    case MSG_SEQUENCEOBJECTS_CREATED:
      OnAllocationSequenceObjectsCreated();
      break;
    case MSG_CONFIG_STOP:
      OnConfigStop();
      break;
    default:
      RTC_NOTREACHED();
  }
}

OnAllocate中启动端口分配:遍历所有网络设备(Network 对象),创建 AllocationSequence 对象,调用其 InitStart 函数,分配 port。在init中会创建udp_socket_(CreateUdpSocket),在启用boundle的时候检测StunPorts的时候会和普通UDP端口共用一个socket。

void AllocationSequence::Start() {
  state_ = kRunning;
  session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
  // Take a snapshot of the best IP, so that when DisableEquivalentPhases is
  // called next time, we enable all phases if the best IP has since changed.
  previous_best_ip_ = network_->GetBestIP();
}

发送post消息后会进入AllocationSequence对象本身的OnMessage:

void AllocationSequence::OnMessage(rtc::Message* msg) {
  RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
  RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);

  const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};

  // Perform all of the phases in the current step.
  RTC_LOG(LS_INFO) << network_->ToString()
                   << ": Allocation Phase=" << PHASE_NAMES[phase_];

  switch (phase_) {
    case PHASE_UDP:
      CreateUDPPorts();
      CreateStunPorts();
      break;

    case PHASE_RELAY:
      CreateRelayPorts();
      break;

    case PHASE_TCP:
      CreateTCPPorts();
      state_ = kCompleted;
      break;

    default:
      RTC_NOTREACHED();
  }

  if (state() == kRunning) {
    ++phase_;
    session_->network_thread()->PostDelayed(RTC_FROM_HERE,
                                            session_->allocator()->step_delay(),
                                            this, MSG_ALLOCATION_PHASE);
  } else {
    // If all phases in AllocationSequence are completed, no allocation
    // steps needed further. Canceling  pending signal.
    session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
    SignalPortAllocationComplete(this);
  }
}

AllocationSequence 的 phase_ 成员在对象创建时初始化为 0, 等于 PHASE_UDP ,所以首先会进入 PHASE_UDP 的处理过程,处理完成后会进入下一个处理session_->network_thread()->PostDelayed即PHASE_RELAY。

UDP phase 会收集两种类型的 candidate:host 和 srflx。会先收集host,然后srflx,但是PORTALLOCATOR_ENABLE_SHARED_SOCKET设置为true时会共享一个socket。

void AllocationSequence::CreateUDPPorts() {
  if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) {
    RTC_LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping.";
    return;
  }

  // TODO(mallinath) - Remove UDPPort creating socket after shared socket
  // is enabled completely.
  std::unique_ptr<UDPPort> port;
  bool emit_local_candidate_for_anyaddress =
      !IsFlagSet(PORTALLOCATOR_DISABLE_DEFAULT_LOCAL_CANDIDATE);
  if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) {
    port = UDPPort::Create(
        session_->network_thread(), session_->socket_factory(), network_,
        udp_socket_.get(), session_->username(), session_->password(),
        session_->allocator()->origin(), emit_local_candidate_for_anyaddress,
        session_->allocator()->stun_candidate_keepalive_interval());
  } else {
    port = UDPPort::Create(
        session_->network_thread(), session_->socket_factory(), network_,
        session_->allocator()->min_port(), session_->allocator()->max_port(),
        session_->username(), session_->password(),
        session_->allocator()->origin(), emit_local_candidate_for_anyaddress,
        session_->allocator()->stun_candidate_keepalive_interval());
  }

  if (port) {
    // If shared socket is enabled, STUN candidate will be allocated by the
    // UDPPort.
    if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
      udp_port_ = port.get();
      port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed);

      // If STUN is not disabled, setting stun server address to port.
      if (!IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
        if (config_ && !config_->StunServers().empty()) {
          RTC_LOG(LS_INFO)
              << "AllocationSequence: UDPPort will be handling the "
                 "STUN candidate generation.";
          port->set_server_addresses(config_->StunServers());
        }
      }
    }

    session_->AddAllocatedPort(port.release(), this, true);
  }
}

之后进入BasicPortAllocatorSession::AddAllocatedPort,这个时候会关联信号和槽并将此port存入队列中:

void BasicPortAllocatorSession::AddAllocatedPort(Port* port,
                                                 AllocationSequence* seq,
                                                 bool prepare_address) {
  RTC_DCHECK_RUN_ON(network_thread_);
  if (!port)
    return;

  RTC_LOG(LS_INFO) << "Adding allocated port for " << content_name();
  port->set_content_name(content_name());
  port->set_component(component());
  port->set_generation(generation());
  if (allocator_->proxy().type != rtc::PROXY_NONE)
    port->set_proxy(allocator_->user_agent(), allocator_->proxy());
  port->set_send_retransmit_count_attribute(
      (flags() & PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);
  //将port存入队列
  PortData data(port, seq);
  ports_.push_back(data);
  //关联信号和槽
  port->SignalCandidateReady.connect(
      this, &BasicPortAllocatorSession::OnCandidateReady);
  port->SignalCandidateError.connect(
      this, &BasicPortAllocatorSession::OnCandidateError);
  port->SignalPortComplete.connect(this,
                                   &BasicPortAllocatorSession::OnPortComplete);
  port->SignalDestroyed.connect(this,
                                &BasicPortAllocatorSession::OnPortDestroyed);
  port->SignalPortError.connect(this, &BasicPortAllocatorSession::OnPortError);
  RTC_LOG(LS_INFO) << port->ToString() << ": Added port to allocator";

  if (prepare_address)
    port->PrepareAddress();
}

之后进入UDPPort::PrepareAddress,然后转到UDPPort::OnLocalAddressReady

void UDPPort::OnLocalAddressReady(rtc::AsyncPacketSocket* socket,
                                  const rtc::SocketAddress& address) {
  // When adapter enumeration is disabled and binding to the any address, the
  // default local address will be issued as a candidate instead if
  // |emit_local_for_anyaddress| is true. This is to allow connectivity for
  // applications which absolutely requires a HOST candidate.
  rtc::SocketAddress addr = address;

  // If MaybeSetDefaultLocalAddress fails, we keep the "any" IP so that at
  // least the port is listening.
  MaybeSetDefaultLocalAddress(&addr);

  AddAddress(addr, addr, rtc::SocketAddress(), UDP_PROTOCOL_NAME, "", "",
             LOCAL_PORT_TYPE, ICE_TYPE_PREFERENCE_HOST, 0, "", false);
  MaybePrepareStunCandidate();
}

会调用Port::AddAddress->Port::FinishAddingAddress在这里会触发SignalCandidateReady,转到回调函数

void Port::FinishAddingAddress(const Candidate& c, bool is_final) {
  candidates_.push_back(c);
  SignalCandidateReady(this, c);

  PostAddAddress(is_final);
}
void BasicPortAllocatorSession::OnCandidateReady(Port* port,
                                                 const Candidate& c) {
    ......
// If the current port is not pruned yet, SignalPortReady.
    if (!data->pruned()) {
      RTC_LOG(LS_INFO) << port->ToString() << ": Port ready.";
      SignalPortReady(this, port);
      port->KeepAliveUntilPruned();
    }
  }

  if (data->ready() && CheckCandidateFilter(c)) {
    std::vector<Candidate> candidates;
    candidates.push_back(allocator_->SanitizeCandidate(c));
    SignalCandidatesReady(this, candidates);
  } else {
    RTC_LOG(LS_INFO) << "Discarding candidate because it doesn't match filter.";
  }
........
}

信号和槽关联流转:

BasicPortAllocatorSession::OnCandidateReady
                     ↓SignalCandidatesReady
P2PTransportChannel::OnCandidatesReady
                     ↓SignalCandidateGathered
JsepTransportController::OnTransportCandidateGathered_n
                     ↓SignalIceCandidatesGathered
PeerConnection::OnTransportControllerCandidatesGathered
                     ↓
PeerConnection::OnIceCandidate
SignalPortReady
P2PTransportChannel::OnPortReady
                     ↓
P2PTransportChannel::CreateConnection
                     ↓
P2PTransportChannel::SortConnectionsAndUpdateState
                     ↓
void PeerConnection::OnIceCandidate(
    std::unique_ptr<IceCandidateInterface> candidate) {
  if (IsClosed()) {
    return;
  }
  ReportIceCandidateCollected(candidate->candidate());
  Observer()->OnIceCandidate(candidate.get());//然后客户端就会监听到ICECandidate收集到的消息
}

复用 socket 的情况下, AllocationSequence::CreateStunPorts函数会直接返回,因为早在 AllocationSequence::CreateUDPPorts函数的执行过程中,就已经执行了 STUN Binding request 的发送逻辑。

发送 STUN Binding request:

UDPPort::OnLocalAddressReady(CreateStunPorts)
            ↓
UDPPort::MaybePrepareStunCandidate
            ↓
UDPPort::SendStunBindingRequests
            ↓
UDPPort::SendStunBindingRequest
            ↓
StunRequestManager::Send
            ↓
StunRequestManager::SendDelayed
            ↓MSG_STUN_SEND
StunRequest::OnMessage
            ↓SignalSendPacket
UDPPort::OnSendPacket
            ↓
AsyncUDPSocket::SendTo
            ↓
PhysicalSocket::SendTo
            ↓

收到 STUN Binding response:

PhysicalSocketServer::Wait
            ↓
SocketDispatcher::OnEvent
            ↓
AsyncUDPSocket::OnReadEvent
            ↓SignalReadPacket
AllocationSequence::OnReadPacket
            ↓
UDPPort::HandleIncomingPacket
            ↓
UDPPort::OnReadPacket
            ↓
StunRequestManager::CheckResponse
            ↓
StunBindingRequest::OnResponse
            ↓
UDPPort::OnStunBindingRequestSucceeded
            ↓
Port::AddAddress
            ↓
Port::FinishAddingAddress
            ↓
BasicPortAllocatorSession::OnCandidateReady
            ↓SignalCandidatesReady

StunRequest 类是对 STUN request 的定义和封装,基类里实现了 request 超时管理、重发的逻辑,各种特定类型的逻辑由子类实现,例如 StunBindingRequest 和 TurnAllocateRequest

StunRequestManager 则实现了 response 和 request 匹配的逻辑:manager 按 transaction id => request 的 hash 保存了所有的 request,收到 response 后,根据 transaction id 即可找到对应的 request,进而可以执行 request 对象的回调。

candidate 收集状态:

  • P2PTransportChannel 创建时, gathering_state_
    为 kIceGatheringNew;

  • P2PTransportChannel::MaybeStartGathering
    里开始收集 candidate 时,如果当前未处于 Gathering 状态,则切换到 kIceGatheringGathering 状态;

  • 在收到 OnCandidatesAllocationDone 回调时,切换到kIceGatheringComplete状态;
    • BasicPortAllocatorSession::CandidatesAllocationDone为 true 时,就会触发这个回调;
    • 这意味着创建了 AllocationSequence,且所有的 AllocationSequence 都不处于 kRunning 状态,且所有的 port 都不处于 STATE_INPROGRESS
      状态;
    • AllocationSequence 创建时处于 kInit 状态, AllocationSequence::Start
      里切换为 kRunning 状态,TCP phase 结束后切换为 kCompleted 状态,如果调用了 AllocationSequence::Stop,则会切换到 kStopped 状态;
    • port 创建时就处于 STATE_INPROGRESS状态,当被 prune、发生错误时,分别切换到 STATE_PRUNEDSTATE_ERROR状态,TurnPort 和 TcpPort 收集到 candidate 后调用 Port::AddAddress时,就会切换到 STATE_COMPLETE状态,RelayPort(GTURN)和 UdpPort 也会在收集到 candidate 后切换到 STATE_COMPLETE状态,StunPort 则会在收集完 candidate(即向所有 STUN server 完成了 binding request)之后切换到 STATE_COMPLETE状态.
BasicPortAllocatorSession::
OnAllocationSequenceObjectsCreated也会调用检测是否完成


AllocationSequence::OnMessage                     Port::FinishAddingAddress
            ↓SignalPortAllocationComplete                   ↓
OnPortAllocationComplete                      Port::PostAddAddress(bool is_final)
            ↓                                            ↓SignalPortComplete
            ↓                        BasicPortAllocatorSession::OnPortComplete
            ↓                            ↓
BasicPortAllocatorSession::MaybeSignalCandidatesAllocationDone
            ↓
BasicPortAllocatorSession::CandidatesAllocationDone
            ↓true
SignalCandidatesAllocationDone
            ↓
OnCandidatesAllocationDone

远端Candidates设置

PeerConnection::SetRemoteDescription    UpdateStats(kStatsOutputLevelStandard)
            ↓
PeerConnection::UseCandidatesInSessionDescription
            ↓
PeerConnection::UseCandidate
            ↓SetIceConnectionState(PeerConnectionInterface::kIceConnectionChecking)
JsepTransportController::AddRemoteCandidates
            ↓
JsepTransport::AddRemoteCandidates
            ↓
P2PTransportChannel::AddRemoteCandidate
            ↓
P2PTransportChannel::FinishAddingRemoteCandidate
            ↓
P2PTransportChannel::CreateConnections
            ↓
P2PTransportChannel::CreateConnection
            ↓
UDPPort::CreateConnection
            ↓
Port::AddOrReplaceConnection   SignalConnectionCreated(这个信号未关联,无用)
            ↓
P2PTransportChannel::AddConnection
            ↓
P2PTransportChannel::RememberRemoteCandidate (维护remote_candidates_列表)
P2PTransportChannel::FinishAddingRemoteCandidate
            ↓
P2PTransportChannel::SortConnectionsAndUpdateState
            ↓
P2PTransportChannel::MaybeStartPinging
            ↓
P2PTransportChannel::CheckAndPing    FindNextPingableConnection
            ↓
P2PTransportChannel::PingConnection
            ↓
Connection::Ping
            ↓
StunRequestManager::Send  IceCandidatePairState::IN_PROGRESS
            ↓
StunRequestManager::SendDelayed
            ↓
           ...这个和STUN Binding response类似 
            ↓
StunRequestManager::CheckResponse
            ↓
ConnectionRequest::OnResponse
            ↓
Connection::OnConnectionRequestResponse
            ↓
Connection::ReceivedPingResponse
            ↓
Connection::set_write_state STATE_WRITABLE
            ↓SignalStateChange
P2PTransportChannel::OnConnectionStateChange
            ↓
RequestSortAndStateUpdate
            ↓
P2PTransportChannel::SortConnectionsAndUpdateState

技术图片

P2PTransportChannel::SwitchSelectedConnection
                    ↓ sig slot (SignalReadyToSend)
DtlsTransport::OnReadyToSend
                    ↓ sig slot (SignalReadyToSend)
RtpTransport::OnReadyToSend

AddConnection会关联端口的发送和接收

void P2PTransportChannel::AddConnection(Connection* connection) {
  RTC_DCHECK_RUN_ON(network_thread_);
  connections_.push_back(connection);
  unpinged_connections_.insert(connection);
  connection->set_remote_ice_mode(remote_ice_mode_);
  connection->set_receiving_timeout(config_.receiving_timeout);
  connection->set_unwritable_timeout(config_.ice_unwritable_timeout);
  connection->set_unwritable_min_checks(config_.ice_unwritable_min_checks);
  connection->set_inactive_timeout(config_.ice_inactive_timeout);
  connection->SignalReadPacket.connect(this,
                                       &P2PTransportChannel::OnReadPacket);
  connection->SignalReadyToSend.connect(this,
                                        &P2PTransportChannel::OnReadyToSend);
  connection->SignalStateChange.connect(
      this, &P2PTransportChannel::OnConnectionStateChange);
  connection->SignalDestroyed.connect(
      this, &P2PTransportChannel::OnConnectionDestroyed);
  connection->SignalNominated.connect(this, &P2PTransportChannel::OnNominated);

  had_connection_ = true;

  connection->set_ice_event_log(&ice_event_log_);
  LogCandidatePairConfig(connection,
                         webrtc::IceCandidatePairConfigType::kAdded);
}

以上是关于peer-connection - ICE的主要内容,如果未能解决你的问题,请参考以下文章

Ubuntu16.04上用源代码安装ICE

ICE中间件介绍以及demo

C++使用ICE实现两台主机通信实例

ice03 每晚构建错误 - wix/sln

ZeroC ICE的远程调用框架 AMD

ICE 超过最大连接数崩溃的问题