1 """Base implementation of event loop.
3 The event loop can be broken up into a multiplexer (the part
4 responsible for notifying us of I/O events) and the event loop proper,
5 which wraps a multiplexer with functionality for scheduling callbacks,
6 immediately or at a given time in the future.
8 Whenever a public API takes a callback, subsequent positional
9 arguments will be passed to the callback if/when it is called. This
10 avoids the proliferation of trivial lambdas implementing closures.
11 Keyword arguments for the callback are not supported; this is a
12 conscious design decision, leaving the door open for keyword arguments
13 to modify the meaning of the API call itself.
28 from collections import OrderedDict
30 # Python 2.6: use ordereddict backport
31 from ordereddict import OrderedDict
33 from threading import get_ident as _get_thread_ident
36 from threading import _get_ident as _get_thread_ident
39 from . import coroutines
43 from .coroutines import coroutine, From, Return
44 from .executor import get_default_executor
45 from .log import logger
46 from .time_monotonic import time_monotonic, time_monotonic_resolution
49 __all__ = ['BaseEventLoop']
52 # Argument for default thread pool executor creation.
55 # Minimum number of _scheduled timer handles before cleanup of
56 # cancelled handles is performed.
57 _MIN_SCHEDULED_TIMER_HANDLES = 100
59 # Minimum fraction of _scheduled timer handles that are cancelled
60 # before cleanup of cancelled handles is performed.
61 _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
63 def _format_handle(handle):
65 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
67 return repr(cb.__self__)
73 if fd == subprocess.PIPE:
75 elif fd == subprocess.STDOUT:
81 class _StopError(BaseException):
82 """Raised to stop the event loop."""
85 def _check_resolved_address(sock, address):
86 # Ensure that the address is already resolved to avoid the trap of hanging
87 # the entire event loop when the address requires doing a DNS lookup.
89 # getaddrinfo() is slow (around 10 us per call): this function should only
90 # be called in debug mode
93 if family == socket.AF_INET:
95 elif family == socket.AF_INET6:
96 host, port = address[:2]
100 # On Windows, socket.inet_pton() is only available since Python 3.4
101 if hasattr(socket, 'inet_pton'):
102 # getaddrinfo() is slow and has known issue: prefer inet_pton()
105 socket.inet_pton(family, host)
106 except socket.error as exc:
107 raise ValueError("address must be resolved (IP address), "
111 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
114 if hasattr(socket, 'SOCK_NONBLOCK'):
115 type_mask |= socket.SOCK_NONBLOCK
116 if hasattr(socket, 'SOCK_CLOEXEC'):
117 type_mask |= socket.SOCK_CLOEXEC
119 socket.getaddrinfo(host, port,
121 (sock.type & ~type_mask),
123 socket.AI_NUMERICHOST)
124 except socket.gaierror as err:
125 raise ValueError("address must be resolved (IP address), "
129 def _raise_stop_error(*args):
133 def _run_until_complete_cb(fut):
135 if (isinstance(exc, BaseException)
136 and not isinstance(exc, Exception)):
137 # Issue #22429: run_forever() already finished, no need to
143 class Server(events.AbstractServer):
145 def __init__(self, loop, sockets):
147 self.sockets = sockets
148 self._active_count = 0
152 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
155 assert self.sockets is not None
156 self._active_count += 1
159 assert self._active_count > 0
160 self._active_count -= 1
161 if self._active_count == 0 and self.sockets is None:
165 sockets = self.sockets
170 self._loop._stop_serving(sock)
171 if self._active_count == 0:
175 waiters = self._waiters
177 for waiter in waiters:
178 if not waiter.done():
179 waiter.set_result(waiter)
182 def wait_closed(self):
183 if self.sockets is None or self._waiters is None:
185 waiter = futures.Future(loop=self._loop)
186 self._waiters.append(waiter)
190 class BaseEventLoop(events.AbstractEventLoop):
193 self._timer_cancelled_count = 0
195 self._ready = collections.deque()
197 self._default_executor = None
198 self._internal_fds = 0
199 # Identifier of the thread running the event loop, or None if the
200 # event loop is not running
201 self._thread_id = None
202 self._clock_resolution = time_monotonic_resolution
203 self._exception_handler = None
204 self.set_debug(bool(os.environ.get('TROLLIUSDEBUG')))
205 # In debug mode, if the execution of a callback or a step of a task
206 # exceed this duration in seconds, the slow callback/task is logged.
207 self.slow_callback_duration = 0.1
208 self._current_handle = None
209 self._task_factory = None
210 self._coroutine_wrapper_set = False
213 return ('<%s running=%s closed=%s debug=%s>'
214 % (self.__class__.__name__, self.is_running(),
215 self.is_closed(), self.get_debug()))
217 def create_task(self, coro):
218 """Schedule a coroutine object.
220 Return a task object.
223 if self._task_factory is None:
224 task = tasks.Task(coro, loop=self)
225 if task._source_traceback:
226 del task._source_traceback[-1]
228 task = self._task_factory(self, coro)
231 def set_task_factory(self, factory):
232 """Set a task factory that will be used by loop.create_task().
234 If factory is None the default task factory will be set.
236 If factory is a callable, it should have a signature matching
237 '(loop, coro)', where 'loop' will be a reference to the active
238 event loop, 'coro' will be a coroutine object. The callable
239 must return a Future.
241 if factory is not None and not callable(factory):
242 raise TypeError('task factory must be a callable or None')
243 self._task_factory = factory
245 def get_task_factory(self):
246 """Return a task factory, or None if the default one is in use."""
247 return self._task_factory
249 def _make_socket_transport(self, sock, protocol, waiter=None,
250 extra=None, server=None):
251 """Create socket transport."""
252 raise NotImplementedError
254 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
255 server_side=False, server_hostname=None,
256 extra=None, server=None):
257 """Create SSL transport."""
258 raise NotImplementedError
260 def _make_datagram_transport(self, sock, protocol,
261 address=None, waiter=None, extra=None):
262 """Create datagram transport."""
263 raise NotImplementedError
265 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
267 """Create read pipe transport."""
268 raise NotImplementedError
270 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
272 """Create write pipe transport."""
273 raise NotImplementedError
276 def _make_subprocess_transport(self, protocol, args, shell,
277 stdin, stdout, stderr, bufsize,
278 extra=None, **kwargs):
279 """Create subprocess transport."""
280 raise NotImplementedError
282 def _write_to_self(self):
283 """Write a byte to self-pipe, to wake up the event loop.
285 This may be called from a different thread.
287 The subclass is responsible for implementing the self-pipe.
289 raise NotImplementedError
291 def _process_events(self, event_list):
292 """Process selector events."""
293 raise NotImplementedError
295 def _check_closed(self):
297 raise RuntimeError('Event loop is closed')
299 def run_forever(self):
300 """Run until stop() is called."""
302 if self.is_running():
303 raise RuntimeError('Event loop is running.')
304 self._set_coroutine_wrapper(self._debug)
305 self._thread_id = _get_thread_ident()
313 self._thread_id = None
314 self._set_coroutine_wrapper(False)
316 def run_until_complete(self, future):
317 """Run until the Future is done.
319 If the argument is a coroutine, it is wrapped in a Task.
321 WARNING: It would be disastrous to call run_until_complete()
322 with the same coroutine twice -- it would wrap it in two
323 different Tasks and that can't be good.
325 Return the Future's result, or raise its exception.
329 new_task = not isinstance(future, futures._FUTURE_CLASSES)
330 future = tasks.ensure_future(future, loop=self)
332 # An exception is raised if the future didn't complete, so there
333 # is no need to log the "destroy pending task" message
334 future._log_destroy_pending = False
336 future.add_done_callback(_run_until_complete_cb)
340 if new_task and future.done() and not future.cancelled():
341 # The coroutine raised a BaseException. Consume the exception
342 # to not log a warning, the caller doesn't have access to the
346 future.remove_done_callback(_run_until_complete_cb)
347 if not future.done():
348 raise RuntimeError('Event loop stopped before Future completed.')
350 return future.result()
353 """Stop running the event loop.
355 Every callback scheduled before stop() is called will run. Callbacks
356 scheduled after stop() is called will not run. However, those callbacks
357 will run if run_forever is called again later.
359 self.call_soon(_raise_stop_error)
362 """Close the event loop.
364 This clears the queues and shuts down the executor,
365 but does not wait for the executor to finish.
367 The event loop must not be running.
369 if self.is_running():
370 raise RuntimeError("Cannot close a running event loop")
374 logger.debug("Close %r", self)
377 del self._scheduled[:]
378 executor = self._default_executor
379 if executor is not None:
380 self._default_executor = None
381 executor.shutdown(wait=False)
384 """Returns True if the event loop was closed."""
387 # On Python 3.3 and older, objects with a destructor part of a reference
388 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
392 if not self.is_closed():
393 warnings.warn("unclosed event loop %r" % self, ResourceWarning)
394 if not self.is_running():
397 def is_running(self):
398 """Returns True if the event loop is running."""
399 return (self._thread_id is not None)
402 """Return the time according to the event loop's clock.
404 This is a float expressed in seconds since an epoch, but the
405 epoch, precision, accuracy and drift are unspecified and may
406 differ per event loop.
408 return time_monotonic()
410 def call_later(self, delay, callback, *args):
411 """Arrange for a callback to be called at a given time.
413 Return a Handle: an opaque object with a cancel() method that
414 can be used to cancel the call.
416 The delay can be an int or float, expressed in seconds. It is
417 always relative to the current time.
419 Each callback will be called exactly once. If two callbacks
420 are scheduled for exactly the same time, it undefined which
421 will be called first.
423 Any positional arguments after the callback will be passed to
424 the callback when it is called.
426 timer = self.call_at(self.time() + delay, callback, *args)
427 if timer._source_traceback:
428 del timer._source_traceback[-1]
431 def call_at(self, when, callback, *args):
432 """Like call_later(), but uses an absolute time.
434 Absolute time corresponds to the event loop's time() method.
436 if (coroutines.iscoroutine(callback)
437 or coroutines.iscoroutinefunction(callback)):
438 raise TypeError("coroutines cannot be used with call_at()")
442 timer = events.TimerHandle(when, callback, args, self)
443 if timer._source_traceback:
444 del timer._source_traceback[-1]
445 heapq.heappush(self._scheduled, timer)
446 timer._scheduled = True
449 def call_soon(self, callback, *args):
450 """Arrange for a callback to be called as soon as possible.
452 This operates as a FIFO queue: callbacks are called in the
453 order in which they are registered. Each callback will be
456 Any positional arguments after the callback will be passed to
457 the callback when it is called.
461 handle = self._call_soon(callback, args)
462 if handle._source_traceback:
463 del handle._source_traceback[-1]
466 def _call_soon(self, callback, args):
467 if (coroutines.iscoroutine(callback)
468 or coroutines.iscoroutinefunction(callback)):
469 raise TypeError("coroutines cannot be used with call_soon()")
471 handle = events.Handle(callback, args, self)
472 if handle._source_traceback:
473 del handle._source_traceback[-1]
474 self._ready.append(handle)
477 def _check_thread(self):
478 """Check that the current thread is the thread running the event loop.
480 Non-thread-safe methods of this class make this assumption and will
481 likely behave incorrectly when the assumption is violated.
483 Should only be called when (self._debug == True). The caller is
484 responsible for checking this condition for performance reasons.
486 if self._thread_id is None:
488 thread_id = _get_thread_ident()
489 if thread_id != self._thread_id:
491 "Non-thread-safe operation invoked on an event loop other "
492 "than the current one")
494 def call_soon_threadsafe(self, callback, *args):
495 """Like call_soon(), but thread-safe."""
496 handle = self._call_soon(callback, args)
497 if handle._source_traceback:
498 del handle._source_traceback[-1]
499 self._write_to_self()
502 def run_in_executor(self, executor, func, *args):
503 if (coroutines.iscoroutine(func)
504 or coroutines.iscoroutinefunction(func)):
505 raise TypeError("coroutines cannot be used with run_in_executor()")
507 if isinstance(func, events.Handle):
509 assert not isinstance(func, events.TimerHandle)
511 f = futures.Future(loop=self)
514 func, args = func._callback, func._args
516 executor = self._default_executor
518 executor = get_default_executor()
519 self._default_executor = executor
520 return futures.wrap_future(executor.submit(func, *args), loop=self)
522 def set_default_executor(self, executor):
523 self._default_executor = executor
525 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
526 msg = ["%s:%r" % (host, port)]
528 msg.append('family=%r' % family)
530 msg.append('type=%r' % type)
532 msg.append('proto=%r' % proto)
534 msg.append('flags=%r' % flags)
536 logger.debug('Get address info %s', msg)
539 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
540 dt = self.time() - t0
542 msg = ('Getting address info %s took %.3f ms: %r'
543 % (msg, dt * 1e3, addrinfo))
544 if dt >= self.slow_callback_duration:
550 def getaddrinfo(self, host, port,
551 family=0, type=0, proto=0, flags=0):
553 return self.run_in_executor(None, self._getaddrinfo_debug,
554 host, port, family, type, proto, flags)
556 return self.run_in_executor(None, socket.getaddrinfo,
557 host, port, family, type, proto, flags)
559 def getnameinfo(self, sockaddr, flags=0):
560 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
563 def create_connection(self, protocol_factory, host=None, port=None,
564 ssl=None, family=0, proto=0, flags=0, sock=None,
565 local_addr=None, server_hostname=None):
566 """Connect to a TCP server.
568 Create a streaming transport connection to a given Internet host and
569 port: socket family AF_INET or socket.AF_INET6 depending on host (or
570 family if specified), socket type SOCK_STREAM. protocol_factory must be
571 a callable returning a protocol instance.
573 This method is a coroutine which will try to establish the connection
574 in the background. When successful, the coroutine returns a
575 (transport, protocol) pair.
577 if server_hostname is not None and not ssl:
578 raise ValueError('server_hostname is only meaningful with ssl')
580 if server_hostname is None and ssl:
581 # Use host as default for server_hostname. It is an error
582 # if host is empty or not set, e.g. when an
583 # already-connected socket was passed or when only a port
584 # is given. To avoid this error, you can pass
585 # server_hostname='' -- this will bypass the hostname
586 # check. (This also means that if host is a numeric
587 # IP/IPv6 address, we will attempt to verify that exact
588 # address; this will probably fail, but it is possible to
589 # create a certificate for a specific IP address, so we
590 # don't judge it here.)
592 raise ValueError('You must set server_hostname '
593 'when using ssl without a host')
594 server_hostname = host
596 if host is not None or port is not None:
599 'host/port and sock can not be specified at the same time')
601 f1 = self.getaddrinfo(
602 host, port, family=family,
603 type=socket.SOCK_STREAM, proto=proto, flags=flags)
605 if local_addr is not None:
606 f2 = self.getaddrinfo(
607 *local_addr, family=family,
608 type=socket.SOCK_STREAM, proto=proto, flags=flags)
613 yield From(tasks.wait(fs, loop=self))
617 raise socket.error('getaddrinfo() returned empty list')
619 laddr_infos = f2.result()
621 raise socket.error('getaddrinfo() returned empty list')
624 for family, type, proto, cname, address in infos:
626 sock = socket.socket(family=family, type=type, proto=proto)
627 sock.setblocking(False)
629 for _, _, _, _, laddr in laddr_infos:
633 except socket.error as exc:
635 exc.errno, 'error while '
636 'attempting to bind on address '
638 laddr, exc.strerror.lower()))
639 exceptions.append(exc)
645 logger.debug("connect %r to %r", sock, address)
646 yield From(self.sock_connect(sock, address))
647 except socket.error as exc:
650 exceptions.append(exc)
658 if len(exceptions) == 1:
661 # If they all have the same str(), raise one.
662 model = str(exceptions[0])
663 if all(str(exc) == model for exc in exceptions):
665 # Raise a combined exception so the user can see all
666 # the various error messages.
667 raise socket.error('Multiple exceptions: {0}'.format(
668 ', '.join(str(exc) for exc in exceptions)))
672 'host and port was not specified and no sock specified')
674 sock.setblocking(False)
676 transport, protocol = yield From(self._create_connection_transport(
677 sock, protocol_factory, ssl, server_hostname))
679 # Get the socket from the transport because SSL transport closes
680 # the old socket and creates a new SSL socket
681 sock = transport.get_extra_info('socket')
682 logger.debug("%r connected to %s:%r: (%r, %r)",
683 sock, host, port, transport, protocol)
684 raise Return(transport, protocol)
687 def _create_connection_transport(self, sock, protocol_factory, ssl,
689 protocol = protocol_factory()
690 waiter = futures.Future(loop=self)
692 sslcontext = None if isinstance(ssl, bool) else ssl
693 transport = self._make_ssl_transport(
694 sock, protocol, sslcontext, waiter,
695 server_side=False, server_hostname=server_hostname)
697 transport = self._make_socket_transport(sock, protocol, waiter)
705 raise Return(transport, protocol)
708 def create_datagram_endpoint(self, protocol_factory,
709 local_addr=None, remote_addr=None,
710 family=0, proto=0, flags=0):
711 """Create datagram connection."""
712 if not (local_addr or remote_addr):
714 raise ValueError('unexpected address family')
715 addr_pairs_info = (((family, proto), (None, None)),)
717 # join address by (family, protocol)
718 addr_infos = OrderedDict()
719 for idx, addr in ((0, local_addr), (1, remote_addr)):
721 assert isinstance(addr, tuple) and len(addr) == 2, (
722 '2-tuple is expected')
724 infos = yield From(self.getaddrinfo(
725 *addr, family=family, type=socket.SOCK_DGRAM,
726 proto=proto, flags=flags))
728 raise socket.error('getaddrinfo() returned empty list')
730 for fam, _, pro, _, address in infos:
732 if key not in addr_infos:
733 addr_infos[key] = [None, None]
734 addr_infos[key][idx] = address
736 # each addr has to have info for each (family, proto) pair
738 (key, addr_pair) for key, addr_pair in addr_infos.items()
739 if not ((local_addr and addr_pair[0] is None) or
740 (remote_addr and addr_pair[1] is None))]
742 if not addr_pairs_info:
743 raise ValueError('can not get address information')
747 for ((family, proto),
748 (local_address, remote_address)) in addr_pairs_info:
752 sock = socket.socket(
753 family=family, type=socket.SOCK_DGRAM, proto=proto)
754 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
755 sock.setblocking(False)
758 sock.bind(local_address)
760 yield From(self.sock_connect(sock, remote_address))
761 r_addr = remote_address
762 except socket.error as exc:
765 exceptions.append(exc)
775 protocol = protocol_factory()
776 waiter = futures.Future(loop=self)
777 transport = self._make_datagram_transport(sock, protocol, r_addr,
781 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
783 local_addr, remote_addr, transport, protocol)
785 logger.debug("Datagram endpoint remote_addr=%r created: "
787 remote_addr, transport, protocol)
795 raise Return(transport, protocol)
798 def create_server(self, protocol_factory, host=None, port=None,
799 family=socket.AF_UNSPEC,
800 flags=socket.AI_PASSIVE,
805 """Create a TCP server bound to host and port.
807 Return a Server object which can be used to stop the service.
809 This method is a coroutine.
811 if isinstance(ssl, bool):
812 raise TypeError('ssl argument must be an SSLContext or None')
813 if host is not None or port is not None:
816 'host/port and sock can not be specified at the same time')
818 AF_INET6 = getattr(socket, 'AF_INET6', 0)
819 if reuse_address is None:
820 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
825 infos = yield From(self.getaddrinfo(
826 host, port, family=family,
827 type=socket.SOCK_STREAM, proto=0, flags=flags))
829 raise socket.error('getaddrinfo() returned empty list')
834 af, socktype, proto, canonname, sa = res
836 sock = socket.socket(af, socktype, proto)
838 # Assume it's a bad family/type/protocol combination.
840 logger.warning('create_server() failed to create '
841 'socket.socket(%r, %r, %r)',
842 af, socktype, proto, exc_info=True)
846 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
848 # Disable IPv4/IPv6 dual stack support (enabled by
849 # default on Linux) which makes a single socket
850 # listen on both address families.
851 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
852 sock.setsockopt(socket.IPPROTO_IPV6,
857 except socket.error as err:
858 raise socket.error(err.errno,
859 'error while attempting '
860 'to bind on address %r: %s'
861 % (sa, err.strerror.lower()))
869 raise ValueError('Neither host/port nor sock were specified')
872 server = Server(self, sockets)
875 sock.setblocking(False)
876 self._start_serving(protocol_factory, sock, ssl, server)
878 logger.info("%r is serving", server)
882 def connect_read_pipe(self, protocol_factory, pipe):
883 protocol = protocol_factory()
884 waiter = futures.Future(loop=self)
885 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
894 logger.debug('Read pipe %r connected: (%r, %r)',
895 pipe.fileno(), transport, protocol)
896 raise Return(transport, protocol)
899 def connect_write_pipe(self, protocol_factory, pipe):
900 protocol = protocol_factory()
901 waiter = futures.Future(loop=self)
902 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
911 logger.debug('Write pipe %r connected: (%r, %r)',
912 pipe.fileno(), transport, protocol)
913 raise Return(transport, protocol)
915 def _log_subprocess(self, msg, stdin, stdout, stderr):
917 if stdin is not None:
918 info.append('stdin=%s' % _format_pipe(stdin))
919 if stdout is not None and stderr == subprocess.STDOUT:
920 info.append('stdout=stderr=%s' % _format_pipe(stdout))
922 if stdout is not None:
923 info.append('stdout=%s' % _format_pipe(stdout))
924 if stderr is not None:
925 info.append('stderr=%s' % _format_pipe(stderr))
926 logger.debug(' '.join(info))
929 def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
930 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
931 universal_newlines=False, shell=True, bufsize=0,
933 if not isinstance(cmd, compat.string_types):
934 raise ValueError("cmd must be a string")
935 if universal_newlines:
936 raise ValueError("universal_newlines must be False")
938 raise ValueError("shell must be True")
940 raise ValueError("bufsize must be 0")
941 protocol = protocol_factory()
943 # don't log parameters: they may contain sensitive information
944 # (password) and may be too long
945 debug_log = 'run shell command %r' % cmd
946 self._log_subprocess(debug_log, stdin, stdout, stderr)
947 transport = yield From(self._make_subprocess_transport(
948 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs))
950 logger.info('%s: %r' % (debug_log, transport))
951 raise Return(transport, protocol)
954 def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
955 stdin = kwargs.pop('stdin', subprocess.PIPE)
956 stdout = kwargs.pop('stdout', subprocess.PIPE)
957 stderr = kwargs.pop('stderr', subprocess.PIPE)
958 universal_newlines = kwargs.pop('universal_newlines', False)
959 shell = kwargs.pop('shell', False)
960 bufsize = kwargs.pop('bufsize', 0)
961 if universal_newlines:
962 raise ValueError("universal_newlines must be False")
964 raise ValueError("shell must be False")
966 raise ValueError("bufsize must be 0")
967 popen_args = (program,) + args
968 for arg in popen_args:
969 if not isinstance(arg, compat.string_types ):
970 raise TypeError("program arguments must be "
971 "a bytes or text string, not %s"
972 % type(arg).__name__)
973 protocol = protocol_factory()
975 # don't log parameters: they may contain sensitive information
976 # (password) and may be too long
977 debug_log = 'execute program %r' % program
978 self._log_subprocess(debug_log, stdin, stdout, stderr)
979 transport = yield From(self._make_subprocess_transport(
980 protocol, popen_args, False, stdin, stdout, stderr,
983 logger.info('%s: %r' % (debug_log, transport))
984 raise Return(transport, protocol)
986 def set_exception_handler(self, handler):
987 """Set handler as the new event loop exception handler.
989 If handler is None, the default exception handler will
992 If handler is a callable object, it should have a
993 signature matching '(loop, context)', where 'loop'
994 will be a reference to the active event loop, 'context'
995 will be a dict object (see `call_exception_handler()`
996 documentation for details about context).
998 if handler is not None and not callable(handler):
999 raise TypeError('A callable object or None is expected, '
1000 'got {0!r}'.format(handler))
1001 self._exception_handler = handler
1003 def default_exception_handler(self, context):
1004 """Default exception handler.
1006 This is called when an exception occurs and no exception
1007 handler is set, and can be called by a custom exception
1008 handler that wants to defer to the default behavior.
1010 The context parameter has the same meaning as in
1011 `call_exception_handler()`.
1013 message = context.get('message')
1015 message = 'Unhandled exception in event loop'
1017 exception = context.get('exception')
1018 if exception is not None:
1019 if hasattr(exception, '__traceback__'):
1021 tb = exception.__traceback__
1023 # call_exception_handler() is usually called indirectly
1024 # from an except block. If it's not the case, the traceback
1026 tb = sys.exc_info()[2]
1027 exc_info = (type(exception), exception, tb)
1031 if ('source_traceback' not in context
1032 and self._current_handle is not None
1033 and self._current_handle._source_traceback):
1034 context['handle_traceback'] = self._current_handle._source_traceback
1036 log_lines = [message]
1037 for key in sorted(context):
1038 if key in ('message', 'exception'):
1040 value = context[key]
1041 if key == 'source_traceback':
1042 tb = ''.join(traceback.format_list(value))
1043 value = 'Object created at (most recent call last):\n'
1044 value += tb.rstrip()
1045 elif key == 'handle_traceback':
1046 tb = ''.join(traceback.format_list(value))
1047 value = 'Handle created at (most recent call last):\n'
1048 value += tb.rstrip()
1051 log_lines.append('{0}: {1}'.format(key, value))
1053 logger.error('\n'.join(log_lines), exc_info=exc_info)
1055 def call_exception_handler(self, context):
1056 """Call the current event loop's exception handler.
1058 The context argument is a dict containing the following keys:
1060 - 'message': Error message;
1061 - 'exception' (optional): Exception object;
1062 - 'future' (optional): Future instance;
1063 - 'handle' (optional): Handle instance;
1064 - 'protocol' (optional): Protocol instance;
1065 - 'transport' (optional): Transport instance;
1066 - 'socket' (optional): Socket instance.
1068 New keys maybe introduced in the future.
1070 Note: do not overload this method in an event loop subclass.
1071 For custom exception handling, use the
1072 `set_exception_handler()` method.
1074 if self._exception_handler is None:
1076 self.default_exception_handler(context)
1078 # Second protection layer for unexpected errors
1079 # in the default implementation, as well as for subclassed
1080 # event loops with overloaded "default_exception_handler".
1081 logger.error('Exception in default exception handler',
1085 self._exception_handler(self, context)
1086 except Exception as exc:
1087 # Exception in the user set custom exception handler.
1089 # Let's try default handler.
1090 self.default_exception_handler({
1091 'message': 'Unhandled error in exception handler',
1096 # Guard 'default_exception_handler' in case it is
1098 logger.error('Exception in default exception handler '
1099 'while handling an unexpected error '
1100 'in custom exception handler',
1103 def _add_callback(self, handle):
1104 """Add a Handle to _scheduled (TimerHandle) or _ready."""
1105 assert isinstance(handle, events.Handle), 'A Handle is required here'
1106 if handle._cancelled:
1108 assert not isinstance(handle, events.TimerHandle)
1109 self._ready.append(handle)
1111 def _add_callback_signalsafe(self, handle):
1112 """Like _add_callback() but called from a signal handler."""
1113 self._add_callback(handle)
1114 self._write_to_self()
1116 def _timer_handle_cancelled(self, handle):
1117 """Notification that a TimerHandle has been cancelled."""
1118 if handle._scheduled:
1119 self._timer_cancelled_count += 1
1121 def _run_once(self):
1122 """Run one full iteration of the event loop.
1124 This calls all currently ready callbacks, polls for I/O,
1125 schedules the resulting callbacks, and finally schedules
1126 'call_later' callbacks.
1129 sched_count = len(self._scheduled)
1130 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1131 float(self._timer_cancelled_count) / sched_count >
1132 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1133 # Remove delayed calls that were cancelled if their number
1136 for handle in self._scheduled:
1137 if handle._cancelled:
1138 handle._scheduled = False
1140 new_scheduled.append(handle)
1142 heapq.heapify(new_scheduled)
1143 self._scheduled = new_scheduled
1144 self._timer_cancelled_count = 0
1146 # Remove delayed calls that were cancelled from head of queue.
1147 while self._scheduled and self._scheduled[0]._cancelled:
1148 self._timer_cancelled_count -= 1
1149 handle = heapq.heappop(self._scheduled)
1150 handle._scheduled = False
1155 elif self._scheduled:
1156 # Compute the desired timeout.
1157 when = self._scheduled[0]._when
1158 timeout = max(0, when - self.time())
1160 if self._debug and timeout != 0:
1162 event_list = self._selector.select(timeout)
1163 dt = self.time() - t0
1165 level = logging.INFO
1167 level = logging.DEBUG
1168 nevent = len(event_list)
1170 logger.log(level, 'poll took %.3f ms: %s events',
1174 'poll %.3f ms took %.3f ms: %s events',
1175 timeout * 1e3, dt * 1e3, nevent)
1178 'poll %.3f ms took %.3f ms: timeout',
1179 timeout * 1e3, dt * 1e3)
1181 event_list = self._selector.select(timeout)
1182 self._process_events(event_list)
1184 # Handle 'later' callbacks that are ready.
1185 end_time = self.time() + self._clock_resolution
1186 while self._scheduled:
1187 handle = self._scheduled[0]
1188 if handle._when >= end_time:
1190 handle = heapq.heappop(self._scheduled)
1191 handle._scheduled = False
1192 self._ready.append(handle)
1194 # This is the only place where callbacks are actually *called*.
1195 # All other places just add them to ready.
1196 # Note: We run all currently scheduled callbacks, but not any
1197 # callbacks scheduled by callbacks run this time around --
1198 # they will be run the next time (after another I/O poll).
1199 # Use an idiom that is thread-safe without using locks.
1200 ntodo = len(self._ready)
1201 for i in range(ntodo):
1202 handle = self._ready.popleft()
1203 if handle._cancelled:
1207 self._current_handle = handle
1210 dt = self.time() - t0
1211 if dt >= self.slow_callback_duration:
1212 logger.warning('Executing %s took %.3f seconds',
1213 _format_handle(handle), dt)
1215 self._current_handle = None
1218 handle = None # Needed to break cycles when an exception occurs.
1220 def _set_coroutine_wrapper(self, enabled):
1222 set_wrapper = sys.set_coroutine_wrapper
1223 get_wrapper = sys.get_coroutine_wrapper
1224 except AttributeError:
1227 enabled = bool(enabled)
1228 if self._coroutine_wrapper_set == enabled:
1231 wrapper = coroutines.debug_wrapper
1232 current_wrapper = get_wrapper()
1235 if current_wrapper not in (None, wrapper):
1237 "loop.set_debug(True): cannot set debug coroutine "
1238 "wrapper; another wrapper is already set %r" %
1239 current_wrapper, RuntimeWarning)
1241 set_wrapper(wrapper)
1242 self._coroutine_wrapper_set = True
1244 if current_wrapper not in (None, wrapper):
1246 "loop.set_debug(False): cannot unset debug coroutine "
1247 "wrapper; another wrapper was set %r" %
1248 current_wrapper, RuntimeWarning)
1251 self._coroutine_wrapper_set = False
1253 def get_debug(self):
1256 def set_debug(self, enabled):
1257 self._debug = enabled
1259 if self.is_running():
1260 self._set_coroutine_wrapper(enabled)