Go语言 非阻塞io

2018-07-25 16:08 更新

Go提供的网络接口,在用户层是阻塞的,这样最符合人们的编程习惯。在runtime层面,是用epoll/kqueue实现的非阻塞io,为性能提供了保障。

如何实现

底层非阻塞io是如何实现的呢?简单地说,所有文件描述符都被设置成非阻塞的,某个goroutine进行io操作,读或者写文件描述符,如果此刻io还没准备好,则这个goroutine会被放到系统的等待队列中,这个goroutine失去了运行权,但并不是真正的整个系统“阻塞”于系统调用。

后台还有一个poller会不停地进行poll,所有的文件描述符都被添加到了这个poller中的,当某个时刻一个文件描述符准备好了,poller就会唤醒之前因它而阻塞的goroutine,于是goroutine重新运行起来。

这个poller是在后台一直运行的,前面分析系统调度章节时为了简化并没有提起它。其实在proc.c文件中,runtime.main函数的第一行代码就是

newm(sysmon, nil);

这个意思就是新建一个M并让它运行sysmon函数,前面说过M就是机器的抽象,它会直接开一个物理线程。sysmon里面是个死循环,每睡眠一小会儿就会调用runtime.epoll函数,这个sysmon就是所谓的poller。

poller是一个比gc更高优先级的东西,何以见得呢?首先,垃圾回收只是用runtime.newproc建立出来的,它仅仅是个goroutine任务,而poller是直接用newm建立出来的,它跟startm是平级的。也就相当于gc只是线程池里的任务,而poller自身直接就是worker。然后,gc只是被触发性地发生的,是被动的。而poller却是每隔很短时间就会主动运行。

封装层次

从最原始的epoll系统调用,到提供给用户的网络库函数,可以分成三个封装层次。这三个层次分别是,依赖于系统的api封装,平台独立的runtime封装,提供给用户的库的封装。

最下面一层是依赖于系统部分的封装。各个平台下的实现并不一样,比如linux下是封装的epoll,freebsd下是封装的kqueue。以linux为例,实现了一组调用epoll相关系统调用的封装:

int32 runtime·epollcreate(int32 size);
int32 runtime·epollcreate1(int32 flags);
int32 runtime·epollctl(int32 epfd, int32 op, int32 fd, EpollEvent *ev);
int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
void runtime·closeonexec(int32 fd);

它们都是直接使用汇编调用系统调用实现的,比如:

TEXT runtime·epollcreate1(SB),7,$0
    MOVL    8(SP), DI
    MOVL    $291, AX            // syscall entry
    SYSCALL
    RET

这些函数还要继续被封装成下面一组函数:

runtime·netpollinit(void);
runtime·netpollopen(int32 fd, PollDesc *pd);
runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);

runtime·netpollinit是对poller进行初始化。 runtime·netpollopen是对fd和pd进行关联,实现边沿触发通知。 runtime·netpollready,使用前必须调用这个函数来表示fd是就绪的

不管是哪个平台,最终都会将依赖于系统的部分封装好,提供上面这样一组函数供runtime使用。

接下来是平台独立的poller的封装,也就是runtime层的封装。这一层封装是最复杂的,它对外提供的一组接口是:

func runtime_pollServerInit()
func runtime_pollOpen(fd int) (pd *PollDesc, errno int)
func runtime_pollClose(pd *PollDesc)
func runtime_pollReset(pd *PollDesc, mode int) (err int)
func runtime_pollWait(pd *PollDesc, mode int) (err int)
func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int)
func runtime_pollUnblock(pd *PollDesc)

这一组函数是由runtime封装好,提供给net包调用的。里面定义了一个PollDesc的结构体,将fd和对应的goroutine封装起来,从而实现当goroutine读写fd阻塞时,将goroutine变为Gwaiting。等一下回头再看实现的细节。

最后一层封装层次是提供给用户的net包。在net包中网络文件描述符都是用一个netFD结构体来表示的,其中有个成员就是pollDesc。

// 网络文件描述符
type netFD struct {
    sysmu  sync.Mutex
    sysref int

    // must lock both sysmu and pollDesc to write
    // can lock either to read
    closing bool

    // immutable until Close
    sysfd       int
    family      int
    sotype      int
    isConnected bool
    sysfile     *os.File
    net         string
    laddr       Addr
    raddr       Addr

    // serialize access to Read and Write methods
    rio, wio sync.Mutex

    // wait server
    pd pollDesc
}

所有用户的net包的调用最终调用到pollDesc的上面那一组函数中,这样就实现了当goroutine读或写阻塞时会被放到等待队列。最终的效果就是用户层阻塞,底层非阻塞。

文件描述符和goroutine

当一个goroutine进行io阻塞时,会去被放到等待队列。这里面就关键的就是建立起文件描述符和goroutine之间的关联。pollDesc结构体就是完成这个任务的。它的结构体定义如下:

struct PollDesc
{
    PollDesc* link;    // in pollcache, protected by pollcache.Lock
    Lock;        // protectes the following fields
    int32    fd;
    bool    closing;
    uintptr    seq;    // protects from stale timers and ready notifications
    G*    rg;    // 因读这个fd而阻塞的G,等待READY信号
    Timer    rt;    // read deadline timer (set if rt.fv != nil)
    int64    rd;    // read deadline
    G*    wg;    // 因写这个fd而阻塞的goroutines
    Timer    wt;
    int64    wd;
};

这个结构体是重用的,其中link就是将它链起来。PollDesc对象必须是类型稳定的,因为在描述符关闭/重用之后我们会得到epoll/kqueue就绪通知。结构体中有一个seq序号,稳定的通知是通过使用这个序号实现的,当deadline改变或者描述符重用时,序号会增加。

runtime_pollServerInit的实现就是调用更下层的runtime·netpollinit函数。 runtime_pollOpen从PollDesc结构体缓存中拿一个出来,设置好它的fd。之所以叫Open而不是new,就是因为PollDesc结构体是重用的。 runtime_pollClose函数调用runtime·netpollclose后将PollDesc结构体放回缓存。

这些都还没涉及到fd与goroutine交互部分,仅仅是直接对epoll的调用。从下面这个函数可以看到fd与goroutine交互部分:

func runtime_pollWait(pd *PollDesc, mode int) (err int)

它会调用到netpollblock,这个函数是这样子的:

static void
netpollblock(PollDesc *pd, int32 mode)
{
    G **gpp;

    gpp = &pd->rg;
    if(mode == 'w')
        gpp = &pd->wg;
    if(*gpp == READY) {
        *gpp = nil;
        return;
    }
    if(*gpp != nil)
        runtime·throw("epoll: double wait");
    *gpp = g;
    runtime·park(runtime·unlock, &pd->Lock, "IO wait");
    runtime·lock(pd);
}

最后的runtime.park函数,就是将当前的goroutine(调用者)设置为waiting状态。

上面这一部分是goroutine被放到等待队列的部分,下面看它被唤醒的部分。在sysmon函数中,会不停地调用runtime.epoll,这个函数对就绪的网络连接进行poll,返回可运行的goroutine。epoll只能知道哪个fd就绪了,那么它怎么知道哪个goroutine就绪了呢?原来epoll的data域存放的就是PollDesc结构体指针。因此就可以得到其中的goroutine了。


以上内容是否对您有帮助:
在线笔记
App下载
App下载

扫描二维码

下载编程狮App

公众号
微信公众号

编程狮公众号