efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / unix_events.py
1 """Selector event loop for Unix with signal handling."""
2 from __future__ import absolute_import
3
4 import errno
5 import os
6 import signal
7 import socket
8 import stat
9 import subprocess
10 import sys
11 import threading
12 import warnings
13
14
15 from . import base_events
16 from . import base_subprocess
17 from . import compat
18 from . import constants
19 from . import coroutines
20 from . import events
21 from . import futures
22 from . import selector_events
23 from . import selectors
24 from . import transports
25 from .compat import flatten_bytes
26 from .coroutines import coroutine, From, Return
27 from .log import logger
28 from .py33_exceptions import (
29     reraise, wrap_error,
30     BlockingIOError, BrokenPipeError, ConnectionResetError,
31     InterruptedError, ChildProcessError)
32
33
34 __all__ = ['SelectorEventLoop',
35            'AbstractChildWatcher', 'SafeChildWatcher',
36            'FastChildWatcher', 'DefaultEventLoopPolicy',
37            ]
38
39 if sys.platform == 'win32':  # pragma: no cover
40     raise ImportError('Signals are not really supported on Windows')
41
42
43 if compat.PY33:
44     def _sighandler_noop(signum, frame):
45         """Dummy signal handler."""
46         pass
47
48
49 class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
50     """Unix event loop.
51
52     Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
53     """
54
55     def __init__(self, selector=None):
56         super(_UnixSelectorEventLoop, self).__init__(selector)
57         self._signal_handlers = {}
58
59     def _socketpair(self):
60         return socket.socketpair()
61
62     def close(self):
63         super(_UnixSelectorEventLoop, self).close()
64         for sig in list(self._signal_handlers):
65             self.remove_signal_handler(sig)
66
67     # On Python <= 3.2, the C signal handler of Python writes a null byte into
68     # the wakeup file descriptor. We cannot retrieve the signal numbers from
69     # the file descriptor.
70     if compat.PY33:
71         def _process_self_data(self, data):
72             for signum in data:
73                 if not signum:
74                     # ignore null bytes written by _write_to_self()
75                     continue
76                 self._handle_signal(signum)
77
78     def add_signal_handler(self, sig, callback, *args):
79         """Add a handler for a signal.  UNIX only.
80
81         Raise ValueError if the signal number is invalid or uncatchable.
82         Raise RuntimeError if there is a problem setting up the handler.
83         """
84         if (coroutines.iscoroutine(callback)
85         or coroutines.iscoroutinefunction(callback)):
86             raise TypeError("coroutines cannot be used "
87                             "with add_signal_handler()")
88         self._check_signal(sig)
89         self._check_closed()
90         try:
91             # set_wakeup_fd() raises ValueError if this is not the
92             # main thread.  By calling it early we ensure that an
93             # event loop running in another thread cannot add a signal
94             # handler.
95             signal.set_wakeup_fd(self._csock.fileno())
96         except (ValueError, OSError) as exc:
97             raise RuntimeError(str(exc))
98
99         handle = events.Handle(callback, args, self)
100         self._signal_handlers[sig] = handle
101
102         try:
103             if compat.PY33:
104                 # On Python 3.3 and newer, the C signal handler writes the
105                 # signal number into the wakeup file descriptor and then calls
106                 # Py_AddPendingCall() to schedule the Python signal handler.
107                 #
108                 # Register a dummy signal handler to ask Python to write the
109                 # signal number into the wakup file descriptor.
110                 # _process_self_data() will read signal numbers from this file
111                 # descriptor to handle signals.
112                 signal.signal(sig, _sighandler_noop)
113             else:
114                 # On Python 3.2 and older, the C signal handler first calls
115                 # Py_AddPendingCall() to schedule the Python signal handler,
116                 # and then write a null byte into the wakeup file descriptor.
117                 signal.signal(sig, self._handle_signal)
118
119             # Set SA_RESTART to limit EINTR occurrences.
120             signal.siginterrupt(sig, False)
121         except (RuntimeError, OSError) as exc:
122             # On Python 2, signal.signal(signal.SIGKILL, signal.SIG_IGN) raises
123             # RuntimeError(22, 'Invalid argument'). On Python 3,
124             # OSError(22, 'Invalid argument') is raised instead.
125             exc_type, exc_value, tb = sys.exc_info()
126
127             del self._signal_handlers[sig]
128             if not self._signal_handlers:
129                 try:
130                     signal.set_wakeup_fd(-1)
131                 except (ValueError, OSError) as nexc:
132                     logger.info('set_wakeup_fd(-1) failed: %s', nexc)
133
134             if isinstance(exc, RuntimeError) or exc.errno == errno.EINVAL:
135                 raise RuntimeError('sig {0} cannot be caught'.format(sig))
136             else:
137                 reraise(exc_type, exc_value, tb)
138
139     def _handle_signal(self, sig, frame=None):
140         """Internal helper that is the actual signal handler."""
141         handle = self._signal_handlers.get(sig)
142         if handle is None:
143             return  # Assume it's some race condition.
144         if handle._cancelled:
145             self.remove_signal_handler(sig)  # Remove it properly.
146         else:
147             self._add_callback_signalsafe(handle)
148
149     def remove_signal_handler(self, sig):
150         """Remove a handler for a signal.  UNIX only.
151
152         Return True if a signal handler was removed, False if not.
153         """
154         self._check_signal(sig)
155         try:
156             del self._signal_handlers[sig]
157         except KeyError:
158             return False
159
160         if sig == signal.SIGINT:
161             handler = signal.default_int_handler
162         else:
163             handler = signal.SIG_DFL
164
165         try:
166             signal.signal(sig, handler)
167         except OSError as exc:
168             if exc.errno == errno.EINVAL:
169                 raise RuntimeError('sig {0} cannot be caught'.format(sig))
170             else:
171                 raise
172
173         if not self._signal_handlers:
174             try:
175                 signal.set_wakeup_fd(-1)
176             except (ValueError, OSError) as exc:
177                 logger.info('set_wakeup_fd(-1) failed: %s', exc)
178
179         return True
180
181     def _check_signal(self, sig):
182         """Internal helper to validate a signal.
183
184         Raise ValueError if the signal number is invalid or uncatchable.
185         Raise RuntimeError if there is a problem setting up the handler.
186         """
187         if not isinstance(sig, int):
188             raise TypeError('sig must be an int, not {0!r}'.format(sig))
189
190         if not (1 <= sig < signal.NSIG):
191             raise ValueError(
192                 'sig {0} out of range(1, {1})'.format(sig, signal.NSIG))
193
194     def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
195                                   extra=None):
196         return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
197
198     def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
199                                    extra=None):
200         return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
201
202     @coroutine
203     def _make_subprocess_transport(self, protocol, args, shell,
204                                    stdin, stdout, stderr, bufsize,
205                                    extra=None, **kwargs):
206         with events.get_child_watcher() as watcher:
207             waiter = futures.Future(loop=self)
208             transp = _UnixSubprocessTransport(self, protocol, args, shell,
209                                               stdin, stdout, stderr, bufsize,
210                                               waiter=waiter, extra=extra,
211                                               **kwargs)
212
213             watcher.add_child_handler(transp.get_pid(),
214                                       self._child_watcher_callback, transp)
215             try:
216                 yield From(waiter)
217             except Exception as exc:
218                 # Workaround CPython bug #23353: using yield/yield-from in an
219                 # except block of a generator doesn't clear properly
220                 # sys.exc_info()
221                 err = exc
222             else:
223                 err = None
224
225             if err is not None:
226                 transp.close()
227                 yield From(transp._wait())
228                 raise err
229
230         raise Return(transp)
231
232     def _child_watcher_callback(self, pid, returncode, transp):
233         self.call_soon_threadsafe(transp._process_exited, returncode)
234
235     @coroutine
236     def create_unix_connection(self, protocol_factory, path,
237                                ssl=None, sock=None,
238                                server_hostname=None):
239         assert server_hostname is None or isinstance(server_hostname, str)
240         if ssl:
241             if server_hostname is None:
242                 raise ValueError(
243                     'you have to pass server_hostname when using ssl')
244         else:
245             if server_hostname is not None:
246                 raise ValueError('server_hostname is only meaningful with ssl')
247
248         if path is not None:
249             if sock is not None:
250                 raise ValueError(
251                     'path and sock can not be specified at the same time')
252
253             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
254             try:
255                 sock.setblocking(False)
256                 yield From(self.sock_connect(sock, path))
257             except:
258                 sock.close()
259                 raise
260
261         else:
262             if sock is None:
263                 raise ValueError('no path and sock were specified')
264             sock.setblocking(False)
265
266         transport, protocol = yield From(self._create_connection_transport(
267             sock, protocol_factory, ssl, server_hostname))
268         raise Return(transport, protocol)
269
270     @coroutine
271     def create_unix_server(self, protocol_factory, path=None,
272                            sock=None, backlog=100, ssl=None):
273         if isinstance(ssl, bool):
274             raise TypeError('ssl argument must be an SSLContext or None')
275
276         if path is not None:
277             if sock is not None:
278                 raise ValueError(
279                     'path and sock can not be specified at the same time')
280
281             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
282
283             try:
284                 sock.bind(path)
285             except socket.error as exc:
286                 sock.close()
287                 if exc.errno == errno.EADDRINUSE:
288                     # Let's improve the error message by adding
289                     # with what exact address it occurs.
290                     msg = 'Address {0!r} is already in use'.format(path)
291                     raise OSError(errno.EADDRINUSE, msg)
292                 else:
293                     raise
294             except:
295                 sock.close()
296                 raise
297         else:
298             if sock is None:
299                 raise ValueError(
300                     'path was not specified, and no sock specified')
301
302             if sock.family != socket.AF_UNIX:
303                 raise ValueError(
304                     'A UNIX Domain Socket was expected, got {0!r}'.format(sock))
305
306         server = base_events.Server(self, [sock])
307         sock.listen(backlog)
308         sock.setblocking(False)
309         self._start_serving(protocol_factory, sock, ssl, server)
310         return server
311
312
313 if hasattr(os, 'set_blocking'):
314     # Python 3.5 and newer
315     def _set_nonblocking(fd):
316         os.set_blocking(fd, False)
317 else:
318     import fcntl
319
320     def _set_nonblocking(fd):
321         flags = fcntl.fcntl(fd, fcntl.F_GETFL)
322         flags = flags | os.O_NONBLOCK
323         fcntl.fcntl(fd, fcntl.F_SETFL, flags)
324
325
326 class _UnixReadPipeTransport(transports.ReadTransport):
327
328     max_size = 256 * 1024  # max bytes we read in one event loop iteration
329
330     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
331         super(_UnixReadPipeTransport, self).__init__(extra)
332         self._extra['pipe'] = pipe
333         self._loop = loop
334         self._pipe = pipe
335         self._fileno = pipe.fileno()
336         mode = os.fstat(self._fileno).st_mode
337         if not (stat.S_ISFIFO(mode) or
338                 stat.S_ISSOCK(mode) or
339                 stat.S_ISCHR(mode)):
340             raise ValueError("Pipe transport is for pipes/sockets only.")
341         _set_nonblocking(self._fileno)
342         self._protocol = protocol
343         self._closing = False
344         self._loop.call_soon(self._protocol.connection_made, self)
345         # only start reading when connection_made() has been called
346         self._loop.call_soon(self._loop.add_reader,
347                              self._fileno, self._read_ready)
348         if waiter is not None:
349             # only wake up the waiter when connection_made() has been called
350             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
351
352     def __repr__(self):
353         info = [self.__class__.__name__]
354         if self._pipe is None:
355             info.append('closed')
356         elif self._closing:
357             info.append('closing')
358         info.append('fd=%s' % self._fileno)
359         if self._pipe is not None:
360             polling = selector_events._test_selector_event(
361                           self._loop._selector,
362                           self._fileno, selectors.EVENT_READ)
363             if polling:
364                 info.append('polling')
365             else:
366                 info.append('idle')
367         else:
368             info.append('closed')
369         return '<%s>' % ' '.join(info)
370
371     def _read_ready(self):
372         try:
373             data = wrap_error(os.read, self._fileno, self.max_size)
374         except (BlockingIOError, InterruptedError):
375             pass
376         except OSError as exc:
377             self._fatal_error(exc, 'Fatal read error on pipe transport')
378         else:
379             if data:
380                 self._protocol.data_received(data)
381             else:
382                 if self._loop.get_debug():
383                     logger.info("%r was closed by peer", self)
384                 self._closing = True
385                 self._loop.remove_reader(self._fileno)
386                 self._loop.call_soon(self._protocol.eof_received)
387                 self._loop.call_soon(self._call_connection_lost, None)
388
389     def pause_reading(self):
390         self._loop.remove_reader(self._fileno)
391
392     def resume_reading(self):
393         self._loop.add_reader(self._fileno, self._read_ready)
394
395     def close(self):
396         if not self._closing:
397             self._close(None)
398
399     # On Python 3.3 and older, objects with a destructor part of a reference
400     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
401     # to the PEP 442.
402     if compat.PY34:
403         def __del__(self):
404             if self._pipe is not None:
405                 warnings.warn("unclosed transport %r" % self, ResourceWarning)
406                 self._pipe.close()
407
408     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
409         # should be called by exception handler only
410         if (isinstance(exc, OSError) and exc.errno == errno.EIO):
411             if self._loop.get_debug():
412                 logger.debug("%r: %s", self, message, exc_info=True)
413         else:
414             self._loop.call_exception_handler({
415                 'message': message,
416                 'exception': exc,
417                 'transport': self,
418                 'protocol': self._protocol,
419             })
420         self._close(exc)
421
422     def _close(self, exc):
423         self._closing = True
424         self._loop.remove_reader(self._fileno)
425         self._loop.call_soon(self._call_connection_lost, exc)
426
427     def _call_connection_lost(self, exc):
428         try:
429             self._protocol.connection_lost(exc)
430         finally:
431             self._pipe.close()
432             self._pipe = None
433             self._protocol = None
434             self._loop = None
435
436
437 class _UnixWritePipeTransport(transports._FlowControlMixin,
438                               transports.WriteTransport):
439
440     def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
441         super(_UnixWritePipeTransport, self).__init__(extra, loop)
442         self._extra['pipe'] = pipe
443         self._pipe = pipe
444         self._fileno = pipe.fileno()
445         mode = os.fstat(self._fileno).st_mode
446         is_socket = stat.S_ISSOCK(mode)
447         if not (is_socket or
448                 stat.S_ISFIFO(mode) or
449                 stat.S_ISCHR(mode)):
450             raise ValueError("Pipe transport is only for "
451                              "pipes, sockets and character devices")
452         _set_nonblocking(self._fileno)
453         self._protocol = protocol
454         self._buffer = []
455         self._conn_lost = 0
456         self._closing = False  # Set when close() or write_eof() called.
457
458         self._loop.call_soon(self._protocol.connection_made, self)
459
460         # On AIX, the reader trick (to be notified when the read end of the
461         # socket is closed) only works for sockets. On other platforms it
462         # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
463         if is_socket or not sys.platform.startswith("aix"):
464             # only start reading when connection_made() has been called
465             self._loop.call_soon(self._loop.add_reader,
466                                  self._fileno, self._read_ready)
467
468         if waiter is not None:
469             # only wake up the waiter when connection_made() has been called
470             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
471
472     def __repr__(self):
473         info = [self.__class__.__name__]
474         if self._pipe is None:
475             info.append('closed')
476         elif self._closing:
477             info.append('closing')
478         info.append('fd=%s' % self._fileno)
479         if self._pipe is not None:
480             polling = selector_events._test_selector_event(
481                           self._loop._selector,
482                           self._fileno, selectors.EVENT_WRITE)
483             if polling:
484                 info.append('polling')
485             else:
486                 info.append('idle')
487
488             bufsize = self.get_write_buffer_size()
489             info.append('bufsize=%s' % bufsize)
490         else:
491             info.append('closed')
492         return '<%s>' % ' '.join(info)
493
494     def get_write_buffer_size(self):
495         return sum(len(data) for data in self._buffer)
496
497     def _read_ready(self):
498         # Pipe was closed by peer.
499         if self._loop.get_debug():
500             logger.info("%r was closed by peer", self)
501         if self._buffer:
502             self._close(BrokenPipeError())
503         else:
504             self._close()
505
506     def write(self, data):
507         data = flatten_bytes(data)
508         if not data:
509             return
510
511         if self._conn_lost or self._closing:
512             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
513                 logger.warning('pipe closed by peer or '
514                                'os.write(pipe, data) raised exception.')
515             self._conn_lost += 1
516             return
517
518         if not self._buffer:
519             # Attempt to send it right away first.
520             try:
521                 n = wrap_error(os.write, self._fileno, data)
522             except (BlockingIOError, InterruptedError):
523                 n = 0
524             except Exception as exc:
525                 self._conn_lost += 1
526                 self._fatal_error(exc, 'Fatal write error on pipe transport')
527                 return
528             if n == len(data):
529                 return
530             elif n > 0:
531                 data = data[n:]
532             self._loop.add_writer(self._fileno, self._write_ready)
533
534         self._buffer.append(data)
535         self._maybe_pause_protocol()
536
537     def _write_ready(self):
538         data = b''.join(self._buffer)
539         assert data, 'Data should not be empty'
540
541         del self._buffer[:]
542         try:
543             n = wrap_error(os.write, self._fileno, data)
544         except (BlockingIOError, InterruptedError):
545             self._buffer.append(data)
546         except Exception as exc:
547             self._conn_lost += 1
548             # Remove writer here, _fatal_error() doesn't it
549             # because _buffer is empty.
550             self._loop.remove_writer(self._fileno)
551             self._fatal_error(exc, 'Fatal write error on pipe transport')
552         else:
553             if n == len(data):
554                 self._loop.remove_writer(self._fileno)
555                 self._maybe_resume_protocol()  # May append to buffer.
556                 if not self._buffer and self._closing:
557                     self._loop.remove_reader(self._fileno)
558                     self._call_connection_lost(None)
559                 return
560             elif n > 0:
561                 data = data[n:]
562
563             self._buffer.append(data)  # Try again later.
564
565     def can_write_eof(self):
566         return True
567
568     def write_eof(self):
569         if self._closing:
570             return
571         assert self._pipe
572         self._closing = True
573         if not self._buffer:
574             self._loop.remove_reader(self._fileno)
575             self._loop.call_soon(self._call_connection_lost, None)
576
577     def close(self):
578         if self._pipe is not None and not self._closing:
579             # write_eof is all what we needed to close the write pipe
580             self.write_eof()
581
582     # On Python 3.3 and older, objects with a destructor part of a reference
583     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
584     # to the PEP 442.
585     if compat.PY34:
586         def __del__(self):
587             if self._pipe is not None:
588                 warnings.warn("unclosed transport %r" % self, ResourceWarning)
589                 self._pipe.close()
590
591     def abort(self):
592         self._close(None)
593
594     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
595         # should be called by exception handler only
596         if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
597             if self._loop.get_debug():
598                 logger.debug("%r: %s", self, message, exc_info=True)
599         else:
600             self._loop.call_exception_handler({
601                 'message': message,
602                 'exception': exc,
603                 'transport': self,
604                 'protocol': self._protocol,
605             })
606         self._close(exc)
607
608     def _close(self, exc=None):
609         self._closing = True
610         if self._buffer:
611             self._loop.remove_writer(self._fileno)
612         del self._buffer[:]
613         self._loop.remove_reader(self._fileno)
614         self._loop.call_soon(self._call_connection_lost, exc)
615
616     def _call_connection_lost(self, exc):
617         try:
618             self._protocol.connection_lost(exc)
619         finally:
620             self._pipe.close()
621             self._pipe = None
622             self._protocol = None
623             self._loop = None
624
625
626 if hasattr(os, 'set_inheritable'):
627     # Python 3.4 and newer
628     _set_inheritable = os.set_inheritable
629 else:
630     import fcntl
631
632     def _set_inheritable(fd, inheritable):
633         cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
634
635         old = fcntl.fcntl(fd, fcntl.F_GETFD)
636         if not inheritable:
637             fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
638         else:
639             fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
640
641
642 class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
643
644     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
645         stdin_w = None
646         if stdin == subprocess.PIPE:
647             # Use a socket pair for stdin, since not all platforms
648             # support selecting read events on the write end of a
649             # socket (which we use in order to detect closing of the
650             # other end).  Notably this is needed on AIX, and works
651             # just fine on other platforms.
652             stdin, stdin_w = self._loop._socketpair()
653
654             # Mark the write end of the stdin pipe as non-inheritable,
655             # needed by close_fds=False on Python 3.3 and older
656             # (Python 3.4 implements the PEP 446, socketpair returns
657             # non-inheritable sockets)
658             _set_inheritable(stdin_w.fileno(), False)
659         self._proc = subprocess.Popen(
660             args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
661             universal_newlines=False, bufsize=bufsize, **kwargs)
662         if stdin_w is not None:
663             # Retrieve the file descriptor from stdin_w, stdin_w should not
664             # "own" the file descriptor anymore: closing stdin_fd file
665             # descriptor must close immediatly the file
666             stdin.close()
667             if hasattr(stdin_w, 'detach'):
668                 stdin_fd = stdin_w.detach()
669                 self._proc.stdin = os.fdopen(stdin_fd, 'wb', bufsize)
670             else:
671                 stdin_dup = os.dup(stdin_w.fileno())
672                 stdin_w.close()
673                 self._proc.stdin = os.fdopen(stdin_dup, 'wb', bufsize)
674
675
676 class AbstractChildWatcher(object):
677     """Abstract base class for monitoring child processes.
678
679     Objects derived from this class monitor a collection of subprocesses and
680     report their termination or interruption by a signal.
681
682     New callbacks are registered with .add_child_handler(). Starting a new
683     process must be done within a 'with' block to allow the watcher to suspend
684     its activity until the new process if fully registered (this is needed to
685     prevent a race condition in some implementations).
686
687     Example:
688         with watcher:
689             proc = subprocess.Popen("sleep 1")
690             watcher.add_child_handler(proc.pid, callback)
691
692     Notes:
693         Implementations of this class must be thread-safe.
694
695         Since child watcher objects may catch the SIGCHLD signal and call
696         waitpid(-1), there should be only one active object per process.
697     """
698
699     def add_child_handler(self, pid, callback, *args):
700         """Register a new child handler.
701
702         Arrange for callback(pid, returncode, *args) to be called when
703         process 'pid' terminates. Specifying another callback for the same
704         process replaces the previous handler.
705
706         Note: callback() must be thread-safe.
707         """
708         raise NotImplementedError()
709
710     def remove_child_handler(self, pid):
711         """Removes the handler for process 'pid'.
712
713         The function returns True if the handler was successfully removed,
714         False if there was nothing to remove."""
715
716         raise NotImplementedError()
717
718     def attach_loop(self, loop):
719         """Attach the watcher to an event loop.
720
721         If the watcher was previously attached to an event loop, then it is
722         first detached before attaching to the new loop.
723
724         Note: loop may be None.
725         """
726         raise NotImplementedError()
727
728     def close(self):
729         """Close the watcher.
730
731         This must be called to make sure that any underlying resource is freed.
732         """
733         raise NotImplementedError()
734
735     def __enter__(self):
736         """Enter the watcher's context and allow starting new processes
737
738         This function must return self"""
739         raise NotImplementedError()
740
741     def __exit__(self, a, b, c):
742         """Exit the watcher's context"""
743         raise NotImplementedError()
744
745
746 class BaseChildWatcher(AbstractChildWatcher):
747
748     def __init__(self):
749         self._loop = None
750
751     def close(self):
752         self.attach_loop(None)
753
754     def _do_waitpid(self, expected_pid):
755         raise NotImplementedError()
756
757     def _do_waitpid_all(self):
758         raise NotImplementedError()
759
760     def attach_loop(self, loop):
761         assert loop is None or isinstance(loop, events.AbstractEventLoop)
762
763         if self._loop is not None:
764             self._loop.remove_signal_handler(signal.SIGCHLD)
765
766         self._loop = loop
767         if loop is not None:
768             loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
769
770             # Prevent a race condition in case a child terminated
771             # during the switch.
772             self._do_waitpid_all()
773
774     def _sig_chld(self):
775         try:
776             self._do_waitpid_all()
777         except Exception as exc:
778             # self._loop should always be available here
779             # as '_sig_chld' is added as a signal handler
780             # in 'attach_loop'
781             self._loop.call_exception_handler({
782                 'message': 'Unknown exception in SIGCHLD handler',
783                 'exception': exc,
784             })
785
786     def _compute_returncode(self, status):
787         if os.WIFSIGNALED(status):
788             # The child process died because of a signal.
789             return -os.WTERMSIG(status)
790         elif os.WIFEXITED(status):
791             # The child process exited (e.g sys.exit()).
792             return os.WEXITSTATUS(status)
793         else:
794             # The child exited, but we don't understand its status.
795             # This shouldn't happen, but if it does, let's just
796             # return that status; perhaps that helps debug it.
797             return status
798
799
800 class SafeChildWatcher(BaseChildWatcher):
801     """'Safe' child watcher implementation.
802
803     This implementation avoids disrupting other code spawning processes by
804     polling explicitly each process in the SIGCHLD handler instead of calling
805     os.waitpid(-1).
806
807     This is a safe solution but it has a significant overhead when handling a
808     big number of children (O(n) each time SIGCHLD is raised)
809     """
810
811     def __init__(self):
812         super(SafeChildWatcher, self).__init__()
813         self._callbacks = {}
814
815     def close(self):
816         self._callbacks.clear()
817         super(SafeChildWatcher, self).close()
818
819     def __enter__(self):
820         return self
821
822     def __exit__(self, a, b, c):
823         pass
824
825     def add_child_handler(self, pid, callback, *args):
826         self._callbacks[pid] = (callback, args)
827
828         # Prevent a race condition in case the child is already terminated.
829         self._do_waitpid(pid)
830
831     def remove_child_handler(self, pid):
832         try:
833             del self._callbacks[pid]
834             return True
835         except KeyError:
836             return False
837
838     def _do_waitpid_all(self):
839
840         for pid in list(self._callbacks):
841             self._do_waitpid(pid)
842
843     def _do_waitpid(self, expected_pid):
844         assert expected_pid > 0
845
846         try:
847             pid, status = os.waitpid(expected_pid, os.WNOHANG)
848         except ChildProcessError:
849             # The child process is already reaped
850             # (may happen if waitpid() is called elsewhere).
851             pid = expected_pid
852             returncode = 255
853             logger.warning(
854                 "Unknown child process pid %d, will report returncode 255",
855                 pid)
856         else:
857             if pid == 0:
858                 # The child process is still alive.
859                 return
860
861             returncode = self._compute_returncode(status)
862             if self._loop.get_debug():
863                 logger.debug('process %s exited with returncode %s',
864                              expected_pid, returncode)
865
866         try:
867             callback, args = self._callbacks.pop(pid)
868         except KeyError:  # pragma: no cover
869             # May happen if .remove_child_handler() is called
870             # after os.waitpid() returns.
871             if self._loop.get_debug():
872                 logger.warning("Child watcher got an unexpected pid: %r",
873                                pid, exc_info=True)
874         else:
875             callback(pid, returncode, *args)
876
877
878 class FastChildWatcher(BaseChildWatcher):
879     """'Fast' child watcher implementation.
880
881     This implementation reaps every terminated processes by calling
882     os.waitpid(-1) directly, possibly breaking other code spawning processes
883     and waiting for their termination.
884
885     There is no noticeable overhead when handling a big number of children
886     (O(1) each time a child terminates).
887     """
888     def __init__(self):
889         super(FastChildWatcher, self).__init__()
890         self._callbacks = {}
891         self._lock = threading.Lock()
892         self._zombies = {}
893         self._forks = 0
894
895     def close(self):
896         self._callbacks.clear()
897         self._zombies.clear()
898         super(FastChildWatcher, self).close()
899
900     def __enter__(self):
901         with self._lock:
902             self._forks += 1
903
904             return self
905
906     def __exit__(self, a, b, c):
907         with self._lock:
908             self._forks -= 1
909
910             if self._forks or not self._zombies:
911                 return
912
913             collateral_victims = str(self._zombies)
914             self._zombies.clear()
915
916         logger.warning(
917             "Caught subprocesses termination from unknown pids: %s",
918             collateral_victims)
919
920     def add_child_handler(self, pid, callback, *args):
921         assert self._forks, "Must use the context manager"
922         with self._lock:
923             try:
924                 returncode = self._zombies.pop(pid)
925             except KeyError:
926                 # The child is running.
927                 self._callbacks[pid] = callback, args
928                 return
929
930         # The child is dead already. We can fire the callback.
931         callback(pid, returncode, *args)
932
933     def remove_child_handler(self, pid):
934         try:
935             del self._callbacks[pid]
936             return True
937         except KeyError:
938             return False
939
940     def _do_waitpid_all(self):
941         # Because of signal coalescing, we must keep calling waitpid() as
942         # long as we're able to reap a child.
943         while True:
944             try:
945                 pid, status = wrap_error(os.waitpid, -1, os.WNOHANG)
946             except ChildProcessError:
947                 # No more child processes exist.
948                 return
949             else:
950                 if pid == 0:
951                     # A child process is still alive.
952                     return
953
954                 returncode = self._compute_returncode(status)
955
956             with self._lock:
957                 try:
958                     callback, args = self._callbacks.pop(pid)
959                 except KeyError:
960                     # unknown child
961                     if self._forks:
962                         # It may not be registered yet.
963                         self._zombies[pid] = returncode
964                         if self._loop.get_debug():
965                             logger.debug('unknown process %s exited '
966                                          'with returncode %s',
967                                          pid, returncode)
968                         continue
969                     callback = None
970                 else:
971                     if self._loop.get_debug():
972                         logger.debug('process %s exited with returncode %s',
973                                      pid, returncode)
974
975             if callback is None:
976                 logger.warning(
977                     "Caught subprocess termination from unknown pid: "
978                     "%d -> %d", pid, returncode)
979             else:
980                 callback(pid, returncode, *args)
981
982
983 class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
984     """UNIX event loop policy with a watcher for child processes."""
985     _loop_factory = _UnixSelectorEventLoop
986
987     def __init__(self):
988         super(_UnixDefaultEventLoopPolicy, self).__init__()
989         self._watcher = None
990
991     def _init_watcher(self):
992         with events._lock:
993             if self._watcher is None:  # pragma: no branch
994                 self._watcher = SafeChildWatcher()
995                 if isinstance(threading.current_thread(),
996                               threading._MainThread):
997                     self._watcher.attach_loop(self._local._loop)
998
999     def set_event_loop(self, loop):
1000         """Set the event loop.
1001
1002         As a side effect, if a child watcher was set before, then calling
1003         .set_event_loop() from the main thread will call .attach_loop(loop) on
1004         the child watcher.
1005         """
1006
1007         super(_UnixDefaultEventLoopPolicy, self).set_event_loop(loop)
1008
1009         if self._watcher is not None and \
1010             isinstance(threading.current_thread(), threading._MainThread):
1011             self._watcher.attach_loop(loop)
1012
1013     def get_child_watcher(self):
1014         """Get the watcher for child processes.
1015
1016         If not yet set, a SafeChildWatcher object is automatically created.
1017         """
1018         if self._watcher is None:
1019             self._init_watcher()
1020
1021         return self._watcher
1022
1023     def set_child_watcher(self, watcher):
1024         """Set the watcher for child processes."""
1025
1026         assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1027
1028         if self._watcher is not None:
1029             self._watcher.close()
1030
1031         self._watcher = watcher
1032
1033 SelectorEventLoop = _UnixSelectorEventLoop
1034 DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy