本文源码以libev4.20为准,其他版本大同小异。
libev是广泛使用的事件库,是一个功能强大的reactor,可以把Timer、IO、进程线程事件放在一个统一的框架下进行管理。如果有其他的事件触发需求也可以改libev源码把该事件加入libev的框架中(当前前提是得理解libev的设计)。有文章说libev性能比libevent好,没实验过,但是从源码角度看,libev要更简洁,当然更费解一点。作者为了追求代码的整洁和统一使用了大量的宏,造成了阅读的不便。这里我们从宏观分析一下libev的设计实现,然后穿插分析一些小的trick。旨在学习总结libev设计中优雅的地方。
基本概念
首先是一些主要的概念和数据结构。
libev通过定义watcher来关注一个事件,并且把事件类型和对应的毁回调函数关联起来。libev定义了多种事件类型,同时可以在框架中自己添加感兴趣的事件,libev保证了事件触发的顺序性,并在多线程环境下保证事件的串行触发。
每一种类型的watcher都包含几个基本的成员,通过EV_WATCHER和EV_WATCHER_LIST宏实现。EV_WATCHER_LIST比EV_WATCHER多了一个纸箱下一个watcher的指针。EV_WATCHER_TIMER是定时器的基类,多一个timestamp。这几个宏这里留一个小的trick分析,在后面阐述。
/* shared by all watchers */
#define EV_WATCHER(type) \
int active; /* private */ \
int pending; /* private */ \
EV_DECL_PRIORITY /* private */ \
EV_COMMON /* rw */ \
EV_CB_DECLARE (type) /* private */
#define EV_WATCHER_LIST(type) \
EV_WATCHER (type) \
struct ev_watcher_list *next; /* private */
#define EV_WATCHER_TIME(type) \
EV_WATCHER (type) \
ev_tstamp at; /* private */
举一个例子,IO事件watcher:ev_io
/* invoked when fd is either EV_READable or EV_WRITEable */
/* revent EV_READ, EV_WRITE */
typedef struct ev_io
{
EV_WATCHER_LIST (ev_io)
int fd; /* ro */
int events; /* ro */
} ev_io;
可以看到ev_io相当于继承了基类ev_watcher_list,并派生出自己的成员,fd和events,分别用来存储文件描述符和事件标识。类似的watcher有:
- 基类ev_watcher,ev_watcher_list, ev_watcher_time.
- io:ev_io
- 周期触发定时器:ev_periodic
- 定时器:ev_timer
- 信号:ev_signal
- 子进程:ev_child
- 文件stat:ev_stat
- 一些内部流程watcher:ev_idle,ev_prepare,ev_check, ev_fork, ev_cleanup
- 异步触发:ev_async
使用流程
libev库的基本使用流程是:
- 生成一个循环(loop)对象,单线程情况下直接使用default_loop,多线程的情况下使用ev_loop_new来创建。
- 调用ev_xx_init先注册一个感兴趣的watcher。把这个watcher跟事件、回调关联起来。
- 调用ev_xx_start把事件添加到loop的待处理(后述)列表中。
- 调用ev_run执行循环。
一个简单的使用例程如下(来自官方sample,可见官方风格是大括号换行的….鄙视):
// a single header file is required
#include <ev.h>
#include <stdio.h> // for puts
// every watcher type has its own typedef'd struct
// with the name ev_TYPE
ev_io stdin_watcher;
ev_timer timeout_watcher;
// all watcher callbacks have a similar signature
// this callback is called when data is readable on stdin
static void
stdin_cb (EV_P_ ev_io *w, int revents)
{
puts ("stdin ready");
// for one-shot events, one must manually stop the watcher
// with its corresponding stop function.
ev_io_stop (EV_A_ w);
// this causes all nested ev_run's to stop iterating
ev_break (EV_A_ EVBREAK_ALL);
}
// another callback, this time for a time-out
static void
timeout_cb (EV_P_ ev_timer *w, int revents)
{
puts ("timeout");
// this causes the innermost ev_run to stop iterating
ev_break (EV_A_ EVBREAK_ONE);
}
int
main (void)
{
// use the default event loop unless you have special needs
struct ev_loop *loop = EV_DEFAULT;
// initialise an io watcher, then start it
// this one will watch for stdin to become readable
ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);
ev_io_start (loop, &stdin_watcher);
// initialise a timer watcher, then start it
// simple non-repeating 5.5 second timeout
ev_timer_init (&timeout_watcher, timeout_cb, 5.5, 0.);
ev_timer_start (loop, &timeout_watcher);
// now wait for events to arrive
ev_run (loop, 0);
// break was called, so exit
return 0;
}
实现分析:
下面我们针对上述程序分析一下libev的实现。
首先直接采用了default_loop,这里没有使用多线程支持。如果需要开启的话,记得define一下EV_MULTIPLICITY,源码中有大量的对EV_MULTIPLICITY的判断,如果define了则增加一个loop入参来指定运行的线程,否则就直接用默认的。源码实现如下(这里只给出了ev_loop定义):
#if EV_MULTIPLICITY
struct ev_loop
{
ev_tstamp ev_rt_now;
#define ev_rt_now ((loop)->ev_rt_now)
#define VAR(name,decl) decl;
#include "ev_vars.h"
#undef VAR
};
#include "ev_wrap.h"
static struct ev_loop default_loop_struct;
EV_API_DECL struct ev_loop *ev_default_loop_ptr = 0; /* needs to be initialised to make it a definition despite extern */
#else
EV_API_DECL ev_tstamp ev_rt_now = 0; /* needs to be initialised to make it a definition despite extern */
#define VAR(name,decl) static decl;
#include "ev_vars.h"
#undef VAR
static int ev_default_loop_ptr;
#endif
var这里留一个小trick分析,在后面来阐述。
sample之后调用了两个init来关联事件、watcher和回调函数。对应的定义如下:
/* these may evaluate ev multiple times, and the other arguments at most once */
/* either use ev_init + ev_TYPE_set, or the ev_TYPE_init macro, below, to first initialise a watcher */
#define ev_init(ev,cb_) do { \
((ev_watcher *)(void *)(ev))->active = \
((ev_watcher *)(void *)(ev))->pending = 0; \
ev_set_priority ((ev), 0); \
ev_set_cb ((ev), cb_); \
} while (0)
#define ev_io_set(ev,fd_,events_) do { (ev)->fd = (fd_); (ev)->events = (events_) | EV__IOFDSET; } while (0)
#define ev_timer_set(ev,after_,repeat_) do { ((ev_watcher_time *)(ev))->at = (after_); (ev)->repeat = (repeat_); } while (0)
#define ev_io_init(ev,cb,fd,events) do { ev_init ((ev), (cb)); ev_io_set ((ev),(fd),(events)); } while (0)
#define ev_timer_init(ev,cb,after,repeat) do { ev_init ((ev), (cb)); ev_timer_set ((ev),(after),(repeat)); } while (0)
可以看到,每种类型的init都是定义了一些赋值操作,由于各种watcher都是从ev_watcher “派生” 而来的,所以可以用ev_watcher向上转换来访问公共成员。这里只是定义了对象,不涉及事件的注册等操作。
之后sample通过调用xx_start把事件添加到了关注列表中。ev_io_start的源码如下:
void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
int fd = w->fd;
if (expect_false (ev_is_active (w)))//当前watcher是否已经active
return;
assert (("libev: ev_io_start called with negative fd", fd >= 0));
assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));
EV_FREQUENT_CHECK;//周期性的检查
ev_start (EV_A_ (W)w, 1);
array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
wlist_add (&anfds[fd].head, (WL)w);//添加到watcher list
/* common bug, apparently */
assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w));
fd,change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);//加入事件变更
w->events &= ~EV__IOFDSET;
EV_FREQUENT_CHECK;//周期检查
}
代码主要的逻辑在三个地方,ev_start、wlist_add和fd_change。ev_start的比较简单,主要是标记了一下当前watcher已经actived,这是所有的xx_start函数都有的逻辑。
inline_speed void
ev_start (EV_P_ W w, int active)
{
pri_adjust (EV_A_ w);
w->active = active;
ev_ref (EV_A);
}
之后的部分每个不同的watcher实现不同。针对io_watcher,由于fd分配是连续的,所以这个长度可以进行大小限制的,我们用一个连续的数组来存储fd/watcher信息,如下图所示,用anfd[fd] 就可以找到对应的fd/watcher信息,如果遇到anfd超出我们的buffer长度情形,可以动态扩容。这里直接用了文献2里面的图。
wlist_add完成向anfd数组对应位置的链表增加事件的工作。更详细的过程可以参考文献2。
最后fd_change完成增加事件变更的任务。libev会根据之前的wlist来判断一个事件是否需要调用对应的处理函数向系统添加监听,比如针对epoll,如果第一次在一个watcher(fd)上调用io_start,那么fdchanges数组中会增加一项,表明下个事件循环周期内需要调用epoll_ctl增加监听。如果之前已经有对应的事件监听存在,则判断是否要替换,不需要再调用epoll_ctl更改epoll的事件注册。源码如下:
/* something about the given fd changed */
inline_size void
fd_change (EV_P_ int fd, int flags)
{
unsigned char reify = anfds [fd].reify;
anfds [fd].reify |= flags;
if (expect_true (!reify))
{
++fdchangecnt;
array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);
fdchanges [fdchangecnt - 1] = fd;
}
}
到底位置libev只是完成了一些初始化操作,表明需要对什么事件进行什么处理,但事件流程并没有run起来,最后需要做的是调用ev_run来起线程,并进入事件循环。整个ev_run函数较长,大概有两百多行,这里就不列出代码了,只把程序的主要逻辑列出来。
int
ev_run (EV_P_ int flags)
{
...
do
{
...
//如果这个进程是新fork出来的,执行ev_fork事件的回调
...
//执行ev_prepare回调,也就是每次poll前执行的函数
...
//执行监听有改变的事件
...
//计算poll应该等待的时间,这个时间和设置以及定时器超时时间有关
...
//调用后台I/O复用端口等待事件触发
backend_poll (EV_A_ waittime);
...
//将定时器事件放入pending数组中
...
//将ev_check事件翻入pending数组中
...
//执行pending数组中所有的回调
EV_INVOKE_PENDING;
}
while (调用了stop);
}
backend_poll封装了不同系统的多路复用机制,在不同的情况下会映射成不同的实现,如epoll、kequque等。对于epoll而言,在每次epoll_wait之前会执行fd_reify(loop)。 fd_reify中会遍历fdchanges数组,把对fd事件的修改通过调用epoll_modify来做真正的修改,这里才真正完成了事件监听向系统的注册。这里有个小的trick再后面分析。
具体的代码中,程序使用queue_events将要运行的事件放入一个叫做pending的二维数组中,其第一维是优先级,第二维是动态分配的,存放具体事件。之后程序会在适当的地方调用宏EV_INVOKE_PENDING,将pending数组中的事件按优先级从高到低依次执行。
基本流程图可以看这里,转自阿里核心系统团队博客:
一些技巧
-
首先是通过define来模拟了继承。libev用宏定义了ev_watcher等基类的成员,实现派生类的时候只需要先用宏把公共成员包含进来,然后定义各个子类自己的成员即可。这种技巧也广泛用在其他一些开源项目中。
-
通过重新define var关键字和重新包含vars头文件的方式,可以把一组变量变换成不同的形式:
#define VAR(name,decl) decl; #include "ev_vars.h" #undef VAR #define VAR(name,decl) static decl; #include "ev_vars.h" #undef VAR
- 最后编译libev的时候会发现像epoll.c poll.c等平台相关的backend定义实际上没有加入Makefile。libev实现的时候其实直接在源码里面根据define来包含了c文件。大部分时候我们都是只include头文件,所以这里在使用的时候需要稍加注意。
总结
libev虽然代码比较晦涩,但是实现还是很清楚的,设计思想对于实现底层系统很有启发,值得仔细研读。时间有限,只涵盖了一下基本框架,如果有兴趣还是自己改写一下,会更有收获。
参考文献:
[1] libev ev_io源码分析, http://csrd.aliapp.com/?p=1604
[2] libev 设计分析, https://cnodejs.org/topic/4f16442ccae1f4aa270010a3