diff --git a/klippy/reactor.py b/klippy/reactor.py index f9bedcf3f..db6a089e3 100644 --- a/klippy/reactor.py +++ b/klippy/reactor.py @@ -55,8 +55,6 @@ class ReactorFileHandler: self.fd = fd self.read_callback = read_callback self.write_callback = write_callback - def fileno(self): - return self.fd class ReactorGreenlet(greenlet.greenlet): def __init__(self, run): @@ -109,8 +107,13 @@ class SelectReactor: self._pipe_fds = None self._async_queue = queue.Queue() # File descriptors + self._dummy_fd_hdl = ReactorFileHandler(-1, (lambda e: None), + (lambda e: None)) + self._fds = {} self._read_fds = [] self._write_fds = [] + self._READ = 1 + self._WRITE = 2 # Greenlets self._g_dispatch = None self._greenlets = [] @@ -245,48 +248,54 @@ class SelectReactor: # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) + self._fds[fd] = file_handler self.set_fd_wake(file_handler, True, False) return file_handler def unregister_fd(self, file_handler): - if file_handler in self._read_fds: - self._read_fds.pop(self._read_fds.index(file_handler)) - if file_handler in self._write_fds: - self._write_fds.pop(self._write_fds.index(file_handler)) + self.set_fd_wake(file_handler, False, False) + del self._fds[file_handler.fd] def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): - if file_handler in self._read_fds: + fd = file_handler.fd + if fd in self._read_fds: if not is_readable: - self._read_fds.pop(self._read_fds.index(file_handler)) + self._read_fds.remove(fd) elif is_readable: - self._read_fds.append(file_handler) - if file_handler in self._write_fds: + self._read_fds.append(fd) + if fd in self._write_fds: if not is_writeable: - self._write_fds.pop(self._write_fds.index(file_handler)) + self._write_fds.remove(fd) elif is_writeable: - self._write_fds.append(file_handler) + self._write_fds.append(fd) + def _check_fds(self, eventtime, hdls): + g_dispatch = self._g_dispatch + for fd, event in hdls: + hdl = self._fds.get(fd, self._dummy_fd_hdl) + if event & self._READ: + hdl.read_callback(eventtime) + if g_dispatch is not self._g_dispatch: + self._end_greenlet(g_dispatch) + return self.monotonic() + if event & self._WRITE: + hdl.write_callback(eventtime) + if g_dispatch is not self._g_dispatch: + self._end_greenlet(g_dispatch) + return self.monotonic() + return eventtime # Main loop def _dispatch_loop(self): - self._g_dispatch = g_dispatch = greenlet.getcurrent() + self._g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: timeout = self._check_timers(eventtime, busy) busy = False - res = select.select(self._read_fds, self.write_fds, [], timeout) + res = select.select(self._read_fds, self._write_fds, [], timeout) eventtime = self.monotonic() - for fd in res[0]: + if res[0] or res[1]: busy = True - fd.read_callback(eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break - for fd in res[1]: - busy = True - fd.write_callback(eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break + hdls = ([(fd, self._READ) for fd in res[0]] + + [(fd, self._WRITE) for fd in res[1]]) + eventtime = self._check_fds(eventtime, hdls) self._g_dispatch = None def run(self): if self._pipe_fds is None: @@ -315,30 +324,27 @@ class PollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._poll = select.poll() - self._fds = {} + self._READ = select.POLLIN | select.POLLHUP + self._WRITE = select.POLLOUT # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) - fds = self._fds.copy() - fds[fd] = file_handler - self._fds = fds - self._poll.register(file_handler, select.POLLIN | select.POLLHUP) + self._fds[fd] = file_handler + self._poll.register(file_handler.fd, select.POLLIN | select.POLLHUP) return file_handler def unregister_fd(self, file_handler): - self._poll.unregister(file_handler) - fds = self._fds.copy() - del fds[file_handler.fd] - self._fds = fds + self._poll.unregister(file_handler.fd) + del self._fds[file_handler.fd] def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): flags = select.POLLHUP if is_readable: flags |= select.POLLIN if is_writeable: flags |= select.POLLOUT - self._poll.modify(file_handler, flags) + self._poll.modify(file_handler.fd, flags) # Main loop def _dispatch_loop(self): - self._g_dispatch = g_dispatch = greenlet.getcurrent() + self._g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: @@ -346,50 +352,36 @@ class PollReactor(SelectReactor): busy = False res = self._poll.poll(int(math.ceil(timeout * 1000.))) eventtime = self.monotonic() - for fd, event in res: + if res: busy = True - if event & (select.POLLIN | select.POLLHUP): - self._fds[fd].read_callback(eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break - if event & select.POLLOUT: - self._fds[fd].write_callback(eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break + eventtime = self._check_fds(eventtime, res) self._g_dispatch = None class EPollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._epoll = select.epoll() - self._fds = {} + self._READ = select.EPOLLIN | select.EPOLLHUP + self._WRITE = select.EPOLLOUT # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) - fds = self._fds.copy() - fds[fd] = read_callback - self._fds = fds + self._fds[fd] = file_handler self._epoll.register(fd, select.EPOLLIN | select.EPOLLHUP) return file_handler def unregister_fd(self, file_handler): self._epoll.unregister(file_handler.fd) - fds = self._fds.copy() - del fds[file_handler.fd] - self._fds = fds + del self._fds[file_handler.fd] def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): - flags = select.POLLHUP + flags = select.EPOLLHUP if is_readable: flags |= select.EPOLLIN if is_writeable: flags |= select.EPOLLOUT - self._epoll.modify(file_handler, flags) + self._epoll.modify(file_handler.fd, flags) # Main loop def _dispatch_loop(self): - self._g_dispatch = g_dispatch = greenlet.getcurrent() + self._g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: @@ -397,20 +389,9 @@ class EPollReactor(SelectReactor): busy = False res = self._epoll.poll(timeout) eventtime = self.monotonic() - for fd, event in res: + if res: busy = True - if event & (select.EPOLLIN | select.EPOLLHUP): - self._fds[fd].read_callback(eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break - if event & select.EPOLLOUT: - self._fds[fd].write_callback(eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break + eventtime = self._check_fds(eventtime, res) self._g_dispatch = None # Use the poll based reactor if it is available