3 This module allows high-level and efficient I/O multiplexing, built upon the
4 `select` module primitives.
8 from abc import ABCMeta, abstractmethod
9 from collections import namedtuple, Mapping
14 from .py33_exceptions import wrap_error, InterruptedError
15 from .compat import integer_types
18 # generic events, that must be mapped to implementation-specific ones
20 EVENT_WRITE = (1 << 1)
23 def _fileobj_to_fd(fileobj):
24 """Return a file descriptor from a file object.
27 fileobj -- file object or file descriptor
30 corresponding file descriptor
33 ValueError if the object is invalid
35 if isinstance(fileobj, integer_types):
39 fd = int(fileobj.fileno())
40 except (AttributeError, TypeError, ValueError):
41 raise ValueError("Invalid file object: "
42 "{0!r}".format(fileobj))
44 raise ValueError("Invalid file descriptor: {0}".format(fd))
48 SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
49 """Object used to associate a file object to its backing file descriptor,
50 selected event mask and attached data."""
53 class _SelectorMapping(Mapping):
54 """Mapping of file objects to selector keys."""
56 def __init__(self, selector):
57 self._selector = selector
60 return len(self._selector._fd_to_key)
62 def __getitem__(self, fileobj):
64 fd = self._selector._fileobj_lookup(fileobj)
65 return self._selector._fd_to_key[fd]
67 raise KeyError("{0!r} is not registered".format(fileobj))
70 return iter(self._selector._fd_to_key)
73 class BaseSelector(object):
74 """Selector abstract base class.
76 A selector supports registering file objects to be monitored for specific
79 A file object is a file descriptor or any object with a `fileno()` method.
80 An arbitrary object can be attached to the file object, which can be used
81 for example to store context information, a callback, etc.
83 A selector can use various implementations (select(), poll(), epoll()...)
84 depending on the platform. The default `Selector` class uses the most
85 efficient implementation on the current platform.
87 __metaclass__ = ABCMeta
90 def register(self, fileobj, events, data=None):
91 """Register a file object.
94 fileobj -- file object or file descriptor
95 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
102 ValueError if events is invalid
103 KeyError if fileobj is already registered
104 OSError if fileobj is closed or otherwise is unacceptable to
105 the underlying system call (if a system call is made)
108 OSError may or may not be raised
110 raise NotImplementedError
113 def unregister(self, fileobj):
114 """Unregister a file object.
117 fileobj -- file object or file descriptor
123 KeyError if fileobj is not registered
126 If fileobj is registered but has since been closed this does
127 *not* raise OSError (even if the wrapped syscall does)
129 raise NotImplementedError
131 def modify(self, fileobj, events, data=None):
132 """Change a registered file object monitored events or attached data.
135 fileobj -- file object or file descriptor
136 events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE)
137 data -- attached data
143 Anything that unregister() or register() raises
145 self.unregister(fileobj)
146 return self.register(fileobj, events, data)
149 def select(self, timeout=None):
150 """Perform the actual selection, until some monitored file objects are
151 ready or a timeout expires.
154 timeout -- if timeout > 0, this specifies the maximum wait time, in
156 if timeout <= 0, the select() call won't block, and will
157 report the currently ready file objects
158 if timeout is None, select() will block until a monitored
159 file object becomes ready
162 list of (key, events) for ready file objects
163 `events` is a bitwise mask of EVENT_READ|EVENT_WRITE
165 raise NotImplementedError
168 """Close the selector.
170 This must be called to make sure that any underlying resource is freed.
174 def get_key(self, fileobj):
175 """Return the key associated to a registered file object.
178 SelectorKey for this file object
180 mapping = self.get_map()
182 raise RuntimeError('Selector is closed')
184 return mapping[fileobj]
186 raise KeyError("{0!r} is not registered".format(fileobj))
190 """Return a mapping of file objects to selector keys."""
191 raise NotImplementedError
196 def __exit__(self, *args):
200 class _BaseSelectorImpl(BaseSelector):
201 """Base selector implementation."""
204 # this maps file descriptors to keys
206 # read-only mapping returned by get_map()
207 self._map = _SelectorMapping(self)
209 def _fileobj_lookup(self, fileobj):
210 """Return a file descriptor from a file object.
212 This wraps _fileobj_to_fd() to do an exhaustive search in case
213 the object is invalid but we still have it in our map. This
214 is used by unregister() so we can unregister an object that
215 was previously registered even if it is closed. It is also
216 used by _SelectorMapping.
219 return _fileobj_to_fd(fileobj)
221 # Do an exhaustive search.
222 for key in self._fd_to_key.values():
223 if key.fileobj is fileobj:
225 # Raise ValueError after all.
228 def register(self, fileobj, events, data=None):
229 if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
230 raise ValueError("Invalid events: {0!r}".format(events))
232 key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
234 if key.fd in self._fd_to_key:
235 raise KeyError("{0!r} (FD {1}) is already registered"
236 .format(fileobj, key.fd))
238 self._fd_to_key[key.fd] = key
241 def unregister(self, fileobj):
243 key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
245 raise KeyError("{0!r} is not registered".format(fileobj))
248 def modify(self, fileobj, events, data=None):
249 # TODO: Subclasses can probably optimize this even further.
251 key = self._fd_to_key[self._fileobj_lookup(fileobj)]
253 raise KeyError("{0!r} is not registered".format(fileobj))
254 if events != key.events:
255 self.unregister(fileobj)
256 key = self.register(fileobj, events, data)
257 elif data != key.data:
258 # Use a shortcut to update the data.
259 key = key._replace(data=data)
260 self._fd_to_key[key.fd] = key
264 self._fd_to_key.clear()
270 def _key_from_fd(self, fd):
271 """Return the key associated to a given file descriptor.
274 fd -- file descriptor
277 corresponding key, or None if not found
280 return self._fd_to_key[fd]
285 class SelectSelector(_BaseSelectorImpl):
286 """Select-based selector."""
289 super(SelectSelector, self).__init__()
290 self._readers = set()
291 self._writers = set()
293 def register(self, fileobj, events, data=None):
294 key = super(SelectSelector, self).register(fileobj, events, data)
295 if events & EVENT_READ:
296 self._readers.add(key.fd)
297 if events & EVENT_WRITE:
298 self._writers.add(key.fd)
301 def unregister(self, fileobj):
302 key = super(SelectSelector, self).unregister(fileobj)
303 self._readers.discard(key.fd)
304 self._writers.discard(key.fd)
307 if sys.platform == 'win32':
308 def _select(self, r, w, _, timeout=None):
309 r, w, x = select.select(r, w, w, timeout)
312 def _select(self, r, w, x, timeout=None):
313 return select.select(r, w, x, timeout)
315 def select(self, timeout=None):
316 timeout = None if timeout is None else max(timeout, 0)
319 r, w, _ = wrap_error(self._select,
320 self._readers, self._writers, [], timeout)
321 except InterruptedError:
330 events |= EVENT_WRITE
332 key = self._key_from_fd(fd)
334 ready.append((key, events & key.events))
338 if hasattr(select, 'poll'):
340 class PollSelector(_BaseSelectorImpl):
341 """Poll-based selector."""
344 super(PollSelector, self).__init__()
345 self._poll = select.poll()
347 def register(self, fileobj, events, data=None):
348 key = super(PollSelector, self).register(fileobj, events, data)
350 if events & EVENT_READ:
351 poll_events |= select.POLLIN
352 if events & EVENT_WRITE:
353 poll_events |= select.POLLOUT
354 self._poll.register(key.fd, poll_events)
357 def unregister(self, fileobj):
358 key = super(PollSelector, self).unregister(fileobj)
359 self._poll.unregister(key.fd)
362 def select(self, timeout=None):
368 # poll() has a resolution of 1 millisecond, round away from
369 # zero to wait *at least* timeout seconds.
370 timeout = int(math.ceil(timeout * 1e3))
373 fd_event_list = wrap_error(self._poll.poll, timeout)
374 except InterruptedError:
376 for fd, event in fd_event_list:
378 if event & ~select.POLLIN:
379 events |= EVENT_WRITE
380 if event & ~select.POLLOUT:
383 key = self._key_from_fd(fd)
385 ready.append((key, events & key.events))
389 if hasattr(select, 'epoll'):
391 class EpollSelector(_BaseSelectorImpl):
392 """Epoll-based selector."""
395 super(EpollSelector, self).__init__()
396 self._epoll = select.epoll()
399 return self._epoll.fileno()
401 def register(self, fileobj, events, data=None):
402 key = super(EpollSelector, self).register(fileobj, events, data)
404 if events & EVENT_READ:
405 epoll_events |= select.EPOLLIN
406 if events & EVENT_WRITE:
407 epoll_events |= select.EPOLLOUT
408 self._epoll.register(key.fd, epoll_events)
411 def unregister(self, fileobj):
412 key = super(EpollSelector, self).unregister(fileobj)
414 self._epoll.unregister(key.fd)
416 # This can happen if the FD was closed since it
421 def select(self, timeout=None):
427 # epoll_wait() has a resolution of 1 millisecond, round away
428 # from zero to wait *at least* timeout seconds.
429 timeout = math.ceil(timeout * 1e3) * 1e-3
431 # epoll_wait() expects `maxevents` to be greater than zero;
432 # we want to make sure that `select()` can be called when no
434 max_ev = max(len(self._fd_to_key), 1)
438 fd_event_list = wrap_error(self._epoll.poll, timeout, max_ev)
439 except InterruptedError:
441 for fd, event in fd_event_list:
443 if event & ~select.EPOLLIN:
444 events |= EVENT_WRITE
445 if event & ~select.EPOLLOUT:
448 key = self._key_from_fd(fd)
450 ready.append((key, events & key.events))
455 super(EpollSelector, self).close()
458 if hasattr(select, 'devpoll'):
460 class DevpollSelector(_BaseSelectorImpl):
461 """Solaris /dev/poll selector."""
464 super(DevpollSelector, self).__init__()
465 self._devpoll = select.devpoll()
468 return self._devpoll.fileno()
470 def register(self, fileobj, events, data=None):
471 key = super(DevpollSelector, self).register(fileobj, events, data)
473 if events & EVENT_READ:
474 poll_events |= select.POLLIN
475 if events & EVENT_WRITE:
476 poll_events |= select.POLLOUT
477 self._devpoll.register(key.fd, poll_events)
480 def unregister(self, fileobj):
481 key = super(DevpollSelector, self).unregister(fileobj)
482 self._devpoll.unregister(key.fd)
485 def select(self, timeout=None):
491 # devpoll() has a resolution of 1 millisecond, round away from
492 # zero to wait *at least* timeout seconds.
493 timeout = math.ceil(timeout * 1e3)
496 fd_event_list = self._devpoll.poll(timeout)
497 except InterruptedError:
499 for fd, event in fd_event_list:
501 if event & ~select.POLLIN:
502 events |= EVENT_WRITE
503 if event & ~select.POLLOUT:
506 key = self._key_from_fd(fd)
508 ready.append((key, events & key.events))
512 self._devpoll.close()
513 super(DevpollSelector, self).close()
516 if hasattr(select, 'kqueue'):
518 class KqueueSelector(_BaseSelectorImpl):
519 """Kqueue-based selector."""
522 super(KqueueSelector, self).__init__()
523 self._kqueue = select.kqueue()
526 return self._kqueue.fileno()
528 def register(self, fileobj, events, data=None):
529 key = super(KqueueSelector, self).register(fileobj, events, data)
530 if events & EVENT_READ:
531 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
533 self._kqueue.control([kev], 0, 0)
534 if events & EVENT_WRITE:
535 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
537 self._kqueue.control([kev], 0, 0)
540 def unregister(self, fileobj):
541 key = super(KqueueSelector, self).unregister(fileobj)
542 if key.events & EVENT_READ:
543 kev = select.kevent(key.fd, select.KQ_FILTER_READ,
546 self._kqueue.control([kev], 0, 0)
548 # This can happen if the FD was closed since it
551 if key.events & EVENT_WRITE:
552 kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
555 self._kqueue.control([kev], 0, 0)
561 def select(self, timeout=None):
562 timeout = None if timeout is None else max(timeout, 0)
563 max_ev = len(self._fd_to_key)
566 kev_list = wrap_error(self._kqueue.control,
567 None, max_ev, timeout)
568 except InterruptedError:
574 if flag == select.KQ_FILTER_READ:
576 if flag == select.KQ_FILTER_WRITE:
577 events |= EVENT_WRITE
579 key = self._key_from_fd(fd)
581 ready.append((key, events & key.events))
586 super(KqueueSelector, self).close()
589 # Choose the best implementation, roughly:
590 # epoll|kqueue|devpoll > poll > select.
591 # select() also can't accept a FD > FD_SETSIZE (usually around 1024)
592 if 'KqueueSelector' in globals():
593 DefaultSelector = KqueueSelector
594 elif 'EpollSelector' in globals():
595 DefaultSelector = EpollSelector
596 elif 'DevpollSelector' in globals():
597 DefaultSelector = DevpollSelector
598 elif 'PollSelector' in globals():
599 DefaultSelector = PollSelector
601 DefaultSelector = SelectSelector