1. 综述
1.1 直播原理
使用 obs 向 nginx 推送一个直播流,该直播流经 nginx-rtmp 的 ngx_rtmp_live_module 模块转发给 application live 应用,
然后使用 vlc 连接 live,播放该直播流。
1.2 nginx.conf
# 创建的子进程数
worker_processes 1;
error_log stderr debug;
daemon off;
master_process off;
events {
worker_connections 1024;
}
rtmp {
server {
listen 1935; # rtmp传输端口
chunk_size 4096; # 数据传输块大小
application live { # 直播配置
live on;
}
# obs 将流推到该 push 应用,push 应用又将该流发布到 live 应用
application push {
live on;
push rtmp://192.168.1.82:1935/live; # 推流到上面的直播应用
}
}
}
1.3 obs 推流设置
-
点击 "+" 选择一个媒体源,确定,然后设置该媒体源,如下图:
-
点击 "设置" 选择 "流",设置推流地址,如下图,确定后即可进行推流:
1.4 使用 vlc 播放直播流
2. 源码分析:application push
首先开始分析从 obs 推送 rtmp 流到 nginx 服务器的整个流程。
2.1 监听连接
nginx 启动后,就会一直在 ngx_process_events 函数中的 epoll_eait 处休眠,监听客户端的连接:
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
...
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
/* nginx 最初运行时,timer 为 -1,即一直等待客户端连接 */
events = epoll_wait(ep, event_list, (int) nevents, timer);
...
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
/* 获取被监听的读事件 */
rev = c->read;
/* 获取 epoll_wait 返回的事件标志 */
revents = event_list[i].events;
...
/* 若是监听的事件可读,首次监听即表示有新连接到来 */
if ((revents & EPOLLIN) && rev->active) {
...
rev->ready = 1;
/* 若是开启了负载均衡,则先将该事件添加到 ngx_posted_accept_events
* 延迟队列中 */
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
/* 否则,直接调用该读事件的回调函数,若是新连接则
* 调用的是 ngx_event_accept 函数 */
rev->handler(rev);
}
}
...
}
return NGX_OK;
}
ngx_event_accept 函数中主要也就是接受客户端的连接,并调用该监听端口对应的回调函数:
void
ngx_event_accept(ngx_event_t *ev)
{
...
do {
...
s = accept(lc->fd, &sa.sockaddr, &socklen);
...
/* 调用该监听端口对应的回调函数,对于 rtmp 模块,则固定为 ngx_rtmp_init_connection */
ls->handler(c);
...
} while (ev->available);
}
在 ngx_rtmp_init_connection 函数中先经过一系列的初始化后,开始接收与客户端进行 rtmp 的 handshake 过程。
下面从 hanshake 到 hanshake 成功后接收到第一个 rtmp 包之间仅以图片说明,就不再分析源码了。
2.2 handshake
2.2.1 hs_stage: SERVER_RECV_CHALLENGE(1)
该 hanshake 阶段即为等待接收客户端发送的 C0 和 C1 阶段。
receive: Handshake C0+C1 图(1)
接收到客户端发送的 C0 和 C1 后,服务器进入 NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE(2)阶段,即为
发送S0 和 S1 阶段。
2.2.2 hs_stage: SERVER_SEND_CHALLENGE(2) 和 SERVER_SEND_RESPONSE(3)
该 SERVER_SEND_CHALLENGE 阶段即为等待接收客户端发送的 S0 和 S1 阶段。但是实际上,服务器在发送完 S0 和
S1 后,进入到 SERVER_SEND_RESPONSE(3) 阶段后又立刻发送 S2,因此,在抓到的包如下:
send: Handshake S0+S1+S2 图(2)
2.2.3 hs_stage: SERVER_RECV_RESPONSE(4)
该阶段为等待接收客户端发送的 C2 阶段。
receive:Handshake C2 图(3)
至此,服务器和客户端的 rtmp handshake 过程完整,开始正常的信息交互阶段。
如下代码,接收到 C2 后,服务器即进入循环处理客户端的请求阶段:ngx_rtmp_cycle
static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
ngx_rtmp_free_handshake_buffers(s);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: done");
if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
NULL, NULL) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
ngx_rtmp_cycle(s);
}
ngx_rtmp_cycle 函数中,重新设置了当前 rtmp 连接的读、写事件的回调函数,当监听到客户端发送的数据时,将调用
ngx_rtmp_recv 函数进行处理。
void
ngx_rtmp_cycle(ngx_rtmp_session_t *s)
{
ngx_connection_t *c;
c = s->connection;
c->read->handler = ngx_rtmp_recv;
c->write->handler = ngx_rtmp_send;
s->ping_evt.data = c;
s->ping_evt.log = c->log;
s->ping_evt.handler = ngx_rtmp_ping;
ngx_rtmp_reset_ping(s);
ngx_rtmp_recv(c->read);
}
在 ngx_rtmp_recv 函数中,会循环接收客户端发来的 rtmp 包数据,接收到完整的一个 rtmp message 后,会根据该消息
的 rtmp message type,调用相应的函数进行处理,如,若为 20,即为 amf0 类型的命令消息,就会调用
ngx_rtmp_amf_message_handler 函数进行处理。
2.3 connect(\'push\')
hanshake 成功后,接收到客户端发来的第一个 rtmp 包为连接 nginx.conf 中 rtmp{} 下的 application push{}
应用,如下图:
receive: connect(\'push\') 图(4)
从该图可知,该消息类型为 20,即为 AMF0 Command,因此会调用 ngx_rtmp_amf_message_handler 对该消息进行解析,
然后对其中的命令 connect 调用预先设置好的 ngx_rtmp_cmd_connect_init 回调函数。在 ngx_rtmp_cmd_connect_init
函数中,继续解析该 connect 余下的消息后,开始 ngx_rtmp_connect 构件的 connect 函数链表,该链表中存放着各个
rtmp 模块对该 connect 命令所要做的操作(注:仅有部分 rtmp 模块会对该 connect 命令设置有回调函数,并且就算
设置了回调函数,也需要在配置文件中启用相应的模块才会真正执行该模块对 connect 的处理)。因此,对于 connect
命令,这里仅会真正处理 ngx_rtmp_cmd_module 模块设置 ngx_rtmp_cmd_connect 回调函数。
2.3.1 ngx_rtmp_cmd_connect
static ngx_int_t
ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"rtmp cmd: connect");
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_core_app_conf_t **cacfp;
ngx_uint_t n;
ngx_rtmp_header_t h;
u_char *p;
static double trans;
static double capabilities = NGX_RTMP_CAPABILITIES;
static double object_encoding = 0;
/* 以下内容为服务器将要对客户端的 connect 命令返回的 amf 类型的响应 */
static ngx_rtmp_amf_elt_t out_obj[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("fmsVer"),
NGX_RTMP_FMS_VERSION, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("capabilities"),
&capabilities, 0 },
};
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetConnection.Connect.Success", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Connection succeeded.", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("objectEncoding"),
&object_encoding, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_obj, sizeof(out_obj) },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
if (s->connected) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: duplicate connection");
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
trans = v->trans;
/* fill session parameters */
s->connected = 1;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
#define NGX_RTMP_SET_STRPAR(name) \\
s->name.len = ngx_strlen(v->name); \\
s->name.data = ngx_palloc(s->connection->pool, s->name.len); \\
ngx_memcpy(s->name.data, v->name, s->name.len)
NGX_RTMP_SET_STRPAR(app);
NGX_RTMP_SET_STRPAR(args);
NGX_RTMP_SET_STRPAR(flashver);
NGX_RTMP_SET_STRPAR(swf_url);
NGX_RTMP_SET_STRPAR(tc_url);
NGX_RTMP_SET_STRPAR(page_url);
#undef NGX_RTMP_SET_STRPAR
p = ngx_strlchr(s->app.data, s->app.data + s->app.len, \'?\');
if (p) {
s->app.len = (p - s->app.data);
}
s->acodecs = (uint32_t) v->acodecs;
s->vcodecs = (uint32_t) v->vcodecs;
/* 找到客户端 connect 的应用配置 */
/* find application & set app_conf */
cacfp = cscf->applications.elts;
for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {
if ((*cacfp)->name.len == s->app.len &&
ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0)
{
/* found app! */
s->app_conf = (*cacfp)->app_conf;
break;
}
}
if (s->app_conf == NULL) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: application not found: \'%V\'", &s->app);
return NGX_ERROR;
}
object_encoding = v->object_encoding;
/* 发送应答窗口大小:ack_size 给客户端,该消息是用来通知对方应答窗口的大小,
* 发送方在发送了等于窗口大小的数据之后,等的爱接收对方的应答消息(在接收
* 到应答消息之前停止发送数据)。接收当必须发送应答消息,在会话开始时,在
* 会话开始时,会从上一次发送应答之后接收到了等于窗口大小的数据 */
return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||
/* 发送 设置流带宽消息。发送此消息来说明对方的出口带宽限制,接收方以此来限制
* 自己的出口带宽,即限制未被应答的消息数据大小。接收到此消息的一方,如果
* 窗口大小与上一次发送的不一致,应该回复应答窗口大小的消息 */
ngx_rtmp_send_bandwidth(s, cscf->ack_window,
NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||
/* 发送 设置块消息消息,用来通知对方新的最大的块大小。 */
ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||
ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
!= NGX_OK ? NGX_ERROR : NGX_OK;
}
send: ack_size 图(5)
send: peer bandwidth 图(6)
send:chunk_size 图(7)
send:_result(\'NetConnection.Connect.Success\') 图(8)
2.4 releaseStream(\'test\')
服务器响应客户端 connect 命令消息后,客户端接着发送 releaseStream 命令消息给服务器,但是 nginx-rtmp 中没有
任何一个 rtmp 模块对该命令设置有回调函数,因此,不进行处理,接着等待接收下一个消息。
receive: releaseStream(\'test\') 图(9)
2.5 createStream(\'\')
接着服务器接收到客户端发来的 createStream 命令消息。
receive: createStream(\'\') 图(10)
从以前的分析可知,此时,会调用 ngx_rtmp_cmd_create_stream_init 函数。
2.5.1 ngx_rtmp_cmd_create_stream_init
static ngx_int_t
ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_create_stream_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, sizeof(v.trans) },
};
/* 解析该 createStream 命令消息,获取 v.trans 值,从图(10) 可知,为 4 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");
return ngx_rtmp_create_stream(s, &v);
}
接着,从该函数中开始调用 ngx_rtmp_create_stream 构建的函数链表。这里调用到的是 ngx_rtmp_cmd_create_stream
函数。
2.5.2 ngx_rtmp_cmd_create_stream
static ngx_int_t
ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");
/* support one message stream per connection */
static double stream;
static double trans;
ngx_rtmp_header_t h;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"_result", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&stream, sizeof(stream) },
};
trans = v->trans;
stream = NGX_RTMP_MSID;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?
NGX_DONE : NGX_ERROR;
}
该函数主要是发送服务器对 createStream 的响应。
send: _result()
2.6 publish(\'test\')
接着,客户端发送 publish 给服务器,用来发布一个有名字的流到服务器,其他客户端可以使用此流名来播放流,接收
发布的音频,视频,以及其他数据消息。
receive:publish(\'test\') 图(11)
从图中可知,publish type 为 \'live\',即服务器不会保存客户端发布的流到文件中。
2.6.1 ngx_rtmp_cmd_publish_init
static ngx_int_t
ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_publish_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.name, sizeof(v.name) },
{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.type, sizeof(v.type) },
};
ngx_memzero(&v, sizeof(v));
/* 从 publish 命令消息中获取 in_elts 中指定的值 */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_rtmp_cmd_fill_args(v.name, v.args);
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"publish: name=\'%s\' args=\'%s\' type=%s silent=%d",
v.name, v.args, v.type, v.silent);
return ngx_rtmp_publish(s, &v);
}
接着,该函数开始调用 ngx_rtmp_publish 构建的函数链表。从 nginx-rtmp 的源码和 nginx.conf 的配置可知,主要调用
ngx_rtmp_relay_publish 和 ngx_rtmp_live_publish 两个函数。
由 rtmp 模块的排序,首先调用 ngx_rtmp_relay_publish。
2.6.2 ngx_rtmp_relay_publish
static ngx_int_t
ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target, **t;
ngx_str_t name;
size_t n;
ngx_rtmp_relay_ctx_t *ctx;
if (s->auto_pushed) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && s->relay) {
goto next;
}
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL || racf->pushes.nelts == 0) {
goto next;
}
/* v->name 中保存的是从客户端发送的 publish 命令消息中提取出的要发布的流名称 */
name.len = ngx_strlen(v->name);
name.data = v->name;
/* 从 pushes 数组中取出首元素,遍历该数组 */
t = racf->pushes.elts;
for (n = 0; n < racf->pushes.nelts; ++n, ++t) {
target = *t;
/* 配置文件中是否指定了要推流的名称,若是,则检测指定的流名字与当前接收到的publish 流名
* 是否一致 */
if (target->name.len && (name.len != target->name.len ||
ngx_memcmp(name.data, target->name.data, name.len)))
{
continue;
}
if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) {
continue;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: push failed name=\'%V\' app=\'%V\' "
"playpath=\'%V\' url=\'%V\'",
&name, &target->app, &target->play_path,
&target->url.url);
if (!ctx->push_evt.timer_set) {
ngx_add_timer(&ctx->push_evt, racf->push_reconnect);
}
}
next:
return next_publish(s, v);
}
2.6.3 ngx_rtmp_relay_push
ngx_int_t
ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"relay: create push name=\'%V\' app=\'%V\' playpath=\'%V\' url=\'%V\'",
name, &target->app, &target->play_path, &target->url.url);
return ngx_rtmp_relay_create(s, name, target,
ngx_rtmp_relay_create_local_ctx,
ngx_rtmp_relay_create_remote_ctx);
}
2.6.4 ngx_rtmp_relay_create
static ngx_int_t
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target,
ngx_rtmp_relay_create_ctx_pt create_publish_ctx,
ngx_rtmp_relay_create_ctx_pt create_play_ctx)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;
ngx_uint_t hash;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL) {
return NGX_ERROR;
}
/* 该函数主要是创建一个新的连接,连接推流url中指定的地址,即将该地址作为上游服务器的地址,
* 向该上游服务器发起连接 */
play_ctx = create_play_ctx(s, name, target);
if (play_ctx == NULL) {
return NGX_ERROR;
}
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data,
name->len))
{
break;
}
}
if (*cctx) {
play_ctx->publish = (*cctx)->publish;
play_ctx->next = (*cctx)->play;
(*cctx)->play = play_ctx;
return NGX_OK;
}
/* 创建一个本地 ngx_rtmp_relay_ctx_t */
publish_ctx = create_publish_ctx(s, name, target);
if (publish_ctx == NULL) {
ngx_rtmp_finalize_session(play_ctx->session);
return NGX_ERROR;
}
publish_ctx->publish = publish_ctx;
publish_ctx->play = play_ctx;
play_ctx->publish = publish_ctx;
*cctx = publish_ctx;
return NGX_OK;
}
2.6.4.1 ngx_rtmp_relay_create_remote_ctx
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_conf_ctx_t cctx;
cctx.app_conf = s->app_conf;
cctx.srv_conf = s->srv_conf;
cctx.main_conf = s->main_conf;
return ngx_rtmp_relay_create_connection(&cctx, name, target);
}
2.6.4.2 ngx_rtmp_relay_create_connection
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *rctx;
ngx_rtmp_addr_conf_t *addr_conf;
ngx_rtmp_conf_ctx_t *addr_ctx;
ngx_rtmp_session_t *rs;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
ngx_addr_t *addr;
ngx_pool_t *pool;
ngx_int_t rc;
ngx_str_t v, *uri;
u_char *first, *last, *p;
racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: create remote context");
pool = NULL;
/* 分配一个内存池 */
pool = ngx_create_pool(4096, racf->log);
if (pool == NULL) {
return NULL;
}
/* 从内存池中为 ngx_rtmp_relay_ctx_t 结构体分配内存 */
rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
if (rctx == NULL) {
goto clear;
}
/* 将发布的流名拷贝到新建的 ngx_rtmp_relay_ctx_t 中的 name 成员 */
if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {
goto clear;
}
/* 将配置文件中配置的 push 推流地址,即 url 拷贝到新建的 ngx_rtmp_relay_ctx_t
* 结构体的 url 成员中 */
if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {
goto clear;
}
/* target->tag 指向 ngx_rtmp_relay_module 结构体的首地址 */
rctx->tag = target->tag;
/* target->data 指向当前 data 所属的 ngx_rtmp_relay_ctx_t 结构体的首地址 */
rctx->data = target->data;
#define NGX_RTMP_RELAY_STR_COPY(to, from) \\
if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { \\
goto clear; \\
}
/* 将以下 target 中的值拷贝到新建的 ngx_rtmp_relay_ctx_t 结构体的相应成员中 */
NGX_RTMP_RELAY_STR_COPY(app, app);
NGX_RTMP_RELAY_STR_COPY(tc_url, tc_url);
NGX_RTMP_RELAY_STR_COPY(page_url, page_url);
NGX_RTMP_RELAY_STR_COPY(swf_url, swf_url);
NGX_RTMP_RELAY_STR_COPY(flash_ver, flash_ver);
NGX_RTMP_RELAY_STR_COPY(play_path, play_path);
rctx->live = target->live;
rctx->start = target->start;
rctx->stop = target->stop;
#undef NGX_RTMP_RELAY_STR_COPY
/* 若 app 的值未知 */
if (rctx->app.len == 0 || rctx->play_path.len == 0) {
/* 这里是从推流地址中提取出 app 的值,下面分析以 "push rtmp:192.168.1.82:1935/live;"
* 为例,则提出的 live 将赋给 rctx->app */
/* parse uri */
uri = &target->url.uri;
first = uri->data;
last = uri->data + uri->len;
if (first != last && *first == \'/\') {
++first;
}
if (first != last) {
/* deduce app */
p = ngx_strlchr(first, last, \'/\');
if (p == NULL) {
p = last;
}
if (rctx->app.len == 0 && first != p) {
/* 这里 v.data 指向 "live" */
v.data = first;
v.len = p - first;
/* 将 "live" 赋给 rctx->app */
if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) {
goto clear;
}
}
/* deduce play_path */
if (p != last) {
++p;
}
/* 若播放路径为 NULL 且 p 不等于 last(注,这里 p 不等于 last 意味着
* "push rtmp:192.168.1.82:1935/live;" 的 "live" 字符串后面还有数据,
* 但是,这里没有)*/
if (rctx->play_path.len == 0 && p != last) {
v.data = p;
v.len = last - p;
if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v)
!= NGX_OK)
{
goto clear;
}
}
}
}
/* 从内存池中为主动连接结构体 ngx_peer_connection_t 分配内存 */
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
if (pc == NULL) {
goto clear;
}
if (target->url.naddrs == 0) {
ngx_log_error(NGX_LOG_ERR, racf->log, 0,
"relay: no address");
goto clear;
}
/* get address */
/* 获取 推流地址 url 中指明的服务器地址(即推流的目标地址)
* 如"push rtmp:192.168.1.82:1935/live;" 中的 "192.168.1.82:1935" */
addr = &target->url.addrs[target->counter % target->url.naddrs];
target->counter++;
/* copy log to keep shared log unchanged */
rctx->log = *racf->log;
pc->log = &rctx->log;
/* 当使用长连接与上游服务器通信时,可通过该方法由连接池中获取一个新连接 */
pc->get = ngx_rtmp_relay_get_peer;
/* 当使用长连接与上游服务器通信时,通过该方法将使用完毕的连接释放给连接池 */
pc->free = ngx_rtmp_relay_free_peer;
/* 远端服务器的名称,这里其实就是 "192.168.1.82:1935" 该串字符串 */
pc->name = &addr->name;
pc->socklen = addr->socklen;
pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);
if (pc->sockaddr == NULL) {
goto clear;
}
/* 将 addr->sockaddr 中保存的远端服务器的地址信息拷贝到 pc->sockaddr 中 */
ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);
/* 开始连接上游服务器 */
rc = ngx_event_connect_peer(pc);
/* 由 ngx_event_connect_peer 源码可知,因为 socket 套接字被设置为非阻塞,
* 因为首次 connect 必定失败,因此该函数返回 NGX_AGAIN */
if (rc != NGX_OK && rc != NGX_AGAIN ) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: connection failed");
goto clear;
}
c = pc->connection;
c->pool = pool;
/* 推流 URL */
c->addr_text = rctx->url;
addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t));
if (addr_conf == NULL) {
goto clear;
}
addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t));
if (addr_ctx == NULL) {
goto clear;
}
addr_conf->ctx = addr_ctx;
addr_ctx->main_conf = cctx->main_conf;
addr_ctx->srv_conf = cctx->srv_conf;
ngx_str_set(&addr_conf->addr_text, "ngx-relay");
/* 为该主动连接初始化一个会话 */
rs = ngx_rtmp_init_session(c, addr_conf);
if (rs == NULL) {
/* no need to destroy pool */
return NULL;
}
rs->app_conf = cctx->app_conf;
/* 置该标志位为 1 */
rs->relay = 1;
rctx->session = rs;
ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
ngx_str_set(&rs->flashver, "ngx-local-relay");
#if (NGX_STAT_STUB)
(void) ngx_atomic_fetch_add(ngx_stat_active, 1);
#endif
/* 此时作为客户端,开始向上游服务器发说送 hanshake 包,即 C0 + C1 */
ngx_rtmp_client_handshake(rs, 1);
return rctx;
clear:
if (pool) {
ngx_destroy_pool(pool);
}
return NULL;
}
2.6.4.3 ngx_event_connect_peer
ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
int rc, type;
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
in_port_t port;
#endif
ngx_int_t event;
ngx_err_t err;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
/* 该 get 方法其实没有做任何处理 */
rc = pc->get(pc, pc->data);
if (rc != NGX_OK) {
return rc;
}
type = (pc->type ? pc->type : SOCK_STREAM);
/* 创建一个 socket 套接字 */
s = ngx_socket(pc->sockaddr->sa_family, type, 0);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d",
(type == SOCK_STREAM) ? "stream" : "dgram", s);
if (s == (ngx_socket_t) -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
}
/* 从连接池中获取一个空闲连接 */
c = ngx_get_connection(s, pc->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_close_socket_n "failed");
}
return NGX_ERROR;
}
/* 当前 socket 的类型,是 STREAM 还是 DGRAM,这里为 STREAM */
c->type = type;
/* 若设置了接收缓冲区的大小,从上面知没有设置 */
if (pc->rcvbuf) {
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,
(const void *) &pc->rcvbuf, sizeof(int)) == -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_RCVBUF) failed");
goto failed;
}
}
/* 将该 socket 套接字设置为非阻塞 */
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
goto failed;
}
/* local 保存的是本地地址信息,则上面可知,没有设置 */
if (pc->local) {
#if (NGX_HAVE_TRANSPARENT_PROXY)
if (pc->transparent) {
if (ngx_event_connect_set_transparent(pc, s) != NGX_OK) {
goto failed;
}
}
#endif
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
port = ngx_inet_get_port(pc->local->sockaddr);
#endif
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT)
if (pc->sockaddr->sa_family != AF_UNIX && port == 0) {
static int bind_address_no_port = 1;
if (bind_address_no_port) {
if (setsockopt(s, IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT,
(const void *) &bind_address_no_port,
sizeof(int)) == -1)
{
err = ngx_socket_errno;
if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) {
ngx_log_error(NGX_LOG_ALERT, pc->log, err,
"setsockopt(IP_BIND_ADDRESS_NO_PORT) "
"failed, ignored");
} else {
bind_address_no_port = 0;
}
}
}
}
#endif
#if (NGX_LINUX)
if (pc->type == SOCK_DGRAM && port != 0) {
int reuse_addr = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuse_addr, sizeof(int))
== -1)
{
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
"setsockopt(SO_REUSEADDR) failed");
goto failed;
}
}
#endif
if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
"bind(%V) failed", &pc->local->name);
goto failed;
}
}
if (type == SOCK_STREAM) {
/* 设置当前连接的 IO 回调函数 */
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
/* 使用 sendfile */
c->sendfile = 1;
if (pc->sockaddr->sa_family == AF_UNIX) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
/* Solaris\'s sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
c->sendfile = 0;
#endif
}
} else { /* type == SOCK_DGRAM */
c->recv = ngx_udp_recv;
c->send = ngx_send;
c->send_chain = ngx_udp_send_chain;
}
c->log_error = pc->log_error;
/* 设置当前主动连接读写事件的回调函数 */
rev = c->read;
wev = c->write;
rev->log = pc->log;
wev->log = pc->log;
pc->connection = c;
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
/* 将该主动连接的读写事件添加到 epoll 等事件监控机制中 */
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"connect to %V, fd:%d #%uA", pc->name, s, c->number);
/* 连接该上游服务器,因为该 socket 套接字被设置为非阻塞,因此首次connect返回 -1,即失败 */
rc = connect(s, pc->sockaddr, pc->socklen);
if (rc == -1) {
err = ngx_socket_errno;
if (err != NGX_EINPROGRESS
#if (NGX_WIN32)
/* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
&& err != NGX_EAGAIN
#endif
)
{
if (err == NGX_ECONNREFUSED
#if (NGX_LINUX)
/*
* Linux returns EAGAIN instead of ECONNREFUSED
* for unix sockets if listen queue is full
*/
|| err == NGX_EAGAIN
#endif
|| err == NGX_ECONNRESET
|| err == NGX_ENETDOWN
|| err == NGX_ENETUNREACH
|| err == NGX_EHOSTDOWN
|| err == NGX_EHOSTUNREACH)
{
level = NGX_LOG_ERR;
} else {
level = NGX_LOG_CRIT;
}
ngx_log_error(level, c->log, err, "connect() to %V failed",
pc->name);
ngx_close_connection(c);
pc->connection = NULL;
return NGX_DECLINED;
}
}
/* 因此,从这里返回 NGX_AGAIN */
if (ngx_add_conn) {
if (rc == -1) {
/* NGX_EINPROGRESS */
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
}
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno,
"connect(): %d", rc);
if (ngx_blocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_blocking_n " failed");
goto failed;
}
/*
* FreeBSD\'s aio allows to post an operation on non-connected socket.
* NT does not support it.
*
* TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT
*/
rev->ready = 1;
wev->ready = 1;
return NGX_OK;
}
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue */
event = NGX_CLEAR_EVENT;
} else {
/* select, poll, /dev/poll */
event = NGX_LEVEL_EVENT;
}
if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
goto failed;
}
if (rc == -1) {
/* NGX_EINPROGRESS */
if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {
goto failed;
}
return NGX_AGAIN;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
wev->ready = 1;
return NGX_OK;
failed:
ngx_close_connection(c);
pc->connection = NULL;
return NGX_ERROR;
}
2.6.4.4 ngx_rtmp_client_handshake
void
ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async)
{
ngx_connection_t *c;
c = s->connection;
/* 设置当前连接读写事件的回调函数 */
c->read->handler = ngx_rtmp_handshake_recv;
c->write->handler = ngx_rtmp_handshake_send;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: start client handshake");
/* 为该将要进行的 hanshake 过程分配数据缓存,用于存储接收/响应的 hanshake 包 */
s->hs_buf = ngx_rtmp_alloc_handshake_buffer(s);
/* 设置当前 hanshake 阶段,即为 client send: C0 + C1 */
s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE;
/* 构建 C0 + C1 的 数据包 */
if (ngx_rtmp_handshake_create_challenge(s,
ngx_rtmp_client_version,
&ngx_rtmp_client_partial_key) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
/* 有前面的调用传入的参数可知,该值为 1,即为异步,因此这里暂时不向上游服务器发送 handshake,
* 而是将其写事件添加到定时器和 epoll 中,等待下次循环监控到该写事件可写时才发送 C0 + C1 */
if (async) {
/* 将该写事件添加到定时器中,超时时间为 s->timeout */
ngx_add_timer(c->write, s->timeout);
/* 将该写事件添加到 epoll 等事件监控机制中 */
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
}
return;
}
ngx_rtmp_handshake_send(c->write);
}
2.6.4.5 ngx_rtmp_relay_create_local_ctx
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_ctx_t *ctx;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create local context");
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
return NULL;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
}
ctx->session = s;
ctx->push_evt.data = s;
ctx->push_evt.log = s->connection->log;
/* 设置该 push_evt 事件的回调函数 */
ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;
if (ctx->publish) {
return NULL;
}
if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)
!= NGX_OK)
{
return NULL;
}
return ctx;
}
从 ngx_rtmp_relay_create_local_ctx 函数返回后,就一直返回到 ngx_rtmp_relay_publish 函数中,接着执行 next_publish 的下
一个函数。这里为 ngx_rtmp_live_publish。
2.6.5 ngx_rtmp_live_publish
static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL || !lacf->live) {
goto next;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: publish: name=\'%s\' type=\'%s\'",
v->name, v->type);
/* join stream as publisher */
/* 构建一个 ngx_rtmp_live_ctx_t 结构体作为发布者 */
ngx_rtmp_live_join(s, v->name, 1);
/* 这里获取到的就是上面构建的 ngx_rtmp_live_ctx_t 结构体 */
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || !ctx->publishing) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent) {
/* 对之前客户端发送的 publish 返回一个响应 */
ngx_rtmp_send_status(s, "NetStream.Publish.Start",
"status", "Start publishing");
}
next:
return next_publish(s, v);
}
send: onStatus(\'NetStream.Publish.Start\') 图(12)
之后又回到 epoll_wait 处,等待监听的事件触发。接下来的分析先看 nginx 的一段打印:
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59761
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0004 d:088F6950
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705070
ngx_send.c:ngx_unix_send:37 send: fd:9 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:3 ev:00002001
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 7
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 9: 60000:958705071
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:5 ev:0001 d:088F67E8
ngx_event_accept.c:ngx_event_accept:58 accept on 0.0.0.0:1935, ready: 0
ngx_alloc.c:ngx_memalign:66 posix_memalign: 08930870:4096 @16
ngx_event_accept.c:ngx_event_accept:293 *3 accept: 192.168.1.82:39334 fd:10
ngx_rtmp_init.c:ngx_rtmp_init_connection:124 *3 client connected \'192.168.1.82\'
ngx_rtmp_handler.c:ngx_rtmp_set_chunk_size:823 setting chunk_size=128
ngx_alloc.c:ngx_memalign:66 posix_memalign: 089318A0:4096 @16
ngx_rtmp_limit_module.c:ngx_rtmp_limit_connect:87 rtmp limit: connect
ngx_rtmp_handshake.c:ngx_rtmp_handshake:589 handshake: start server handshake
ngx_rtmp_handshake.c:ngx_rtmp_alloc_handshake_buffer:208 handshake: allocating buffer
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 1
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:10 ev:0001 d:088F69C8
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 10: 958705071
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:10 op:2 ev:00000000
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 2
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=14.13.0.12 epoch=958645070
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=638
ngx_send.c:ngx_unix_send:37 send: fd:10 1537 of 1537
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 3
ngx_send.c:ngx_unix_send:37 send: fd:10 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 4
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 -1 of 1536
ngx_recv.c:ngx_unix_recv:150 recv() not ready (11: Resource temporarily unavailable)
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 0
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0001 d:088F6950
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705071
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:2 ev:00000000
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 8
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=13.10.14.13 epoch=958645071
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=557
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 9
ngx_send.c:ngx_unix_send:37 send: fd:9 1536 of 1536
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 10
ngx_rtmp_handshake.c:ngx_rtmp_handshake_done:362 handshake: done
ngx_rtmp_relay_module.c:ngx_rtmp_relay_handshake_done:1319 rtmp relay module: handhshake done
首先 fd = 9 为连接上游服务器(192.168.1.82:1935) 时创建的作为客户端的 STREAM 类型的 socket 套接字,而 fd = 5 为 nginx
启动时创建的 STREAM 类型的 socket 监听套接字。因此,从打印中可以看出,上面的打印是这么一个流程:
- epoll 监听的 fd 为 9 的套接字可写,因此调用该套接字上写事件的回调函数,从之前的源码可知,为
ngx_rtmp_handshake_send 函数,该函数将已经准备好的 C0 和 C1 通过该写事件对应的 send 函数,即
ngx_unix_send 函数发送给上游服务器(192.168.1.82:1935);发送完后进入 CLIENT_RECV_CHALLENGE(7) 阶段,
该阶段为等待接收服务器 S0 和 S1 的阶段; - epool 监控到服务器 fd:5 有数据可读,且为新连接,因此调用 ngx_event_accept 接收该客户端(192.168.1.82:39334)的
连接,接受连接后服务器使用 fd:10 与客户端进行交互,接着服务器开始进入 handshake 阶段; - 下面就开始了服务器 (192.168.1.82:1935, fd = 10) 和 客户端(192.168.1.82:39334, fd = 9) 的 hanshake 过程,就不再详
述,和之前分析的 hanshake 一样。
客户端发送 C2 后,会进入 NGX_RTMP_HANDSHAKE_CLIENT_DONE(10) 阶段,接着会调用该函数 ngx_rtmp_handshake_done:
static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
ngx_rtmp_free_handshake_buffers(s);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"handshake: done");
if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
NULL, NULL) != NGX_OK)
{
ngx_rtmp_finalize_session(s);
return;
}
ngx_rtmp_cycle(s);
}
该函数接着会调用到 ngx_rtmp_relay_handshake_done 函数:
static ngx_int_t
ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: handhshake done");
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: return");
return NGX_OK;
}
/* 主要是向服务器发送 connect 连接命令 */
return ngx_rtmp_relay_send_connect(s);
}
2.7 客户端(fd = 9)发送:connect
客户端(192.168.1.82:39334, fd = 9) hanshake 成功后会向服务器发送 connec 连接命令。
2.7.1 ngx_rtmp_relay_send_connect
static ngx_int_t
ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: send connect");
static double trans = NGX_RTMP_RELAY_CONNECT_TRANS;
static double acodecs = 3575;
static double vcodecs = 252;
static ngx_rtmp_amf_elt_t out_cmd[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("app"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("tcUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("pageUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("swfUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("flashVer"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audioCodecs"),
&acodecs, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videoCodecs"),
&vcodecs, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"connect", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_cmd, sizeof(out_cmd) }
};
ngx_rtmp_core_app_conf_t *cacf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_header_t h;
size_t len, url_len;
u_char *p, *url_end;
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (cacf == NULL || ctx == NULL) {
return NGX_ERROR;
}
/* app */
if (ctx->app.len) {
out_cmd[0].data = ctx->app.data;
out_cmd[0].len = ctx->app.len;
} else {
out_cmd[0].data = cacf->name.data;
out_cmd[0].len = cacf->name.len;
}
/* tcUrl */
if (ctx->tc_url.len) {
out_cmd[1].data = ctx->tc_url.data;
out_cmd[1].len = ctx->tc_url.len;
} else {
len = sizeof("rtmp://") - 1 + ctx->url.len +
sizeof("/") - 1 + ctx->app.len;
p = ngx_palloc(s->connection->pool, len);
if (p == NULL) {
return NGX_ERROR;
}
out_cmd[1].data = p;
p = ngx_cpymem(p, "rtmp://", sizeof("rtmp://") - 1);
url_len = ctx->url.len;
url_end = ngx_strlchr(ctx->url.data, ctx->url.data + ctx->url.len, \'/\');
if (url_end) {
url_len = (size_t) (url_end - ctx->url.data);
}
p = ngx_cpymem(p, ctx->url.data, url_len);
*p++ = \'/\';
p = ngx_cpymem(p, ctx->app.data, ctx->app.len);
out_cmd[1].len = p - (u_char *)out_cmd[1].data;
}
/* pageUrl */
out_cmd[2].data = ctx->page_url.data;
out_cmd[2].len = ctx->page_url.len;
/* swfUrl */
out_cmd[3].data = ctx->swf_url.data;
out_cmd[3].len = ctx->swf_url.len;
/* flashVer */
if (ctx->flash_ver.len) {
out_cmd[4].data = ctx->flash_ver.data;
out_cmd[4].len = ctx->flash_ver.len;
} else {
out_cmd[4].data = NGX_RTMP_RELAY_FLASHVER;
out_cmd[4].len = sizeof(NGX_RTMP_RELAY_FLASHVER) - 1;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
|| ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
|| ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? NGX_ERROR
: NGX_OK;
}
发送完这几个 RTMP 包,后,又回到 epoll_wait 中进行监听。
下面的分析区分一个服务器,两个客户端:
- 服务器:192.168.1.82:1935
- 客户端:obs 推流
- 客户端:192.168.1.82:xxxx
2.8 服务器 接收 客户端 obs: amf_meta(18)
此时,监听到 obs 客户端发送的类型为 amf_meta(18) 的 rtmp 消息。
receive: @setDataFrame(meta_data 18) 图(13)
对于 "@setDataFrame",仅有 ngx_rtmp_codec_module 模块对其设置了会调函数,为 ngx_rtmp_codec_meta_data 函数:
2.8.1 ngx_rtmp_codec_meta_data
static ngx_int_t
ngx_rtmp_codec_meta_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_codec_app_conf_t *cacf;
ngx_rtmp_codec_ctx_t *ctx;
ngx_uint_t skip;
static struct {
double width;
double height;
double duration;
double frame_rate;
double video_data_rate;
double video_codec_id_n;
u_char video_codec_id_s[32];
double audio_data_rate;
double audio_codec_id_n;
u_char audio_codec_id_s[32];
u_char profile[32];
u_char level[32];
} v;
static ngx_rtmp_amf_elt_t in_video_codec_id[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.video_codec_id_n, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.video_codec_id_s, sizeof(v.video_codec_id_s) },
};
static ngx_rtmp_amf_elt_t in_audio_codec_id[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.audio_codec_id_n, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.audio_codec_id_s, sizeof(v.audio_codec_id_s) },
};
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_string("width"),
&v.width, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("height"),
&v.height, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("duration"),
&v.duration, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("framerate"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("fps"),
&v.frame_rate, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videodatarate"),
&v.video_data_rate, 0 },
{ NGX_RTMP_AMF_VARIANT,
ngx_string("videocodecid"),
in_video_codec_id, sizeof(in_video_codec_id) },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audiodatarate"),
&v.audio_data_rate, 0 },
{ NGX_RTMP_AMF_VARIANT,
ngx_string("audiocodecid"),
in_audio_codec_id, sizeof(in_audio_codec_id) },
{ NGX_RTMP_AMF_STRING,
ngx_string("profile"),
&v.profile, sizeof(v.profile) },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_codec_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
}
ngx_memzero(&v, sizeof(v));
/* use -1 as a sign of unchanged data;
* 0 is a valid value for uncompressed audio */
v.audio_codec_id_n = -1;
/* FFmpeg sends a string in front of actal metadata; ignore it */
skip = !(in->buf->last > in->buf->pos
&& *in->buf->pos == NGX_RTMP_AMF_STRING);
if (ngx_rtmp_receive_amf(s, in, in_elts + skip,
sizeof(in_elts) / sizeof(in_elts[0]) - skip))
{
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"codec: error parsing data frame");
return NGX_OK;
}
ctx->width = (ngx_uint_t) v.width;
ctx->height = (ngx_uint_t) v.height;
ctx->duration = (ngx_uint_t) v.duration;
ctx->frame_rate = (ngx_uint_t) v.frame_rate;
ctx->video_data_rate = (ngx_uint_t) v.video_data_rate;
ctx->video_codec_id = (ngx_uint_t) v.video_codec_id_n;
ctx->audio_data_rate = (ngx_uint_t) v.audio_data_rate;
ctx->audio_codec_id = (v.audio_codec_id_n == -1
? 0 : v.audio_codec_id_n == 0
? NGX_RTMP_AUDIO_UNCOMPRESSED : (ngx_uint_t) v.audio_codec_id_n);
ngx_memcpy(ctx->profile, v.profile, sizeof(v.profile));
ngx_memcpy(ctx->level, v.level, sizeof(v.level));
ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"codec: data frame: "
"width=%ui height=%ui duration=%ui frame_rate=%ui "
"video=%s (%ui) audio=%s (%ui)",
ctx->width, ctx->he以上是关于Nginx-rtmp直播之业务流程分析的主要内容,如果未能解决你的问题,请参考以下文章
搭建rtmp直播流服务之4:videojs和ckPlayer开源播放器二次开发(播放rtmphls直播流及普通视频)
Nginx-rtmp之 ngx_rtmp_send.c 文件分析
NGINX-RTMP直播服务器搭建-OBS录制推流-VLC视频流播放
HLS NGINX-RTMP [错误] 1281#0:* 58 hls:强制片段拆分:10.002 秒
Gavin老师Transformer直播课感悟 - Rasa对话机器人项目实战之教育领域Education Bot项目架构运行测试流程分析及Rasa interactive实验分析(六十)