# File descriptor and timer event helper # # Copyright (C) 2016-2025 Kevin O'Connor # # This file may be distributed under the terms of the GNU GPLv3 license. import os, gc, select, math, time, logging, queue import greenlet import chelper, util _NOW = 0. _NEVER = 9999999999999999. class ReactorError(Exception): pass class ReactorTimer: def __init__(self, callback, waketime): self.callback = callback self.waketime = waketime self.timer_is_running = False class ReactorCompletion: class sentinel: pass def __init__(self, reactor): self.reactor = reactor self.result = self.sentinel self.waiting = [] def test(self): return self.result is not self.sentinel def complete(self, result): self.result = result for wait in self.waiting: self.reactor.update_timer(wait.timer, self.reactor.NOW) def wait(self, waketime=_NEVER, waketime_result=None): if self.result is self.sentinel: wait = greenlet.getcurrent() self.waiting.append(wait) self.reactor.pause(waketime) self.waiting.remove(wait) if self.result is self.sentinel: return waketime_result return self.result class ReactorCallback: def __init__(self, reactor, callback, waketime): self.reactor = reactor self.timer = reactor.register_timer(self.invoke, waketime) self.callback = callback self.completion = ReactorCompletion(reactor) def invoke(self, eventtime): self.reactor.unregister_timer(self.timer) res = self.callback(eventtime) self.completion.complete(res) return self.reactor.NEVER class ReactorFileHandler: def __init__(self, fd, read_callback, write_callback): self.fd = fd self.read_callback = read_callback self.write_callback = write_callback class ReactorGreenlet(greenlet.greenlet): def __init__(self, run): greenlet.greenlet.__init__(self, run=run) self.timer = None class ReactorMutex: def __init__(self, reactor, is_locked): self.reactor = reactor self.is_locked = is_locked self.next_pending = False self.queue = [] self.lock = self.__enter__ self.unlock = self.__exit__ def test(self): return self.is_locked def __enter__(self): if not self.is_locked: self.is_locked = True return g = greenlet.getcurrent() self.queue.append(g) while 1: self.reactor.pause(self.reactor.NEVER) if self.next_pending and self.queue[0] is g: self.next_pending = False self.queue.pop(0) return def __exit__(self, type=None, value=None, tb=None): if not self.queue: self.is_locked = False return self.next_pending = True self.reactor.update_timer(self.queue[0].timer, self.reactor.NOW) class ReactorPreventPause: def __init__(self, reactor): self.reactor = reactor def __enter__(self): self.reactor._prevent_pause_count += 1 def __exit__(self, type=None, value=None, tb=None): self.reactor._prevent_pause_count -= 1 class SelectReactor: NOW = _NOW NEVER = _NEVER def __init__(self, gc_checking=False): # Main code self._process = False self.monotonic = chelper.get_ffi()[1].get_monotonic # Python garbage collection self._check_gc = gc_checking self._last_gc_times = [0., 0., 0.] # Timers self._timers = [] self._next_timer = self.NEVER # Callbacks self._pipe_fds = None self._async_queue = queue.Queue() # File descriptors self._dummy_fd_hdl = ReactorFileHandler(-1, (lambda e: None), (lambda e: None)) self._fds = {} self._read_fds = [] self._write_fds = [] self._READ = 1 self._WRITE = 2 # Greenlets self._g_dispatch = None self._greenlets = [] self._all_greenlets = [] self._prevent_pause_count = 0 def get_gc_stats(self): return tuple(self._last_gc_times) # Timers def update_timer(self, timer_handler, waketime): if timer_handler.timer_is_running: return timer_handler.waketime = waketime self._next_timer = min(self._next_timer, waketime) def register_timer(self, callback, waketime=NEVER): timer_handler = ReactorTimer(callback, waketime) timers = list(self._timers) timers.append(timer_handler) self._timers = timers self._next_timer = min(self._next_timer, waketime) return timer_handler def unregister_timer(self, timer_handler): timer_handler.waketime = self.NEVER timers = list(self._timers) timers.pop(timers.index(timer_handler)) self._timers = timers def _check_timers(self, eventtime, busy): if eventtime < self._next_timer: if busy: return 0. if self._check_gc: gi = gc.get_count() if gi[0] >= 700: # Reactor looks idle and gc is due - run it gc_level = 0 if gi[1] >= 10: gc_level = 1 if gi[2] >= 10: gc_level = 2 self._last_gc_times[gc_level] = eventtime gc.collect(gc_level) return 0. return min(1., max(.001, self._next_timer - eventtime)) self._next_timer = self.NEVER g_dispatch = self._g_dispatch for t in self._timers: waketime = t.waketime if eventtime >= waketime: t.waketime = self.NEVER t.timer_is_running = True t.waketime = waketime = t.callback(eventtime) t.timer_is_running = False if g_dispatch is not self._g_dispatch: self._next_timer = min(self._next_timer, waketime) self._end_greenlet(g_dispatch) return 0. self._next_timer = min(self._next_timer, waketime) return 0. # Callbacks and Completions def completion(self): return ReactorCompletion(self) def register_callback(self, callback, waketime=NOW): rcb = ReactorCallback(self, callback, waketime) return rcb.completion # Asynchronous (from another thread) callbacks and completions def register_async_callback(self, callback, waketime=NOW): self._async_queue.put_nowait( (ReactorCallback, (self, callback, waketime))) try: os.write(self._pipe_fds[1], b'.') except os.error: pass def async_complete(self, completion, result): self._async_queue.put_nowait((completion.complete, (result,))) try: os.write(self._pipe_fds[1], b'.') except os.error: pass def _got_pipe_signal(self, eventtime): try: os.read(self._pipe_fds[0], 4096) except os.error: pass while 1: try: func, args = self._async_queue.get_nowait() except queue.Empty: break func(*args) def _setup_async_callbacks(self): self._pipe_fds = os.pipe() util.set_nonblock(self._pipe_fds[0]) util.set_nonblock(self._pipe_fds[1]) self.register_fd(self._pipe_fds[0], self._got_pipe_signal) # Greenlets def _sys_pause(self, waketime): # Pause using system sleep for when reactor not running delay = waketime - self.monotonic() if delay > 0.: time.sleep(delay) return self.monotonic() def pause(self, waketime): g = greenlet.getcurrent() if g is not self._g_dispatch: if self._g_dispatch is None: return self._sys_pause(waketime) # Switch to _check_timers (via g.timer.callback return) if self._prevent_pause_count: self.verify_can_pause() return self._g_dispatch.switch(waketime) # Pausing the dispatch greenlet - prepare a new greenlet to do dispatch if self._prevent_pause_count: self.verify_can_pause() if self._greenlets: g_next = self._greenlets.pop() else: g_next = ReactorGreenlet(run=self._dispatch_loop) self._all_greenlets.append(g_next) g_next.parent = g.parent g.timer = self.register_timer(g.switch, waketime) self._next_timer = self.NOW # Switch to _dispatch_loop (via _end_greenlet or direct) eventtime = g_next.switch() # This greenlet activated from g.timer.callback (via _check_timers) return eventtime def _end_greenlet(self, g_old): # Cache this greenlet for later use self._greenlets.append(g_old) self.unregister_timer(g_old.timer) g_old.timer = None # Switch to _check_timers (via g_old.timer.callback return) self._g_dispatch.switch(self.NEVER) # This greenlet reactivated from pause() - return to main dispatch loop self._g_dispatch = g_old # Support for temporarily disabling pauses def assert_no_pause(self): return ReactorPreventPause(self) def verify_can_pause(self): if self._prevent_pause_count: raise ReactorError("Internal error - reactor pause disabled") # Mutexes def mutex(self, is_locked=False): return ReactorMutex(self, is_locked) # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) self._fds[fd] = file_handler self.set_fd_wake(file_handler, True, False) return file_handler def unregister_fd(self, file_handler): self.set_fd_wake(file_handler, False, False) del self._fds[file_handler.fd] def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): fd = file_handler.fd if fd in self._read_fds: if not is_readable: self._read_fds.remove(fd) elif is_readable: self._read_fds.append(fd) if fd in self._write_fds: if not is_writeable: self._write_fds.remove(fd) elif is_writeable: self._write_fds.append(fd) def _check_fds(self, eventtime, hdls): g_dispatch = self._g_dispatch for fd, event in hdls: hdl = self._fds.get(fd, self._dummy_fd_hdl) if event & self._READ: hdl.read_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) return self.monotonic() if event & self._WRITE: hdl.write_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) return self.monotonic() return eventtime # Main loop def _dispatch_loop(self): self._g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: timeout = self._check_timers(eventtime, busy) busy = False res = select.select(self._read_fds, self._write_fds, [], timeout) eventtime = self.monotonic() if res[0] or res[1]: busy = True hdls = ([(fd, self._READ) for fd in res[0]] + [(fd, self._WRITE) for fd in res[1]]) eventtime = self._check_fds(eventtime, hdls) self._g_dispatch = None def run(self): if self._pipe_fds is None: self._setup_async_callbacks() self._process = True self._prevent_pause_count = 0 g_next = ReactorGreenlet(run=self._dispatch_loop) self._all_greenlets.append(g_next) g_next.switch() def end(self): self._process = False def finalize(self): self._g_dispatch = None self._greenlets = [] for g in self._all_greenlets: try: g.throw() except: logging.exception("reactor finalize greenlet terminate") self._all_greenlets = [] if self._pipe_fds is not None: os.close(self._pipe_fds[0]) os.close(self._pipe_fds[1]) self._pipe_fds = None class PollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._poll = select.poll() self._READ = select.POLLIN | select.POLLHUP self._WRITE = select.POLLOUT # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) self._fds[fd] = file_handler self._poll.register(file_handler.fd, select.POLLIN | select.POLLHUP) return file_handler def unregister_fd(self, file_handler): self._poll.unregister(file_handler.fd) del self._fds[file_handler.fd] def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): flags = select.POLLHUP if is_readable: flags |= select.POLLIN if is_writeable: flags |= select.POLLOUT self._poll.modify(file_handler.fd, flags) # Main loop def _dispatch_loop(self): self._g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: timeout = self._check_timers(eventtime, busy) busy = False res = self._poll.poll(int(math.ceil(timeout * 1000.))) eventtime = self.monotonic() if res: busy = True eventtime = self._check_fds(eventtime, res) self._g_dispatch = None class EPollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._epoll = select.epoll() self._READ = select.EPOLLIN | select.EPOLLHUP self._WRITE = select.EPOLLOUT # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) self._fds[fd] = file_handler self._epoll.register(fd, select.EPOLLIN | select.EPOLLHUP) return file_handler def unregister_fd(self, file_handler): self._epoll.unregister(file_handler.fd) del self._fds[file_handler.fd] def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): flags = select.EPOLLHUP if is_readable: flags |= select.EPOLLIN if is_writeable: flags |= select.EPOLLOUT self._epoll.modify(file_handler.fd, flags) # Main loop def _dispatch_loop(self): self._g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: timeout = self._check_timers(eventtime, busy) busy = False res = self._epoll.poll(timeout) eventtime = self.monotonic() if res: busy = True eventtime = self._check_fds(eventtime, res) self._g_dispatch = None # Use the poll based reactor if it is available try: select.poll Reactor = PollReactor except: Reactor = SelectReactor