efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / base_events.py
1 """Base implementation of event loop.
2
3 The event loop can be broken up into a multiplexer (the part
4 responsible for notifying us of I/O events) and the event loop proper,
5 which wraps a multiplexer with functionality for scheduling callbacks,
6 immediately or at a given time in the future.
7
8 Whenever a public API takes a callback, subsequent positional
9 arguments will be passed to the callback if/when it is called.  This
10 avoids the proliferation of trivial lambdas implementing closures.
11 Keyword arguments for the callback are not supported; this is a
12 conscious design decision, leaving the door open for keyword arguments
13 to modify the meaning of the API call itself.
14 """
15
16
17 import collections
18 import heapq
19 import inspect
20 import logging
21 import os
22 import socket
23 import subprocess
24 import sys
25 import traceback
26 import warnings
27 try:
28     from collections import OrderedDict
29 except ImportError:
30     # Python 2.6: use ordereddict backport
31     from ordereddict import OrderedDict
32 try:
33     from threading import get_ident as _get_thread_ident
34 except ImportError:
35     # Python 2
36     from threading import _get_ident as _get_thread_ident
37
38 from . import compat
39 from . import coroutines
40 from . import events
41 from . import futures
42 from . import tasks
43 from .coroutines import coroutine, From, Return
44 from .executor import get_default_executor
45 from .log import logger
46 from .time_monotonic import time_monotonic, time_monotonic_resolution
47
48
49 __all__ = ['BaseEventLoop']
50
51
52 # Argument for default thread pool executor creation.
53 _MAX_WORKERS = 5
54
55 # Minimum number of _scheduled timer handles before cleanup of
56 # cancelled handles is performed.
57 _MIN_SCHEDULED_TIMER_HANDLES = 100
58
59 # Minimum fraction of _scheduled timer handles that are cancelled
60 # before cleanup of cancelled handles is performed.
61 _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63 def _format_handle(handle):
64     cb = handle._callback
65     if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
66         # format the task
67         return repr(cb.__self__)
68     else:
69         return str(handle)
70
71
72 def _format_pipe(fd):
73     if fd == subprocess.PIPE:
74         return '<pipe>'
75     elif fd == subprocess.STDOUT:
76         return '<stdout>'
77     else:
78         return repr(fd)
79
80
81 class _StopError(BaseException):
82     """Raised to stop the event loop."""
83
84
85 def _check_resolved_address(sock, address):
86     # Ensure that the address is already resolved to avoid the trap of hanging
87     # the entire event loop when the address requires doing a DNS lookup.
88     #
89     # getaddrinfo() is slow (around 10 us per call): this function should only
90     # be called in debug mode
91     family = sock.family
92
93     if family == socket.AF_INET:
94         host, port = address
95     elif family == socket.AF_INET6:
96         host, port = address[:2]
97     else:
98         return
99
100     # On Windows, socket.inet_pton() is only available since Python 3.4
101     if hasattr(socket, 'inet_pton'):
102         # getaddrinfo() is slow and has known issue: prefer inet_pton()
103         # if available
104         try:
105             socket.inet_pton(family, host)
106         except socket.error as exc:
107             raise ValueError("address must be resolved (IP address), "
108                              "got host %r: %s"
109                              % (host, exc))
110     else:
111         # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
112         # already resolved.
113         type_mask = 0
114         if hasattr(socket, 'SOCK_NONBLOCK'):
115             type_mask |= socket.SOCK_NONBLOCK
116         if hasattr(socket, 'SOCK_CLOEXEC'):
117             type_mask |= socket.SOCK_CLOEXEC
118         try:
119             socket.getaddrinfo(host, port,
120                                family,
121                                (sock.type & ~type_mask),
122                                sock.proto,
123                                socket.AI_NUMERICHOST)
124         except socket.gaierror as err:
125             raise ValueError("address must be resolved (IP address), "
126                              "got host %r: %s"
127                              % (host, err))
128
129 def _raise_stop_error(*args):
130     raise _StopError
131
132
133 def _run_until_complete_cb(fut):
134     exc = fut._exception
135     if (isinstance(exc, BaseException)
136     and not isinstance(exc, Exception)):
137         # Issue #22429: run_forever() already finished, no need to
138         # stop it.
139         return
140     _raise_stop_error()
141
142
143 class Server(events.AbstractServer):
144
145     def __init__(self, loop, sockets):
146         self._loop = loop
147         self.sockets = sockets
148         self._active_count = 0
149         self._waiters = []
150
151     def __repr__(self):
152         return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
153
154     def _attach(self):
155         assert self.sockets is not None
156         self._active_count += 1
157
158     def _detach(self):
159         assert self._active_count > 0
160         self._active_count -= 1
161         if self._active_count == 0 and self.sockets is None:
162             self._wakeup()
163
164     def close(self):
165         sockets = self.sockets
166         if sockets is None:
167             return
168         self.sockets = None
169         for sock in sockets:
170             self._loop._stop_serving(sock)
171         if self._active_count == 0:
172             self._wakeup()
173
174     def _wakeup(self):
175         waiters = self._waiters
176         self._waiters = None
177         for waiter in waiters:
178             if not waiter.done():
179                 waiter.set_result(waiter)
180
181     @coroutine
182     def wait_closed(self):
183         if self.sockets is None or self._waiters is None:
184             raise Return()
185         waiter = futures.Future(loop=self._loop)
186         self._waiters.append(waiter)
187         yield From(waiter)
188
189
190 class BaseEventLoop(events.AbstractEventLoop):
191
192     def __init__(self):
193         self._timer_cancelled_count = 0
194         self._closed = False
195         self._ready = collections.deque()
196         self._scheduled = []
197         self._default_executor = None
198         self._internal_fds = 0
199         # Identifier of the thread running the event loop, or None if the
200         # event loop is not running
201         self._thread_id = None
202         self._clock_resolution = time_monotonic_resolution
203         self._exception_handler = None
204         self.set_debug(bool(os.environ.get('TROLLIUSDEBUG')))
205         # In debug mode, if the execution of a callback or a step of a task
206         # exceed this duration in seconds, the slow callback/task is logged.
207         self.slow_callback_duration = 0.1
208         self._current_handle = None
209         self._task_factory = None
210         self._coroutine_wrapper_set = False
211
212     def __repr__(self):
213         return ('<%s running=%s closed=%s debug=%s>'
214                 % (self.__class__.__name__, self.is_running(),
215                    self.is_closed(), self.get_debug()))
216
217     def create_task(self, coro):
218         """Schedule a coroutine object.
219
220         Return a task object.
221         """
222         self._check_closed()
223         if self._task_factory is None:
224             task = tasks.Task(coro, loop=self)
225             if task._source_traceback:
226                 del task._source_traceback[-1]
227         else:
228             task = self._task_factory(self, coro)
229         return task
230
231     def set_task_factory(self, factory):
232         """Set a task factory that will be used by loop.create_task().
233
234         If factory is None the default task factory will be set.
235
236         If factory is a callable, it should have a signature matching
237         '(loop, coro)', where 'loop' will be a reference to the active
238         event loop, 'coro' will be a coroutine object.  The callable
239         must return a Future.
240         """
241         if factory is not None and not callable(factory):
242             raise TypeError('task factory must be a callable or None')
243         self._task_factory = factory
244
245     def get_task_factory(self):
246         """Return a task factory, or None if the default one is in use."""
247         return self._task_factory
248
249     def _make_socket_transport(self, sock, protocol, waiter=None,
250                                extra=None, server=None):
251         """Create socket transport."""
252         raise NotImplementedError
253
254     def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
255                             server_side=False, server_hostname=None,
256                             extra=None, server=None):
257         """Create SSL transport."""
258         raise NotImplementedError
259
260     def _make_datagram_transport(self, sock, protocol,
261                                  address=None, waiter=None, extra=None):
262         """Create datagram transport."""
263         raise NotImplementedError
264
265     def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
266                                   extra=None):
267         """Create read pipe transport."""
268         raise NotImplementedError
269
270     def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
271                                    extra=None):
272         """Create write pipe transport."""
273         raise NotImplementedError
274
275     @coroutine
276     def _make_subprocess_transport(self, protocol, args, shell,
277                                    stdin, stdout, stderr, bufsize,
278                                    extra=None, **kwargs):
279         """Create subprocess transport."""
280         raise NotImplementedError
281
282     def _write_to_self(self):
283         """Write a byte to self-pipe, to wake up the event loop.
284
285         This may be called from a different thread.
286
287         The subclass is responsible for implementing the self-pipe.
288         """
289         raise NotImplementedError
290
291     def _process_events(self, event_list):
292         """Process selector events."""
293         raise NotImplementedError
294
295     def _check_closed(self):
296         if self._closed:
297             raise RuntimeError('Event loop is closed')
298
299     def run_forever(self):
300         """Run until stop() is called."""
301         self._check_closed()
302         if self.is_running():
303             raise RuntimeError('Event loop is running.')
304         self._set_coroutine_wrapper(self._debug)
305         self._thread_id = _get_thread_ident()
306         try:
307             while True:
308                 try:
309                     self._run_once()
310                 except _StopError:
311                     break
312         finally:
313             self._thread_id = None
314             self._set_coroutine_wrapper(False)
315
316     def run_until_complete(self, future):
317         """Run until the Future is done.
318
319         If the argument is a coroutine, it is wrapped in a Task.
320
321         WARNING: It would be disastrous to call run_until_complete()
322         with the same coroutine twice -- it would wrap it in two
323         different Tasks and that can't be good.
324
325         Return the Future's result, or raise its exception.
326         """
327         self._check_closed()
328
329         new_task = not isinstance(future, futures._FUTURE_CLASSES)
330         future = tasks.ensure_future(future, loop=self)
331         if new_task:
332             # An exception is raised if the future didn't complete, so there
333             # is no need to log the "destroy pending task" message
334             future._log_destroy_pending = False
335
336         future.add_done_callback(_run_until_complete_cb)
337         try:
338             self.run_forever()
339         except:
340             if new_task and future.done() and not future.cancelled():
341                 # The coroutine raised a BaseException. Consume the exception
342                 # to not log a warning, the caller doesn't have access to the
343                 # local task.
344                 future.exception()
345             raise
346         future.remove_done_callback(_run_until_complete_cb)
347         if not future.done():
348             raise RuntimeError('Event loop stopped before Future completed.')
349
350         return future.result()
351
352     def stop(self):
353         """Stop running the event loop.
354
355         Every callback scheduled before stop() is called will run. Callbacks
356         scheduled after stop() is called will not run. However, those callbacks
357         will run if run_forever is called again later.
358         """
359         self.call_soon(_raise_stop_error)
360
361     def close(self):
362         """Close the event loop.
363
364         This clears the queues and shuts down the executor,
365         but does not wait for the executor to finish.
366
367         The event loop must not be running.
368         """
369         if self.is_running():
370             raise RuntimeError("Cannot close a running event loop")
371         if self._closed:
372             return
373         if self._debug:
374             logger.debug("Close %r", self)
375         self._closed = True
376         self._ready.clear()
377         del self._scheduled[:]
378         executor = self._default_executor
379         if executor is not None:
380             self._default_executor = None
381             executor.shutdown(wait=False)
382
383     def is_closed(self):
384         """Returns True if the event loop was closed."""
385         return self._closed
386
387     # On Python 3.3 and older, objects with a destructor part of a reference
388     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
389     # to the PEP 442.
390     if compat.PY34:
391         def __del__(self):
392             if not self.is_closed():
393                 warnings.warn("unclosed event loop %r" % self, ResourceWarning)
394                 if not self.is_running():
395                     self.close()
396
397     def is_running(self):
398         """Returns True if the event loop is running."""
399         return (self._thread_id is not None)
400
401     def time(self):
402         """Return the time according to the event loop's clock.
403
404         This is a float expressed in seconds since an epoch, but the
405         epoch, precision, accuracy and drift are unspecified and may
406         differ per event loop.
407         """
408         return time_monotonic()
409
410     def call_later(self, delay, callback, *args):
411         """Arrange for a callback to be called at a given time.
412
413         Return a Handle: an opaque object with a cancel() method that
414         can be used to cancel the call.
415
416         The delay can be an int or float, expressed in seconds.  It is
417         always relative to the current time.
418
419         Each callback will be called exactly once.  If two callbacks
420         are scheduled for exactly the same time, it undefined which
421         will be called first.
422
423         Any positional arguments after the callback will be passed to
424         the callback when it is called.
425         """
426         timer = self.call_at(self.time() + delay, callback, *args)
427         if timer._source_traceback:
428             del timer._source_traceback[-1]
429         return timer
430
431     def call_at(self, when, callback, *args):
432         """Like call_later(), but uses an absolute time.
433
434         Absolute time corresponds to the event loop's time() method.
435         """
436         if (coroutines.iscoroutine(callback)
437         or coroutines.iscoroutinefunction(callback)):
438             raise TypeError("coroutines cannot be used with call_at()")
439         self._check_closed()
440         if self._debug:
441             self._check_thread()
442         timer = events.TimerHandle(when, callback, args, self)
443         if timer._source_traceback:
444             del timer._source_traceback[-1]
445         heapq.heappush(self._scheduled, timer)
446         timer._scheduled = True
447         return timer
448
449     def call_soon(self, callback, *args):
450         """Arrange for a callback to be called as soon as possible.
451
452         This operates as a FIFO queue: callbacks are called in the
453         order in which they are registered.  Each callback will be
454         called exactly once.
455
456         Any positional arguments after the callback will be passed to
457         the callback when it is called.
458         """
459         if self._debug:
460             self._check_thread()
461         handle = self._call_soon(callback, args)
462         if handle._source_traceback:
463             del handle._source_traceback[-1]
464         return handle
465
466     def _call_soon(self, callback, args):
467         if (coroutines.iscoroutine(callback)
468         or coroutines.iscoroutinefunction(callback)):
469             raise TypeError("coroutines cannot be used with call_soon()")
470         self._check_closed()
471         handle = events.Handle(callback, args, self)
472         if handle._source_traceback:
473             del handle._source_traceback[-1]
474         self._ready.append(handle)
475         return handle
476
477     def _check_thread(self):
478         """Check that the current thread is the thread running the event loop.
479
480         Non-thread-safe methods of this class make this assumption and will
481         likely behave incorrectly when the assumption is violated.
482
483         Should only be called when (self._debug == True).  The caller is
484         responsible for checking this condition for performance reasons.
485         """
486         if self._thread_id is None:
487             return
488         thread_id = _get_thread_ident()
489         if thread_id != self._thread_id:
490             raise RuntimeError(
491                 "Non-thread-safe operation invoked on an event loop other "
492                 "than the current one")
493
494     def call_soon_threadsafe(self, callback, *args):
495         """Like call_soon(), but thread-safe."""
496         handle = self._call_soon(callback, args)
497         if handle._source_traceback:
498             del handle._source_traceback[-1]
499         self._write_to_self()
500         return handle
501
502     def run_in_executor(self, executor, func, *args):
503         if (coroutines.iscoroutine(func)
504         or coroutines.iscoroutinefunction(func)):
505             raise TypeError("coroutines cannot be used with run_in_executor()")
506         self._check_closed()
507         if isinstance(func, events.Handle):
508             assert not args
509             assert not isinstance(func, events.TimerHandle)
510             if func._cancelled:
511                 f = futures.Future(loop=self)
512                 f.set_result(None)
513                 return f
514             func, args = func._callback, func._args
515         if executor is None:
516             executor = self._default_executor
517             if executor is None:
518                 executor = get_default_executor()
519                 self._default_executor = executor
520         return futures.wrap_future(executor.submit(func, *args), loop=self)
521
522     def set_default_executor(self, executor):
523         self._default_executor = executor
524
525     def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
526         msg = ["%s:%r" % (host, port)]
527         if family:
528             msg.append('family=%r' % family)
529         if type:
530             msg.append('type=%r' % type)
531         if proto:
532             msg.append('proto=%r' % proto)
533         if flags:
534             msg.append('flags=%r' % flags)
535         msg = ', '.join(msg)
536         logger.debug('Get address info %s', msg)
537
538         t0 = self.time()
539         addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
540         dt = self.time() - t0
541
542         msg = ('Getting address info %s took %.3f ms: %r'
543                % (msg, dt * 1e3, addrinfo))
544         if dt >= self.slow_callback_duration:
545             logger.info(msg)
546         else:
547             logger.debug(msg)
548         return addrinfo
549
550     def getaddrinfo(self, host, port,
551                     family=0, type=0, proto=0, flags=0):
552         if self._debug:
553             return self.run_in_executor(None, self._getaddrinfo_debug,
554                                         host, port, family, type, proto, flags)
555         else:
556             return self.run_in_executor(None, socket.getaddrinfo,
557                                         host, port, family, type, proto, flags)
558
559     def getnameinfo(self, sockaddr, flags=0):
560         return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
561
562     @coroutine
563     def create_connection(self, protocol_factory, host=None, port=None,
564                           ssl=None, family=0, proto=0, flags=0, sock=None,
565                           local_addr=None, server_hostname=None):
566         """Connect to a TCP server.
567
568         Create a streaming transport connection to a given Internet host and
569         port: socket family AF_INET or socket.AF_INET6 depending on host (or
570         family if specified), socket type SOCK_STREAM. protocol_factory must be
571         a callable returning a protocol instance.
572
573         This method is a coroutine which will try to establish the connection
574         in the background.  When successful, the coroutine returns a
575         (transport, protocol) pair.
576         """
577         if server_hostname is not None and not ssl:
578             raise ValueError('server_hostname is only meaningful with ssl')
579
580         if server_hostname is None and ssl:
581             # Use host as default for server_hostname.  It is an error
582             # if host is empty or not set, e.g. when an
583             # already-connected socket was passed or when only a port
584             # is given.  To avoid this error, you can pass
585             # server_hostname='' -- this will bypass the hostname
586             # check.  (This also means that if host is a numeric
587             # IP/IPv6 address, we will attempt to verify that exact
588             # address; this will probably fail, but it is possible to
589             # create a certificate for a specific IP address, so we
590             # don't judge it here.)
591             if not host:
592                 raise ValueError('You must set server_hostname '
593                                  'when using ssl without a host')
594             server_hostname = host
595
596         if host is not None or port is not None:
597             if sock is not None:
598                 raise ValueError(
599                     'host/port and sock can not be specified at the same time')
600
601             f1 = self.getaddrinfo(
602                 host, port, family=family,
603                 type=socket.SOCK_STREAM, proto=proto, flags=flags)
604             fs = [f1]
605             if local_addr is not None:
606                 f2 = self.getaddrinfo(
607                     *local_addr, family=family,
608                     type=socket.SOCK_STREAM, proto=proto, flags=flags)
609                 fs.append(f2)
610             else:
611                 f2 = None
612
613             yield From(tasks.wait(fs, loop=self))
614
615             infos = f1.result()
616             if not infos:
617                 raise socket.error('getaddrinfo() returned empty list')
618             if f2 is not None:
619                 laddr_infos = f2.result()
620                 if not laddr_infos:
621                     raise socket.error('getaddrinfo() returned empty list')
622
623             exceptions = []
624             for family, type, proto, cname, address in infos:
625                 try:
626                     sock = socket.socket(family=family, type=type, proto=proto)
627                     sock.setblocking(False)
628                     if f2 is not None:
629                         for _, _, _, _, laddr in laddr_infos:
630                             try:
631                                 sock.bind(laddr)
632                                 break
633                             except socket.error as exc:
634                                 exc = socket.error(
635                                     exc.errno, 'error while '
636                                     'attempting to bind on address '
637                                     '{0!r}: {1}'.format(
638                                         laddr, exc.strerror.lower()))
639                                 exceptions.append(exc)
640                         else:
641                             sock.close()
642                             sock = None
643                             continue
644                     if self._debug:
645                         logger.debug("connect %r to %r", sock, address)
646                     yield From(self.sock_connect(sock, address))
647                 except socket.error as exc:
648                     if sock is not None:
649                         sock.close()
650                     exceptions.append(exc)
651                 except:
652                     if sock is not None:
653                         sock.close()
654                     raise
655                 else:
656                     break
657             else:
658                 if len(exceptions) == 1:
659                     raise exceptions[0]
660                 else:
661                     # If they all have the same str(), raise one.
662                     model = str(exceptions[0])
663                     if all(str(exc) == model for exc in exceptions):
664                         raise exceptions[0]
665                     # Raise a combined exception so the user can see all
666                     # the various error messages.
667                     raise socket.error('Multiple exceptions: {0}'.format(
668                         ', '.join(str(exc) for exc in exceptions)))
669
670         elif sock is None:
671             raise ValueError(
672                 'host and port was not specified and no sock specified')
673
674         sock.setblocking(False)
675
676         transport, protocol = yield From(self._create_connection_transport(
677             sock, protocol_factory, ssl, server_hostname))
678         if self._debug:
679             # Get the socket from the transport because SSL transport closes
680             # the old socket and creates a new SSL socket
681             sock = transport.get_extra_info('socket')
682             logger.debug("%r connected to %s:%r: (%r, %r)",
683                          sock, host, port, transport, protocol)
684         raise Return(transport, protocol)
685
686     @coroutine
687     def _create_connection_transport(self, sock, protocol_factory, ssl,
688                                      server_hostname):
689         protocol = protocol_factory()
690         waiter = futures.Future(loop=self)
691         if ssl:
692             sslcontext = None if isinstance(ssl, bool) else ssl
693             transport = self._make_ssl_transport(
694                 sock, protocol, sslcontext, waiter,
695                 server_side=False, server_hostname=server_hostname)
696         else:
697             transport = self._make_socket_transport(sock, protocol, waiter)
698
699         try:
700             yield From(waiter)
701         except:
702             transport.close()
703             raise
704
705         raise Return(transport, protocol)
706
707     @coroutine
708     def create_datagram_endpoint(self, protocol_factory,
709                                  local_addr=None, remote_addr=None,
710                                  family=0, proto=0, flags=0):
711         """Create datagram connection."""
712         if not (local_addr or remote_addr):
713             if family == 0:
714                 raise ValueError('unexpected address family')
715             addr_pairs_info = (((family, proto), (None, None)),)
716         else:
717             # join address by (family, protocol)
718             addr_infos = OrderedDict()
719             for idx, addr in ((0, local_addr), (1, remote_addr)):
720                 if addr is not None:
721                     assert isinstance(addr, tuple) and len(addr) == 2, (
722                         '2-tuple is expected')
723
724                     infos = yield From(self.getaddrinfo(
725                         *addr, family=family, type=socket.SOCK_DGRAM,
726                         proto=proto, flags=flags))
727                     if not infos:
728                         raise socket.error('getaddrinfo() returned empty list')
729
730                     for fam, _, pro, _, address in infos:
731                         key = (fam, pro)
732                         if key not in addr_infos:
733                             addr_infos[key] = [None, None]
734                         addr_infos[key][idx] = address
735
736             # each addr has to have info for each (family, proto) pair
737             addr_pairs_info = [
738                 (key, addr_pair) for key, addr_pair in addr_infos.items()
739                 if not ((local_addr and addr_pair[0] is None) or
740                         (remote_addr and addr_pair[1] is None))]
741
742             if not addr_pairs_info:
743                 raise ValueError('can not get address information')
744
745         exceptions = []
746
747         for ((family, proto),
748              (local_address, remote_address)) in addr_pairs_info:
749             sock = None
750             r_addr = None
751             try:
752                 sock = socket.socket(
753                     family=family, type=socket.SOCK_DGRAM, proto=proto)
754                 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
755                 sock.setblocking(False)
756
757                 if local_addr:
758                     sock.bind(local_address)
759                 if remote_addr:
760                     yield From(self.sock_connect(sock, remote_address))
761                     r_addr = remote_address
762             except socket.error as exc:
763                 if sock is not None:
764                     sock.close()
765                 exceptions.append(exc)
766             except:
767                 if sock is not None:
768                     sock.close()
769                 raise
770             else:
771                 break
772         else:
773             raise exceptions[0]
774
775         protocol = protocol_factory()
776         waiter = futures.Future(loop=self)
777         transport = self._make_datagram_transport(sock, protocol, r_addr,
778                                                   waiter)
779         if self._debug:
780             if local_addr:
781                 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
782                             "created: (%r, %r)",
783                             local_addr, remote_addr, transport, protocol)
784             else:
785                 logger.debug("Datagram endpoint remote_addr=%r created: "
786                              "(%r, %r)",
787                              remote_addr, transport, protocol)
788
789         try:
790             yield From(waiter)
791         except:
792             transport.close()
793             raise
794
795         raise Return(transport, protocol)
796
797     @coroutine
798     def create_server(self, protocol_factory, host=None, port=None,
799                       family=socket.AF_UNSPEC,
800                       flags=socket.AI_PASSIVE,
801                       sock=None,
802                       backlog=100,
803                       ssl=None,
804                       reuse_address=None):
805         """Create a TCP server bound to host and port.
806
807         Return a Server object which can be used to stop the service.
808
809         This method is a coroutine.
810         """
811         if isinstance(ssl, bool):
812             raise TypeError('ssl argument must be an SSLContext or None')
813         if host is not None or port is not None:
814             if sock is not None:
815                 raise ValueError(
816                     'host/port and sock can not be specified at the same time')
817
818             AF_INET6 = getattr(socket, 'AF_INET6', 0)
819             if reuse_address is None:
820                 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
821             sockets = []
822             if host == '':
823                 host = None
824
825             infos = yield From(self.getaddrinfo(
826                 host, port, family=family,
827                 type=socket.SOCK_STREAM, proto=0, flags=flags))
828             if not infos:
829                 raise socket.error('getaddrinfo() returned empty list')
830
831             completed = False
832             try:
833                 for res in infos:
834                     af, socktype, proto, canonname, sa = res
835                     try:
836                         sock = socket.socket(af, socktype, proto)
837                     except socket.error:
838                         # Assume it's a bad family/type/protocol combination.
839                         if self._debug:
840                             logger.warning('create_server() failed to create '
841                                            'socket.socket(%r, %r, %r)',
842                                            af, socktype, proto, exc_info=True)
843                         continue
844                     sockets.append(sock)
845                     if reuse_address:
846                         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
847                                         True)
848                     # Disable IPv4/IPv6 dual stack support (enabled by
849                     # default on Linux) which makes a single socket
850                     # listen on both address families.
851                     if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
852                         sock.setsockopt(socket.IPPROTO_IPV6,
853                                         socket.IPV6_V6ONLY,
854                                         True)
855                     try:
856                         sock.bind(sa)
857                     except socket.error as err:
858                         raise socket.error(err.errno,
859                                            'error while attempting '
860                                            'to bind on address %r: %s'
861                                            % (sa, err.strerror.lower()))
862                 completed = True
863             finally:
864                 if not completed:
865                     for sock in sockets:
866                         sock.close()
867         else:
868             if sock is None:
869                 raise ValueError('Neither host/port nor sock were specified')
870             sockets = [sock]
871
872         server = Server(self, sockets)
873         for sock in sockets:
874             sock.listen(backlog)
875             sock.setblocking(False)
876             self._start_serving(protocol_factory, sock, ssl, server)
877         if self._debug:
878             logger.info("%r is serving", server)
879         raise Return(server)
880
881     @coroutine
882     def connect_read_pipe(self, protocol_factory, pipe):
883         protocol = protocol_factory()
884         waiter = futures.Future(loop=self)
885         transport = self._make_read_pipe_transport(pipe, protocol, waiter)
886
887         try:
888             yield From(waiter)
889         except:
890             transport.close()
891             raise
892
893         if self._debug:
894             logger.debug('Read pipe %r connected: (%r, %r)',
895                          pipe.fileno(), transport, protocol)
896         raise Return(transport, protocol)
897
898     @coroutine
899     def connect_write_pipe(self, protocol_factory, pipe):
900         protocol = protocol_factory()
901         waiter = futures.Future(loop=self)
902         transport = self._make_write_pipe_transport(pipe, protocol, waiter)
903
904         try:
905             yield From(waiter)
906         except:
907             transport.close()
908             raise
909
910         if self._debug:
911             logger.debug('Write pipe %r connected: (%r, %r)',
912                          pipe.fileno(), transport, protocol)
913         raise Return(transport, protocol)
914
915     def _log_subprocess(self, msg, stdin, stdout, stderr):
916         info = [msg]
917         if stdin is not None:
918             info.append('stdin=%s' % _format_pipe(stdin))
919         if stdout is not None and stderr == subprocess.STDOUT:
920             info.append('stdout=stderr=%s' % _format_pipe(stdout))
921         else:
922             if stdout is not None:
923                 info.append('stdout=%s' % _format_pipe(stdout))
924             if stderr is not None:
925                 info.append('stderr=%s' % _format_pipe(stderr))
926         logger.debug(' '.join(info))
927
928     @coroutine
929     def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
930                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
931                          universal_newlines=False, shell=True, bufsize=0,
932                          **kwargs):
933         if not isinstance(cmd, compat.string_types):
934             raise ValueError("cmd must be a string")
935         if universal_newlines:
936             raise ValueError("universal_newlines must be False")
937         if not shell:
938             raise ValueError("shell must be True")
939         if bufsize != 0:
940             raise ValueError("bufsize must be 0")
941         protocol = protocol_factory()
942         if self._debug:
943             # don't log parameters: they may contain sensitive information
944             # (password) and may be too long
945             debug_log = 'run shell command %r' % cmd
946             self._log_subprocess(debug_log, stdin, stdout, stderr)
947         transport = yield From(self._make_subprocess_transport(
948             protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs))
949         if self._debug:
950             logger.info('%s: %r' % (debug_log, transport))
951         raise Return(transport, protocol)
952
953     @coroutine
954     def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
955         stdin = kwargs.pop('stdin', subprocess.PIPE)
956         stdout = kwargs.pop('stdout', subprocess.PIPE)
957         stderr = kwargs.pop('stderr', subprocess.PIPE)
958         universal_newlines = kwargs.pop('universal_newlines', False)
959         shell = kwargs.pop('shell', False)
960         bufsize = kwargs.pop('bufsize', 0)
961         if universal_newlines:
962             raise ValueError("universal_newlines must be False")
963         if shell:
964             raise ValueError("shell must be False")
965         if bufsize != 0:
966             raise ValueError("bufsize must be 0")
967         popen_args = (program,) + args
968         for arg in popen_args:
969             if not isinstance(arg, compat.string_types ):
970                 raise TypeError("program arguments must be "
971                                 "a bytes or text string, not %s"
972                                 % type(arg).__name__)
973         protocol = protocol_factory()
974         if self._debug:
975             # don't log parameters: they may contain sensitive information
976             # (password) and may be too long
977             debug_log = 'execute program %r' % program
978             self._log_subprocess(debug_log, stdin, stdout, stderr)
979         transport = yield From(self._make_subprocess_transport(
980             protocol, popen_args, False, stdin, stdout, stderr,
981             bufsize, **kwargs))
982         if self._debug:
983             logger.info('%s: %r' % (debug_log, transport))
984         raise Return(transport, protocol)
985
986     def set_exception_handler(self, handler):
987         """Set handler as the new event loop exception handler.
988
989         If handler is None, the default exception handler will
990         be set.
991
992         If handler is a callable object, it should have a
993         signature matching '(loop, context)', where 'loop'
994         will be a reference to the active event loop, 'context'
995         will be a dict object (see `call_exception_handler()`
996         documentation for details about context).
997         """
998         if handler is not None and not callable(handler):
999             raise TypeError('A callable object or None is expected, '
1000                             'got {0!r}'.format(handler))
1001         self._exception_handler = handler
1002
1003     def default_exception_handler(self, context):
1004         """Default exception handler.
1005
1006         This is called when an exception occurs and no exception
1007         handler is set, and can be called by a custom exception
1008         handler that wants to defer to the default behavior.
1009
1010         The context parameter has the same meaning as in
1011         `call_exception_handler()`.
1012         """
1013         message = context.get('message')
1014         if not message:
1015             message = 'Unhandled exception in event loop'
1016
1017         exception = context.get('exception')
1018         if exception is not None:
1019             if hasattr(exception, '__traceback__'):
1020                 # Python 3
1021                 tb = exception.__traceback__
1022             else:
1023                 # call_exception_handler() is usually called indirectly
1024                 # from an except block. If it's not the case, the traceback
1025                 # is undefined...
1026                 tb = sys.exc_info()[2]
1027             exc_info = (type(exception), exception, tb)
1028         else:
1029             exc_info = False
1030
1031         if ('source_traceback' not in context
1032         and self._current_handle is not None
1033         and self._current_handle._source_traceback):
1034             context['handle_traceback'] = self._current_handle._source_traceback
1035
1036         log_lines = [message]
1037         for key in sorted(context):
1038             if key in ('message', 'exception'):
1039                 continue
1040             value = context[key]
1041             if key == 'source_traceback':
1042                 tb = ''.join(traceback.format_list(value))
1043                 value = 'Object created at (most recent call last):\n'
1044                 value += tb.rstrip()
1045             elif key == 'handle_traceback':
1046                 tb = ''.join(traceback.format_list(value))
1047                 value = 'Handle created at (most recent call last):\n'
1048                 value += tb.rstrip()
1049             else:
1050                 value = repr(value)
1051             log_lines.append('{0}: {1}'.format(key, value))
1052
1053         logger.error('\n'.join(log_lines), exc_info=exc_info)
1054
1055     def call_exception_handler(self, context):
1056         """Call the current event loop's exception handler.
1057
1058         The context argument is a dict containing the following keys:
1059
1060         - 'message': Error message;
1061         - 'exception' (optional): Exception object;
1062         - 'future' (optional): Future instance;
1063         - 'handle' (optional): Handle instance;
1064         - 'protocol' (optional): Protocol instance;
1065         - 'transport' (optional): Transport instance;
1066         - 'socket' (optional): Socket instance.
1067
1068         New keys maybe introduced in the future.
1069
1070         Note: do not overload this method in an event loop subclass.
1071         For custom exception handling, use the
1072         `set_exception_handler()` method.
1073         """
1074         if self._exception_handler is None:
1075             try:
1076                 self.default_exception_handler(context)
1077             except Exception:
1078                 # Second protection layer for unexpected errors
1079                 # in the default implementation, as well as for subclassed
1080                 # event loops with overloaded "default_exception_handler".
1081                 logger.error('Exception in default exception handler',
1082                              exc_info=True)
1083         else:
1084             try:
1085                 self._exception_handler(self, context)
1086             except Exception as exc:
1087                 # Exception in the user set custom exception handler.
1088                 try:
1089                     # Let's try default handler.
1090                     self.default_exception_handler({
1091                         'message': 'Unhandled error in exception handler',
1092                         'exception': exc,
1093                         'context': context,
1094                     })
1095                 except Exception:
1096                     # Guard 'default_exception_handler' in case it is
1097                     # overloaded.
1098                     logger.error('Exception in default exception handler '
1099                                  'while handling an unexpected error '
1100                                  'in custom exception handler',
1101                                  exc_info=True)
1102
1103     def _add_callback(self, handle):
1104         """Add a Handle to _scheduled (TimerHandle) or _ready."""
1105         assert isinstance(handle, events.Handle), 'A Handle is required here'
1106         if handle._cancelled:
1107             return
1108         assert not isinstance(handle, events.TimerHandle)
1109         self._ready.append(handle)
1110
1111     def _add_callback_signalsafe(self, handle):
1112         """Like _add_callback() but called from a signal handler."""
1113         self._add_callback(handle)
1114         self._write_to_self()
1115
1116     def _timer_handle_cancelled(self, handle):
1117         """Notification that a TimerHandle has been cancelled."""
1118         if handle._scheduled:
1119             self._timer_cancelled_count += 1
1120
1121     def _run_once(self):
1122         """Run one full iteration of the event loop.
1123
1124         This calls all currently ready callbacks, polls for I/O,
1125         schedules the resulting callbacks, and finally schedules
1126         'call_later' callbacks.
1127         """
1128
1129         sched_count = len(self._scheduled)
1130         if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1131             float(self._timer_cancelled_count) / sched_count >
1132                 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1133             # Remove delayed calls that were cancelled if their number
1134             # is too high
1135             new_scheduled = []
1136             for handle in self._scheduled:
1137                 if handle._cancelled:
1138                     handle._scheduled = False
1139                 else:
1140                     new_scheduled.append(handle)
1141
1142             heapq.heapify(new_scheduled)
1143             self._scheduled = new_scheduled
1144             self._timer_cancelled_count = 0
1145         else:
1146             # Remove delayed calls that were cancelled from head of queue.
1147             while self._scheduled and self._scheduled[0]._cancelled:
1148                 self._timer_cancelled_count -= 1
1149                 handle = heapq.heappop(self._scheduled)
1150                 handle._scheduled = False
1151
1152         timeout = None
1153         if self._ready:
1154             timeout = 0
1155         elif self._scheduled:
1156             # Compute the desired timeout.
1157             when = self._scheduled[0]._when
1158             timeout = max(0, when - self.time())
1159
1160         if self._debug and timeout != 0:
1161             t0 = self.time()
1162             event_list = self._selector.select(timeout)
1163             dt = self.time() - t0
1164             if dt >= 1.0:
1165                 level = logging.INFO
1166             else:
1167                 level = logging.DEBUG
1168             nevent = len(event_list)
1169             if timeout is None:
1170                 logger.log(level, 'poll took %.3f ms: %s events',
1171                            dt * 1e3, nevent)
1172             elif nevent:
1173                 logger.log(level,
1174                            'poll %.3f ms took %.3f ms: %s events',
1175                            timeout * 1e3, dt * 1e3, nevent)
1176             elif dt >= 1.0:
1177                 logger.log(level,
1178                            'poll %.3f ms took %.3f ms: timeout',
1179                            timeout * 1e3, dt * 1e3)
1180         else:
1181             event_list = self._selector.select(timeout)
1182         self._process_events(event_list)
1183
1184         # Handle 'later' callbacks that are ready.
1185         end_time = self.time() + self._clock_resolution
1186         while self._scheduled:
1187             handle = self._scheduled[0]
1188             if handle._when >= end_time:
1189                 break
1190             handle = heapq.heappop(self._scheduled)
1191             handle._scheduled = False
1192             self._ready.append(handle)
1193
1194         # This is the only place where callbacks are actually *called*.
1195         # All other places just add them to ready.
1196         # Note: We run all currently scheduled callbacks, but not any
1197         # callbacks scheduled by callbacks run this time around --
1198         # they will be run the next time (after another I/O poll).
1199         # Use an idiom that is thread-safe without using locks.
1200         ntodo = len(self._ready)
1201         for i in range(ntodo):
1202             handle = self._ready.popleft()
1203             if handle._cancelled:
1204                 continue
1205             if self._debug:
1206                 try:
1207                     self._current_handle = handle
1208                     t0 = self.time()
1209                     handle._run()
1210                     dt = self.time() - t0
1211                     if dt >= self.slow_callback_duration:
1212                         logger.warning('Executing %s took %.3f seconds',
1213                                        _format_handle(handle), dt)
1214                 finally:
1215                     self._current_handle = None
1216             else:
1217                 handle._run()
1218         handle = None  # Needed to break cycles when an exception occurs.
1219
1220     def _set_coroutine_wrapper(self, enabled):
1221         try:
1222             set_wrapper = sys.set_coroutine_wrapper
1223             get_wrapper = sys.get_coroutine_wrapper
1224         except AttributeError:
1225             return
1226
1227         enabled = bool(enabled)
1228         if self._coroutine_wrapper_set == enabled:
1229             return
1230
1231         wrapper = coroutines.debug_wrapper
1232         current_wrapper = get_wrapper()
1233
1234         if enabled:
1235             if current_wrapper not in (None, wrapper):
1236                 warnings.warn(
1237                     "loop.set_debug(True): cannot set debug coroutine "
1238                     "wrapper; another wrapper is already set %r" %
1239                     current_wrapper, RuntimeWarning)
1240             else:
1241                 set_wrapper(wrapper)
1242                 self._coroutine_wrapper_set = True
1243         else:
1244             if current_wrapper not in (None, wrapper):
1245                 warnings.warn(
1246                     "loop.set_debug(False): cannot unset debug coroutine "
1247                     "wrapper; another wrapper was set %r" %
1248                     current_wrapper, RuntimeWarning)
1249             else:
1250                 set_wrapper(None)
1251                 self._coroutine_wrapper_set = False
1252
1253     def get_debug(self):
1254         return self._debug
1255
1256     def set_debug(self, enabled):
1257         self._debug = enabled
1258
1259         if self.is_running():
1260             self._set_coroutine_wrapper(enabled)