这篇文章我们再进一步,搞定高性能网络编程最后一块拼图——epoll。
前面我们在介绍select和poll的时候发现,它们总是有这样或者那样的缺点。比如,select有最大描述符限制。poll虽然突破了最大描述符限制,但它本质上还是通过遍历最大描述字个数的方式去监听描述符。
下面这张图展示了select、poll、epoll在不同数量描述符的情况下的耗时
可以看到,select和poll在相同描述符数量时表现几乎一致,这是由于它们要找到可用描述符都要去遍历一遍集合中所有的元素。而epoll在10和10000的描述符的对比下,变化微乎其微,这足以看出epoll的强大。
首先,还是老规矩,我们先来看一下epoll的函数签名,首先是创建一个epoll实例:
int epoll_create(int size);
int epoll_create1(int flags);
和select、poll不同,epoll首先需要先创建一个epoll实例。函数返回值大于0表示epoll实例,返回-1表示出错。
size参数在之前的版本中含义是期望监听的描述符大小,在Linux2.6.8之后这个参数会被忽略,内核会动态分配描述符的大小,但我们在传参的时候还是会传一个大于0的数。
epoll_create1的flags传0和epoll_create是一样的效果。当然如果传非0就可以进一步设置epoll的行为。我们使用man命令看一下epoll_create1的文档。如下:
man epoll_create1
...
epoll_create1()
If flags is 0, then, other than the fact that the obsolete size argument is dropped, epoll_create1() is the
same as epoll_create(). The following value can be included in flags to obtain different behavior:
EPOLL_CLOEXEC
Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. See the description of the
O_CLOEXEC flag in open(2) for reasons why this may be useful.
...
其中EPOLL_CLOEXEC一般用在执行exec系列函数启动新进程的时候,自动关闭原描述符,防止新的进程访问到原来老的描述符。
简单来说,当你创建一个子进程,但又不希望子进程访问到父进程创建的描述符,就可以使用EPOLL_CLOECEC。
创建完epoll实例之后,我们就可以对它进行各种控制了,控制epoll实例的函数是epoll_ctl,它的签名如下:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数返回0表示成功,返回-1表示出错。这个函数有4个参数:
第一个参数就是我们使用poll_create创建出来的epoll实例。
第二个参数表示增、删、改分别对应
- EPOLL_CTL_ADD: 向epoll实例注册文件描述符对应的事件
- EPOLL_CTL_DEL: 删除epoll实例中文件描述符对应的事件
- EPOLL_CTL_MOD: 修改epoll实例中文件描述符对应的事件
第三个参数是我们要注册事件的描述符,在这个系列文章中通常都是网络套接字
第四个参数是对事件的描述,是一个结构体,如下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll 事件*/
epoll_data_t data;
};
这里的事件和前面讲poll的时候基本上是一样的,下面是在使用epoll的时候,我们常用的事件类型:
- EPOLLIN: 表示描述符可读
- EPOLLOUT: 表示描述符可写
- EPOLLRDHUP: 表示描述符一端已经关闭或者半关闭
- EPOLLHUP: 表示对应描述符被挂起
- EPOLLET: 边缘触发模式edge-triggered,不设置默认level-triggered
事件设置好了之后,我们就可以调用epoll_wait来等待可用的描述符了,它的原型如下:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
函数返回大于0表示事件个数,返回0表示超时,出错返回-1
有4个参数:
第一个参数是epoll实例
第二个参数是返回的要处理的I/O事件,是一个数组,大小是epoll_wait返回值,每一个元素是一个待处理的I/O事件。
第三个参数表示epoll_wait可以返回的最大事件数。
第四个参数是超时时间,如果设置-1表示不超时,设置0表示立即返回,和select基本是一致的。
下面,我们将poll那篇文章中的例子通过epoll改造一下,改造后的代码如下:
#include "stdio.h"
#include "stdlib.h"
#include "sys/socket.h"
#include "sys/types.h"
#include "sys/epoll.h"
#include "unistd.h"
#include "string.h"
#include "arpa/inet.h"
#include "netinet/in.h"
#include "errno.h"
#include "fcntl.h"
#define MAX_EVENTS 1024
int create_sock();
void make_nonblocking(int fd);
int main(int argc, char *argv[]) {
int read_num, conn_fd, n, efd, i;
char buf[1024];
struct sockaddr_in client_addr;
int sock_fd = create_sock();
make_nonblocking(sock_fd);
struct epoll_event event;
struct epoll_event *events;
efd = epoll_create1(0);
if (efd == -1) {
perror("epoll_create1 failed.");
exit(1);
}
event.data.fd = sock_fd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, sock_fd, &event) == -1) {
perror("epoll_ctl failed.");
exit(1);
}
events = calloc(MAX_EVENTS, sizeof(event));
for (;;) {
read_num = epoll_wait(efd, events, MAX_EVENTS, -1);
for (i = 0; i < read_num; i++) {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP ||
(!(events[i].events & EPOLLIN))))
{
perror("epoll error.");
close(events[i].data.fd);
continue;
} else if (sock_fd == events[i].data.fd) {
socklen_t cli_len = sizeof(client_addr);
conn_fd = accept(sock_fd, (struct sockaddr *)&client_addr, &cli_len);
if (conn_fd == -1) {
perror("accept error.");
continue;
} else {
event.data.fd = conn_fd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_fd, &event) == -1) {
perror("epoll_ctl error.");
}
}
continue;
} else {
int client_fd = events[i].data.fd;
if (n = read(client_fd, buf, sizeof(buf)) > 0) {
if (write(client_fd, buf, n) < 0) {
perror("write failed.");
} else if (n == 0 || errno == ECONNRESET) {
close(client_fd);
continue;
} else {
write(client_fd, buf, n);
}
}
}
}
}
}
int create_sock() {
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(3000);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
listen(sock_fd, 1024);
printf("listening on port 3000.\\n");
return sock_fd;
}
void make_nonblocking(int fd) {
fcntl(fd, F_SETFL, O_NONBLOCK);
}
我们重点看一下epoll和poll不一样的地方
第一,epoll需要使用poll_create创建一个实例,后续所的操作都是基于这个实例。
第二,和poll不一样,epoll不再是将fd设置成-1来表示忽略当前描述,而是关心哪个就设置哪个,使用epoll_ctl函数,如下:
event.data.fd = sock_fd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, sock_fd, &event) == -1) {
perror("epoll_ctl failed.");
exit(1);
}
第三,也是最重要的,events返回的是所有实际产生事件集合,大小就是epoll_wait返回值。所以,只要poll_wait返回,我们就可以确定从0到read_num所有位置都是有事件发生的。我们回忆一下poll,每次都是从0遍历到最大描述字。而这中间有很多是没有事件发生的描述字。
你可能会觉得很神奇,epoll是怎么做到每次返回的描述字都是有事件发生的呢?这个问题要展开就绕不开它背后的实现以及支掌它的数据结构,也就是大名鼎鼎的红黑树。这里我们不展开,后面有机会专门出一篇文章来分析一下epoll到底是怎么实现的。
这里你暂时可以简单理解为epoll将我们监听的事件和已发生的事件保存在了不同的地方,这样epoll就可以只返回那些已经有事件发生的描述符了。
上面我们在设置监听事件的时候,用到了我们前面提到的EPOLLET
event.events = EPOLLIN | EPOLLET;
这个宏表示的是边缘触发,epoll有两种触发方式,它们分别是边缘触发(edge-triggered)和条件触发(level-triggered),默认为条件触发,比如像下面这样:
event.events = EPOLLIN;
假如我们监听可读事件,对于条件触发来说,每次有数据可读的时候都会触发事件,这在某些情况下会造成内核频发触发事件,比如,代码中有大量小数据包的情况下。
边缘触发表示的是,只在第一次有数据可读的情况下通知一次。后面的处理就完全靠自己了,很显然这种触发方式能够明显减少触发次数,从而减轻内核的压力,这在一些大数据量的传输场景下非常有用,比如我们要上传一个文件。
好了,关于epoll相关的内容就介绍到这里了,我将上一篇文章中http-server的代码使用epoll改造了一下,代码地址:
https://github.com/benggee/c-program/tree/main/epoll
如果你有兴趣,可以尝试自己先改造一下,相信会有epoll有不一样的理解。
总结
这篇文章介绍了epoll的使用,通过和select、poll的对比,我们很直观可以看到epoll相比select和epoll有诸多的优势。
所以,现在几乎所有涉及到高性能的网络程序都在使用epoll,比如nginx、redis、甚至在Linux下Golang的网络方案底层也是epoll。