SRS之SrsRtmpConn::service_cycle详解

Posted jimodetiantang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SRS之SrsRtmpConn::service_cycle详解相关的知识,希望对你有一定的参考价值。

1. SrsRtmpConn::service_cycle

当服务器在 conn 线程的开始调用 connect_app 函数接收并解析客户端发送的 connect 消息后,调用该 service_cycle 函数开始服务客户端的具体请求。

/**
 * when valid and connected to vhost/app, service the client.
 */
int SrsRtmpConn::service_cycle()
{
    int ret = ERROR_SUCCESS;
    
    /* 首先向对方发送应答窗口大小(5)的消息,如下图1 */
    if ((ret = rtmp->set_window_ack_size((int)(2.5 * 1000 * 1000))) != ERROR_SUCCESS) {
        srs_error("set window acknowledgement size failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("set window acknowledgement size success");
    
    /* 接着发送 设置流带宽(6)消息 */
    if ((ret = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != ERROR_SUCCESS) {
        srs_error("set peer bandwidth failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("set perr bandwidth success");
    
    // get the ip which client connected.
    std::string local_ip = srs_get_local_ip(st_netfd_fileno(stfd));
    
    // do bandwidth test if connect to the vhost which is for bandwidth check.
    if (_srs_config->get_bw_check_enabled(req->vhost)) {
        return bandwidth->bandwidth_check(rtmp, skt, req, local_ip);
    }
    
    // do token traverse before serve it.
    // @see https://github.com/ossrs/srs/pull/239
    if (true) {
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);
        if (vhost_is_edge && edge_traverse) {
            if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {
                srs_warn("token auth failed, ret=%d", ret);
                return ret;
            }
        }
    }
    
    // set chunk size to larger.
    // set the chunk size before any larger response greater than 128,
    // to make OBS happy, @see https://github.com/ossrs/srs/issues/454
    int chunk_size = _srs_config->get_chunk_size(req->vhost);
    if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) {
        srs_error("set chunk_size=%d failed. ret=%d", chunk_size, ret);
        return ret;
    }
    srs_info("set chunk_size=%d success", chunk_size);
    
    // response the client connect ok.
    if ((ret = rtmp->response_connect_app(req, local_ip.c_str())) != ERROR_SUCCESS) {
        srs_error("response connect app failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("response connect app success");
    
    if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) {
        srs_error("on_bw_done failed. ret=%d", ret);
        return ret;
    }
    srs_verbose("on_bw_done success");
    
    while (!disposed) {
        ret = stream_service_cycle();
        
        // stream service must terminated with error, never success.
        // when terminated with success, it‘s user required to stop.
        if (ret = ERROR_SUCCESS) {
            continue;
        }
        
        // when not system control error, fatal error, return.
        if (!srs_is_system_control_error(ret)) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("stream service cycle failed. ret=%d", ret);
            }
            return ret;
        }
        
        // for republish, continue service
        if (ret == ERROR_CONTROL_REPUBLISH) {
            // set timeout to a large value, wait for encoder to republish.
            rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);
            rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);
            
            srs_trace("control message(unpublish) accept, retry stream service.");
            continue;
        }
        
        // for "some" system control error,
        // logical accept and retry stream service.
        if (ret == ERROR_CONTROL_RTMP_CLOSE) {
            // TODO: FIXME: use ping message to anti-death of socket.
            // @see: https://github.com/ossrs/srs/issues/39
            // set timeout to a larger value, for user paused.
            rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);
            rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);
            
            srs_trace("control message(close) accept, retry stream service.");
            continue;
        }
        
        // for other system control message, fatal error.
        srs_error("control message(%d) reject as error. ret=%d", ret, ret);
        return ret;
    }
    
    return ret;
}

1.1 SrsRtmpServer::set_window_ack_size

int SrsRtmpServer::set_window_ack_size(int ack_size) 
{
    int ret = ERROR_SUCCESS;
    
    /* 构建一个应答窗口大小的 packet */
    SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket();
    /* 设置应答窗口大小 */
    pkt->ackowledgement_window_size = ack_size;
    /* 向对方发送应答窗口大小消息,并释放该消息 */
    if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
        srs_error("send ack size message failed. ret=%d", ret);
        return ret;
    }
    srs_info("send ack size message success. ack_size=%d", ack_size);
    
    return ret;
}

send: 应答窗口大小(5) 图 1

技术分享图片

1.1.1 构造 SrsSetWindowAckSizePacket

SrsSetWindowAckSizePacket 类用于构建应答窗口大小(Window Acknowledgement Size:5)的 packet。客户端和服务器发送这个消息来通知对方当接收到应答窗口大小的数据时需要向对方发送应答消息。

/**
 * Window Acknowledgement Size (5)
 * The client or the server sends this message to inform the peer which
 * window size to use when sending acknowledgment.
 */
SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket()
{
    /* 应答窗口的大小 */
    ackowledgement_window_size = 0;
}

1.1.2 SrsProtocol::send_and_free_packet

/**
 * send the RTMP packet and always free it.
 * user must never free or use the packet after this method,
 * for it will always free the packet.
 * @param packet, the packet to send out, never be NULL.
 * @param stream_id, the stream id of packet to send over, 0 for control message.
 */
int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id) 
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = do_send_and_free_packet(packet, stream_id)) != ERROR_SUCCESS) {
        return ret;
    }
    
    // flush messages in manual queue
    if ((ret = manual_response_flush()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

1.1.3 SrsProtocol::do_send_and_free_packet

int SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id) 
{
    int ret = ERROR_SUCCESS;
    
    srs_assert(packet);
    SrsAutoFree(SrsPacket, packet);
    
    int size = 0;
    char* payload = NULL;
    /* 调用父类 SrsPacket 的 encode 函数,构建消息的负载 */
    if ((ret = packet->encode(size, payload)) != ERROR_SUCCESS) {
        srs_error("encode RTMP packet to bytes oriented RTMP message failed. ret=%d", ret);
        return ret;
    }
    
    // encoder packet to payload and size.
    if (size <= 0 || payload == NULL) {
        srs_warn("packet is empty, ignore empty message.");
        return ret;
    }
    
    /* 初始化消息头信息 */
    // to message
    SrsMessageHeader header;
    header.payload_length = size;
    header.message_type = packet->get_message_type();
    header.stream_id = stream_id;
    header.perfer_cid = packet->get_prefer_cid();
    
    /* 在该函数中调用主要调用 writev 将数据发送给对方 */
    ret = do_simple_send(&header, payload, size);
    srs_freepa(payload);
    if (ret == ERROR_SUCCESS) {
        ret = on_send_packet(&header, packet);
    }
    
    return ret;
}

1.1.4 SrsPacket::encode

/**
 * the subpacket can override this encode,
 * for example, video and audio will directly set the payload without memory copy,
 * other packet which need to serialize/encode to bytes by override the 
 * get_size and encode_packet.
 */
int SrsPacket::encode(int& psize, char*& ppayload)
{
    int ret = ERROR_SUCCESS;
    
    int size = get_size();
    char* payload = NULL;
    
    SrsStream stream;
    
    if (size > 0) {
        payload = new char[size];
        
        if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) {
            srs_error("initialize the stream failed. ret=%d", ret);
            srs_freepa(payload);
            return ret;
        }
    }
    
    /* 由前面 1.1 知,这里应调用的是 SrsSetWindowAckSizePacket 的 encode_packet */
    if ((ret = encode_packet(&stream)) != ERROR_SUCCESS) {
        srs_error("encode the packet failed. ret=%d", ret);
        srs_freepa(payload);
        return ret;
    }
    
    psize = size;
    ppayload = payload;
    srs_verbose("encode the packet success. size=%d", size);
    
    return ret;
}

1.1.5 SrsSetWindowAckSizePacket::encode_packet

该函数是构建应答窗口大小的 packet,负载内容仅为 4 字节的应答窗口大小值。

int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream)
{
    int ret = ERROR_SUCCESS;
    
    if (!stream->require(4)) {
        ret = ERROR_RTMP_MESSAGE_ENCODE;
        srs_error("encode ack size packet failed. ret=%d", ret);
        return ret;
    }
    
    stream->write_4bytes(ackowledgement_window_size);
    
    srs_verbose("encode ack size packet "
        "success. ack_size=%d", ackowledgement_window_size);
    
    return ret;
}

1.1.6 SrsProtocol::do_simple_send

/**
 * max rtmp header size;
 *     1byte basic header,
 *     11bytes message header,
 *     4bytes timestamp header,
 * that is, 1 + 11 + 4 = 16bytes
 */
#define SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE 16

int SrsProtocol::do_simple_send(SrsMessageHeader* mh, char* payload, int size)
{
    int ret = ERROR_SUCCESS;
    
    // we directly send out the packet,
    // use very simple algorithm, not very fast,
    // but it‘s ok.
    char* p = payload;
    char* end = p + size;
    char c0c3[SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE];
    while (p < end) {
        int nbh = 0;
        if (p == payload) {
            nbh = srs_chunk_header_c0(
                mh->perfer_cid, mh->timestamp, mh->payload_length, 
                mh->message_type, mh->stream_id,
                c0c3, sizeof(c0c3));
        } else {
            nbh = srs_chunk_header_c3(
                mh->perfer_cid, mh->timestamp,
                c0c3, sizeof(c0c3));
        }
        srs_assert(nbh > 0);
        
        iovec iovs[2];
        iovs[0].iov_base = c0c3;
        iovs[0].iov_len = nbh;
        
        int payload_size = srs_min(end - p, out_chunk_size);
        iovs[1].iov_base = p;
        iovs[1].iov_len = payload_size;
        p += payload_size;
        
        /* 这里调用 SrsStSocket::writev 函数将数据发送出去 */
        if ((ret = skt->writev(iovs, 2, NULL)) != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("send packet with writev failed. ret=%d", ret);
            }
            return ret;
        }
    }
    
    return ret;
}

1.1.7 srs_chunk_header_c0

位于 srs_kernel_utility.cpp。

/**
 * generate the c0 chunk header for msg.
 * @param cache, the cache to write header.
 * @param nb_cache, the size of cache.
 * @return the size of header. 0 if cache not enough.
 */
int srs_chunk_header_c0(
    int perfer_cid, u_int32_t timestamp, int32_t payload_length, 
    int8_t message_type, int32_t stream_id,
    char* cache, int nb_cache)
{
    // to directly set the field.
    char* pp = NULL;
    
    // generate the header.
    char* p = cache;
    
    // no header.
    if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
        return 0;
    }
    
    // write new chunk stream header, fmt is 0
    *p++ = 0x00 | (perfer_cid & 0x3F);
    
    // chunk message header, 11 bytes
    // timestamp, 3 bytes, bit-endian
    if (timestamp < RTMP_EXTENDED_TIMESTAMP) {
        pp = (char*)&timestamp;
        *p++ = pp[2];
        *p++ = pp[1];
        *p++ = pp[0];
    } else {
        *p++ = 0xFF;
        *p++ = 0xFF;
        *p++ = 0xFF;
    }
    
    // message_length, 3bytes, big-endian
    pp = (char*)&payload_length;
    *p++ = pp[2];
    *p++ = pp[1];
    *p++ = pp[0];
    
    // message_type, 1bytes
    *p++ = message_type;
    
    // stream_id, 4bytes, little-endian
    pp = (char*)&stream_id;
    *p++ = pp[0];
    *p++ = pp[1];
    *p++ = pp[2];
    *p++ = pp[3];
    
    /* for c0
     * chunk extended timestamp header, 0 or 4 bytes, big-endian
     *
     * for c3:
     * chunk extended timestamp header, 0 or 4 bytes, big-endian
     * 
     * Extended Timestamp
     * This field is transmitted only when the normal time stamp in the
     * chunk message header is set to 0x00ffffff. If normal time stamp is
     * set to any value less than 0x00ffffff, this field MUST NOT be
     * present. This field MUST NOT be present if the timestamp field is not
     * present. Type 3 chunk MUST NOT have this field.
     * adobe changed for Type3 chunk:
     *        FMLE always sendout the extended-timestamp,
     *        must send the extended-timestamp to FMS,
     *        must send the extended-timestamp to flash-player.
     * @see: ngx_rtmp_prepare_message
     * @see: http://blog.csdn.net/win_lin/article/details/13363699
     * TODO: FIXME: extract to outer.
     */
    if (timestamp >= RTMP_EXTENDED_TIMESTAMP) {
        pp = (char*)&timestamp;
        *p++ = pp[3];
        *p++ = pp[2];
        *p++ = pp[1];
        *p++ = pp[0];
    }
    
    // always has header
    return p - cache;
}

1.1.8 SrsStSocket::writev

位于 srs_app_st.cpp。

int SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{
    int ret = ERROR_SUCCESS;
    
    ssize_t nb_write = st_writev(stfd, iov, iov_size, send_timeout);
    if (nwrite) {
        *nwrite = nb_write;
    }
    
    // On success a non-negative integer equal to nbyte is returned.
    // Otherwise, a value of -1 is returned and errno is set to indicate the error.
    if (nb_write <= 0) {
        // @see https://github.com/ossrs/srs/issues/200
        if (nb_write < 0 && errno == ETIME) {
            return ERROR_SOCKET_TIMEOUT;
        }
        
        return ERROR_SOCKET_WRITE;
    }
    
    /* 更新 发送字节数计数值 */
    send_bytes += nb_write;
    
    return ret;
}

1.1.9 st_writev

ssize_t st_writev(_st_netfd_t *fd, const struct iovec *iov, int iov_sizek, 
    st_utime_t timeout)
{
    ssize_t n, rv;
    size_t nleft, nbyte;
    int index, iov_cnt;
    struct iovec *tmp_iov;
    struct iovec local_iov[_LOCAL_MAXIOV];
    
    /* Calculate the total number of bytes to be sent */
    nbyte = 0;
    for (index = 0; index < iov_size; index++)
        nbyte += iov[index].iov_len;
    
    rv = (ssize_t)nbyte;
    nleft = nbyte;
    tmp_iov = (struct iovec *) iov; /* we promise not to modify iov */
    iov_cnt = iov_size;
    
    while (nleft > 0) {
        /* 若只有一个数据块要发送,则直接调用 st_write 发送 */
        if (iov_cnt == 1) {
            if (st_write(fd, tmp_iov[0].iov_base, nleft, timeout) != (ssize_t) nleft)
                rv = -1;
            break;
        }
        /* 否则,有多个数据块要发送,则调用 writev */
        if ((n = writev(fd->osfd, tmp_iov, iov_cnt)) < 0) {
            if (errno == EINTR)
                continue;
            if (!_IO_NOT_READY_ERROR) {
                rv = -1;
                break;
            }
            
        /* writev 发送成功 */
        } else {
            /* 若全部数据都发送出去了,则直接退出该 while 循环 */
            if ((size_t) n == nleft)
                break;
            /* 否则,还有数据未发送,则更新剩下未发送的字节数 */
            nleft -= n;
            /* 找到未发送的块 */
            /* Find the next unwritten vector */
            n = (ssize_t)(nbyte - nleft);
            for (index = 0; (size_t) n >= iov[index].iov_len; index++)
                n -= iov[index].iov_len;
            
            if (tmp_iov == iov) {
                /* Must copy iov‘s around */
                /* 判断所需的 iovec 块是否大于 local_iov 数组,若没有,则直接使用该
                 * 数组 */
                if (iov_size - index <= _LOCAL_MAXIOV) {
                    tmp_iov = local_iov;
                } else {
                    /* 否则,需要分配更多的内存 */
                    tmp_iov = calloc(1, (iov_size - index) * sizeof(struct iovec));
                    if (tmp_iov == NULL)
                        return -1;
                }
            }
            
            /* 将未发送的 iov[index] 拷贝到 tmp_iov 中 */
            /* Fill in the first partial read */
            tmp_iov[0].iov_base = &(((char *)iov[index].iov_base)[n]);
            tmp_iov[0].iov_len = iov[index].iov_len - n;
            index++;
            /* Copy the remaining vectors */
            for (iov_cnt = 1; index < iov_size; iov_cnt++, index++) {
                tmp_iov[iov_cnt].iov_base = iov[index].iov_base;
                tmp_iov[iov_cnt].iov_len = iov[index].iov_len;
            }
        }
        /* Wait until the socket becomes writable */
        if (st_netfd_poll(fd, POLLOUT, timeout) < 0) {
            rv = -1;
            break;
        }
    }
    
    if (tmp_iov != iov && tmp_iov != local_iov)
        free(tmp_iov);
    
    return rv;
}

1.1.10 SrsProtocol::on_send_packet

该函数是当消息发送完成后,用于更新上下文。如向对方发送应答窗口大小的消息成功后,同时在该函数中用相同值更新自己的应答窗口大小。

/**
 * when message sentout, update the context.
 */
int SrsProtocol::on_send_packet(SrsMessageHeader* mh, SrsPacket* packet)
{
    int ret = ERROR_SUCCESS;
    
    // ignore raw bytes oriented RTMP message.
    if (packet == NULL) {
        return ret;
    }
    
    switch (mh->message_type) {
        case RTMP_MSG_SetChunkSize: {
            SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(packet);
            out_chunk_size = pkt->chunk_size;
            srs_info("out.chunk=%d", pkt->chunk_size);
            break;
        }
        case RTMP_MSG_WindowAcknowledgementSize: {
            SrsSetWindowAckSizePacket* pkt = 
                dynamic_cast<SrsSetWindowAckSizePacket*>(packet);
            /* 发送应答窗口大小消息给对方后,同时设置己方的应答窗口大小 */
            out_ack_size.window = (uint32_t)pkt->ackowledgement_window_size;
            break;
        }
        case RTMP_MSG_AMF0CommandMessage:
        case RTMP_MSG_AMF3CommandMessage: {
            if (true) {
                SrsConnectAppPacket *pkt = dynamic_cast<SrsConnectAppPacket*>(packet);
                if (pkt) {
                    requests[pkt->transaction_id] = pkt->commnad_name;
                    break;
                }
            }
            if (true) {
                SrsCreateStreamPacket* pkt = dynamic_cast<SrsCreateStreamPacket*>(packet);
                if (pkt) {
                    requests[pkt->transaction_id] = pkt->command_name;
                    break;
                }
            }
            if (true) {
                SrsFMLEStartPacket* pkt = dynamic_cast<SrsFMLEStartPacket*>(packet);
                if (pkt) {
                    requests[pkt->transaction_id] = pkt->command_name;
                    break;
                }
            }
            break;
        }
        case RTMP_MSG_VideoMessage:
        case RTMP_MSG_AudioMessage:
            print_debug_info();
        default:
            break;
    }
    
    return ret;
}

1.1.11 SrsProtocol::manual_response_flush

通常不需要调用该函数,因为默认情况下是开启自动响应的,即 auto_response_when_recv 为 true.

/**
 * flush for manual response when auto response is disable
 * by set_auto_response(false), we default use auto response, so donot
 * need to call this api(the protocol sdk will auto send message).
 * @see the auto_response_when_recv and manual_response_queue.
 */
int SrsProtocol::manual_response_flush()
{
    int ret = ERROR_SUCCESS;
    
    if (manual_response_quene.empty()) {
        return ret;
    }
    
    std::vector<SrsPacket*>::iterator it;
    for (it = manual_response_queue.begin(); it != manual_response_queue.end(); ) {
        SrsPacket* pkt = *it;
        
        // erase this packet, the send api always free it.
        it = manual_response_queue.erase(it);
        
        // use underlayer api to sen, donot flush again.
        if ((ret = do_send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
            return ret;
        }
    }
    
    return ret;
}

1.2 SrsRtmpServer::set_peer_bandwidth

客户端和服务器发送 设置流带宽 消息来说明对方的出口带宽限制。接收方以此来限制自己的出口带宽,即限制未被应答的消息数据大小。接收到此消息的一方,如果窗口大小与上次发送的不一致,应该回复应答窗口大小的消息。

int SrsRtmpServer::set_peer_bandwidth(int bandwidth, int type)
{
    int ret = ERROR_SUCCESS;
    
    /* 构建 设置流带宽 的 packet */
    SrsSetPeerBandwindthPacket* pkt = new SrsSetPeerBandwidthPacket();
    /* 出口带宽大小 */
    pkt->bandwidth = bandwidth;
    /* 限制类型值,由前调用知,传入的值为 2,即动态限制:如果上一次为硬限制,
     * 此消息被视为硬限制,否则忽略此消息 */
    pkt->type = type;
    /* 发送该消息 */
    if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
        srs_error("send set bandwidth message failed. ret=%d", ret);
        return ret;
    }
    srs_info("send set bandwidth message "
        "success. bandwidth=%d, type=%d", bandwidth, type);
    
    return ret;
}

send: 设置流带宽(6)图 2

技术分享图片

以上是关于SRS之SrsRtmpConn::service_cycle详解的主要内容,如果未能解决你的问题,请参考以下文章

SRS之播放推流视频

SRS之SrsRtmpConn::publishing详解

SRS之SrsHlsCache::reap_segment详解

SRS之RTMP的TCP线程

SRS之RTMP handshake

SRS之配置的解析