1 """Event loop and event loop policy."""
2 from __future__ import absolute_import
4 __all__ = ['AbstractEventLoopPolicy',
5 'AbstractEventLoop', 'AbstractServer',
6 'Handle', 'TimerHandle',
7 'get_event_loop_policy', 'set_event_loop_policy',
8 'get_event_loop', 'set_event_loop', 'new_event_loop',
9 'get_child_watcher', 'set_child_watcher',
20 import reprlib # Python 3
22 import repr as reprlib # Python 2
26 except (ImportError, SyntaxError):
27 # ignore SyntaxError for convenience: ignore SyntaxError caused by "yield
28 # from" if asyncio module is in the Python path
31 from trollius import compat
34 def _get_function_source(func):
36 func = inspect.unwrap(func)
37 elif hasattr(func, '__wrapped__'):
38 func = func.__wrapped__
39 if inspect.isfunction(func):
41 return (code.co_filename, code.co_firstlineno)
42 if isinstance(func, functools.partial):
43 return _get_function_source(func.func)
44 if compat.PY34 and isinstance(func, functools.partialmethod):
45 return _get_function_source(func.func)
49 def _format_args(args):
50 """Format function arguments.
52 Special case for a single parameter: ('hello',) is formatted as ('hello').
54 # use reprlib to limit the length of the output
55 args_repr = reprlib.repr(args)
56 if len(args) == 1 and args_repr.endswith(',)'):
57 args_repr = args_repr[:-2] + ')'
61 def _format_callback(func, args, suffix=''):
62 if isinstance(func, functools.partial):
64 suffix = _format_args(args) + suffix
65 return _format_callback(func.func, func.args, suffix)
67 if hasattr(func, '__qualname__'):
68 func_repr = getattr(func, '__qualname__')
69 elif hasattr(func, '__name__'):
70 func_repr = getattr(func, '__name__')
72 func_repr = repr(func)
75 func_repr += _format_args(args)
80 def _format_callback_source(func, args):
81 func_repr = _format_callback(func, args)
82 source = _get_function_source(func)
84 func_repr += ' at %s:%s' % source
89 """Object returned by callback registration methods."""
91 __slots__ = ('_callback', '_args', '_cancelled', '_loop',
92 '_source_traceback', '_repr', '__weakref__')
94 def __init__(self, callback, args, loop):
95 assert not isinstance(callback, Handle), 'A Handle is not a callback'
97 self._callback = callback
99 self._cancelled = False
101 if self._loop.get_debug():
102 self._source_traceback = traceback.extract_stack(sys._getframe(1))
104 self._source_traceback = None
106 def _repr_info(self):
107 info = [self.__class__.__name__]
109 info.append('cancelled')
110 if self._callback is not None:
111 info.append(_format_callback_source(self._callback, self._args))
112 if self._source_traceback:
113 frame = self._source_traceback[-1]
114 info.append('created at %s:%s' % (frame[0], frame[1]))
118 if self._repr is not None:
120 info = self._repr_info()
121 return '<%s>' % ' '.join(info)
124 if not self._cancelled:
125 self._cancelled = True
126 if self._loop.get_debug():
127 # Keep a representation in debug mode to keep callback and
128 # parameters. For example, to log the warning
129 # "Executing <Handle...> took 2.5 second"
130 self._repr = repr(self)
131 self._callback = None
136 self._callback(*self._args)
137 except Exception as exc:
138 cb = _format_callback_source(self._callback, self._args)
139 msg = 'Exception in callback {0}'.format(cb)
145 if self._source_traceback:
146 context['source_traceback'] = self._source_traceback
147 self._loop.call_exception_handler(context)
148 self = None # Needed to break cycles when an exception occurs.
151 class TimerHandle(Handle):
152 """Object returned by timed callback registration methods."""
154 __slots__ = ['_scheduled', '_when']
156 def __init__(self, when, callback, args, loop):
157 assert when is not None
158 super(TimerHandle, self).__init__(callback, args, loop)
159 if self._source_traceback:
160 del self._source_traceback[-1]
162 self._scheduled = False
164 def _repr_info(self):
165 info = super(TimerHandle, self)._repr_info()
166 pos = 2 if self._cancelled else 1
167 info.insert(pos, 'when=%s' % self._when)
171 return hash(self._when)
173 def __lt__(self, other):
174 return self._when < other._when
176 def __le__(self, other):
177 if self._when < other._when:
179 return self.__eq__(other)
181 def __gt__(self, other):
182 return self._when > other._when
184 def __ge__(self, other):
185 if self._when > other._when:
187 return self.__eq__(other)
189 def __eq__(self, other):
190 if isinstance(other, TimerHandle):
191 return (self._when == other._when and
192 self._callback == other._callback and
193 self._args == other._args and
194 self._cancelled == other._cancelled)
195 return NotImplemented
197 def __ne__(self, other):
198 equal = self.__eq__(other)
199 return NotImplemented if equal is NotImplemented else not equal
202 if not self._cancelled:
203 self._loop._timer_handle_cancelled(self)
204 super(TimerHandle, self).cancel()
207 class AbstractServer(object):
208 """Abstract server returned by create_server()."""
211 """Stop serving. This leaves existing connections open."""
212 return NotImplemented
214 def wait_closed(self):
215 """Coroutine to wait until service is closed."""
216 return NotImplemented
219 if asyncio is not None:
220 # Reuse asyncio classes so asyncio.set_event_loop() and
221 # asyncio.set_event_loop_policy() accept Trollius event loop and trollius
223 AbstractEventLoop = asyncio.AbstractEventLoop
224 AbstractEventLoopPolicy = asyncio.AbstractEventLoopPolicy
226 class AbstractEventLoop(object):
227 """Abstract event loop."""
229 # Running and stopping the event loop.
231 def run_forever(self):
232 """Run the event loop until stop() is called."""
233 raise NotImplementedError
235 def run_until_complete(self, future):
236 """Run the event loop until a Future is done.
238 Return the Future's result, or raise its exception.
240 raise NotImplementedError
243 """Stop the event loop as soon as reasonable.
245 Exactly how soon that is may depend on the implementation, but
246 no more I/O callbacks should be scheduled.
248 raise NotImplementedError
250 def is_running(self):
251 """Return whether the event loop is currently running."""
252 raise NotImplementedError
255 """Returns True if the event loop was closed."""
256 raise NotImplementedError
261 The loop should not be running.
263 This is idempotent and irreversible.
265 No other methods should be called after this one.
267 raise NotImplementedError
269 # Methods scheduling callbacks. All these return Handles.
271 def _timer_handle_cancelled(self, handle):
272 """Notification that a TimerHandle has been cancelled."""
273 raise NotImplementedError
275 def call_soon(self, callback, *args):
276 return self.call_later(0, callback, *args)
278 def call_later(self, delay, callback, *args):
279 raise NotImplementedError
281 def call_at(self, when, callback, *args):
282 raise NotImplementedError
285 raise NotImplementedError
287 # Method scheduling a coroutine object: create a task.
289 def create_task(self, coro):
290 raise NotImplementedError
292 # Methods for interacting with threads.
294 def call_soon_threadsafe(self, callback, *args):
295 raise NotImplementedError
297 def run_in_executor(self, executor, func, *args):
298 raise NotImplementedError
300 def set_default_executor(self, executor):
301 raise NotImplementedError
303 # Network I/O methods returning Futures.
305 def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
306 raise NotImplementedError
308 def getnameinfo(self, sockaddr, flags=0):
309 raise NotImplementedError
311 def create_connection(self, protocol_factory, host=None, port=None,
312 ssl=None, family=0, proto=0, flags=0, sock=None,
313 local_addr=None, server_hostname=None):
314 raise NotImplementedError
316 def create_server(self, protocol_factory, host=None, port=None,
317 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
318 sock=None, backlog=100, ssl=None, reuse_address=None):
319 """A coroutine which creates a TCP server bound to host and port.
321 The return value is a Server object which can be used to stop
324 If host is an empty string or None all interfaces are assumed
325 and a list of multiple sockets will be returned (most likely
326 one for IPv4 and another one for IPv6).
328 family can be set to either AF_INET or AF_INET6 to force the
329 socket to use IPv4 or IPv6. If not set it will be determined
330 from host (defaults to AF_UNSPEC).
332 flags is a bitmask for getaddrinfo().
334 sock can optionally be specified in order to use a preexisting
337 backlog is the maximum number of queued connections passed to
338 listen() (defaults to 100).
340 ssl can be set to an SSLContext to enable SSL over the
341 accepted connections.
343 reuse_address tells the kernel to reuse a local socket in
344 TIME_WAIT state, without waiting for its natural timeout to
345 expire. If not specified will automatically be set to True on
348 raise NotImplementedError
350 def create_unix_connection(self, protocol_factory, path,
352 server_hostname=None):
353 raise NotImplementedError
355 def create_unix_server(self, protocol_factory, path,
356 sock=None, backlog=100, ssl=None):
357 """A coroutine which creates a UNIX Domain Socket server.
359 The return value is a Server object, which can be used to stop
362 path is a str, representing a file systsem path to bind the
365 sock can optionally be specified in order to use a preexisting
368 backlog is the maximum number of queued connections passed to
369 listen() (defaults to 100).
371 ssl can be set to an SSLContext to enable SSL over the
372 accepted connections.
374 raise NotImplementedError
376 def create_datagram_endpoint(self, protocol_factory,
377 local_addr=None, remote_addr=None,
378 family=0, proto=0, flags=0):
379 raise NotImplementedError
381 # Pipes and subprocesses.
383 def connect_read_pipe(self, protocol_factory, pipe):
384 """Register read pipe in event loop. Set the pipe to non-blocking mode.
386 protocol_factory should instantiate object with Protocol interface.
387 pipe is a file-like object.
388 Return pair (transport, protocol), where transport supports the
389 ReadTransport interface."""
390 # The reason to accept file-like object instead of just file descriptor
391 # is: we need to own pipe and close it at transport finishing
392 # Can got complicated errors if pass f.fileno(),
393 # close fd in pipe transport then close f and vise versa.
394 raise NotImplementedError
396 def connect_write_pipe(self, protocol_factory, pipe):
397 """Register write pipe in event loop.
399 protocol_factory should instantiate object with BaseProtocol interface.
400 Pipe is file-like object already switched to nonblocking.
401 Return pair (transport, protocol), where transport support
402 WriteTransport interface."""
403 # The reason to accept file-like object instead of just file descriptor
404 # is: we need to own pipe and close it at transport finishing
405 # Can got complicated errors if pass f.fileno(),
406 # close fd in pipe transport then close f and vise versa.
407 raise NotImplementedError
409 def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
410 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
412 raise NotImplementedError
414 def subprocess_exec(self, protocol_factory, *args, **kwargs):
415 raise NotImplementedError
417 # Ready-based callback registration methods.
418 # The add_*() methods return None.
419 # The remove_*() methods return True if something was removed,
420 # False if there was nothing to delete.
422 def add_reader(self, fd, callback, *args):
423 raise NotImplementedError
425 def remove_reader(self, fd):
426 raise NotImplementedError
428 def add_writer(self, fd, callback, *args):
429 raise NotImplementedError
431 def remove_writer(self, fd):
432 raise NotImplementedError
434 # Completion based I/O methods returning Futures.
436 def sock_recv(self, sock, nbytes):
437 raise NotImplementedError
439 def sock_sendall(self, sock, data):
440 raise NotImplementedError
442 def sock_connect(self, sock, address):
443 raise NotImplementedError
445 def sock_accept(self, sock):
446 raise NotImplementedError
450 def add_signal_handler(self, sig, callback, *args):
451 raise NotImplementedError
453 def remove_signal_handler(self, sig):
454 raise NotImplementedError
458 def set_task_factory(self, factory):
459 raise NotImplementedError
461 def get_task_factory(self):
462 raise NotImplementedError
466 def set_exception_handler(self, handler):
467 raise NotImplementedError
469 def default_exception_handler(self, context):
470 raise NotImplementedError
472 def call_exception_handler(self, context):
473 raise NotImplementedError
475 # Debug flag management.
478 raise NotImplementedError
480 def set_debug(self, enabled):
481 raise NotImplementedError
484 class AbstractEventLoopPolicy(object):
485 """Abstract policy for accessing the event loop."""
487 def get_event_loop(self):
488 """Get the event loop for the current context.
490 Returns an event loop object implementing the BaseEventLoop interface,
491 or raises an exception in case no event loop has been set for the
492 current context and the current policy does not specify to create one.
494 It should never return None."""
495 raise NotImplementedError
497 def set_event_loop(self, loop):
498 """Set the event loop for the current context to loop."""
499 raise NotImplementedError
501 def new_event_loop(self):
502 """Create and return a new event loop object according to this
503 policy's rules. If there's need to set this loop as the event loop for
504 the current context, set_event_loop must be called explicitly."""
505 raise NotImplementedError
507 # Child processes handling (Unix only).
509 def get_child_watcher(self):
510 "Get the watcher for child processes."
511 raise NotImplementedError
513 def set_child_watcher(self, watcher):
514 """Set the watcher for child processes."""
515 raise NotImplementedError
518 class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
519 """Default policy implementation for accessing the event loop.
521 In this policy, each thread has its own event loop. However, we
522 only automatically create an event loop by default for the main
523 thread; other threads by default have no event loop.
525 Other policies may have different rules (e.g. a single global
526 event loop, or automatically creating an event loop per thread, or
527 using some other notion of context to which an event loop is
533 class _Local(threading.local):
538 self._local = self._Local()
540 def get_event_loop(self):
541 """Get the event loop.
543 This may be None or an instance of EventLoop.
545 if (self._local._loop is None and
546 not self._local._set_called and
547 isinstance(threading.current_thread(), threading._MainThread)):
548 self.set_event_loop(self.new_event_loop())
549 if self._local._loop is None:
550 raise RuntimeError('There is no current event loop in thread %r.'
551 % threading.current_thread().name)
552 return self._local._loop
554 def set_event_loop(self, loop):
555 """Set the event loop."""
556 self._local._set_called = True
557 assert loop is None or isinstance(loop, AbstractEventLoop)
558 self._local._loop = loop
560 def new_event_loop(self):
561 """Create a new event loop.
563 You must call set_event_loop() to make this the current event
566 return self._loop_factory()
569 # Event loop policy. The policy itself is always global, even if the
570 # policy's rules say that there is an event loop per thread (or other
571 # notion of context). The default policy is installed by the first
572 # call to get_event_loop_policy().
573 _event_loop_policy = None
575 # Lock for protecting the on-the-fly creation of the event loop policy.
576 _lock = threading.Lock()
579 def _init_event_loop_policy():
580 global _event_loop_policy
582 if _event_loop_policy is None: # pragma: no branch
583 from . import DefaultEventLoopPolicy
584 _event_loop_policy = DefaultEventLoopPolicy()
587 def get_event_loop_policy():
588 """Get the current event loop policy."""
589 if _event_loop_policy is None:
590 _init_event_loop_policy()
591 return _event_loop_policy
594 def set_event_loop_policy(policy):
595 """Set the current event loop policy.
597 If policy is None, the default policy is restored."""
598 global _event_loop_policy
599 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
600 _event_loop_policy = policy
603 def get_event_loop():
604 """Equivalent to calling get_event_loop_policy().get_event_loop()."""
605 return get_event_loop_policy().get_event_loop()
608 def set_event_loop(loop):
609 """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
610 get_event_loop_policy().set_event_loop(loop)
613 def new_event_loop():
614 """Equivalent to calling get_event_loop_policy().new_event_loop()."""
615 return get_event_loop_policy().new_event_loop()
618 def get_child_watcher():
619 """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
620 return get_event_loop_policy().get_child_watcher()
623 def set_child_watcher(watcher):
624 """Equivalent to calling
625 get_event_loop_policy().set_child_watcher(watcher)."""
626 return get_event_loop_policy().set_child_watcher(watcher)