rtmp直播地址怎么打开(Nginx-rtmp直播之业务流程分析)

1.1 直播原理 总结有福利内行人都在看总结

使用 obsnginx 推送一个直播流,该直播流经 nginx-rtmp 的 ngx_rtmp_live_module 模块转发给 application live 应用,
然后使用 vlc 连接 live,播放该直播流。

1.2 nginx.conf

# 创建的子进程数 worker_processes 1; error_log stderr debug; daemon off; master_process off; events { worker_connections 1024; } rtmp { server { listen 1935; # rtmp传输端口 chunk_size 4096; # 数据传输块大小 application live { # 直播配置 live on; } # obs 将流推到该 push 应用,push 应用又将该流发布到 live 应用 application push { live on; push rtmp://192.168.1.82:1935/live; # 推流到上面的直播应用 } } } 1.3 obs 推流设置

  1. 点击 "+" 选择一个媒体源,确定,然后设置该媒体源,如下图:


点击 "设置" 选择 "流",设置推流地址,如下图,确定后即可进行推流:


1.4 使用 vlc 播放直播流


2. 源码分析:application push

首先开始分析从 obs 推送 rtmp 流到 nginx 服务器的整个流程。

2.1 监听连接

nginx 启动后,就会一直在 ngx_process_events 函数中的 epoll_eait 处休眠,监听客户端的连接:

static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { ... ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); /* nginx 最初运行时,timer 为 -1,即一直等待客户端连接 */ events = epoll_wait(ep, event_list, (int) nevents, timer); ... for (i = 0; i < events; i++) { c = event_list[i].data.ptr; instance = (uintptr_t) c & 1; c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1); /* 获取被监听的读事件 */ rev = c->read; /* 获取 epoll_wait 返回的事件标志 */ revents = event_list[i].events; ... /* 若是监听的事件可读,首次监听即表示有新连接到来 */ if ((revents & EPOLLIN) && rev->active) { ... rev->ready = 1; /* 若是开启了负载均衡,则先将该事件添加到 ngx_posted_accept_events * 延迟队列中 */ if (flags & NGX_POST_EVENTS) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { /* 否则,直接调用该读事件的回调函数,若是新连接则 * 调用的是 ngx_event_accept 函数 */ rev->handler(rev); } } ... } return NGX_OK; }

ngx_event_accept 函数中主要也就是接受客户端的连接,并调用该监听端口对应的回调函数:

void ngx_event_accept(ngx_event_t *ev) { ... do { ... s = accept(lc->fd, &sa.sockaddr, &socklen); ... /* 调用该监听端口对应的回调函数,对于 rtmp 模块,则固定为 ngx_rtmp_init_connection */ ls->handler(c); ... } while (ev->available); }

在 ngx_rtmp_init_connection 函数中先经过一系列的初始化后,开始接收与客户端进行 rtmp 的 handshake 过程。

下面从 hanshake 到 hanshake 成功后接收到第一个 rtmp 包之间仅以图片说明,就不再分析源码了。

2.2 handshake2.2.1 hs_stage: SERVER_recv_CHALLENGE(1)

该 hanshake 阶段即为等待接收客户端发送的 C0 和 C1 阶段。

receive: Handshake C0+C1 图(1)


接收到客户端发送的 C0 和 C1 后,服务器进入 NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE(2)阶段,即为
发送S0 和 S1 阶段。

2.2.2 hs_stage: SERVER_SEND_CHALLENGE(2) 和 SERVER_SEND_RESPONSE(3)

该 SERVER_SEND_CHALLENGE 阶段即为等待接收客户端发送的 S0 和 S1 阶段。但是实际上,服务器在发送完 S0 和
S1 后,进入到 SERVER_SEND_RESPONSE(3) 阶段后又立刻发送 S2,因此,在抓到的包如下:

send: Handshake S0+S1+S2 图(2)


2.2.3 hs_stage: SERVER_RECV_RESPONSE(4)

该阶段为等待接收客户端发送的 C2 阶段。

receive:Handshake C2 图(3)


至此,服务器和客户端的 rtmp handshake 过程完整,开始正常的信息交互阶段。

如下代码,接收到 C2 后,服务器即进入循环处理客户端的请求阶段:ngx_rtmp_cycle

static void ngx_rtmp_handshake_done(ngx_rtmp_session_t *s) { ngx_rtmp_free_handshake_buffers(s); ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "handshake: done"); if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE, NULL, NULL) != NGX_OK) { ngx_rtmp_finalize_session(s); return; } ngx_rtmp_cycle(s); }

ngx_rtmp_cycle 函数中,重新设置了当前 rtmp 连接的读、写事件的回调函数,当监听到客户端发送的数据时,将调用
ngx_rtmp_recv 函数进行处理。

void ngx_rtmp_cycle(ngx_rtmp_session_t *s) { ngx_connection_t *c; c = s->connection; c->read->handler = ngx_rtmp_recv; c->write->handler = ngx_rtmp_send; s->ping_evt.data = c; s->ping_evt.log = c->log; s->ping_evt.handler = ngx_rtmp_ping; ngx_rtmp_reset_ping(s); ngx_rtmp_recv(c->read); }

在 ngx_rtmp_recv 函数中,会循环接收客户端发来的 rtmp 包数据,接收到完整的一个 rtmp message 后,会根据该消息
的 rtmp message type,调用相应的函数进行处理,如,若为 20,即为 amf0 类型的命令消息,就会调用
ngx_rtmp_amf_message_handler 函数进行处理。

2.3 connect('push')

hanshake 成功后,接收到客户端发来的第一个 rtmp 包为连接 nginx.conf 中 rtmp{} 下的 application push{}
应用,如下图:

receive: connect('push') 图(4)


从该图可知,该消息类型为 20,即为 AMF0 Command,因此会调用 ngx_rtmp_amf_message_handler 对该消息进行解析,
然后对其中的命令 connect 调用预先设置好的 ngx_rtmp_cmd_connect_init 回调函数。在 ngx_rtmp_cmd_connect_init
函数中,继续解析该 connect 余下的消息后,开始 ngx_rtmp_connect 构件的 connect 函数链表,该链表中存放着各个
rtmp 模块对该 connect 命令所要做的操作(注:仅有部分 rtmp 模块会对该 connect 命令设置有回调函数,并且就算
设置了回调函数,也需要在配置文件中启用相应的模块才会真正执行该模块对 connect 的处理)。因此,对于 connect
命令,这里仅会真正处理 ngx_rtmp_cmd_module 模块设置 ngx_rtmp_cmd_connect 回调函数。

2.3.1 ngx_rtmp_cmd_connect

static ngx_int_t ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: connect"); ngx_rtmp_core_srv_conf_t *cscf; ngx_rtmp_core_app_conf_t **cacfp; ngx_uint_t n; ngx_rtmp_header_t h; u_char *p; static double trans; static double capabilities = NGX_RTMP_CAPABILITIES; static double object_encoding = 0; /* 以下内容为服务器将要对客户端的 connect 命令返回的 amf 类型的响应 */ static ngx_rtmp_amf_elt_t out_obj[] = { { NGX_RTMP_AMF_STRING, ngx_string("fmsVer"), NGX_RTMP_FMS_VERSION, 0 }, { NGX_RTMP_AMF_NUMBER, ngx_string("capabilities"), &capabilities, 0 }, }; static ngx_rtmp_amf_elt_t out_inf[] = { { NGX_RTMP_AMF_STRING, ngx_string("level"), "status", 0 }, { NGX_RTMP_AMF_STRING, ngx_string("code"), "NetConnection.Connect.Success", 0 }, { NGX_RTMP_AMF_STRING, ngx_string("description"), "Connection succeeded.", 0 }, { NGX_RTMP_AMF_NUMBER, ngx_string("objectEncoding"), &object_encoding, 0 } }; static ngx_rtmp_amf_elt_t out_elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, "_result", 0 }, { NGX_RTMP_AMF_NUMBER, ngx_null_string, &trans, 0 }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, out_obj, sizeof(out_obj) }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, out_inf, sizeof(out_inf) }, }; if (s->connected) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "connect: duplicate connection"); return NGX_ERROR; } cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); trans = v->trans; /* fill session parameters */ s->connected = 1; ngx_memzero(&h, sizeof(h)); h.csid = NGX_RTMP_CSID_AMF_INI; h.type = NGX_RTMP_MSG_AMF_CMD; #define NGX_RTMP_SET_STRPAR(name) s->name.len = ngx_strlen(v->name); s->name.data = ngx_palloc(s->connection->pool, s->name.len); ngx_memcpy(s->name.data, v->name, s->name.len) NGX_RTMP_SET_STRPAR(app); NGX_RTMP_SET_STRPAR(args); NGX_RTMP_SET_STRPAR(flashver); NGX_RTMP_SET_STRPAR(swf_url); NGX_RTMP_SET_STRPAR(tc_url); NGX_RTMP_SET_STRPAR(page_url); #undef NGX_RTMP_SET_STRPAR p = ngx_strlchr(s->app.data, s->app.data + s->app.len, '?'); if (p) { s->app.len = (p - s->app.data); } s->acodecs = (uint32_t) v->acodecs; s->vcodecs = (uint32_t) v->vcodecs; /* 找到客户端 connect 的应用配置 */ /* find application & set app_conf */ cacfp = cscf->applications.elts; for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) { if ((*cacfp)->name.len == s->app.len && ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0) { /* found app! */ s->app_conf = (*cacfp)->app_conf; break; } } if (s->app_conf == NULL) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "connect: application not found: '%V'", &s->app); return NGX_ERROR; } object_encoding = v->object_encoding; /* 发送应答窗口大小:ack_size 给客户端,该消息是用来通知对方应答窗口的大小, * 发送方在发送了等于窗口大小的数据之后,等的爱接收对方的应答消息(在接收 * 到应答消息之前停止发送数据)。接收当必须发送应答消息,在会话开始时,在 * 会话开始时,会从上一次发送应答之后接收到了等于窗口大小的数据 */ return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK || /* 发送 设置流带宽消息。发送此消息来说明对方的出口带宽限制,接收方以此来限制 * 自己的出口带宽,即限制未被应答的消息数据大小。接收到此消息的一方,如果 * 窗口大小与上一次发送的不一致,应该回复应答窗口大小的消息 */ ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK || /* 发送 设置块消息消息,用来通知对方新的最大的块大小。 */ ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK || ngx_rtmp_send_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK ? NGX_ERROR : NGX_OK; } send: ack_size 图(5)








2.4 releaseStream('test')

服务器响应客户端 connect 命令消息后,客户端接着发送 releaseStream 命令消息给服务器,但是 nginx-rtmp 中没有
任何一个 rtmp 模块对该命令设置有回调函数,因此,不进行处理,接着等待接收下一个消息。

receive: releaseStream('test') 图(9)



rtmp直播地址获取


2.5 createStream('')

接着服务器接收到客户端发来的 createStream 命令消息。

receive: createStream('') 图(10)


从以前的分析可知,此时,会调用 ngx_rtmp_cmd_create_stream_init 函数。

2.5.1 ngx_rtmp_cmd_create_stream_init

static ngx_int_t ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { static ngx_rtmp_create_stream_t v; static ngx_rtmp_amf_elt_t in_elts[] = { { NGX_RTMP_AMF_NUMBER, ngx_null_string, &v.trans, sizeof(v.trans) }, }; /* 解析该 createStream 命令消息,获取 v.trans 值,从图(10) 可知,为 4 */ if (ngx_rtmp_receive_amf(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream"); return ngx_rtmp_create_stream(s, &v); }

接着,从该函数中开始调用 ngx_rtmp_create_stream 构建的函数链表。这里调用到的是 ngx_rtmp_cmd_create_stream
函数。

2.5.2 ngx_rtmp_cmd_create_stream

static ngx_int_t ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream"); /* support one message stream per connection */ static double stream; static double trans; ngx_rtmp_header_t h; static ngx_rtmp_amf_elt_t out_elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, "_result", 0 }, { NGX_RTMP_AMF_NUMBER, ngx_null_string, &trans, 0 }, { NGX_RTMP_AMF_NULL, ngx_null_string, NULL, 0 }, { NGX_RTMP_AMF_NUMBER, ngx_null_string, &stream, sizeof(stream) }, }; trans = v->trans; stream = NGX_RTMP_MSID; ngx_memzero(&h, sizeof(h)); h.csid = NGX_RTMP_CSID_AMF_INI; h.type = NGX_RTMP_MSG_AMF_CMD; return ngx_rtmp_send_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ? NGX_DONE : NGX_ERROR; }

该函数主要是发送服务器对 createStream 的响应。

send: _result()




2.6 publish('test')

接着,客户端发送 publish 给服务器,用来发布一个有名字的流到服务器,其他客户端可以使用此流名来播放流,接收
发布的音频,视频,以及其他数据消息。

receive:publish('test') 图(11)




从图中可知,publish type 为 'live',即服务器不会保存客户端发布的流到文件中。

2.6.1 ngx_rtmp_cmd_publish_init

static ngx_int_t ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { static ngx_rtmp_publish_t v; static ngx_rtmp_amf_elt_t in_elts[] = { /* transaction is always 0 */ { NGX_RTMP_AMF_NUMBER, ngx_null_string, NULL, 0 }, { NGX_RTMP_AMF_NULL, ngx_null_string, NULL, 0 }, { NGX_RTMP_AMF_STRING, ngx_null_string, &v.name, sizeof(v.name) }, { NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING, ngx_null_string, &v.type, sizeof(v.type) }, }; ngx_memzero(&v, sizeof(v)); /* 从 publish 命令消息中获取 in_elts 中指定的值 */ if (ngx_rtmp_receive_amf(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } ngx_rtmp_cmd_fill_args(v.name, v.args); ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "publish: name='%s' args='%s' type=%s silent=%d", v.name, v.args, v.type, v.silent); return ngx_rtmp_publish(s, &v); }

接着,该函数开始调用 ngx_rtmp_publish 构建的函数链表。从 nginx-rtmp 的源码和 nginx.conf 的配置可知,主要调用
ngx_rtmp_relay_publish 和 ngx_rtmp_live_publish 两个函数。

由 rtmp 模块的排序,首先调用 ngx_rtmp_relay_publish。

2.6.2 ngx_rtmp_relay_publish

static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_target_t *target, **t; ngx_str_t name; size_t n; ngx_rtmp_relay_ctx_t *ctx; if (s->auto_pushed) { goto next; } ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); if (ctx && s->relay) { goto next; } racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); if (racf == NULL || racf->pushes.nelts == 0) { goto next; } /* v->name 中保存的是从客户端发送的 publish 命令消息中提取出的要发布的流名称 */ name.len = ngx_strlen(v->name); name.data = v->name; /* 从 pushes 数组中取出首元素,遍历该数组 */ t = racf->pushes.elts; for (n = 0; n < racf->pushes.nelts; ++n, ++t) { target = *t; /* 配置文件中是否指定了要推流的名称,若是,则检测指定的流名字与当前接收到的publish 流名 * 是否一致 */ if (target->name.len && (name.len != target->name.len || ngx_memcmp(name.data, target->name.data, name.len))) { continue; } if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) { continue; } ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, "relay: push failed name='%V' app='%V' " "playpath='%V' url='%V'", &name, &target->app, &target->play_path, &target->url.url); if (!ctx->push_evt.timer_set) { ngx_add_timer(&ctx->push_evt, racf->push_reconnect); } } next: return next_publish(s, v); } 2.6.3 ngx_rtmp_relay_push

ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_rtmp_relay_target_t *target) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "relay: create push name='%V' app='%V' playpath='%V' url='%V'", name, &target->app, &target->play_path, &target->url.url); return ngx_rtmp_relay_create(s, name, target, ngx_rtmp_relay_create_local_ctx, ngx_rtmp_relay_create_remote_ctx); } 2.6.4 ngx_rtmp_relay_create

static ngx_int_t ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_rtmp_relay_target_t *target, ngx_rtmp_relay_create_ctx_pt create_publish_ctx, ngx_rtmp_relay_create_ctx_pt create_play_ctx) { ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx; ngx_uint_t hash; racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); if (racf == NULL) { return NGX_ERROR; } /* 该函数主要是创建一个新的连接,连接推流url中指定的地址,即将该地址作为上游服务器的地址, * 向该上游服务器发起连接 */ play_ctx = create_play_ctx(s, name, target); if (play_ctx == NULL) { return NGX_ERROR; } hash = ngx_hash_key(name->data, name->len); cctx = &racf->ctx[hash % racf->nbuckets]; for (; *cctx; cctx = &(*cctx)->next) { if ((*cctx)->name.len == name->len && !ngx_memcmp(name->data, (*cctx)->name.data, name->len)) { break; } } if (*cctx) { play_ctx->publish = (*cctx)->publish; play_ctx->next = (*cctx)->play; (*cctx)->play = play_ctx; return NGX_OK; } /* 创建一个本地 ngx_rtmp_relay_ctx_t */ publish_ctx = create_publish_ctx(s, name, target); if (publish_ctx == NULL) { ngx_rtmp_finalize_session(play_ctx->session); return NGX_ERROR; } publish_ctx->publish = publish_ctx; publish_ctx->play = play_ctx; play_ctx->publish = publish_ctx; *cctx = publish_ctx; return NGX_OK; } 2.6.4.1 ngx_rtmp_relay_create_remote_ctx

static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, ngx_rtmp_relay_target_t *target) { ngx_rtmp_conf_ctx_t cctx; cctx.app_conf = s->app_conf; cctx.srv_conf = s->srv_conf; cctx.main_conf = s->main_conf; return ngx_rtmp_relay_create_connection(&cctx, name, target); } 2.6.4.2 ngx_rtmp_relay_create_connection

static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name, ngx_rtmp_relay_target_t *target) { ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_ctx_t *rctx; ngx_rtmp_addr_conf_t *addr_conf; ngx_rtmp_conf_ctx_t *addr_ctx; ngx_rtmp_session_t *rs; ngx_peer_connection_t *pc; ngx_connection_t *c; ngx_addr_t *addr; ngx_pool_t *pool; ngx_int_t rc; ngx_str_t v, *uri; u_char *first, *last, *p; racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module); ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0, "relay: create remote context"); pool = NULL; /* 分配一个内存池 */ pool = ngx_create_pool(4096, racf->log); if (pool == NULL) { return NULL; } /* 从内存池中为 ngx_rtmp_relay_ctx_t 结构体分配内存 */ rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t)); if (rctx == NULL) { goto clear; } /* 将发布的流名拷贝到新建的 ngx_rtmp_relay_ctx_t 中的 name 成员 */ if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) { goto clear; } /* 将配置文件中配置的 push 推流地址,即 url 拷贝到新建的 ngx_rtmp_relay_ctx_t * 结构体的 url 成员中 */ if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) { goto clear; } /* target->tag 指向 ngx_rtmp_relay_module 结构体的首地址 */ rctx->tag = target->tag; /* target->data 指向当前 data 所属的 ngx_rtmp_relay_ctx_t 结构体的首地址 */ rctx->data = target->data; #define NGX_RTMP_RELAY_STR_COPY(to, from) if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { goto clear; } /* 将以下 target 中的值拷贝到新建的 ngx_rtmp_relay_ctx_t 结构体的相应成员中 */ NGX_RTMP_RELAY_STR_COPY(app, app); NGX_RTMP_RELAY_STR_COPY(tc_url, tc_url); NGX_RTMP_RELAY_STR_COPY(page_url, page_url); NGX_RTMP_RELAY_STR_COPY(swf_url, swf_url); NGX_RTMP_RELAY_STR_COPY(flash_ver, flash_ver); NGX_RTMP_RELAY_STR_COPY(play_path, play_path); rctx->live = target->live; rctx->start = target->start; rctx->stop = target->stop; #undef NGX_RTMP_RELAY_STR_COPY /* 若 app 的值未知 */ if (rctx->app.len == 0 || rctx->play_path.len == 0) { /* 这里是从推流地址中提取出 app 的值,下面分析以 "push rtmp:192.168.1.82:1935/live;" * 为例,则提出的 live 将赋给 rctx->app */ /* parse uri */ uri = &target->url.uri; first = uri->data; last = uri->data + uri->len; if (first != last && *first == '/') { ++first; } if (first != last) { /* deduce app */ p = ngx_strlchr(first, last, '/'); if (p == NULL) { p = last; } if (rctx->app.len == 0 && first != p) { /* 这里 v.data 指向 "live" */ v.data = first; v.len = p - first; /* 将 "live" 赋给 rctx->app */ if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) { goto clear; } } /* deduce play_path */ if (p != last) { ++p; } /* 若播放路径为 NULL 且 p 不等于 last(注,这里 p 不等于 last 意味着 * "push rtmp:192.168.1.82:1935/live;" 的 "live" 字符串后面还有数据, * 但是,这里没有)*/ if (rctx->play_path.len == 0 && p != last) { v.data = p; v.len = last - p; if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v) != NGX_OK) { goto clear; } } } } /* 从内存池中为主动连接结构体 ngx_peer_connection_t 分配内存 */ pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t)); if (pc == NULL) { goto clear; } if (target->url.naddrs == 0) { ngx_log_error(NGX_LOG_ERR, racf->log, 0, "relay: no address"); goto clear; } /* get address */ /* 获取 推流地址 url 中指明的服务器地址(即推流的目标地址) * 如"push rtmp:192.168.1.82:1935/live;" 中的 "192.168.1.82:1935" */ addr = &target->url.addrs[target->counter % target->url.naddrs]; target->counter++; /* copy log to keep shared log unchanged */ rctx->log = *racf->log; pc->log = &rctx->log; /* 当使用长连接与上游服务器通信时,可通过该方法由连接池中获取一个新连接 */ pc->get = ngx_rtmp_relay_get_peer; /* 当使用长连接与上游服务器通信时,通过该方法将使用完毕的连接释放给连接池 */ pc->free = ngx_rtmp_relay_free_peer; /* 远端服务器的名称,这里其实就是 "192.168.1.82:1935" 该串字符串 */ pc->name = &addr->name; pc->socklen = addr->socklen; pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen); if (pc->sockaddr == NULL) { goto clear; } /* 将 addr->sockaddr 中保存的远端服务器的地址信息拷贝到 pc->sockaddr 中 */ ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen); /* 开始连接上游服务器 */ rc = ngx_event_connect_peer(pc); /* 由 ngx_event_connect_peer 源码可知,因为 socket 套接字被设置为非阻塞, * 因为首次 connect 必定失败,因此该函数返回 NGX_AGAIN */ if (rc != NGX_OK && rc != NGX_AGAIN ) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0, "relay: connection failed"); goto clear; } c = pc->connection; c->pool = pool; /* 推流 URL */ c->addr_text = rctx->url; addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t)); if (addr_conf == NULL) { goto clear; } addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t)); if (addr_ctx == NULL) { goto clear; } addr_conf->ctx = addr_ctx; addr_ctx->main_conf = cctx->main_conf; addr_ctx->srv_conf = cctx->srv_conf; ngx_str_set(&addr_conf->addr_text, "ngx-relay"); /* 为该主动连接初始化一个会话 */ rs = ngx_rtmp_init_session(c, addr_conf); if (rs == NULL) { /* no need to destroy pool */ return NULL; } rs->app_conf = cctx->app_conf; /* 置该标志位为 1 */ rs->relay = 1; rctx->session = rs; ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module); ngx_str_set(&rs->flashver, "ngx-local-relay"); #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif /* 此时作为客户端,开始向上游服务器发说送 hanshake 包,即 C0 + C1 */ ngx_rtmp_client_handshake(rs, 1); return rctx; clear: if (pool) { ngx_destroy_pool(pool); } return NULL; } 2.6.4.3 ngx_event_connect_peer

ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) { int rc, type; #if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX) in_port_t port; #endif ngx_int_t event; ngx_err_t err; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_connection_t *c; /* 该 get 方法其实没有做任何处理 */ rc = pc->get(pc, pc->data); if (rc != NGX_OK) { return rc; } type = (pc->type ? pc->type : SOCK_STREAM); /* 创建一个 socket 套接字 */ s = ngx_socket(pc->sockaddr->sa_family, type, 0); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d", (type == SOCK_STREAM) ? "stream" : "dgram", s); if (s == (ngx_socket_t) -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_socket_n " failed"); return NGX_ERROR; } /* 从连接池中获取一个空闲连接 */ c = ngx_get_connection(s, pc->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_close_socket_n "failed"); } return NGX_ERROR; } /* 当前 socket 的类型,是 STREAM 还是 DGRAM,这里为 STREAM */ c->type = type; /* 若设置了接收缓冲区的大小,从上面知没有设置 */ if (pc->rcvbuf) { if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const void *) &pc->rcvbuf, sizeof(int)) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, "setsockopt(SO_RCVBUF) failed"); goto failed; } } /* 将该 socket 套接字设置为非阻塞 */ if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_nonblocking_n " failed"); goto failed; } /* local 保存的是本地地址信息,则上面可知,没有设置 */ if (pc->local) { #if (NGX_HAVE_TRANSPARENT_PROXY) if (pc->transparent) { if (ngx_event_connect_set_transparent(pc, s) != NGX_OK) { goto failed; } } #endif #if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX) port = ngx_inet_get_port(pc->local->sockaddr); #endif #if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT) if (pc->sockaddr->sa_family != AF_UNIX && port == 0) { static int bind_address_no_port = 1; if (bind_address_no_port) { if (setsockopt(s, IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, (const void *) &bind_address_no_port, sizeof(int)) == -1) { err = ngx_socket_errno; if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) { ngx_log_error(NGX_LOG_ALERT, pc->log, err, "setsockopt(IP_BIND_ADDRESS_NO_PORT) " "failed, ignored"); } else { bind_address_no_port = 0; } } } } #endif #if (NGX_LINUX) if (pc->type == SOCK_DGRAM && port != 0) { int reuse_addr = 1; if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const void *) &reuse_addr, sizeof(int)) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, "setsockopt(SO_REUSEADDR) failed"); goto failed; } } #endif if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) { ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno, "bind(%V) failed", &pc->local->name); goto failed; } } if (type == SOCK_STREAM) { /* 设置当前连接的 IO 回调函数 */ c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; /* 使用 sendfile */ c->sendfile = 1; if (pc->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } } else { /* type == SOCK_DGRAM */ c->recv = ngx_udp_recv; c->send = ngx_send; c->send_chain = ngx_udp_send_chain; } c->log_error = pc->log_error; /* 设置当前主动连接读写事件的回调函数 */ rev = c->read; wev = c->write; rev->log = pc->log; wev->log = pc->log; pc->connection = c; c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); /* 将该主动连接的读写事件添加到 epoll 等事件监控机制中 */ if (ngx_add_conn) { if (ngx_add_conn(c) == NGX_ERROR) { goto failed; } } ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connect to %V, fd:%d #%uA", pc->name, s, c->number); /* 连接该上游服务器,因为该 socket 套接字被设置为非阻塞,因此首次connect返回 -1,即失败 */ rc = connect(s, pc->sockaddr, pc->socklen); if (rc == -1) { err = ngx_socket_errno; if (err != NGX_EINPROGRESS #if (NGX_WIN32) /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */ && err != NGX_EAGAIN #endif ) { if (err == NGX_ECONNREFUSED #if (NGX_LINUX) /* * Linux returns EAGAIN instead of ECONNREFUSED * for unix sockets if listen queue is full */ || err == NGX_EAGAIN #endif || err == NGX_ECONNRESET || err == NGX_ENETDOWN || err == NGX_ENETUNREACH || err == NGX_EHOSTDOWN || err == NGX_EHOSTUNREACH) { level = NGX_LOG_ERR; } else { level = NGX_LOG_CRIT; } ngx_log_error(level, c->log, err, "connect() to %V failed", pc->name); ngx_close_connection(c); pc->connection = NULL; return NGX_DECLINED; } } /* 因此,从这里返回 NGX_AGAIN */ if (ngx_add_conn) { if (rc == -1) { /* NGX_EINPROGRESS */ return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK; } if (ngx_event_flags & NGX_USE_IOCP_EVENT) { ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno, "connect(): %d", rc); if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_blocking_n " failed"); goto failed; } /* * FreeBSD's aio allows to post an operation on non-connected socket. * NT does not support it. * * TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT */ rev->ready = 1; wev->ready = 1; return NGX_OK; } if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue */ event = NGX_CLEAR_EVENT; } else { /* select, poll, /dev/poll */ event = NGX_LEVEL_EVENT; } if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) { goto failed; } if (rc == -1) { /* NGX_EINPROGRESS */ if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) { goto failed; } return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK; failed: ngx_close_connection(c); pc->connection = NULL; return NGX_ERROR; } 2.6.4.4 ngx_rtmp_client_handshake

void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async) { ngx_connection_t *c; c = s->connection; /* 设置当前连接读写事件的回调函数 */ c->read->handler = ngx_rtmp_handshake_recv; c->write->handler = ngx_rtmp_handshake_send; ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "handshake: start client handshake"); /* 为该将要进行的 hanshake 过程分配数据缓存,用于存储接收/响应的 hanshake 包 */ s->hs_buf = ngx_rtmp_alloc_handshake_buffer(s); /* 设置当前 hanshake 阶段,即为 client send: C0 + C1 */ s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE; /* 构建 C0 + C1 的 数据包 */ if (ngx_rtmp_handshake_create_challenge(s, ngx_rtmp_client_version, &ngx_rtmp_client_partial_key) != NGX_OK) { ngx_rtmp_finalize_session(s); return; } /* 有前面的调用传入的参数可知,该值为 1,即为异步,因此这里暂时不向上游服务器发送 handshake, * 而是将其写事件添加到定时器和 epoll 中,等待下次循环监控到该写事件可写时才发送 C0 + C1 */ if (async) { /* 将该写事件添加到定时器中,超时时间为 s->timeout */ ngx_add_timer(c->write, s->timeout); /* 将该写事件添加到 epoll 等事件监控机制中 */ if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_rtmp_finalize_session(s); } return; } ngx_rtmp_handshake_send(c->write); } 2.6.4.5 ngx_rtmp_relay_create_local_ctx总结:

更多视频教程文档资料免费领取后台私信【资料】自行获取。


您可以还会对下面的文章感兴趣

使用微信扫描二维码后

点击右上角发送给好友