精华内容
下载资源
问答
  • 网络IO模型:同步IO和异步IO,阻塞IO和非阻塞IO
  • 处理套接字网络编程的异步问题,本文详细描述了如何使用完成端口加线程池技术处理作为服务器端的高并发问题。
  • 某 课 网Python高级编程和异步IO并发编程,高级进阶技巧,内容全面,值得分享
  • 本文实例讲述了Python通过poll实现异步IO的方法。分享给大家供大家参考。具体分析如下: 在使用poll()后返回轮询对象,该对象支持以下方法: pollObj.register(fd,[,eventmask])第一个参数是注册新的文件描述符fd...
  • 主要介绍了浅谈Node 异步IO和事件循环,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • 同步(synchronous) IO异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non...
  • 一、glibc aio 1、名称 由于是glibc提供的aio函数库,所以称为glibc aio。 glibc是GNU发布的libc库,即c运行库。...提交一个异步读 int aio_write(struct aiocb *aiocbp); 提交一个异步写 int ai

    一、glibc aio

    1、名称

    由于是glibc提供的aio函数库,所以称为glibc aio

    glibc是GNU发布的libc库,即c运行库。

    另外网上还有其他叫法posix aio,都是指glibc提供的这套aio实现方案。

    2、主要接口

    glibc aio主要包含如下接口:

    函数功能
    int aio_read(struct aiocb *aiocbp);提交一个异步读
    int aio_write(struct aiocb *aiocbp);提交一个异步写
    int aio_cancel(int fildes, struct aiocb *aiocbp);取消一个异步请求(或基于一个fd的所有异步请求,aiocbp==NULL)
    int aio_error(const struct aiocb *aiocbp);查看一个异步请求的状态(进行中EINPROGRESS?还是已经结束或出错?)
    ssize_t aio_return(struct aiocb *aiocbp);查看一个异步请求的返回值(跟同步读写定义的一样)
    int aio_suspend(const struct aiocb * const list[], int nent, const struct timespec *timeout);阻塞等待请求完成

    glibc aio提供函数API,是比较通俗易懂的。

    3、实现原理

    在glibc aio的实现原理是,用多线程同步来模拟异步IO。实际上,为了避免线程的频繁创建、销毁,当有多个请求时,glibc aio会使用线程池,但以上原理是不会变的,尤其要注意的是:我们的回调函数是在一个单独线程中执行的。

    缺点: glibc aio 广受非议,存在一些难以忍受的缺陷和bug,饱受诟病,是极不推荐使用的。详见:http://davmac.org/davpage/linux/async-io.html

    二、libaio

    1、名称

    libaio是由linux内核提供的aio实现方案,类似于windows api。

    由于是linux kernel提供的api,故也叫linux kernel aio,或者原生aio
    native aio

    由于linux下aio实现方式较多,网上叫法很乱,所以这里特意总结下,方便大家区分。

    2、主要接口

    它主要包含如下系统调用接口:

    函数功能
    int io_setup(int maxevents, io_context_t *ctxp);创建一个异步IO上下文(io_context_t是一个句柄)
    int io_destroy(io_context_t ctx);销毁一个异步IO上下文(如果有正在进行的异步IO,取消并等待它们完成)
    long io_submit(aio_context_t ctx_id, long nr, struct iocb **iocbpp);提交异步IO请求
    long io_cancel(aio_context_t ctx_id, struct iocb *iocb, struct io_event *result);取消一个异步IO请求
    long io_getevents(aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout)等待并获取异步IO请求的事件(也就是异步请求的处理结果)

    其中,struct iocb主要包含以下字段:

    struct iocb {
        void     *data;  /* Return in the io completion event */
        unsigned key;   /*r use in identifying io requests */
        short           aio_lio_opcode;
        short           aio_reqprio;
        int             aio_fildes;
        union {
                struct io_iocb_common           c;
                struct io_iocb_vector           v;
                struct io_iocb_poll             poll;
                struct io_iocb_sockaddr saddr;
        } u;
    };
    
    struct io_iocb_common {
        void            *buf;
        unsigned long   nbytes;
        long long       offset;
        unsigned        flags;
        unsigned        resfd;
    };
    

    iocb是提交IO任务时用到的,可以完整地描述一个IO请求:

    • data是留给用来自定义的指针:可以设置为IO完成后的callback函数;
    • aio_lio_opcode表示操作的类型:IO_CMD_PWRITE | IO_CMD_PREAD;
    • aio_fildes是要操作的文件:fd;
    • io_iocb_common中的buf, nbytes, offset分别记录的IO请求的mem buffer,大小和偏移。
    struct io_event {
        void *data;
        struct iocb *obj;
        unsigned long res;
        unsigned long res2;
    };
    

    io_event是用来描述返回结果的:

    • obj就是之前提交IO任务时的iocb;
    • res和res2来表示IO任务完成的状态。

    3、实现原理

    libaio与Glibc的多线程模拟不同 ,它是真正做到内核的异步通知,是真正意义上的异步IO。

    听起来Kernel Native AIO几乎提供了近乎完美的异步方式,但如果你对它抱有太高期望的话,你会再一次感到失望。

    使用限制: 目前libaio仅支持O_DIRECT标志,即仅支持Direct I/O。

    Direct I/O可以简单理解为直接读写IO,读写期间无缓存。

    Linux中直接I/O机制介绍:
    https://www.ibm.com/developerworks/cn/linux/l-cn-directio/index.html

    缺点: 目前libaio仅支持Direct I/O方式来对磁盘读写,这意味着,你无法利用系统的缓存,同时它要求读写的的大小和偏移要以区块的方式对齐。

    三、libeio

    在当年,linux下已有的AIO (异步IO)解决方案:

    • Glibc的AIO,在用户态,多线程同步来模拟的异步IO;
    • libaio,需要linux内核2.6.22以上,且仅支持Direct I/O。

    但两者都存在让使用者望而却步的问题:

    • Glibc的AIO bug太多,而且IO发起者并不是最后的IO终结者(callbak是在单独的线程执行的);
    • libaio只支持O_DIRECT方式,无法利用Page cache。

    正是由于上述原因,Marc Alexander Lehmann大佬决定自己开发一个AIO库,即libeio

    libeio也是在用户态用多线程同步来模拟异步IO,但实现更高效,代码也更可靠,目前虽然是beta版,但已经可以上生产了(node.js底层就是用libev和libeio来驱动的)。

    还要强调点:libeio里IO的终结者正是当初IO的发起者(这一点非常重要,因为IO都是由用户的request而发起,而IO完成后返回给用户的response也能在处理request的线程中完成)。

    libeio提供全套异步文件操作的接口,让使用者能写出完全非阻塞的程序。

    缺点: 严格来讲,libeio也不属于真正的异步IO,仍然是通过用户态多线程来模拟的,性能上与真正的异步IO有差距。

    github地址:https://github.com/kindy/libeio

    代码量不大,几千行,感兴趣可以研究下。

    四、io_uring

    在过去的数年间,针对上述缺陷,限制的很多改进努力都未果,如Glibc AIO、libaio、libeio。

    虽然在使用和性能上提升了很多,但是,在Linux 上,依然没有比较完美的异步文件IO方案。

    直到,Linux 5.1合入了一个新的异步IO框架和实现:io_uring,由block IO大神Jens Axboe开发。

    这对当前异步IO领域无疑是一个喜大普奔的消息,这意味着,libaio的时代即将成为过去,io_uring的时代即将开启。

    为了方便使用,Jens Axboe还开发了一套liburing库,同时在fio中提供了ioengine=io_uring的支持。通过liburing库,应用不必了解诸多io_uring的细节就可以简单地使用起来。例如,无需担心memory barrier,或者是ring buffer管理之类等。

    一句话总结 io_uring 就是:一套全新的 syscall,一套全新的 async API,更高的性能,更好的兼容性,来迎接高 IOPS,高吞吐量的未来。

    这个特性,才出来,需要5.1以上内核才能支持,具体好不好用,后续才知道,现在似乎搜索到的内容较少。

    io_uring使用参考:

    《原生的 Linux 异步文件操作,io_uring 尝鲜体验》

    《Linux 5.1内核AIO 的新归宿:io_uring》

    五、总结

    linux异步IO实际上是利用了CPU和IO设备可以异步工作的特性(IO请求提交的过程主要还是在调用者线程上同步完成的,请求提交后由于CPU与IO设备可以并行工作,所以调用流程可以返回,调用者可以继续做其他事情)。

    linux的异步IO发展之路,还是比较曲折的,没有一个完美的实现。不像windows下异步IO,IOCP就是标杆,各种吊打。

    在目前情况下,libeio就是比较不错的方案了。

    但是如果你的程序中只用到Direct I/O,那么推荐使用libaio。

    在将来,随着Linux 5.1以上版本的更新,如果io_uring给力的话,linux异步IO很可能就一统天下了。

    参考链接:

    《linux AIO (异步IO) 那点事儿》

    《异步I/O – posix aio 从入门到放弃的吐血实践》

    《linux异步IO编程实例分析》

    《linux异步IO的两种方式》



    若对你有帮助,欢迎点赞、收藏、评论,你的支持就是我的最大动力!!!

    同时,阿超为大家准备了丰富的学习资料,欢迎关注公众号“超哥学编程”,即可领取。

    在这里插入图片描述

    展开全文
  • Go 与异步 IO - io_uring 的思考

    千次阅读 2020-10-22 13:58:49
    于是便借鉴 liburing,配合 Go 提供的并发机制实现了一个 golang 版本的异步 IO 库 —— iouring-go Golang 中并发 IO 的现状 对于 Go 这种本身便是为并发而生的语言来说,使用 io_uring 这种系统级异步接口也不是...

    本来准备写一篇详细关于 io_uring 的中文文章,不过在使用上官方的一些文章写的已经非常详细,简单的拿来翻译感觉又失去了乐趣
    于是便借鉴 liburing,配合 Go 提供的并发机制实现了一个 golang 版本的异步 IO 库 —— iouring-go 来学习 io_uring 的使用


    本文不会去详细介绍 io_uring 的一些细节,如果想对 io_uring 了解更多可以查看文末的推荐阅读

    Golang 中并发 IO 的现状

    对于 Go 这种本身便是为并发而生的语言来说,使用 io_uring 这种系统级异步接口也不是那么的迫切

    比如对于普通文件的读写以及 socket 的操作都会通过 netpoll 来进行优化,当文件/套接字可读可写时 netpoll 便会唤醒相应 goroutine 来对文件进行读写
    而对于可能会阻塞的系统调用,在 syscall 底层调用 syscall.Syscall/Syscall6 时配合 runtime 判断是否将P 和 G 绑定的 M 解绑,然后将 P 交给其他 M 来使用,通过这种机制可以减少系统调用从用户态切换到内核态对整个程序带来的损耗

    Go runtime 实际上已经实现了用户态的并发 IO,现在 Linux 内核提供了新的异步 IO 接口,那又该如何去利用这种新的技术呢

    我们首先先看一下当前 Go 是如何做到异步 IO 的

    IO 与 netpoll

    文件 IO 与 netpoll

    // src/os/file.go
    func OpenFile(name string, flag int, perm FileMode) (*File, error) {
        f, err := openFileNolog(name, flag, perm)
    }
    
    // src/os/file_unix.go
    func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
        // ...
        r, e = syscall.Open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
    
        // ...
    
        return newFile(uintptr(r), name, kindOpenFile), nil
    }
    
    // src/os/file_unix.go
    func newFile(fd uintptr, name string, kind newFileKind) *File {
        fdi := int(fd)
        f := &File{&file{
            pfd: poll.FD{
                Sysfd:     fdi,
                IsStream:      true,
                ZeroReadIsEOF: true,
            },
            name:    name,
            stdoutOrErr: fdi == 1 || fdi == 2,
        }}
    
        pollable := kind == kindOpenFile || kind == kindPipe || kind == kindNonBlock
        // ...
        if err := f.pfd.Init("file", pollable); err != nil {
            // ...
        } else if pollable {
            if err := syscall.SetNonblock(fdi, true); err == nil {
                f.nonblock = true
            }
            return f
        }
    }
    

    os.OpennewFile,可以看到文件的 文件描述符 被放到 poll.FD 进行初始化了, poll.FD.Init 便是将文件描述符注册到 netpoll(epoll)

    需要注意当文件被注册到 netpoll(epoll) 后,会将它置为非阻塞模式(SetNonblock),因为 netpoll(epoll) 采用的是边缘触发模式
    比如说非阻塞文件描述符中有可读事件时,epoll 只会通知一次(除非有新的数据被写入文件会再次通知),也就说需要所有数据读出来直到返回 -EAGAIN,对于阻塞模式的socket文件,当从socket中读取数据时就可能会阻塞等待,这样也就失去了 epoll 的意义

    我们可以再看一下 poll.FD 是如何利用 netpoll 进行读取的

    // src/internal/poll/fd_unix.go
    func (fd *FD) Read(p []byte) (int, error) {
        // ...
    
        for {
            n, err := ignoringEINTR(syscall.Read, fd.Sysfd, p)
            if err != nil {
                n = 0
                if err == syscall.EAGAIN && fd.pd.pollable() {
                    continue
                }
            }
            err = fd.eofError(n, err)
            return n, err
        }
    }
    

    可以看到 ignoringEINTR 中调用 syscall.Read 读取文件,如果出现 syscall.EAGAIN,那么就调用 fd.pd.waitRead 来等待数据可读

    // src/internal/poll/fd_unix.go
    type FD struct {
        // ...
        Sysfd int
        pd pollDesc
    }
    

    pollDesc Go 对 netpoll 的抽象

    // src/internal/poll/fd_poll_runtime.go
    
    func runtime_pollServerInit()
    func runtime_pollOpen(fd uintptr)(uintptr, int)
    func runtime_pollClose(ctx uintptr)
    // ...
    
    type pollDesc struct {
        runtimeCtx uintptr
    }
    
    func (pd *pollDesc) init(fd *FD) error {
        serverInit.Do(runtime_pollServerInit)
        ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
        // ...
        pd.runtimeCtx = ctx
        return nil
    }
    

    runtime_poll* 这些函数才是真正的 netpoll,而这些函数是在src/runtime/netpoll.go 中实现,并通过 go:linkname 来链接到 internal/poll

    // src/runtime/netpoll.go
    
    // go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
    func poll_runtime_pollServerInit() {
        // ...
    }
    

    根据具体的平台来实现 poller,对于 Linux,便是使用 epoll

    // src/runtime/netpoll_epoll.go
    
    // 注册文件到 netpoll 中
    func netpolllopen(fd uintptr, pd *pollDesc) int32 {
        var ev epollevent
        ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
        // ...
        return -epollctr(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
    }
    

    添加新的文件描述符时,可以发现fd是以 边缘触发 的方式注册到 netpoll(epoll)

    socket IO 与 netpoll

    netpoll 这个名字上就可以看出,netpoll 是 Go 为了高性能的异步网络而实现的

    看一下创建 TCPListener socket 的流程

    // src/net/tcpsock.go
    type TCPListener struct {
        fd *netFD
        // ...
    }
    
    // src/net/fd_posix.go
    type netFD struct {
        pfd poll.FD
        // ...
    }
    
    // 1.
    // src/net/tcpsock.go
    func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
        sl := &sysListener{network: network, address: laddr.String()}
        ln, err := sl.listenTCP(context.Background(), laddr)
        // ...
    }
    
    // 2.
    // src/net/tcpsock_posix.go
    func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
        fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
        // ...
        return &TCPListener{fd: fd, lc, sl.ListenConfig}, nil
    }
    
    // 3.
    // src/net/ipsock_posix.go
    func internelSocket(ctx context.Context, ...) (fd *netFD, err error) {
        // ...
        return socket(ctx, net, family, sotype, proto, ipv6only, laddr, radddr, ctrlFn)
    }
    
    // 4.
    // src/sock_posix.go
    func socket(...) (fd *netFD, err error) {
        s, err := sysSocket(family, sotype, proto)
        // ...
    
        fd, err = newFD(s, family, sotype, net)
    }
    
    // 5.
    // src/fd_unix.go
    func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
        ret := &netFD{
            pfd: poll.FD{
                Sysfd:    sysfd,
                IsStream: sotype == syscall.SOCK_STREAM,
                // ...
            },
            // ...
        }
        return ret, nil
    }
    

    创建 TCPListener 链路还是挺长的,不过在第四步 socket 函数中可以看到调用 newFD 来返回 netFD 实例,而 netFD.pfd 便是 poll.FD, 而对 netFD 的读写和文件IO一样便都会通过 poll.FD 来利用 netpoll

    netpoll 唤醒 goroutine

    挂起 goroutine

    通过 poll.pollDesc 将文件描述符加入到 netpoll 后,当对文件描述符进行读写时,如果 syscall.Read 返回 syscall.EAGAIN 的话就需要调用 pollDesc.waitRead/waitWrite 来等待可读可写

    // src/internal/poll/fd_poll_runtime.go
    
    func (pd *pollDesc) waitRead(isFile bool) error{
        return pd.wait('r', isFile)
    }
    
    func (pd *pollDesc) waitWrite(isFile bool) error{
        return pd.wait('w', isFile)
    }
    
    func (pd *pollDesc) wait(mode int, isFile bool) error{
        // ...
        res := runtime_pollWait(pd.runtimeCtx, mode)
        return convertErr(res, isFile)
    }
    
    // src/runtime/netpoll.go
    //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
    func poll_runtime_pollWait(pd *pollDesc, mode int) int {
        // ...
        for !netpollblock(pd, int32(mode), false) {
            // ...
        }
        // ...
    }
    
    func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
        gpp := &pd.rg
        // ...
        // 状态检查
        if waitio || netpollcheckerr(pd, mode) == 0 {
            gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
        }
        // ...
    }
    
    func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
        r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
        // ...
    }
    

    等待文件可读写,最终会调用 netpollblock 函数,并不会直接调用 epoll wait 的系统调用,而是挂起当前 goroutine, 并等待唤醒

    唤醒 goroutine

    // src/runtime/netpoll_epoll.go
    func netpoll(delay int64) gList {
        // ...
    
        var waitms int32
        // 计算 waitms,大概规则:
        // delay < 0, waitms = -1,阻塞等待
        // delay == 0, waitms = 0, 不阻塞
        // delay > 0, delay 以纳秒为单位作为 waitms
    
        var events [128]epollevent
    retry:
        n := epollwait(epfd, &events[0], int32(len(events)), waitms)
        if n < 0 {
            // ...
        }
    
        var toRun gList
        for i := int32(0); i < n; i++ {
            ev := &events[i]
            // ...
    
            var mode int32
            if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
                mode += 'r'
            }
            if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
                mode += 'w'
            }
    
            if mode != 0 {
                pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
                pd.everr = false
                if ev.events == _EPOLLERR {
                    pd.everr = true
                }
                netpollready(&toRun, pd, mode)
            }
        }
        return toRun
    }
    

    netpoll 会调用 epollWait 来获取epoll事件,而在 runtime 中很多地方都会调用 netpoll 函数

    监控函数 sysmon

    // src/runtime/proc.go
    func sysmon() {
        // ....
        for {
            // ...
            list := netpoll(0)
            if !list.empty() {
                //... 
                injectglist(&list) // 将 goroutine 放到 runable 队列中
            }
        }
    }
    

    查找可运行的 goroutine

    // src/runtime/proc.go
    func findrunable() (gp *g, inheritTIme bool) {
    top:
        // ...
        if list := netpoll(0); !list.empty() {
            gp := list.pop()
            injectglist(&list)
            // ...
            return gp, false
        }
        // ....
    stop:
        // ...
        list := netpoll(delta) // block until new work is available
        // ...
    }
    

    GC 时调用 startTheWorld

    // src/runtime/proc.go
    func startTheWorld() {
        systemstack(func() {startTheWorldWithSema(false)})
        // ...
     }
    
    func startTheWorldWithSema(emitTraceEvent bool) int64 {
        // ...
        list := netpoll(0)
        injectglist(&list)
        // ...
    }
    

    通常获取可用的 goroutine 时都可能有机会去调用 netpoll,然后再调用 injectglist(&list) 将可运行 goroutine 加入到runq队列中

    系统级异步接口 —— io_uring

    本节不会详细介绍 io_uring 的具体操作,关于 io_uring 的使用,可以查看 Lord of the io_uring

    Linux kernel 5.1 新增了异步接口 io_uring,它是 Jens Axboe 以高效,可扩展,易用为目的设计的一种全新异步接口,为什么是全新呢,因为 Linux 已经提供了异步 IO 接口 —— AIO,不过就连 Linus 都对它一阵吐槽
    Re: [PATCH 09/13] aio: add support for async openat()

    So I think this is ridiculously ugly.
    AIO is a horrible ad-hoc design, with the main excuse being “other,
    less gifted people, made that design, and we are implementing it for
    compatibility because database people - who seldom have any shred of
    taste - actually use it”.
    But AIO was always really really ugly.

    io_uring 提供的异步接口,不仅仅可以使用 文件 IO,套接字 IO,甚至未来可以扩展加入其它系统调用
    而且 io_uring 采用应用程序和内核共享内存的方式,来提交请求和获取完成事件

    使用共享内存的方式可能是内核对接口优化的一种趋势

    io_uring 名称的意思便是 io use ring,而 ring 便是指和内核内存共享的 提交队列完成队列两个环形缓冲区

    SubmissionQueueEntry

    我们来看一下 io_uring 用来提交请求的结构

    struct io_uring_sqe {
            __u8    opcode;         /* 请求的操作类型 */
            __u8    flags;          /* IOSQE_ flags */
            __u16   ioprio;         /* ioprio for the request */
            __s32   fd;             /* 用于 IO 的文件描述符 */
            union {
                    __u64   off;    /* offset into file */
                    __u64   addr2;
            };
            union {
                    __u64   addr;   /* pointer to buffer or iovecs */
                    __u64   splice_off_in;
            };
            __u32   len;            /* buffer size or number of iovecs */
            
            /*
             * 用于特定操作的字段
             */
            union {
                    __kernel_rwf_t  rw_flags;
                    __u32           fsync_flags;
                    __u16           poll_events;    /* compatibility */
                    __u32           poll32_events;  /* word-reversed for BE */
                    __u32           sync_range_flags;
                    __u32           msg_flags;
                    __u32           timeout_flags;
                    __u32           accept_flags;
                    __u32           cancel_flags;
                    __u32           open_flags;
                    __u32           statx_flags;
                    __u32           fadvise_advice;
                    __u32           splice_flags;
            };
            __u64   user_data;      /* 用来关联请求的完成事件 */
            union {
                    struct {
                            /* pack this to avoid bogus arm OABI complaints */
                            union {
                                    /* index into fixed buffers, if used */
                                    __u16   buf_index;
                                    /* for grouped buffer selection */
                                    __u16   buf_group;
                            } __attribute__((packed));
                            /* personality to use, if used */
                            __u16   personality;
                            __s32   splice_fd_in;
                    };
                    __u64   __pad2[3];
            };
    };
    

    io_uring_sqe 一个非常复杂的结构,最核心的三个字段 opcodefduser_data

    • opcode 指定具体是什么操作,比如 IORING_OP_READVIORING_OP_ACCEPTIORING_OP_OPENAT,现在支持 35 种操作
    • fd 表示用于 IO 操作的文件描述符
    • user_data 当请求操作完成后,io_uring 会生成一个完成事件放到 完成队列(CompletionQueue) 中,而 user_data 便是用来和完成队列中的事件进行绑定的,他会原封不动的复制到完成队列的事件(cqe)
    • flags 字段用来实现链式请求之类的功能

    可以发现 opcode 是uint8,也就是现在来看最多支持 256 个系统调用,但是整个 io_uring_sqe 还为未来预留了一些空间 __pad2

    通过 opcode 结合结合其他 union 的字段,便实现了扩展性极强的 io_uring 接口

    CompletionQueueEvent

    完成队列事件(CompletionQueueEvent) 的结构就比较简单了,主要是表示提交的异步操作执行的结果

    struct io_uring_cqe {
            __u64   user_data;      /* 直接复制 sqe->data */
            __s32   res;            /* 异步操作的结果 */
            __u32   flags;
    };
    

    user_data 便是从 sqe->data 直接复制过来了,可以通过 user_data 绑定到对应的 sqe
    res 便是异步操作执行的结果,如果 res < 0,通常说明操作执行错误
    flags 暂时没有使用

    io_uring_setup

    io_uring 使用共享内存的方式,来提交请求和获取执行结果,减少内存拷贝带来的损耗
    io_uring_setup 接口会接受一个指定提交队列大小的 uint32 类型参数和一个 io_uring_params 对象

    #include <linux/io_uring.h>
    
    int io_uring_setup(u32 entries, struct io_uring_params *p);
    

    调用成功会返回一个文件描述符,用于后续的操作

    io_uring_params

    struct io_uring_params {
            __u32 sq_entries;
            __u32 cq_entries;
            __u32 flags;
            __u32 sq_thread_cpu;
            __u32 sq_thread_idle;
            __u32 features;
            __u32 wq_fd;
            __u32 resv[3];
            struct io_sqring_offsets sq_off;
            struct io_cqring_offsets cq_off;
    };
    

    io_uring_params 不只用来配置 io_uring 实例,内核也会填充 io_uring_params 中关于 io_uring 实例的信息,比如用来映射共享内存的请求队列和完成队列字段的偏移量 - io_sqring_offsetsio_cqring_offsets

    配置 io_uring

    flags 以位掩码的方式,结合相应 sq_thread_cpusq_thread_idlewq_fdcq_entries 字段来配置 io_uring 实例

    /*
     * io_uring_setup() flags
     */
    #define IORING_SETUP_IOPOLL     (1U << 0)       /* io_context is polled */
    #define IORING_SETUP_SQPOLL     (1U << 1)       /* SQ poll thread */
    #define IORING_SETUP_SQ_AFF     (1U << 2)       /* sq_thread_cpu is valid */
    #define IORING_SETUP_CQSIZE     (1U << 3)       /* app defines CQ size */
    #define IORING_SETUP_CLAMP      (1U << 4)       /* clamp SQ/CQ ring sizes */
    #define IORING_SETUP_ATTACH_WQ  (1U << 5)       /* attach to existing wq */
    #define IORING_SETUP_R_DISABLED (1U << 6)       /* start with ring disabled */
    

    通常 cq_entries 为 sq_entries 的两倍,通过 flags 指定 IORING_SETUP_CQSIZE ,然后设置 cq_entries 字段为指定大小

    cq_entries 不能小于 sq_entries

    iouring-go 提供了初始化 io_uring 对象时的配置函数,可以看一下这些函数的具体实现

    type IOURingOption func(*IOURing)
    
    func New(entries uint, opts ...IOURingOption) (iour *IOURing, err error)
    
    func WithParams(params *iouring_syscall.IOURingParams) IOURingOption
    func WithAsync() IOURingOption
    func WithDisableRing() IOURingOption
    func WithCQSize(size uint32) IOURingOption
    func WithSQPoll() IOURingOption
    func WithSQPollThreadCPU(cpu uint32) IOURingOption
    func WithSQPollThreadIdle(idle time.Duration) IOURingOption
    
    内核填充信息

    内核会向 io_uring_params 填充跟 io_uring 实例相关的信息
    sq_entries 请求队列的大小,io_uring_setup 会传递请求队列的大小 entries,io_uring 会根据 entries 设置 sq_entries 为 2 的次方大小
    cq_entries 完成队列的大小,通常为 sq_entries 的两倍,即使通过 IORING_SETUP_CQSIZE flag 设置了 cq_enries ,内核依然会以 2 的次方重新计算出 cq_entries 的大小
    features 记录了当前内核版本支持的一些功能

    /*
     * io_uring_params->features flags
     */
    #define IORING_FEAT_SINGLE_MMAP         (1U << 0)
    #define IORING_FEAT_NODROP              (1U << 1)
    #define IORING_FEAT_SUBMIT_STABLE       (1U << 2)
    #define IORING_FEAT_RW_CUR_POS          (1U << 3)
    #define IORING_FEAT_CUR_PERSONALITY     (1U << 4)
    #define IORING_FEAT_FAST_POLL           (1U << 5)
    #define IORING_FEAT_POLL_32BITS         (1U << 6)
    #define IORING_FEAT_SQPOLL_NONFIXED     (1U << 7)
    

    io_sqring_offsetsio_cqring_offsets 便是SQCQ在共享内存中的偏移量

    struct io_sqring_offsets {
            __u32 head;
            __u32 tail;
            __u32 ring_mask;
            __u32 ring_entries;
            __u32 flags;
            __u32 dropped;
            __u32 array;
            __u32 resv1;
            __u64 resv2;
    };
    
    struct io_cqring_offsets {
            __u32 head;
            __u32 tail;
            __u32 ring_mask;
            __u32 ring_entries;
            __u32 overflow;
            __u32 cqes;
            __u32 flags;
            __u32 resv1;
            __u64 resv2;
    };
    

    根据这些偏移量便可以调用 mmap 来映射 SQ 和 CQ

    ptr = mmap(0, sq_off.array + sq_entries * sizeof(__u32),
               PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE,
               ring_fd, IORING_OFF_SQ_RING);
    
    

    可以参考 iouring-go 对 IOURing 对象的初始化

    // iouring-go/iouring.go
    func New(entries uint, opts ...IOURingOption) (*IOURing, error) {
        iour := &IOURing{
            params:    &iouring_syscall.IOURingParams{},
            userDatas: make(map[uint64]*UserData),
            cqeSign:   make(chan struct{}, 1),
            closer:    make(chan struct{}),
            closed:    make(chan struct{}),
        }
    
        for _, opt := range opts {
            opt(iour)
        }
    
        var err error
        iour.fd, err = iouring_syscall.IOURingSetup(entries, iour.params)
        if err != nil {
            return nil, err
        }
    
        if err := mmapIOURing(iour); err != nil {
            munmapIOURing(iour)
            return nil, err
        }
        // ...
    }
    

    mmapIOURing 中实现了对请求队列以及完成队列的内存映射

    // iouring-go/mmap.go
    func mmapIOURing(iour *IOURing) (err error) {
        defer func() {
            if err != nil {
                munmapIOURing(iour)
            }
        }()
        iour.sq = new(SubmissionQueue)
        iour.cq = new(CompletionQueue)
    
        if err = mmapSQ(iour); err != nil {
            return err
        }
    
        if (iour.params.Features & iouring_syscall.IORING_FEAT_SINGLE_MMAP) != 0 {
            iour.cq.ptr = iour.sq.ptr
        }
    
        if err = mmapCQ(iour); err != nil {
            return err
        }
    
        if err = mmapSQEs(iour); err != nil {
            return err
        }
        return nil
    }
    

    这里不再详细介绍 io_uring 的使用 ,想要了解更多可以查看文末的推荐阅读

    io_uring 的功能

    这里简单介绍一下 io_uring 提供的一些功能,以及在 Go 中如何去使用

    顺序执行

    设置 sqe->flagsIOSQE_IO_DRAIN 标记,这样只有当该 sqe 之前所有的 sqes 都完成后,才会执行该 sqe,而后续的 sqe 也会在该 sqe 完成后才会执行

    iouring-go 中可以在构建 IOURing 对象时使用 WithDrain 来全局设置请求顺序执行

    iour := iouring.New(8, WithDrain())
    

    针对单一请求设置 WithDrain,保证请求会在之前所有的请求都完成才会执行,而后续的请求也都会在该请求完成之后才会开始执行

    request, err := iour.SubmitRequest(iouring.Read(fd, buf).WithDrain(), nil)
    

    链式请求

    io_uring 提供了一组请求的链式/顺序执行的方法,可以让链中的请求会在上一个请求执行完成后才会执行,而且不影响链外其他请求的并发执行

    设置 sqe->flagsIOSQE_IO_LINK 标记后,下一个 sqe 和当前 sqe 自动组成新链或者当前 sqe 的链中,链中没有设置 IOSQWE_IO_LINKsqe 便是链尾

    如果链中的有请求执行失败了,那么链中后续的 sqe 都会被取消( cqe.res-ECANCELED)
    io_uring 还提供了以外一种设置链式请求的方式,设置 sqe->flagsIOSQE_IO_HARDLINK flag,这种方式会让链中的请求忽略之前请求的结果,也就是说即使链中之前的请求执行失败了,也不会取消链中后边的请求

    iouring-go 中可以使用 SubmitLinkRequests 或者 SubmitHardLinkRequests 方法来设置链式请求

    preps := []iouring.PrepRequest{ iouring.Read(fd1, buf), iouring.Write(fd2, buf) }
    requests, err := iour.SubmitLinkRequest(preps, nil)
    

    请求取消

    当请求提交后,还可以提交取消请求的请求,这样如果请求还没有执行或者请求的操作可以被中断(比如 socket IO),那么就可以被异步的取消,而对于已经启动的磁盘IO请求则无法取消

    iouring-go 中,提交请求后会返回一个 iouring.Request 对象,通过request.Cancel 方法就可以取消请求

    request, err := iour.SubmitRequest(iouring.Timeout(1 * time.Second), nil)
    cancelRequest, err := request.Cancel()
    

    Cancel 方法会返回一个 cancelRequest 对象,表示提交的取消请求
    可以监听 request 的执行是否失败,并且失败原因是否为 iouring.ErrRequestCanceled

    <- request.Done()
    if err := request.Err(); if err != nil {
        if err == iouring.ErrRequestCanceled {
            fmt.Println("request is canceled")
        }
    }
    

    也可去监听 cancelRequest 的执行结果,如果cancelRequest.Err 方法返回 nil,便是可能成功取消了,注意是可能取消了,因为一些操作是无法被取消的

    <- cancelRequest.Done()
    if err := cancelRequest.Err(); if err != nil{
        if err == iouring.ErrRequestNotFound(){
            fmt.Println("canceled request is not found")
        }
        // do something
    }
    

    定时和请求完成计数

    io_uring 提供了 IORING_OP_TIMEOUT 请求,可以用来提交超时请求
    超时请求可以分为三种:

    • 相对时间超时
    • 绝对时间超时
    • 对请求完成计数,到达指定的完成事件数量后,超时请求就会完成

    iouring-go 对这三种情况封装了三个函数 iouring.Timeoutiouring.TimeoutWithTimeiouring.CountCompletionEvent 来分别代表三种超时请求

    now := time.Now()
    request, err := iouring.SubmitRequest(iouring.Timeout(2 * time.Second), nil)
    if err != nil {
        panic(err)
    }
    <- request.Done()
    fmt.Println(time.Now().Sub(now))
    

    根据 io_uring 提供的超时请求,可以实现系统级的异步定时器

    请求超时

    io_uring 通过 IOSQE_IO_LINK 将一个请求和 IORING_OP_LINK_TIMEOUT 请求链接在一起,那么就可以做到请求的超时控制

    iouring-go 同样提供了简便方法 WithTimeout

    preps := iouring.Read(fd, buf).WithTimeout()
    

    WithTimeout 方法会返回两个 PrepRequest 对象,所以需要使用 SubmitRequests 来提交

    iouring-go 中请求超时的一些操作使用起来感觉还不是特别友好,有待优化

    注册文件

    io_uring 的一些 IO 操作需要提供文件描述符,而频繁的将文件和内核之间进行映射也会导致一定的性能损耗,所以可以使用 io_uringio_uring_register 接口来提前注册文件描述符
    详细的概念可以参考 io_uring_register

    iouring-go 也提供了文件描述符的注册功能,而且对于已经注册的文件描述符会自动使用

    func (iour *IOURing) RegisterFile(file *os.File) error
    func (iour *IOURing) RegisterFiles(files []*os.File) error
    
    func (iour *IOURing) UnregisterFile(file *os.File) error
    func (iour *IOURing) UnregisterFiles(files []*os.File) error
    

    io_uring 的文件描述符被关闭后,这些注册的文件会自动注销

    需要注意,调用 io_uring_register 来注册文件描述符时,如果有其他的正在进行的请求的话,会等到这些请求都完成才会注册

    注册文件描述符在 Go 中带来的并发问题

    type fileRegister struct {
        lock sync.Mutex
        iouringFd int
    
        fds          []int32
        sparseindexs map[int]int
        
        registered bool
        indexs     sync.Map
    }
    

    需要注意由于存在对索引 fileRegister.indexs 的并发读写,所以使用 sync.Map,也就意味着,使用注册文件描述符,会带来一定的并发问题,经过简单的测试,sync.Map 带来的性能损耗导致注册文件描述符带来的优势并没有那么大的

    在 Go 中使用 io_uring 的最大问题便是对 io_uring 实例的竞争问题,而通过 Go 暴露给外部使用的并发机制,并不能让 io_uring 带来的异步 IO 发挥最大的性能

    io_uring 融入 runtime 中,才是最终的解决方案

    注册缓冲区

    和注册文件描述符类似, io_uring 为了减少 IO 请求中缓冲区的映射,同样可以使用 io_uring_register 来注册缓冲区
    如果要在请求中使用缓冲区的话,需要使用 IORING_OP_READ_FIXED 或者 IORING_OP_WRITE_FIXED 请求

    具体可以参考 io_uring_register

    内核侧请求队列轮询

    将请求放到SQ的环形缓冲区后,需要调用 io_uring_enter 来通知内核有请求需要处理
    io_uring 为了进一步减少系统调用,可以在 io_uring_setup 是设置 io_uring_params->flagsIORING_SETUP_SQPOLL flags,内核就会创建一个轮询请求队列的线程

    可以通过 ps 命令查看用来轮询的内核线程

    ps --ppid 2 | grep io_uring-sq
    

    需要注意在 5.10 之前的版本,需要使用特权用户来执行,而 5.10 以后只需 CAP_SYS_NICE 权限即可
    并且 5.10 之前,SQPoll 需要配合注册的文件描述符一起使用,而 5.10 以后则不需要,可以通过查看内核填充的 io_uring_params->features 是否设置了 IORING_FEAT_SQPOLL_NONFIXED

    // iouring-go/iouring.go
    func (iour *IOURing) doRequest(sqe *iouring_syscall.SubmissionQueueEntry, request PrepRequest, ch chan<- Result) (*UserData, error) {
        // ...
        if sqe.Fd() >= 0 {
            if index, ok := iour.fileRegister.GetFileIndex(int32(sqe.Fd())); ok {
                sqe.SetFdIndex(int32(index))
            } else if iour.Flags&iouring_syscall.IORING_SETUP_SQPOLL != 0 &&
                iour.Features&iouring_syscall.IORING_FEAT_SQPOLL_NONFIXED == 0 {
                return nil, ErrUnregisteredFile
            }
        }
        // ...
    }
    

    iouring-go 同样提供了开启 SQPoll 的 WithSQPoll 以及设置与 SQPoll 内核线程的相关配置 WithSQPollThreadCpuWithSQPollThreadIdle

    iour, err := iouring.New(8, iouring.WithSQPoll())
    

    但是在 Go 简单的设置 io_uring_params 并不能正常的工作,可能是由于 Go 的 GMP 模型导致的一些问题。暂时还在思考解决方案

    注册 eventfd,利用 epoll

    通过 io_uring_register 可以将 eventfd 注册到 io_uring 实例中,然后将 eventfd 加入到 epoll 中,如果当 io_uring 中有完成事件时,便会通知 eventfd

    iouring-go 中,对于完成事件的监听便是使用了 eventfdepoll

    type IOURing struct {
        eventfd int
        cqeSign chan struct{}
        // ...
    }
    
    func New() (*IOURing, error) {
        // ....
        if err := iour.registerEventfd(); err != nil {
            return nil, err
        }
        if err := registerIOURing(iour); err != nil {
            return nil, err
        }
        // ...
    }
    
    func (iour *IOURing) registerEventfd() error {
        eventfd, err := unix.Eventfd(0, unix.EFD_NONBLOCK|unix.FD_CLOEXEC)
        if err != nil {
             return os.NewSyscallError("eventfd", err)
        }
        iour.eventfd = eventfd
        return iouring_syscall.IOURingRegister(
            iour.fd,
            iouring_syscall.IOURING_REGISTER_EVENTFD,
            unsafe.Pointer(&iour.eventfd), 1,
        ) 
    }
    func registerIOURing(iour *IOURing) error {
        if err := initpoller(); err != nil {
            return err
        }
    
        if err := unix.EpollCtl(poller.fd, unix.EPOLL_CTL_ADD, iour.eventfd,
             &unix.EpollEvent{Fd: int32(iour.eventfd), Events: unix.EPOLLIN | unix.EPOLLET},
        ); err != nil {
             return os.NewSyscallError("epoll_ctl_add", err)
        }
    
        poller.Lock()
        poller.iours[iour.eventfd] = iour
        poller.Unlock()
        return nil
    }
    

    poller 会调用 EpollWait 等待完成队列中有完成事件,并通知相应的 IOURing 对象

    // iouring-go/iouring.go
    func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
        var tryPeeks int
        for {
            if cqe = iour.cq.peek(); cqe != nil {
                iour.cq.advance(1)
                return
            }
    
            if !wait && !iour.sq.cqOverflow() {
                err = syscall.EAGAIN
                return
            }
    
            if iour.sq.cqOverflow() {
                _, err = iouring_syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
                if err != nil {
                    return
                }
                continue
            }
    
            if tryPeeks++; tryPeeks < 3 {
                runtime.Gosched()
                continue
            }
    
            select {
            case <-iour.cqeSign:
            case <-iour.closer:
                return nil, ErrIOURingClosed
            }
        }
    }
    // iouring-go/poller.go
    func (poller *iourPoller) run() {
        for {
            n, err := unix.EpollWait(poller.fd, poller.events, -1)
            if err != nil {
                continue
            }
    
            for i := 0; i < n; i++ {
                fd := int(poller.events[i].Fd)
                poller.Lock()
                iour, ok := poller.iours[fd]
                poller.Unlock()
                if !ok {
                    continue
                }
    
                select {
                case iour.cqeSign <- struct{}{}:
                default:
                }
            }
    
            poller.adjust()
        }
    }
    
    

    保证数据不丢失

    默认情况下, CQ 环的大小是 SQ 环的 两倍,为什么 SQ 环的大小会小于 CQ 环,是因为 SQ 环中的 sqe 一旦被内核发现,便会被内核消耗掉,也就意味着 sqe 的生命周期很短,而请求的完成事件都会放到 CQ 环中
    我们也可以通过 IORING_SETUP_CQSIZE 或者 iouring-goWithCQSize Option 里设置 CQ 环的大小

    但是依然会存在 CQ 环溢出的情况,而内核会在内部存储溢出的时间,直到 CQ 环有空间容纳更多事件。

    可以通过 io_uring_params->features 是否设置 IORING_FEAT_NODROP 来判断当前内核是否支持该功能

    如果 CQ 环溢出,那么提交请求时可能会以 -EBUSY 错误失败,需要重新提交

    并且当 CQ 环中数据被消耗后,需要调用 io_uring_enter 来通知内核 CQ 环中有空余空间

    func (iour *IOURing) getCQEvent(wait bool) (cqe *iouring_syscall.CompletionQueueEvent, err error) {
        // ...
        if iour.sq.cqOverflow() {
            _, err := iour.syscall.IOURingEnter(iour.fd, 0, 0, iouring_syscall.IORING_ENTER_FLAGS_GETEVENTS, nil)
            if err != nil{
                return
            }
            continue
        }
        // ...
    }
    

    io_uring 与 Go —— iouring-go

    竞争问题

    在实现 iouring-go 中遇到的问题,一个是并发导致对 io_uring 的竞争问题
    对于 CQ 环的竞争是使用单一的 CQ 环消费 goroutine IOURing.run() 来完成 cqe 的消费

    func New(entries int, opts ...IOURingOption) (iour *IOURing, err error) {
        iour := &IOURing{...}
        // ...
        go iour.run()
        return
    }
    
    func (iour *IOURing) run() {
        for {
            cqe, err := iour.getCQEvent(true)
            // ...
        }
    }
    

    SQ 环的解决方案有两种

    1. 使用单独的提交 goroutine,将需要提交的请求通过内部 channel 发送给提交 goroutine,这样保证了 SQ 环的单一生产者
    2. 使用锁的方式,对于提交请求的函数加锁,保证同一时间只有一个 goroutine 在提交请求

    第一种方式听起来使用 channel 更优雅一些,但是 channel 内部依然使用锁的方式以及额外的内存复制
    另外最大的弊端就是将 IOURIng提交函数将请求发送给提交channel)和真正将请求提交给内核(调用 io_uring_enter通知内核有新的请求)分开
    当多个提交函数 向 channel 发送的请求的顺序无法保证,这样链式请求就无法实现(除非对于链式请求再次加锁)

    第二种方式,采用加锁的方式,保证了同一时间只有一个提交函数在处理 SQ 环,并且可以立即是否真正提交成功(调用 IOURing.submit 方法通知内核有新的请求
    iouring-go 采用了第二种方式

    真正去解决这个问题的方式,估计可能只有 runtime 才能给出答案,为每一个 P 创建一个 io_uring 实例在 runtime 内部解决竞争问题,内部使用 eventfd 注册到 netpoll 中来获取完成队列通知

    io_uring 与 channel

    对于 iouring-go 设计比较好的地方,我感觉便是对 channel 的利用,异步 IO 加上 channel,可以将异步在并发的程序中发挥出最大的作用
    当然,如果只是简单的使用 channel 的话又会引入其他一些问题,后续会进行说明

    func (iour *IOURing) SubmitRequest(request PrepRequest, ch chan<- Result) (Request, error)
    

    SubmitRequest 方法接收一个 channel,当请求完成后,会将结果发送到 channel 中,这样通过多个请求复用同一个 channel,程序便可以监听一组请求的完成情况

    func (iour *IOURing) run() {
        for {
            cqe, err := iour.getCQEvent(true)
            // ...
            userData := iour.userData[cqe.UserData]
            // ...
            userData.request.complate(cqe)
            if userData.resulter != nil {
                userData.resulter <- userData.request
            }
        }
    }
    

    SubmitRequest 方法同样会返回一个 Request 接口对象,通过 Request 我们同样可以去查看请求的是否完成已经它的完成结果

    type Request interface {
    	Result
    
    	Cancel() (Request, error)
    	Done() <-chan struct{}
    
    	GetRes() (int, error)
    	// Can Only be used in ResultResolver
    	SetResult(r0, r1 interface{}, err error) error
    }
    
    type Result interface {
    	Fd() int
    	Opcode() uint8
    	GetRequestBuffer() (b0, b1 []byte)
    	GetRequestBuffers() [][]byte
    	GetRequestInfo() interface{}
    	FreeRequestBuffer()
    
    	Err() error
    	ReturnValue0() interface{}
    	ReturnValue1() interface{}
    	ReturnFd() (int, error)
    	ReturnInt() (int, error)
    }
    

    利用 channel 便可以完成对异步 IO 的异步监听和同步监听

    channel 带来的问题

    当然使用 channel 又会带来其他的问题,比如 channel 满了以后,对 io_uring 完成队列的消费便会阻塞在向 channel 发送数据,阻塞时间过长也会导致 CQ 环溢出

    比较好的解决方案是,在 channel 上抽象出一层 ResulterResulter 会对完成事件进行自动缓冲,当然这也会带来一定的代码复杂度,所以 iouring-go 便将 channel 阻塞的问题交给使用者,要求 channel 的消费端尽快消费掉数据

    思考 io_uring 在 Go 中的发展

    netpoll 在 Linux 平台下使用了 epoll,而且 epoll 在使用上并没有竞争问题,当然如果要使用 io_uring 来替代 epoll 来实现 netpoll 的话并不是不可能,只是这样对于工作很好的 epoll 来说并没有什么必要,而且是否能够带来可观的性能收益也都是不确定的

    在高并发的情况下,有限的 SQ 环和 CQ 环,对于请求数量大于完成事件的消费速度的情况,CQ 环的大量溢出带来对内核的压力以及新的请求提交带来的错误处理,都会提高真正利用 io_uring 的难度

    对于 SQ 环和 CQ 环的大小限制,也许需要通过 Pool 的方式来解决,初始化多个 io_uring 实例,当一个实例的 SQ 环满,那么就使用另外的实例来提交请求
    而使用 Pool 又会增加一定的复杂度

    io_uring 的功能实际可以覆盖了 epoll 的,比如提交的阻塞 IO 请求便相当于 epoll + syscall,另外 io_uring 还提供了超时设置和请求的超时控制,相当于实现了系统级的定时器以及 netpoll 的 deadline

    但是 epoll 自身的优势,比如没有竞争问题,没有监听文件描述符的数量限制,都让 epoll 在实际的使用中更加好用,而这些问题对于 io_uring 在本身设计上就会导致的问题
    比如竞争问题,使用环形缓冲区可以协调应用和内核对请求队列的访问,但是应用中多个线程或者 goroutine 就会引发对环形缓冲区的竞争问题
    而请求数量的限制,那么就需要考虑到请求完成事件的溢出问题,内核不能无限制的去保存溢出的完成事件,当然这个问题通过应用中在 io_uring 实例上抽象出 io_uring 池的方式来解决

    使用 io_uring 来实现异步网络框架,对已有的网络模型会是非常大的冲击,怎么去使用 io_uring 来发挥最大的能力依然处于探索阶段,毕竟 io_uring 是一个出现才 1 年的技术
    而对于普通的磁盘 IO 来说,io_uring 还是有很大的发挥空间的,利用 Go 中已有的并发机制,结合具体的性能评估,对于文件服务器来说,也许会带来极大的提升

    另外一个问题便是,对于 5.1 引入,5.6 开始功能变得丰富成熟的 io_uring 来说,现在大量的环境处于 3.X,4.X,甚至 2.X , io_uring 仍然需要等待时机才能去发挥它真正的作用,而这段时间便是留给我们去探讨怎么让 io_uring 更好用

    推荐阅读

    彻底学会使用 epoll 系列
    曹春晖:谈一谈 Go 和 Syscall

    io_uring

    Efficient IO with io_uring
    What’s new with io_uring
    io_uring 在 LWN 中的讨论
    io_uring 在内核中的提交记录
    Lord of the io_uring
    liburing

    展开全文
  • 异步IO

    千次阅读 2017-03-02 09:53:41
    IO编程一节中,我们已经知道,CPU的速度远远快于磁盘、网络等IO。在一个线程中,CPU执行代码的速度极快,然而,一旦遇到IO操作,如读写文件、发送网络数据时,就需要等待IO操作完成,才能继续进行下一步操作。这种...

    在IO编程一节中,我们已经知道,CPU的速度远远快于磁盘、网络等IO。在一个线程中,CPU执行代码的速度极快,然而,一旦遇到IO操作,如读写文件、发送网络数据时,就需要等待IO操作完成,才能继续进行下一步操作。这种情况称为同步IO。

    在IO操作的过程中,当前线程被挂起,而其他需要CPU执行的代码就无法被当前线程执行了。

    因为一个IO操作就阻塞了当前线程,导致其他代码无法执行,所以我们必须使用多线程或者多进程来并发执行代码,为多个用户服务。每个用户都会分配一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响。

    多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。

    由于我们要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多进程只是解决这一问题的一种方法。

    另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。

    可以想象如果按普通顺序写出的代码实际上是没法完成异步IO的:

    do_some_code()
    f = open('/path/to/file', 'r')
    r = f.read() # <== 线程停在此处等待IO操作结果
    # IO操作完成后线程才能继续执行:
    do_some_code(r)
    

    所以,同步IO模型的代码是无法实现异步IO模型的。

    异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程:

    loop = get_event_loop()
    while True:
        event = loop.get_event()
        process_event(event)
    

    消息模型其实早在应用在桌面应用程序中了。一个GUI程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中,然后由GUI程序的主线程处理。

    由于GUI线程处理键盘、鼠标等消息的速度非常快,所以用户感觉不到延迟。某些时候,GUI线程在一个消息处理的过程中遇到问题导致一次消息处理时间过长,此时,用户会感觉到整个GUI程序停止响应了,敲键盘、点鼠标都没有反应。这种情况说明在消息模型中,处理一个消息必须非常迅速,否则,主线程将无法及时处理消息队列中的其他消息,导致程序看上去停止响应。

    消息模型是如何解决同步IO必须等待IO操作这一问题的呢?当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。

    在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。

    展开全文
  • windows下异步IO

    千次阅读 2020-02-16 15:33:17
    当进行一个异步的设备IO请求时,该线程可以先去做其他事,等到设备完成IO请求后通知该线程进行处理。本文讨论在windows平台下的异步设备IO。同时在一些示例中会对涉及到的知识进行讲解。 1.异步设备IO执行 进行异步...

    介绍

    简单讲解下我们程序进行IO的过程,当线程进行一个同步的设备IO请求时,他会被挂起,直到设备完成IO请求,返回给阻塞线程,线程激活继续处理。当进行一个异步的设备IO请求时,该线程可以先去做其他事,等到设备完成IO请求后通知该线程进行处理。本文讨论在windows平台下的异步设备IO。同时在一些示例中会对涉及到的知识进行讲解。

    1.异步IO执行

    进行异步设备io时我们来做一下下准备工作,首先针对不同的设备(文件,管道,套接字,控制台)的初始化和发出IO不太一样,以简单的文件为例,别的应该都是相通的。

    1.1 初始化设备(eg.CreateFile)

    首先我们来说下在windows下他的api大多数有后缀为W和A两种情况,W表示以unicode(utf-16)字符编码,
    A表示以ANSI字符编码,我们以W为例,然后我们使用CreateFile创建文件设备对象,CreateFile也可以用来创建目录,磁盘驱动器,串口,并口等设备对象。这里我们用最简单的文件为例。

    WINBASEAPI
    HANDLE
    WINAPI
    CreateFileW(
        _In_ LPCWSTR lpFileName,
        _In_ DWORD dwDesiredAccess,
        _In_ DWORD dwShareMode,
        _In_opt_ LPSECURITY_ATTRIBUTES lpSecurityAttributes,
        _In_ DWORD dwCreationDisposition,
        _In_ DWORD dwFlagsAndAttributes,
        _In_opt_ HANDLE hTemplateFile
        );
    
    • WINBASEAPI宏表示__declspec(dllimport)是用来导入导出时使用
    • HANDLE类型表示内核对象,比如线程,进程,事件,设备等,操作系统来维护的。
    • WINAPI 宏是__stdcall,VC编译器的指令,可以来设置传参的时入栈的参数顺序,栈内数据清除方式,函数签名等
    • lpFileName文件名
    • dwDesiredAccess访问方式,可读、可写等
    • dwShareMode,其他内核对象使用是的共享方式
    • lpSecurityAttributes 安全属性
    • dwCreationDisposition 打开方式,创建还是打开已有等

    • 我们如果使用CreateFile来进行异步IO,我们需要将dwFlagsAndAttributes设置带有FILE_FLAG_OVERLAPPED属性。OVERLAPPED重叠的意思,表示内核线程和应用线程重叠运行。

    1.2 执行(eg.ReadFile,WriteFile)

    WINBASEAPI
    _Must_inspect_result_
    BOOL
    WINAPI
    ReadFile(
        _In_ HANDLE hFile,
        _Out_writes_bytes_to_opt_(nNumberOfBytesToRead, *lpNumberOfBytesRead) __out_data_source(FILE) LPVOID lpBuffer,
        _In_ DWORD nNumberOfBytesToRead,
        _Out_opt_ LPDWORD lpNumberOfBytesRead,
        _Inout_opt_ LPOVERLAPPED lpOverlapped
        );
    
    WINBASEAPI
    BOOL
    WINAPI
    WriteFile(
        _In_ HANDLE hFile,
        _In_reads_bytes_opt_(nNumberOfBytesToWrite) LPCVOID lpBuffer,
        _In_ DWORD nNumberOfBytesToWrite,
        _Out_opt_ LPDWORD lpNumberOfBytesWritten,
        _Inout_opt_ LPOVERLAPPED lpOverlapped
        );
    
    

    来看下ReadFile的解释

    • hFile即为上一节的设备对象
    • lpBuffer是文件最后读到的缓冲区,或者要写到设备的缓冲区
    • nNumberOfBytesToRead要读取多少字节,nNumberOfBytesToWrite要写多少字节
    • lpNumberOfBytesRead指向一个DWORD的地址,表示最终读取了多少字节,lpNumberOfBytesWritten最终写了多少字节。

    然后就是lpOverlapped了,我们来看下LPOVERLAPPED的结构

    typedef struct _OVERLAPPED {
        ULONG_PTR Internal;
        ULONG_PTR InternalHigh;
        union {
            struct {
                DWORD Offset;
                DWORD OffsetHigh;
            } DUMMYSTRUCTNAME;
            PVOID Pointer;
        } DUMMYUNIONNAME;
    
        HANDLE  hEvent;
    } OVERLAPPED, *LPOVERLAPPED;
    
    • Internal用来保存等到已经处理完IO后的错误码
    • InternalHigh用来保存已传输的字节数
    • Offset和InternalHigh构成一个64位的偏移值,表示访问文件从哪里开始访问
    • Pointer系统保留字
    • hEvent用来接收I/O完成通知时使用,后边会说到

    2. IO请求完成通知

    然后我们来看下,等到IO完成后如何通知到线程中,有四种方式来通知,摘自《windows核心编程》:

    方法描述
    触发设备内核对象允许一个线程发出IO请求,另一个线程对结果处理,只能同时发出一个IO请求
    触发事件内核对象允许一个线程发出IO请求,另一个线程对结果处理 ,能同时发出多个IO请求
    可提醒I/O只允许一个线程发出IO请求,须发出请求的线程对结果处理,能同时发出多个IO请求
    I/O完成端口循序一个线程发出IO请求,另一个线程对结果处理,能同时发出多个IO请求

    2.1 触发设备内核对象

    先来看例子:

    int main()
    {
        HANDLE hFile = CreateFile(L"1.txt", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
        if (hFile == INVALID_HANDLE_VALUE) {
            std::cout << "open error";
            return -1;
        }
    
        BYTE bBuffer[1024];
        OVERLAPPED o = { 0 };
        BOOL bReadDone = ReadFile(hFile, bBuffer, 1024, NULL, &o);
    
        DWORD dwError = GetLastError();
        if (!bReadDone && dwError == ERROR_IO_PENDING) {
            DWORD dw = WaitForSingleObject(hFile, INFINITE);
            bReadDone = TRUE;
        }
    
        if (bReadDone) {
            std::cout << o.Internal << std::endl;
            std::cout << o.InternalHigh << std::endl;
            bBuffer[o.InternalHigh] = '\0';
            std::cout << bBuffer << std::endl;
        }
        else {
            std::cout << "read error";
            return 0;
        }
    
        std::cout << "succ";
        return 0;
    }
    

    CreateFile用可读可写的权限;用OPEN_ALWAYS的打开方式,表示有文件打开,没有该文件创建文件。
    这个例子对一些判断比较完整,我们可以顺便来巩固下基础知识,CreateFile成功返回句柄,失败时返回INVALID_HANDLE_VALUE,而不是像许多windows返回句柄为NULL来表示失败了,但是CreateFile失败返回的是INVALID_HANDLE_VALUE(-1),大家可以注意下。
    然后进行初始化,声明的BYTE数组来存放读取到的数据;OVERLAPPED 对象初始化为0,即中的元素值都是0,这里要注意的是Offset为0即为从文件的开头读取数据。
    调用ReadFile后,由于是异步的,所以bReadDone 是FALSE,然后获取下错误信息,得知是ERROR_IO_PENDING,表示正在进行IO操作。
    最后我们调用WaitForSingleObject(hFile, INFINITE)来等待hFile设备内核对象触发,这里我们大概讲解下关于内核对象触发。

    在windows中,内核对象可以用来进行线程同步,内核对象有两个状态:触发和,未触发。比如说线程,进程,他们在创建时是未触发的,运行结束时变为触发状态。在比如Event对象,可以我们写代码来使他的程序变化,后边我们再说。
    这里我们说下文件内核对象,ReadFile和WriteFile函数在将IO请求添加到设备的队列之前,会先将状态设为未触发状态,当设备驱动程序完成了所谓请求后,会将对象状态设为触发状态。
    再来说WaitForSingleObject函数,就是等待第一个参数(内核对象句柄)状态变成触发,等待时间是第二个参数,等待该时间后或者内核对象状态变成触发该函数返回。

    我们先往文件中写入“01234567899876543210”
    最后我们打印出来读取结果,依次打印出错误码,读取的字节数,读取内容。另外我们首先在文件中写入了内容。
    在这里插入图片描述
    这个有一个缺点就是,只能同时处理一个IO请求。

    2.2 触发事件内核对象

    继续看例子:

    static bool readReady = false;
    void WaitResultThd(void *param)
    {
        HANDLE* hh = (HANDLE*)param;
        DWORD dw = WaitForMultipleObjects(2, hh, TRUE, INFINITE);
        if (dw == WAIT_OBJECT_0) {
            readReady = true;
        }
    }
    
    int main()
    {
        HANDLE hFile = CreateFile(L"1.txt", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, NULL, 
        						  OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
        if (hFile == INVALID_HANDLE_VALUE) {
            std::cout << "open error";
            return -1;
        }
    
        BYTE bBuffer1[11] = {0};
        OVERLAPPED o1 = { 0 };
        o1.hEvent = CreateEvent(NULL, FALSE, FALSE, L"");
        o1.Offset = 0;
    
        ReadFile(hFile, bBuffer1, 10, NULL, &o1);
    
        BYTE bBuffer2[11] = { 0 };
        OVERLAPPED o2 = { 0 };
        o2.hEvent = CreateEvent(NULL, FALSE, FALSE, L"");
        o2.Offset = 10;
    
        ReadFile(hFile, bBuffer2, 10, NULL, &o2);
    
        HANDLE h[2];
        h[0] = o1.hEvent;
        h[1] = o2.hEvent;
        _beginthread(WaitResultThd, 0, h);
    
        while (1)
        {
            /* do somthing*/
            Sleep(500);
    
            if (readReady) {
                std::cout << bBuffer1 << std::endl;
                std::cout << bBuffer2 << std::endl;
                break;
            }
        }
    
        return 0;
    }
    

    我们看下这个和上一个的区别是用OVERLAPPED的hEvent变量来实现IO完成的通知,首先CreateEvent为每个OVERLAPPED的变量创建事件内核对象,看下CreateEvent:

    WINBASEAPI
    _Ret_maybenull_
    HANDLE
    WINAPI
    CreateEventW(
        _In_opt_ LPSECURITY_ATTRIBUTES lpEventAttributes,
        _In_ BOOL bManualReset,
        _In_ BOOL bInitialState,
        _In_opt_ LPCWSTR lpName
        );
    
    • lpEventAttributes设置的安全属性
    • bManualReset,意为是否为手动重置对象,为TRUE表示手动重置,事件触发时正在等待改事件的所有线程将都变成可调度状态。为FALSE为自动重置,事件触发时只有一个线程变成可调度状态。
    • bInitialState初始状态,TRUE是触发状态,FALSE为未触发状态
    • lpName是可以用次来共享该事件对象

    当我们创建成功了时间内核对象时,可以使用SetEvent将其设置为触发状态,可以使用ResetEvent将其设置为未触发状态

    我们继续,当异步IO请求完成后,设备驱动程序会检查OVERLAPPED的hEvent是不是为空,如果不是为空,调用SetEvent来触发该对象。
    为了演示可以多线程来进行操作,我们开启另一个线程来等待事件完成,使用WaitForMultipleObjects来等待多个事件触发,我们再来看下WaitForMultipleObjects

    WINBASEAPI
    DWORD
    WINAPI
    WaitForMultipleObjects(
        _In_ DWORD nCount,
        _In_reads_(nCount) CONST HANDLE* lpHandles,
        _In_ BOOL bWaitAll,
        _In_ DWORD dwMilliseconds
        );
    
    • nCount表示等待几个对象
    • lpHandles,等待的对象句柄数组
    • bWaitAll,表示是等待所有对象都变成触发状态再返回(TRUE),还是只要有一个对象触发就返回(FALSE)
    • dwMilliseconds 表示等待的时间
      如果bWaitAll为TRUE,返回值为WAIT_OBJECT_0表示全部触发
      如果bWaitAll为FALSE,返回值为WAIT_OBJECT_0表示lpHandles[0]对象触发,WAIT_OBJECT_0 + 1表示lpHandles[1]触发,以此类推。

    再继续,我们设置的两次IO读取请求是从文件的不同偏移开始读的,我们来看下读取结果:
    在这里插入图片描述

    2.3 可提醒的I/O

    可提醒IO是使用回调函数来实现,同时执行IO请求的函数有点变化,这里我们介绍RadFileEx和WriteFileEx,我们看下函数原型:

    WINBASEAPI
    _Must_inspect_result_
    BOOL
    WINAPI
    ReadFileEx(
        _In_ HANDLE hFile,
        _Out_writes_bytes_opt_(nNumberOfBytesToRead) __out_data_source(FILE) LPVOID lpBuffer,
        _In_ DWORD nNumberOfBytesToRead,
        _Inout_ LPOVERLAPPED lpOverlapped,
        _In_ LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
        );
    
    WINBASEAPI
    BOOL
    WINAPI
    WriteFileEx(
        _In_ HANDLE hFile,
        _In_reads_bytes_opt_(nNumberOfBytesToWrite) LPCVOID lpBuffer,
        _In_ DWORD nNumberOfBytesToWrite,
        _Inout_ LPOVERLAPPED lpOverlapped,
        _In_ LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
        );
    

    和ReadFile及WriteFile,有几点不一样。

    • 这两个函数没有指向DWORD地址的指针表示已传输多少字节,毕竟在异步中不能立即拿到,该信息在回调函数才能得到
    • lpCompletionRoutine增加了这个参数,即回调函数的函数指针,看下类型:
    typedef
    VOID
    (WINAPI *LPOVERLAPPED_COMPLETION_ROUTINE)(
        _In_    DWORD dwErrorCode,
        _In_    DWORD dwNumberOfBytesTransfered,
        _Inout_ LPOVERLAPPED lpOverlapped
        );
    

    错误码,传输的字节数,及LPOVERLAPPED 结构。
    然后我们来看下例子,通过此来讲解下。

    static bool readReady = false;
    static BYTE bBuffer1[11] = { 0 };
    static BYTE bBuffer2[11] = { 0 };
    
    VOID WINAPI ReadyFunction(ULONG_PTR param)
    {
        static int times = 0;
        times++;
        if (times == 2) {
            readReady = true;
        }
    }
    
    VOID WINAPI DoWorkRountine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, OVERLAPPED* lpOverlapped)
    {
        if (lpOverlapped->Offset == 0) {
            std::cout << bBuffer1 << std::endl;
        }
        else {
            std::cout << bBuffer2 << std::endl;
        }
    
        QueueUserAPC(ReadyFunction, GetCurrentThread(), NULL);
    }
    
    void DoWorkThd(void *param)
    {
        HANDLE hFile = CreateFile(L"1.txt", GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
        if (hFile == INVALID_HANDLE_VALUE) {
            std::cout << "open error";
            return;
        }
    
        OVERLAPPED o1 = { 0 };
        o1.Offset = 0;
        ReadFileEx(hFile, bBuffer1, 10, &o1, DoWorkRountine);
    
        OVERLAPPED o2 = { 0 };
        o2.Offset = 10;
        ReadFileEx(hFile, bBuffer2, 10, &o2, DoWorkRountine);
    
        while (1) {
            if (readReady) {
                break;
            }
    
            SleepEx(500, TRUE);
        }
    }
    
    int main()
    {
        HANDLE tHandle = (HANDLE)_beginthread(DoWorkThd, 0, NULL);
        WaitForSingleObject(tHandle, INFINITE);
        return 0;
    }
    

    我们将DoWorkRountine作为IO完成的回调函数传入,其读出来的数据我们用两个全局变量来缓冲,我们注意到了发起IO请求的线程使用了SleepEx函数进去睡眠,我们看下这个函数:

    WINBASEAPI
    DWORD
    WINAPI
    SleepEx(
        _In_ DWORD dwMilliseconds,
        _In_ BOOL bAlertable
        );
    

    和sleep相似,多了一个bAlertable参数,表示是否是可提醒的,如果是可提醒的,那么完成了IO请求完成后就会唤醒线程去执行回调函数。

    • 当系统创建一个线程,会创建一个与线程相关的待执行队列,这个队列被称为异步队列,在此当IO请求完成后,设备驱动程序就会在调用线程的异步队列中添加一项。当线程是可提醒的状态就会被激活去执行相关任务。且如果队列中至少有一项,那么系统就不会让线程进入到睡眠状态,当回调函数返回时,系统判断队列中是否有任务,如果有就会继续取出任务去执行,如果没有其他项,SleepEx等可提醒的函数返回,返回值是WAIT_IO_COMPLETION
    • Sleep函数内部也是调用了SleepEx,只是将bAlertable置为FALSE。其他可以将线程置为可提醒状态的还有WaitForSingleObjectEx,WaitForMultipleObjectEx,SingleObjectAndWaitEx,GetQueuedCompletionStatusEx,MsgWaitForMutipleObjectEx。

    QueueUserAPC是允许我们手动往编程里添加任务。原型是:

    WINBASEAPI
    DWORD
    WINAPI
    QueueUserAPC(
        _In_ PAPCFUNC pfnAPC,
        _In_ HANDLE hThread,
        _In_ ULONG_PTR dwData
        );
    
    • pfnAPC是待执行的函数
    • hThread要添加的线程
    • dwData回调函数的自定义参数
      可提醒IO的确定很明显,回调函数没有足够地方存放上下文信息,需要一些全局变量,如我们例子中的bBuffer;第二个就是只能一个线程来完成IO请求和完成通知,不能用上多线程,可能对资源利用率不足。
      最后我们看下运行结果:
      在这里插入图片描述

    2.4 注意事项

    由于篇幅限制,我们下一篇再讲述完成端口,剩下这里我们说下关于进行异步IO的时候注意事项

    • 当我们发起IO多个请求时,设备驱动程序并不会按照我们请求的顺序去执行(顺序是不一定的),所以大家尽量避免依靠顺序编码。
    • 当我们进行IO请求时,可能会同步返回,这是有可能系统之前有了这一部分的数据就会直接返回,所以大家需要在ReadFile等要判断返回值。
    • 我们在完成IO请求完成之前,一定要保证数据缓存和OVERLAPPED结构的存活,这些是在我们发起IO请求时只会传入地址,完成后会填充改地址的值。所以一定要保证他的存活性。

    好了,就到这里了,参考自《windows核心编程》,欢迎交流

    展开全文
  • 本文会先介绍并演示阻塞模式,然后引入非阻塞模式来对阻塞模式进行优化,最后再介绍 JDK7 引入的异步 IO,由于网上关于异步 IO 的介绍相对较少,所以这部分内容我会介绍得具体一些。 希望看完本文,读者可以对非阻塞
  • 异步IO和同步IO 当网卡有了数据,DMA会把数据拷贝到内核缓冲区(内核缓冲区的哪里呢);而从内核缓冲区拷贝到用户态需要用户调用read,同步地进行 异步则是注册个读完成事件,等其他用户态线程/内核进程拷贝到用户态后再...
  • Python异步IO实现全过程

    千次阅读 2019-05-25 20:43:37
    你可能会有一种疑问,“现在并发,并行,线程,多线程,这已经很多了,异步IO又适用于哪里呢?” 这篇教程将帮助你回答这个问题,让你更加深入地掌握Python的异步IO方法。 将介绍以下内容: 1. 异步IO (as...
  • wg-async-foundations 致力于改善Rust中异步I / O基础的工作组 请访问我们的以获取更多信息!
  • C++,串口通信,DLL,异步IO,可以自动识别需要串口号(不用手动点击打开串口),用于设备的自动识别中。
  • 异步io

    2016-08-29 20:35:24
    1何为异步IO (1)几乎可以认为:异步IO就是操作系统用软件实现的一套中断响应系统。 (2)异步IO的工作方法是:我们当前进程注册一个异步IO事件(使用signal注册一个信号SIGIO的处理函数),然后当前进程可以正常处理...
  • sk-async (WIP)基于Coro的无异常异步I / O库。 仅适用于Windows。 要求: MSVC 19.28(VS 16.9)或更高版本。 不支持Clang,因为clang-cl不支持标准协程(尚未)。 例子:
  • Linux中的异步IO

    2018-06-14 16:52:35
    Linux 系统中的异步IO包,在安装数据库的失手需要它。这样数据库系统就可以支持异步IO了。
  • 同步IO 和异步IO

    千次阅读 2018-07-25 19:08:12
    异步IO:当遇到IO操作时,CPU只是发送IO指令,不等待结果,然后继续执行其他代码。一段时间后,当IO返回结果时,再通知CPU进行处理。 异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息...
  • 想当然的改成了异步IO alter system set filesystemio_options=setall scope=spfile; 重新启动数据库后,检查,发现异步IO已经启用: SQL> SELECT NAME,ASYNCH_IO FROM V$DATAFILE F,V$IOSTAT_FILE I 2 WHERE ...
  • 同步IO,异步IO,阻塞IO,非阻塞IO

    千次阅读 2018-08-18 11:29:17
    概念说明 用户空间与内核空间 进程切换 进程的阻塞 文件描述符fd 缓存 I/O ...异步 I/O(asynchronous IO) 总结 blocking和non-blocking的区别 synchronous IO和asynchronous IO的区别 ...
  • NULL 博文链接:https://zhangshixi.iteye.com/blog/683767
  • linux异步IO的两种方式

    千次阅读 2017-12-26 00:35:46
    知道异步IO已经很久了,但是直到最近,才真正用它来解决一下实际问题(在一个CPU密集型的应用中,有一些需要处理的数据可能放在磁盘上。预先知道这些数据的位置,所以预先发起异步IO读请求。等到真正需要用到这些...
  • 一步步理解python的异步IO

    千次阅读 2018-06-28 18:57:46
    分享至:一步步理解python的异步IO 前言 看到越来越多的大佬都在使用python的异步IO,协程等概念来实现高效的IO处理过程,可是我对这些概念还不太懂,就学习了一下。 因为是初学者,在理解上有很多不到位的地方,...
  • 在这里告诉大家在.NET Framework4.5中支持异步IO的操作。大大简化之前些的异步方法代码。 使用backgroundworker代码 代码如下:View Code private void Button_Click_3(object sender, RoutedEventArgs e) { ...
  • 这篇文章个人觉得作者写得非常好,之前...同步(synchronous) IO异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,
  • 5种IO模型、阻塞IO和非阻塞IO、同步IO和异步IO

    万次阅读 多人点赞 2016-10-28 20:01:41
    5种IO模型、阻塞IO和非阻塞IO、同步IO和异步IO 看了一些文章,发现有很多不同的理解,可能是因为大家入切的角度、环境不一样。所以,我们先说明基本的IO操作及环境。本文是在《UNIX网络编程 卷1:套接字联网API》...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 266,488
精华内容 106,595
关键字:

异步io