IO多路复用之select、poll、epoll版本一
select
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
poll
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
python select实现
python的select()方法直接调用操作系统的IO接口,它监控sockets、open、files和pipes(所有带fileno()方法的文件句柄)何时变成readable和writeable,或者通信错误,select()是得同时监控多个连接变得简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过python解释器。
注:select()只用于Unix的文件对象,不适用于windows
下面通过echo server例子来理解select是如何通过单进程实现同时处理多个非阻塞的socket连接的
服务端代码:
import select import socket import queue server = socket.socket() server.bind(('0.0.0.0',9090)) server.listen(1000) server.setblocking(False) # 不阻塞 inputs = [server,] #inputs = [server, conn, conn2] outputs = [] msg_dict = {} while True: readable, writeable, excepttional = select.select(inputs, outputs, inputs) print(readable,writeable,excepttional) for i in readable: if i is server: # 代表来了一个新连接 conn,addr = server.accept() print("client: ", addr) inputs.append(conn) msg_dict[conn] = queue.Queue() # 初始化一个队列,存后面返回给客户端的数据 else: data = i.recv(1024) print("recv: ",data) msg_dict[i].put(data) outputs.append(i) # 放入返回的连接队列 # i.send(data) # print("send done") for w in writeable: # 要返回给客户端的连接列表 data_to_client = msg_dict[w].get() w.send(data_to_client) # 返回给客户端源数据 outputs.remove(w) # 发送完成后删除数据 for e in excepttional: if e in outputs: outputs.remove(e) inputs.remove(e) del msg_dict[e]
客户端完整代码
import socket HOST = 'localhost' # The remote host PORT = 9090 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) print('Received', data) s.close()
python epoll实现
#_*_coding:utf-8_*_ import socket, logging import select, errno logger = logging.getLogger("network-server") def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) if __name__ == "__main__": InitLog() try: # 创建 TCP socket 作为监听 socket listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error as msg: logger.error("create socket failed") try: # 设置 SO_REUSEADDR 选项 对unix套接字的设置 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error as msg: logger.error("setsocketopt SO_REUSEADDR failed") try: # 进行 bind -- 此处未指定 ip 地址,即 bind 了全部网卡 ip 上 listen_fd.bind(('', 2003)) except socket.error as msg: logger.error("bind failed") try: # 设置 listen 的 backlog 数 listen_fd.listen(10) except socket.error as msg: logger.error(msg) try: # 创建 epoll 句柄 epoll_fd = select.epoll() # 向 epoll 句柄中注册 监听 socket 的 可读 事件 #登记一个新的文件描述符,如果文件描述符已经被创建则引发一个OSError错误 #fd是目标文件描述符的操作 #register(fd[, eventmask]) #events是由不同的EPOLL常熟组成的,EPOLLIN | EPOLLOUT | EPOLLPRI epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) except select.error as msg: logger.error(msg) connections = {} addresses = {} datalist = {} while True: # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待 #poll([timeout=-1[, maxevents=-1]]) -> [(fd, events), (...)] #Wait for events on the epoll file descriptor(文件描述符) for a maximum time of timeout #in seconds (as float). -1 makes poll wait indefinitely. #Up to maxevents are returned to the caller. epoll_list = epoll_fd.poll() for fd, events in epoll_list: # 若为监听 fd 被激活 if fd == listen_fd.fileno(): # 进行 accept -- 获得连接上来 client 的 ip 和 port,以及 socket 句柄 conn, addr = listen_fd.accept() logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) # 将连接 socket 设置为 非阻塞 conn.setblocking(0) # 向 epoll 句柄中注册 连接 socket 的 可读 事件 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) # 将 conn 和 addr 信息分别保存起来 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr elif select.EPOLLIN & events: # 有 可读 事件激活 datas = '' while True: try: # 从激活 fd 上 recv 10 字节数据 data = connections[fd].recv(10) # 若当前没有接收到数据,并且之前的累计数据也没有 if not data and not datas: # 从 epoll 句柄中移除该 连接 fd epoll_fd.unregister(fd) # server 侧主动关闭该 连接 fd connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) break else: # 将接收到的数据拼接保存在 datas 中 datas += data except socket.error as msg: # 在 非阻塞 socket 上进行 recv 需要处理 读穿 的情况 # 这里实际上是利用 读穿 出 异常 的方式跳到这里进行后续处理 if msg.errno == errno.EAGAIN: logger.debug("%s receive %s" % (fd, datas)) # 将已接收数据保存起来 datalist[fd] = datas # 更新 epoll 句柄中连接d 注册事件为 可写 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) break else: # 出错处理 epoll_fd.unregister(fd) connections[fd].close() logger.error(msg) break elif select.EPOLLHUP & events: # 有 HUP 事件激活 epoll_fd.unregister(fd) connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) elif select.EPOLLOUT & events: # 有 可写 事件激活 sendLen = 0 # 通过 while 循环确保将 buf 中的数据全部发送出去 while True: # 将之前收到的数据发回 client -- 通过 sendLen 来控制发送位置 sendLen += connections[fd].send(datalist[fd][sendLen:]) # 在全部发送完毕后退出 while 循环 if sendLen == len(datalist[fd]): break # 更新 epoll 句柄中连接 fd 注册事件为 可读 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) else: # 其他 epoll 事件不进行处理 continue
selectors模块
它具有根据平台选出最佳的IO多路机制,比如在win的系统上他默认的是select模式而在linux上它默认的epoll。
该模块允许基于select模块原语构建的高级别和高效的/输出多路复用。鼓励用户使用这个模块,除非他们希望对使用的os级别原语进行精确控制。
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)