event/modules/ngx_epoll_module.c源文件分析
本章我们介绍一下nginx epoll模块的实现。
1. NGX_TEST_BUILD_EPOLL测试
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
#if (NGX_TEST_BUILD_EPOLL)
/* epoll declarations */
#define EPOLLIN 0x001
#define EPOLLPRI 0x002
#define EPOLLOUT 0x004
#define EPOLLRDNORM 0x040
#define EPOLLRDBAND 0x080
#define EPOLLWRNORM 0x100
#define EPOLLWRBAND 0x200
#define EPOLLMSG 0x400
#define EPOLLERR 0x008
#define EPOLLHUP 0x010
#define EPOLLRDHUP 0x2000
#define EPOLLET 0x80000000
#define EPOLLONESHOT 0x40000000
#define EPOLL_CTL_ADD 1
#define EPOLL_CTL_DEL 2
#define EPOLL_CTL_MOD 3
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events;
epoll_data_t data;
};
int epoll_create(int size);
int epoll_create(int size)
{
return -1;
}
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
{
return -1;
}
int epoll_wait(int epfd, struct epoll_event *events, int nevents, int timeout);
int epoll_wait(int epfd, struct epoll_event *events, int nevents, int timeout)
{
return -1;
}
#if (NGX_HAVE_EVENTFD)
#define SYS_eventfd 323
#endif
#if (NGX_HAVE_FILE_AIO)
#define SYS_io_setup 245
#define SYS_io_destroy 246
#define SYS_io_getevents 247
typedef u_int aio_context_t;
struct io_event {
uint64_t data; /* the data field from the iocb */
uint64_t obj; /* what iocb this event came from */
int64_t res; /* result code for this event */
int64_t res2; /* secondary result */
};
#endif
#endif /* NGX_TEST_BUILD_EPOLL */
上面只是用于测试epoll能不能编译过,这里不做介绍。
2. ngx_epoll_conf_t数据结构
typedef struct {
ngx_uint_t events;
ngx_uint_t aio_requests;
} ngx_epoll_conf_t;
此数据结构用于保存epoll配置文件中相应的配置信息:
- events: 用于指示epoll_wait每次能处理的最大事件数。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
- aio_requests: 当aio搭配epoll事件驱动机制一起使用时,单个worker进程最大的未完成异步IO操作数。(在v1.1.4和1.0.7中出现,当前我们并不使用aio)
3. 相关静态函数声明
//作为epoll module上下文初始化的回调函数
static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer);
//当前我们支持NGX_HAVE_EVENTFD宏定义。主要是实现某种类似与pipe这样的通知机制。当前主要用在nginx线程池的实现方面,当任务处理完成,异步通知主线程
#if (NGX_HAVE_EVENTFD)
//初始化notify通知机制
static ngx_int_t ngx_epoll_notify_init(ngx_log_t *log);
//收到通知之后的回调函数,所关联的事件为notify_event
static void ngx_epoll_notify_handler(ngx_event_t *ev);
#endif
//事件驱动机制退出时的回调函数
static void ngx_epoll_done(ngx_cycle_t *cycle);
//往epoll事件驱动机制中添加一个事件
static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event,
ngx_uint_t flags);
//从epoll事件驱动机制中移除一个事件
static ngx_int_t ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event,
ngx_uint_t flags);
//将一个connection(连接)添加到事件驱动机制中,连接上的读、写事件即被加入到了事件驱动机制中
static ngx_int_t ngx_epoll_add_connection(ngx_connection_t *c);
//从事件驱动机制中删除一个连接的读、写事件
static ngx_int_t ngx_epoll_del_connection(ngx_connection_t *c,
ngx_uint_t flags);
#if (NGX_HAVE_EVENTFD)
//通过notify_fd向外发出通知
static ngx_int_t ngx_epoll_notify(ngx_event_handler_pt handler);
#endif
//epoll事件驱动机制处理事件的主流程
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags);
//当前我们不支持NGX_HAVE_FILE_AIO宏定义, 此函数作为获取异步IO通知的回调函数
#if (NGX_HAVE_FILE_AIO)
static void ngx_epoll_eventfd_handler(ngx_event_t *ev);
#endif
//作为epoll模块初始化时,会通过调用此函数创建相应的配置信息ngx_epoll_conf_t
static void *ngx_epoll_create_conf(ngx_cycle_t *cycle);
//作为epoll模块初始化时,会通过调用此函数来初始化相应的配置信息ngx_epoll_conf_t
static char *ngx_epoll_init_conf(ngx_cycle_t *cycle, void *conf);
4. 相关变量声明
static int ep = -1;
static struct epoll_event *event_list;
static ngx_uint_t nevents;
#if (NGX_HAVE_EVENTFD)
static int notify_fd = -1;
static ngx_event_t notify_event;
static ngx_connection_t notify_conn;
#endif
#if (NGX_HAVE_FILE_AIO)
int ngx_eventfd = -1;
aio_context_t ngx_aio_ctx = 0;
static ngx_event_t ngx_eventfd_event;
static ngx_connection_t ngx_eventfd_conn;
#endif
static ngx_str_t epoll_name = ngx_string("epoll");
static ngx_command_t ngx_epoll_commands[] = {
{ ngx_string("epoll_events"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, events),
NULL },
{ ngx_string("worker_aio_requests"),
NGX_EVENT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_epoll_conf_t, aio_requests),
NULL },
ngx_null_command
};
ngx_event_module_t ngx_epoll_module_ctx = {
&epoll_name,
ngx_epoll_create_conf, /* create configuration */
ngx_epoll_init_conf, /* init configuration */
{
ngx_epoll_add_event, /* add an event */
ngx_epoll_del_event, /* delete an event */
ngx_epoll_add_event, /* enable an event */
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
ngx_epoll_notify, /* trigger a notify */
#else
NULL, /* trigger a notify */
#endif
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */
ngx_epoll_done, /* done the events */
}
};
ngx_module_t ngx_epoll_module = {
NGX_MODULE_V1,
&ngx_epoll_module_ctx, /* module context */
ngx_epoll_commands, /* module directives */
NGX_EVENT_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
下面我们简要介绍一下这些变量:
-
ep: 存放epoll_create创建的句柄对象
-
event_list: 存放epoll_wait()获取到的事件
-
nevents: event_list长度
-
notify_fd: 用于发送或接收通知的fd
-
notify_event: 通知事件
-
notify_conn: notify所关联的ngx_connection_t对象
-
ngx_eventfd: aio相关的fd,epoll监控此文件句柄。当前不支持NGX_HAVE_FILE_AIO
-
ngx_aio_ctx: aio所关联的上下文对象。当前不支持NGX_HAVE_FILE_AIO
-
ngx_eventfd_event: aio所关联的事件。当前不支持NGX_HAVE_FILE_AIO
-
ngx_eventfd_conn: aio所关联的ngx_connection_t对象。当前不支持NGX_HAVE_FILE_AIO
-
epoll_name: 只是epoll模块的名称
-
ngx_epoll_commands: epoll模块所支持的指令
-
ngx_epoll_module_ctx: epoll模块所关联的上下文对象
-
ngx_epoll_module: epoll模块对象
6. aio操作相关
#if (NGX_HAVE_FILE_AIO)
/*
* We call io_setup(), io_destroy() io_submit(), and io_getevents() directly
* as syscalls instead of libaio usage, because the library header file
* supports eventfd() since 0.3.107 version only.
*/
static int
io_setup(u_int nr_reqs, aio_context_t *ctx)
{
return syscall(SYS_io_setup, nr_reqs, ctx);
}
static int
io_destroy(aio_context_t ctx)
{
return syscall(SYS_io_destroy, ctx);
}
static int
io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events,
struct timespec *tmo)
{
return syscall(SYS_io_getevents, ctx, min_nr, nr, events, tmo);
}
static void
ngx_epoll_aio_init(ngx_cycle_t *cycle, ngx_epoll_conf_t *epcf)
{
int n;
struct epoll_event ee;
#if (NGX_HAVE_SYS_EVENTFD_H)
ngx_eventfd = eventfd(0, 0);
#else
ngx_eventfd = syscall(SYS_eventfd, 0);
#endif
if (ngx_eventfd == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"eventfd() failed");
ngx_file_aio = 0;
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"eventfd: %d", ngx_eventfd);
n = 1;
if (ioctl(ngx_eventfd, FIONBIO, &n) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ioctl(eventfd, FIONBIO) failed");
goto failed;
}
if (io_setup(epcf->aio_requests, &ngx_aio_ctx) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"io_setup() failed");
goto failed;
}
ngx_eventfd_event.data = &ngx_eventfd_conn;
ngx_eventfd_event.handler = ngx_epoll_eventfd_handler;
ngx_eventfd_event.log = cycle->log;
ngx_eventfd_event.active = 1;
ngx_eventfd_conn.fd = ngx_eventfd;
ngx_eventfd_conn.read = &ngx_eventfd_event;
ngx_eventfd_conn.log = cycle->log;
ee.events = EPOLLIN|EPOLLET;
ee.data.ptr = &ngx_eventfd_conn;
if (epoll_ctl(ep, EPOLL_CTL_ADD, ngx_eventfd, &ee) != -1) {
return;
}
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, eventfd) failed");
if (io_destroy(ngx_aio_ctx) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"io_destroy() failed");
}
failed:
if (close(ngx_eventfd) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"eventfd close() failed");
}
ngx_eventfd = -1;
ngx_aio_ctx = 0;
ngx_file_aio = 0;
}
#endif
我们当前不支持aio,这里不做介绍。
7. 函数ngx_epoll_init()
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
ngx_epoll_conf_t *epcf;
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
if (ep == -1) {
ep = epoll_create(cycle->connection_n / 2);
if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
#if (NGX_HAVE_EVENTFD)
if (ngx_epoll_notify_init(cycle->log) != NGX_OK) {
ngx_epoll_module_ctx.actions.notify = NULL;
}
#endif
#if (NGX_HAVE_FILE_AIO)
ngx_epoll_aio_init(cycle, epcf);
#endif
}
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}
nevents = epcf->events;
ngx_io = ngx_os_io;
ngx_event_actions = ngx_epoll_module_ctx.actions;
#if (NGX_HAVE_CLEAR_EVENT)
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
|NGX_USE_GREEDY_EVENT
|NGX_USE_EPOLL_EVENT;
return NGX_OK;
}
此函数会在epoll模块初始化时被调用。下面我们简要分析一下函数的实现:
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
//1) 获取相应的配置,通常情况下我们不会在配置文件中配置对应选项,那么这里采用默认值
//events为512,而aio_requests为32
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);
//2) 初始化epoll
if(ep == -1){
//2.1) 调用epoll_create创建ep句柄,当前epoll_create()参数其实是无作用的,但是要设为大于0
ep = epoll_create(cycle->connection_n / 2);
//2.2) 初始化notify机制
ngx_epoll_notify_init()
//2.3) aio初始化,当前我们并不支持
#if (NGX_HAVE_FILE_AIO)
ngx_epoll_aio_init(cycle, epcf);
#endif
}
//3) 为event_list分配空间,用于保存通过epoll_wait获取到的事件
//4) 保存相应的变量值
nevents = epcf->events;
ngx_io = ngx_os_io; //此处保存epoll事件驱动机制下的TCP、UDP发送与接收方法
ngx_event_actions = ngx_epoll_module_ctx.actions;
//这里我们支持NGX_HAVE_CLEAR_EVENT宏定义,采用边沿触发
//NGX_USE_GREEDY_EVENT:表示对应的事件驱动机制的event filter需要重复执行IO操作,直到EAGAIN
#if (NGX_HAVE_CLEAR_EVENT)
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
|NGX_USE_GREEDY_EVENT
|NGX_USE_EPOLL_EVENT;
return NGX_OK;
}
8. 函数ngx_epoll_notify_init()
#if (NGX_HAVE_EVENTFD)
static ngx_int_t
ngx_epoll_notify_init(ngx_log_t *log)
{
struct epoll_event ee;
#if (NGX_HAVE_SYS_EVENTFD_H)
notify_fd = eventfd(0, 0);
#else
notify_fd = syscall(SYS_eventfd, 0);
#endif
if (notify_fd == -1) {
ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "eventfd() failed");
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0,
"notify eventfd: %d", notify_fd);
notify_event.handler = ngx_epoll_notify_handler;
notify_event.log = log;
notify_event.active = 1;
notify_conn.fd = notify_fd;
notify_conn.read = ¬ify_event;
notify_conn.log = log;
ee.events = EPOLLIN|EPOLLET;
ee.data.ptr = ¬ify_conn;
if (epoll_ctl(ep, EPOLL_CTL_ADD, notify_fd, &ee) == -1) {
ngx_log_error(NGX_LOG_EMERG, log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, eventfd) failed");
if (close(notify_fd) == -1) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"eventfd close() failed");
}
return NGX_ERROR;
}
return NGX_OK;
}
#endif
本函数用于初始化事件通知机制,用来实现多进程或多线程的之间的等待/通知通知机制(与pipe功能类似)。请参看eventfd 的分析与具体例子。本函数较为简单,这里不做介绍。
9. 函数ngx_epoll_notify_handler()
#if (NGX_HAVE_EVENTFD)
static void
ngx_epoll_notify_handler(ngx_event_t *ev)
{
ssize_t n;
uint64_t count;
ngx_err_t err;
ngx_event_handler_pt handler;
if (++ev->index == NGX_MAX_UINT32_VALUE) {
ev->index = 0;
n = read(notify_fd, &count, sizeof(uint64_t));
err = ngx_errno;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"read() eventfd %d: %z count:%uL", notify_fd, n, count);
if ((size_t) n != sizeof(uint64_t)) {
ngx_log_error(NGX_LOG_ALERT, ev->log, err,
"read() eventfd %d failed", notify_fd);
}
}
handler = ev->data;
handler(ev);
}
#endif
当接收到eventfd发送来的通知事件时,会调用此函数。每获取到一次通知,ev->index的值就会加1,当达到最大时就会调用read()函数一次读出。
10. 函数ngx_epoll_done()
static void
ngx_epoll_done(ngx_cycle_t *cycle)
{
if (close(ep) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"epoll close() failed");
}
ep = -1;
#if (NGX_HAVE_EVENTFD)
if (close(notify_fd) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"eventfd close() failed");
}
notify_fd = -1;
#endif
#if (NGX_HAVE_FILE_AIO)
if (ngx_eventfd != -1) {
if (io_destroy(ngx_aio_ctx) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"io_destroy() failed");
}
if (close(ngx_eventfd) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"eventfd close() failed");
}
ngx_eventfd = -1;
}
ngx_aio_ctx = 0;
#endif
ngx_free(event_list);
event_list = NULL;
nevents = 0;
}
当nginx epoll事件驱动机制退出时,调用此方法相应的系统资源。
11. 函数ngx_epoll_add_event()
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
c = ev->data;
events = (uint32_t) event;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}
if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;
} else {
op = EPOLL_CTL_ADD;
}
ee.events = events | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll add event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
ev->active = 1;
#if 0
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif
return NGX_OK;
}
本函数用于实现向epoll事件驱动机制中添加相应的事件。下面我们简要分析一下函数的实现:
static ngx_int_t
ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
if (event == NGX_READ_EVENT) {
//1) 若添加的读事件,这里我们要保存 '写事件' 的相关状态(原因,主要是与epoll添加、移除事件的机制有关)
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif
}else{
//2) 若添加的写事件,这里我们要保存 '读事件' 的相关状态(原因,主要是与epoll添加、移除事件的机制有关)
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}
//3) 判定到底是采用add还是modify方法来添加事件
if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;
} else {
op = EPOLL_CTL_ADD;
}
//4) 对应fd所绑定的事件信息。这里注意ev->instance用于侦测epoll中的陈旧事件(stale event):
//因为当connection被关闭是ngx_connection_t相应的变量值都会清0
ee.events = events | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
//5) 添加到epoll事件驱动机制中
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
}
}
12. 函数ngx_epoll_del_event()
static ngx_int_t
ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
int op;
uint32_t prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
/*
* when the file descriptor is closed, the epoll automatically deletes
* it from its queue, so we do not need to delete explicitly the event
* before the closing the file descriptor
*/
if (flags & NGX_CLOSE_EVENT) {
ev->active = 0;
return NGX_OK;
}
c = ev->data;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
}
if (e->active) {
op = EPOLL_CTL_MOD;
ee.events = prev | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
} else {
op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;
}
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll del event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
ev->active = 0;
return NGX_OK;
}
本函数用于从epoll事件驱动机制中移除一个事件。下面我们简要分析一下函数的实现:
static ngx_int_t
ngx_epoll_del_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
{
//1) 当fd被关闭时,此时该fd所关联的事件会自动的从epoll事件驱动机制中移除,我们可以不用做任何处理
if (flags & NGX_CLOSE_EVENT) {
ev->active = 0;
return NGX_OK;
}
if (event == NGX_READ_EVENT) {
//2) 若移除的是读事件,这里我们要保存 '写事件' 的相关状态(原因,主要是与epoll添加、移除事件的机制有关)
e = c->write;
prev = EPOLLOUT;
}else{
//3) 若移除的是写事件,这里我们要保存 '读事件' 的相关状态(原因,主要是与epoll添加、移除事件的机制有关)
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
}
//4) 判定到底是采用modify还是del方法来移除事件
//5) 移除事件
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
}
}
12. 函数
static ngx_int_t
ngx_epoll_add_connection(ngx_connection_t *c)
{
struct epoll_event ee;
ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP;
ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll add connection: fd:%d ev:%08XD", c->fd, ee.events);
if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
return NGX_ERROR;
}
c->read->active = 1;
c->write->active = 1;
return NGX_OK;
}
将一个connection(连接)添加到事件驱动机制中,连接上的读、写事件即被加入到了事件驱动机制中
13. 函数ngx_epoll_del_connection()
static ngx_int_t
ngx_epoll_del_connection(ngx_connection_t *c, ngx_uint_t flags)
{
int op;
struct epoll_event ee;
/*
* when the file descriptor is closed the epoll automatically deletes
* it from its queue so we do not need to delete explicitly the event
* before the closing the file descriptor
*/
if (flags & NGX_CLOSE_EVENT) {
c->read->active = 0;
c->write->active = 0;
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll del connection: fd:%d", c->fd);
op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
c->read->active = 0;
c->write->active = 0;
return NGX_OK;
}
从事件驱动机制中删除一个连接的读、写事件。这里注意,如果关闭连接的话,则让操作系统自动的将相应的事件移除
14. 函数
#if (NGX_HAVE_EVENTFD)
static ngx_int_t
ngx_epoll_notify(ngx_event_handler_pt handler)
{
static uint64_t inc = 1;
notify_event.data = handler;
if ((size_t) write(notify_fd, &inc, sizeof(uint64_t)) != sizeof(uint64_t)) {
ngx_log_error(NGX_LOG_ALERT, notify_event.log, ngx_errno,
"write() to eventfd %d failed", notify_fd);
return NGX_ERROR;
}
return NGX_OK;
}
#endif
使用eventfd向外发出通知。
15. 函数
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;
/* NGX_TIMER_INFINITE == INFTIM */
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
if (err) {
if (err == NGX_EINTR) {
if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}
level = NGX_LOG_INFO;
} else {
level = NGX_LOG_ALERT;
}
ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
rev = c->read;
if (c->fd == -1 || rev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
revents = event_list[i].events;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);
}
#if 0
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"strange epoll_wait() events fd:%d ev:%04XD",
c->fd, revents);
}
#endif
if ((revents & (EPOLLERR|EPOLLHUP))
&& (revents & (EPOLLIN|EPOLLOUT)) == 0)
{
/*
* if the error events were returned without EPOLLIN or EPOLLOUT,
* then add these flags to handle the events at least in one
* active handler
*/
revents |= EPOLLIN|EPOLLOUT;
}
if ((revents & EPOLLIN) && rev->active) {
#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
#endif
rev->ready = 1;
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
rev->handler(rev);
}
}
wev = c->write;
if ((revents & EPOLLOUT) && wev->active) {
if (c->fd == -1 || wev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);
} else {
wev->handler(wev);
}
}
}
return NGX_OK;
}
此函数作为epoll事件驱动机制处理事件的主函数。下面我们简要分析一下函数的实现:
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
//1) 等待事件的产生
events = epoll_wait(ep, event_list, (int) nevents, timer);
//2) 保存相应的错误信息
err = (events == -1) ? ngx_errno : 0;
//3) 更新Nginx缓存时间
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
//3) 错误处理
if (err){
if (err == NGX_EINTR) {
//SIGALRM信号产生的中断,这里我们上面已经更新了时间,因此这里直接将ngx_event_timer_alarm变量置为0
if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}
}
}
//4) 未返回任何事件
if (events == 0) {
//超时导致的返回
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
}
//5) 遍历返回的事件列表
for (i = 0; i < events; i++) {
if (c->fd == -1 || rev->instance != instance) {
//5.1) 对应的连接已经关闭(连接相关的字段都会清零),本事件是属于一个陈旧事件
}
if (revents & (EPOLLERR|EPOLLHUP)) {
//5.2) 在对应的fd上产生了错误,这里打印错误提示信息
}
if ((revents & (EPOLLERR|EPOLLHUP))
&& (revents & (EPOLLIN|EPOLLOUT)) == 0)
{
//5.3) 对应的fd上产生了错误,也没有产生读、写事件,这里为了能将相应的错误告知
//外层,强制revents |= EPOLLIN|EPOLLOUT;
}
//5.4) 处理可读事件
if ((revents & EPOLLIN) && rev->active) {
//读取到了对端关闭
#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
#endif
//ready置为1,表示有可读事件
rev->ready = 1;
// post事件或直接处理
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
rev->handler(rev);
}
}
//5.5) 处理可写事件
wev = c->write;
if ((revents & EPOLLOUT) && wev->active) {
if (c->fd == -1 || wev->instance != instance) {
// 对应的连接已经关闭(连接相关的字段都会清零),本事件是属于一个陈旧事件
continue;
}
//ready置为1,表示有可写事件
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
// post事件或直接处理
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);
} else {
wev->handler(wev);
}
}
}
}
16. 函数ngx_epoll_eventfd_handler()
#if (NGX_HAVE_FILE_AIO)
static void
ngx_epoll_eventfd_handler(ngx_event_t *ev)
{
int n, events;
long i;
uint64_t ready;
ngx_err_t err;
ngx_event_t *e;
ngx_event_aio_t *aio;
struct io_event event[64];
struct timespec ts;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, 0, "eventfd handler");
n = read(ngx_eventfd, &ready, 8);
err = ngx_errno;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0, "eventfd: %d", n);
if (n != 8) {
if (n == -1) {
if (err == NGX_EAGAIN) {
return;
}
ngx_log_error(NGX_LOG_ALERT, ev->log, err, "read(eventfd) failed");
return;
}
ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
"read(eventfd) returned only %d bytes", n);
return;
}
ts.tv_sec = 0;
ts.tv_nsec = 0;
while (ready) {
events = io_getevents(ngx_aio_ctx, 1, 64, event, &ts);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"io_getevents: %d", events);
if (events > 0) {
ready -= events;
for (i = 0; i < events; i++) {
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"io_event: %XL %XL %L %L",
event[i].data, event[i].obj,
event[i].res, event[i].res2);
e = (ngx_event_t *) (uintptr_t) event[i].data;
e->complete = 1;
e->active = 0;
e->ready = 1;
aio = e->data;
aio->res = event[i].res;
ngx_post_event(e, &ngx_posted_events);
}
continue;
}
if (events == 0) {
return;
}
/* events == -1 */
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"io_getevents() failed");
return;
}
}
#endif
当前我们不支持aio,这里不做介绍。
17. 函数ngx_epoll_create_conf()
static void *
ngx_epoll_create_conf(ngx_cycle_t *cycle)
{
ngx_epoll_conf_t *epcf;
epcf = ngx_palloc(cycle->pool, sizeof(ngx_epoll_conf_t));
if (epcf == NULL) {
return NULL;
}
epcf->events = NGX_CONF_UNSET;
epcf->aio_requests = NGX_CONF_UNSET;
return epcf;
}
本函数用于创建epoll模块所需要的配置信息。
18. 函数ngx_epoll_init_conf()
static char *
ngx_epoll_init_conf(ngx_cycle_t *cycle, void *conf)
{
ngx_epoll_conf_t *epcf = conf;
ngx_conf_init_uint_value(epcf->events, 512);
ngx_conf_init_uint_value(epcf->aio_requests, 32);
return NGX_CONF_OK;
}
本函数用于默认初始化epoll的配置。
[参看]