Nginx-rtmp之 ngx_rtmp_send.c 文件分析

Posted 季末的天堂

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nginx-rtmp之 ngx_rtmp_send.c 文件分析相关的知识,希望对你有一定的参考价值。

1. 简述

1.1 RTMP 消息类型

/* RTMP message types */
#define NGX_RTMP_MSG_CHUNK_SIZE         1
#define NGX_RTMP_MSG_ABORT              2
#define NGX_RTMP_MSG_ACK                3
#define NGX_RTMP_MSG_USER               4
#define NGX_RTMP_MSG_ACK_SIZE           5
#define NGX_RTMP_MSG_BANDWIDTH          6
#define NGX_RTMP_MSG_EDGE               7
#define NGX_RTMP_MSG_AUDIO              8
#define NGX_RTMP_MSG_VIDEO              9
#define NGX_RTMP_MSG_AMF3_META          15
#define NGX_RTMP_MSG_AMF3_SHARED        16
#define NGX_RTMP_MSG_AMF3_CMD           17
#define NGX_RTMP_MSG_AMF_META           18
#define NGX_RTMP_MSG_AMF_SHARED         19
#define NGX_RTMP_MSG_AMF_CMD            20
#define NGX_RTMP_MSG_AGGREGATE          22
#define NGX_RTMP_MSG_MAX                22

1.2 RMTP control message types

#define NGX_RTMP_USER_STREAM_BEGIN      0
#define NGX_RTMP_USER_STREAM_EOF        1
#define NGX_RTMP_USER_STREAM_DRY        2
#define NGX_RTMP_USER_SET_BUFLEN        3
#define NGX_RTMP_USER_RECORDED          4
#define NGX_RTMP_USER_PING_REQUEST      6
#define NGX_RTMP_USER_PING_RESPONSE     7
#define NGX_RTMP_USER_UNKNOWN           8
#define NGX_RTMP_USER_BUFFER_END        31

2. 源码分析

2.1 ngx_rtmp_send_ack_size:发送 ack_size 包

ngx_int_t ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_ack_size(s, ack_size));
}
send ack_size == 5000000

2.2 ngx_rtmp_create_ack_size

ngx_chain_t *ngx_rtmp_create_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
{
    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "create: ack_size=%uD", ack_size);

    {
        NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK_SIZE);

        NGX_RTMP_USER_OUT4(ack_size);

        NGX_RTMP_USER_END(s);
    }
}

2.2.1 NGX_RTMP_USER_START

#define NGX_RTMP_USER_START(s, tp)                                          \\
    ngx_rtmp_header_t               __h;                                    \\
    ngx_chain_t                    *__l;                                    \\
    ngx_buf_t                      *__b;                                    \\
    ngx_rtmp_core_srv_conf_t       *__cscf;                                 \\
                                                                            \\
    __cscf = ngx_rtmp_get_module_srv_conf(                                  \\
            s, ngx_rtmp_core_module);                                       \\
    /* 初始化 RTMP 消息的头部 */
    memset(&__h, 0, sizeof(__h));                                           \\
    __h.type = tp;                                                          \\
    /* csid 为 2 表明该chunk是控制信息和一些命令信息 */
    __h.csid = 2;                                                           \\
    __l = ngx_rtmp_alloc_shared_buf(__cscf);                                \\
    if (__l == NULL) {                                                      \\
        return NULL;                                                        \\
    }                                                                       \\
    __b = __l->buf;

该宏主要是初始化 RTMP 消息头,并为 ngx_chain_t 结构体指针分配内存,用来存储将要发送的数据。

ngx_rtmp_alloc_shared_buf: 分配一个 shared 的 ngx_chain_t 结构体内存

ngx_chain_t *ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
{
    u_char                     *p;
    ngx_chain_t                *out;
    ngx_buf_t                  *b;
    size_t                      size;

    /* 若 free 有空闲的 ngx_chain_t,则直接从 free 指向的链表头中取 */
    if (cscf->free) {
        out = cscf->free;
        cscf->free = out->next;

    } else {

        /* 计算一个实际的 rtmp 块的最大值 */
        size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;

        /* 从内存池中分配一块连续的内存 */
        p = ngx_pcalloc(cscf->pool, NGX_RTMP_REFCOUNT_BYTES
                + sizeof(ngx_chain_t)
                + sizeof(ngx_buf_t)
                + size);
        if (p == NULL) {
            return NULL;
        }

        /* 这块内存的开始 4 bytes 是用于保存这块 shared 内存的引用计数值的 */
        p += NGX_RTMP_REFCOUNT_BYTES;
        out = (ngx_chain_t *)p;

        p += sizeof(ngx_chain_t);
        out->buf = (ngx_buf_t *)p;

        p += sizeof(ngx_buf_t);
        out->buf->start = p;
        out->buf->end = p + size;
    }

    out->next = NULL;
    b = out->buf;
    /* 由于分配好内存后,下一步操作是直接向这块内存写入 rtmp 的 chunk data,
     * 即跳过了 rtmp 的头部,暂时先写入实际数据,因此,这里需要为 rtmp 的头部
     * 预留足够的内存,这里为NGX_RTMP_MAX_CHUNK_HEADER */
    b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
    b->memory = 1;

    /* 这里引用计数最初置为 1 */
    /* buffer has refcount =1 when created! */
    ngx_rtmp_ref_set(out, 1);

    return out;
}

2.2.2 NGX_RTMP_USER_OUT4

#define NGX_RTMP_USER_OUT4(v)                                               \\
    *(__b->last++) = ((u_char*)&v)[3];                                      \\
    *(__b->last++) = ((u_char*)&v)[2];                                      \\
    *(__b->last++) = ((u_char*)&v)[1];                                      \\
    *(__b->last++) = ((u_char*)&v)[0];

该宏主要是将 v 的值转换为大端字节序放置到通过 NGX_RTMP_USER_START 宏分配好缓存的内存中。

2.2.3 NGX_RTMP_USER_END

#define NGX_RTMP_USER_END(s)                                                \\
    ngx_rtmp_prepare_message(s, &__h, NULL, __l);                           \\
    return __l;

2.2.4 ngx_rtmp_prepare_message

void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
        ngx_rtmp_header_t *lh, ngx_chain_t *out)
{
    ngx_chain_t                *l;
    u_char                     *p, *pp;
    ngx_int_t                   hsize, thsize, nbufs;
    uint32_t                    mlen, timestamp, ext_timestamp;
    static uint8_t              hdrsize[] = { 12, 8, 4, 1 };
    u_char                      th[7];
    ngx_rtmp_core_srv_conf_t   *cscf;
    uint8_t                     fmt;
    ngx_connection_t           *c;

    c = s->connection;
    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);

    /* 检测 csid 是否太大 */
    if (h->csid >= (uint32_t)cscf->max_streams) {
        ngx_log_error(NGX_LOG_INFO, c->log, 0,
                "RTMP out chunk stream too big: %D >= %D",
                h->csid, cscf->max_streams);
        ngx_rtmp_finalize_session(s);
        return;
    }

    /* 检测输出缓存的总大小 */
    /* detect packet size */
    mlen = 0;
    nbufs = 0;
    for(l = out; l; l = l->next) {
        mlen += (l->buf->last - l->buf->pos);
        ++nbufs;
    }

    /* 若当前发送的是 RTMP 消息的第一个 chunk 时,fmt 必须为 0 */
    fmt = 0;
    
    /* 下面的判断是检测是否分块,若分块了,则头部中相应的字段就有变化了 */
    
    /* 若这两个 chunk:lh 和 h 是在同一个消息的流中,即 msid 相同 */
    if (lh && lh->csid && h->msid == lh->msid) 
    {
        /* fmt 加 1,此时 Message Header 需 7 bytes */
        ++fmt;
        /* 若不仅在同一个流中,且 chunk 的长度和消息类型都相同 */
        if (h->type == lh->type && mlen && mlen == lh->mlen) {
            /*  fmt 再加 1,此时表示 Message Header 仅需 3 bytes,即 timestamp delta  */
            ++fmt;
            if (h->timestamp == lh->timestamp) {
                /* fmt 再加 1,此时表示这个 chunk 和上一个完全相同的,
                 * 即 不需要 Message Header 了,为 0 byte */
                ++fmt;
            }
        }
        /* 这是计算当前 chunk 和上一个 chunk 的时间戳差值,即 timestamp delta  */
        timestamp = h->timestamp - lh->timestamp;
    } 
    else 
    {
        /* 这里表示这是第一个 chunk 或者是不需要分块的 */
        timestamp = h->timestamp;
    }

    /*if (lh) {
        *lh = *h;
        lh->mlen = mlen;
    }*/

    /* 根据 fmt 得出 rtmp 消息头的实际大小,这里默认 rtmp 的 Basic Header 大小为 1 byte */
    hsize = hdrsize[fmt];

    ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
            "RTMP prep %s (%d) fmt=%d csid=%uD timestamp=%uD "
            "mlen=%uD msid=%uD nbufs=%d",
            ngx_rtmp_message_type(h->type), (int)h->type, (int)fmt,
            h->csid, timestamp, mlen, h->msid, nbufs);

    /* 这里只有当 timestamp 大于 3 字节能表示的最大数值时,才会有 扩展时间戳 */
    ext_timestamp = 0;
    if (timestamp >= 0x00ffffff) {
        /* 若需要用到扩展时间戳,则 ext_timestamp 表示真正的时间戳,而 timestamp 字段全置为 1 */
        ext_timestamp = timestamp;
        timestamp = 0x00ffffff;
        /* 扩展时间戳占 4 bytes */
        hsize += 4;
    }

    /* 若 csid 大于 64,则表示 Basic Header 需要 2 个字节,此时 csid 范围 [64, 319] */
    if (h->csid >= 64) {
        ++hsize;
        /* 若 csid 大于 320,则表示 Basic Header 需要 3 个字节,此时 csid 范围 [64, 65599] */
        if (h->csid >= 320) {
            ++hsize;
        }
    }

    /* 将 out->buf->pos 指针指向 rtmp 消息的头部起始处 */
    /* fill initial header */
    out->buf->pos -= hsize;
    p = out->buf->pos;

    /* basic header */
    *p = (fmt << 6);
    if (h->csid >= 2 && h->csid <= 63) {
        *p++ |= (((uint8_t)h->csid) & 0x3f);
    } else if (h->csid >= 64 && h->csid < 320) {
        ++p;
        *p++ = (uint8_t)(h->csid - 64);
    } else {
        *p++ |= 1;
        *p++ = (uint8_t)(h->csid - 64);
        *p++ = (uint8_t)((h->csid - 64) >> 8);
    }

    /* create fmt3 header for successive fragments */
    thsize = p - out->buf->pos;
    ngx_memcpy(th, out->buf->pos, thsize);
    /* 提取出 fmt 值 */
    th[0] |= 0xc0;

    /* message header */
    if (fmt <= 2) {
        /* 此时 timestamp 存在*/
        pp = (u_char*)&timestamp;
        *p++ = pp[2];
        *p++ = pp[1];
        *p++ = pp[0];
        if (fmt <= 1) {
            /* 此时 message type id 和 message length 存在 */
            pp = (u_char*)&mlen;
            *p++ = pp[2];
            *p++ = pp[1];
            *p++ = pp[0];
            *p++ = h->type;
            if (fmt == 0) {
                /* 此时 message stream id 存在 */
                pp = (u_char*)&h->msid;
                *p++ = pp[0];
                *p++ = pp[1];
                *p++ = pp[2];
                *p++ = pp[3];
            }
        }
    }
    /* 若 fmt 大于 2,则说明没有 message header */

    /* extended header */
    if (ext_timestamp) {
        pp = (u_char*)&ext_timestamp;
        *p++ = pp[3];
        *p++ = pp[2];
        *p++ = pp[1];
        *p++ = pp[0];

        /* This CONTRADICTS the standard
         * but that\'s the way flash client
         * wants data to be encoded;
         * ffmpeg complains */
        if (cscf->play_time_fix) {
            ngx_memcpy(&th[thsize], p - 4, 4);
            thsize += 4;
        }
    }

    /* append headers to successive fragments */
    for(out = out->next; out; out = out->next) {
        out->buf->pos -= thsize;
        ngx_memcpy(out->buf->pos, th, thsize);
    }
}

2.3 ngx_rtmp_send_shared_packet

static ngx_int_t ngx_rtmp_send_shared_packet(ngx_rtmp_session_t *s, ngx_chain_t *cl)
{
    ngx_rtmp_core_srv_conf_t       *cscf;
    ngx_int_t                       rc;

    if (cl == NULL) {
        return NGX_ERROR;
    }

    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);

    /* cl 中已经包含一个完整的将要发送给客户端的 rtmp 包 */
    rc = ngx_rtmp_send_message(s, cl, 0);

    /* 将 cl 的引用计数减 1,当为 0 时,则将其插入到 cscf->free 链表头中 */
    ngx_rtmp_free_shared_chain(cscf, cl);

    return rc;
}

2.3.1 ngx_rtmp_send_message

ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
        ngx_uint_t priority)
{
    ngx_uint_t                      nmsg;

    /* 计算要发送的消息数 */
    nmsg = (s->out_last - s->out_pos) % s->out_queue + 1;

    if (priority > 3) {
        priority = 3;
    }

    /* drop packet? 丢包
     * Note we always leave 1 slot free */
    if (nmsg + priority * s->out_queue / 4 >= s->out_queue) {
        ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                "RTMP drop message bufs=%ui, priority=%ui",
                nmsg, priority);
        return NGX_AGAIN;
    }

    s->out[s->out_last++] = out;
    s->out_last %= s->out_queue;

    /* out 的引用计数加 1 */
    ngx_rtmp_acquire_shared_chain(out);

    ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
            "RTMP send nmsg=%ui, priority=%ui #%ui",
            nmsg, priority, s->out_last);

    if (priority && s->out_buffer && nmsg < s->out_cork) {
        return NGX_OK;
    }

    /* 若当前连接的 write 事件还没有活跃时,发送该 rtmp 包 */
    if (!s->connection->write->active) {
        ngx_rtmp_send(s->connection->write);
        /*return ngx_add_event(s->connection->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT);*/
    }

    return NGX_OK;
}

2.3.2 ngx_rtmp_send

static void ngx_rtmp_send(ngx_event_t *wev)
{
    ngx_connection_t           *c;
    ngx_rtmp_session_t         *s;
    ngx_int_t                   n;
    ngx_rtmp_core_srv_conf_t   *cscf;

    c = wev->data;
    s = c->data;

    if (c->destroyed) {
        return;
    }

    if (wev->timedout) {
        ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
                "client timed out");
        c->timedout = 1;
        ngx_rtmp_finalize_session(s);
        return;
    }

    if (wev->timer_set) {
        ngx_del_timer(wev);
    }

    if (s->out_chain == NULL && s->out_pos != s->out_last) {
        /* 将保存着将要发送的 rtmp 包赋给 s->out_chain */
        s->out_chain = s->out[s->out_pos];
        s->out_bpos = s->out_chain->buf->pos;
    }

    while (s->out_chain) {
        /* 调用 ngx_unix_send 回调函数发送 */
        n = c->send(c, s->out_bpos, s->out_chain->buf->last - s->out_bpos);

        if (n == NGX_AGAIN || n == 0) {
            ngx_add_timer(c->write, s->timeout);
            if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
                ngx_rtmp_finalize_session(s);
            }
            return;
        }

        if (n < 0) {
            ngx_rtmp_finalize_session(s);
            return;
        }

        s->out_bytes += n;
        s->ping_reset = 1;
        ngx_rtmp_update_bandwidth(&ngx_rtmp_bw_out, n);
        s->out_bpos += n;
        /* 若已经完全发送 */
        if (s->out_bpos == s->out_chain->buf->last) {
            /* 指向下一个 ngx_chian_t */
            s->out_chain = s->out_chain->next;
            /* 若不存在 */
            if (s->out_chain == NULL) {
                cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
                /* 则释放之前已经发送完成的 ngx_chain_t */
                ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos]);
                ++s->out_pos;
                s->out_pos %= s->out_queue;
                /* 若相等,则表明所有消息的都发送了 */
                if (s->out_pos == s->out_last) {
                    break;
                }
                s->out_chain = s->out[s->out_pos];
            }
            s->out_bpos = s->out_chain->buf->pos;
        }
    }

    /* 若当前 写事件 是活跃的,则将其从 epoll 等事件监控机制中删除 */
    if (wev->active) {
        ngx_del_event(wev, NGX_WRITE_EVENT, 0);
    }

    /* 将 posted_dry_events 延迟队列上的事件都移除,并执行 */
    ngx_event_process_posted((ngx_cycle_t *) ngx_cycle, &s->posted_dry_events);
}

2.3.3 ngx_rtmp_free_shared_chain

void ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *in)
{
    ngx_chain_t        *cl;

    /* 引用计数减 1 */
    if (ngx_rtmp_ref_put(in)) {
        return;
    }

    /* 将 cl 插入到 cscf->free 链表头中 */
    for (cl = in; ; cl = cl->next) {
        if (cl->next == NULL) {
            cl->next = cscf->free;
            cscf->free = in;
            return;
        }
    }
}

2.4 ngx_rtmp_send_bandwidth

ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
                        uint8_t limit_type)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_bandwidth(s, ack_size, limit_type));
}

2.4.1 ngx_rtmp_create_bandwidth

ngx_chain_t *ngx_rtmp_create_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
                          uint8_t limit_type)
{
    ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "create: bandwidth ack_size=%uD limit=%d",
                   ack_size, (int)limit_type);

    {
        /* 创建发送缓存并初始化 rtmp 头部一些信心,如 message type、csid 等 */
        NGX_RTMP_USER_START(s, NGX_RTMP_MSG_BANDWIDTH);

        /* 填充将要发送的实际数据 */
        NGX_RTMP_USER_OUT4(ack_size);
        NGX_RTMP_USER_OUT1(limit_type);

        /* 主要是构造 rtmp 头,形成完整的 rtmp 包 */
        NGX_RTMP_USER_END(s);
    }
}

2.4.2 NGX_RTMP_USER_OUT1

#define NGX_RTMP_USER_OUT1(v)                                               \\
    *(__b->last++) = ((u_char*)&v)[0];
send bandwidth

2.5 ngx_rtmp_send_chunk_size

ngx_int_t ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_chunk_size(s, chunk_size));
}

2.5.1 ngx_rtmp_create_chunk_size

ngx_chain_t *ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
{
    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "chunk_size=%uD", chunk_size);

    {
        NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE);

        NGX_RTMP_USER_OUT4(chunk_size);

        NGX_RTMP_USER_END(s);
    }
}
send chunk_size

2.6 ngx_rtmp_send_amf

ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
                  ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_amf(s, h, elts, nelts));
}

2.6.1 ngx_rtmp_create_amf

ngx_chain_t *ngx_rtmp_create_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
                    ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
    ngx_chain_t                *first;
    ngx_int_t                   rc;
    ngx_rtmp_core_srv_conf_t   *cscf;

    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "create: amf nelts=%ui", nelts);

    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);

    first = NULL;

    /* 先将 amf 的数据写入到  */
    rc = ngx_rtmp_append_amf(s, &first, NULL, elts, nelts);

    if (rc != NGX_OK && first) {
        ngx_rtmp_free_shared_chain(cscf, first);
        first = NULL;
    }

    if (first) {
        /* 最后将其封装成 rtmp 包 */
        ngx_rtmp_prepare_message(s, h, NULL, first);
    }

    return first;
}

2.6.2 ngx_rtmp_append_amf

/* AMF sender */

/* NOTE: this function does not free shared bufs on error */
ngx_int_t ngx_rtmp_append_amf(ngx_rtmp_session_t *s,
                    ngx_chain_t **first, ngx_chain_t **last,
                    ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
    ngx_rtmp_amf_ctx_t          act;
    ngx_rtmp_core_srv_conf_t   *cscf;
    ngx_int_t                   rc;

    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);

    memset(&act, 0, sizeof(act));
    act.arg = cscf;
    act.alloc = ngx_rtmp_alloc_amf_buf;
    act.log = s->connection->log;

    if (first) {
        act.first = *first;
    }

    if (last) {
        act.link = *last;
    }

    /* 将 elts 中的 amf 数据写入到 act 中 */
    rc = ngx_rtmp_amf_write(&act, elts, nelts);

    if (first) {
        *first = act.first;
    }

    if (last) {
        *last = act.link;
    }

    return rc;
}
send amf

2.7 ngx_rtmp_send_stream_begin

ngx_int_t
ngx_rtmp_send_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_stream_begin(s, msid));
}

2.7.1 ngx_rtmp_create_stream_begin

/* User control messages */

ngx_chain_t *
ngx_rtmp_create_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
{
    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "create: stream_begin msid=%uD", msid);

    {
        NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_BEGIN);

        NGX_RTMP_USER_OUT4(msid);

        NGX_RTMP_USER_END(s);
    }
}

2.7.2 NGX_RTMP_UCTL_START

#define NGX_RTMP_UCTL_START(s, type, utype)                                 \\
    NGX_RTMP_USER_START(s, type);                                           \\
    *(__b->last++) = (u_char)((utype) >> 8);                                \\
    *(__b->last++) = (u_char)(utype);
send stream begin

2.8 ngx_rtmp_send_status

ngx_int_t
ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_status(s, code, level, desc));
}

2.8.1 ngx_rtmp_create_status

ngx_chain_t *
ngx_rtmp_create_status(ngx_rtmp_session_t *s, char *code, char* level,
                       char *desc)
{
    ngx_rtmp_header_t               h;
    static double                   trans;

    static ngx_rtmp_amf_elt_t       out_inf[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_string("level"),
          NULL, 0 },

        { NGX_RTMP_AMF_STRING,
          ngx_string("code"),
          NULL, 0 },

        { NGX_RTMP_AMF_STRING,
          ngx_string("description"),
          NULL, 0 },
    };

    static ngx_rtmp_amf_elt_t       out_elts[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          "onStatus", 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &trans, 0 },

        { NGX_RTMP_AMF_NULL,
          ngx_null_string,
          NULL, 0 },

        { NGX_RTMP_AMF_OBJECT,
          ngx_null_string,
          out_inf,
          sizeof(out_inf) },
    };

    ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "create: status code=\'%s\' level=\'%s\' desc=\'%s\'",
                   code, level, desc);

    out_inf[0].data = level;
    out_inf[1].data = code;
    out_inf[2].data = desc;

    memset(&h, 0, sizeof(h));

    h.type = NGX_RTMP_MSG_AMF_CMD;
    h.csid = NGX_RTMP_CSID_AMF;
    h.msid = NGX_RTMP_MSID;

    return ngx_rtmp_create_amf(s, &h, out_elts,
                               sizeof(out_elts) / sizeof(out_elts[0]));
}
send onStatus

2.9 ngx_rtmp_send_recorded

ngx_int_t
ngx_rtmp_send_recorded(ngx_rtmp_session_t *s, uint32_t msid)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_recorded(s, msid));
}

2.9.1 ngx_rtmp_create_recorded

ngx_chain_t *
ngx_rtmp_create_recorded(ngx_rtmp_session_t *s, uint32_t msid)
{
    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "create: recorded msid=%uD", msid);

    {
        NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_RECORDED);

        NGX_RTMP_USER_OUT4(msid);

        NGX_RTMP_USER_END(s);
    }
}
send record

2.10 ngx_rtmp_send_sample_access

ngx_int_t
ngx_rtmp_send_sample_access(ngx_rtmp_session_t *s)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_sample_access(s));
}

2.10.1 ngx_rtmp_create_sample_access

ngx_chain_t *
ngx_rtmp_create_sample_access(ngx_rtmp_session_t *s)
{
    ngx_rtmp_header_t               h;

    static int                      access = 1;

    static ngx_rtmp_amf_elt_t       access_elts[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          "|RtmpSampleAccess", 0 },

        { NGX_RTMP_AMF_BOOLEAN,
          ngx_null_string,
          &access, 0 },

        { NGX_RTMP_AMF_BOOLEAN,
          ngx_null_string,
          &access, 0 },
    };

    memset(&h, 0, sizeof(h));

    h.type = NGX_RTMP_MSG_AMF_META;
    h.csid = NGX_RTMP_CSID_AMF;
    h.msid = NGX_RTMP_MSID;

    return ngx_rtmp_create_amf(s, &h, access_elts,
                               sizeof(access_elts) / sizeof(access_elts[0]));
}
send sample access

2.11 ngx_rtmp_send_stream_eof

ngx_int_t
ngx_rtmp_send_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
{
    return ngx_rtmp_send_shared_packet(s,
           ngx_rtmp_create_stream_eof(s, msid));
}

2.11.1 ngx_rtmp_create_stream_eof

ngx_chain_t *
ngx_rtmp_create_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
{
    ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "create: stream_end msid=%uD", msid);

    {
        NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_EOF);

        NGX_RTMP_USER_OUT4(msid);

        NGX_RTMP_USER_END(s);
    }
}
send stream eof

以上是关于Nginx-rtmp之 ngx_rtmp_send.c 文件分析的主要内容,如果未能解决你的问题,请参考以下文章

Nginx-rtmp之配置项的管理

Nginx-rtmp之监听端口的管理

Nginx-rtmp之 AMF0 的处理

Nginx-rtmp点播之业务流程分析

搭建rtmp直播流服务之4:videojs和ckPlayer开源播放器二次开发(播放rtmphls直播流及普通视频)

windows下搭建nginx-rtmp服务器