1 """Selector event loop for Unix with signal handling."""
2 from __future__ import absolute_import
15 from . import base_events
16 from . import base_subprocess
18 from . import constants
19 from . import coroutines
22 from . import selector_events
23 from . import selectors
24 from . import transports
25 from .compat import flatten_bytes
26 from .coroutines import coroutine, From, Return
27 from .log import logger
28 from .py33_exceptions import (
30 BlockingIOError, BrokenPipeError, ConnectionResetError,
31 InterruptedError, ChildProcessError)
34 __all__ = ['SelectorEventLoop',
35 'AbstractChildWatcher', 'SafeChildWatcher',
36 'FastChildWatcher', 'DefaultEventLoopPolicy',
39 if sys.platform == 'win32': # pragma: no cover
40 raise ImportError('Signals are not really supported on Windows')
44 def _sighandler_noop(signum, frame):
45 """Dummy signal handler."""
49 class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
52 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
55 def __init__(self, selector=None):
56 super(_UnixSelectorEventLoop, self).__init__(selector)
57 self._signal_handlers = {}
59 def _socketpair(self):
60 return socket.socketpair()
63 super(_UnixSelectorEventLoop, self).close()
64 for sig in list(self._signal_handlers):
65 self.remove_signal_handler(sig)
67 # On Python <= 3.2, the C signal handler of Python writes a null byte into
68 # the wakeup file descriptor. We cannot retrieve the signal numbers from
69 # the file descriptor.
71 def _process_self_data(self, data):
74 # ignore null bytes written by _write_to_self()
76 self._handle_signal(signum)
78 def add_signal_handler(self, sig, callback, *args):
79 """Add a handler for a signal. UNIX only.
81 Raise ValueError if the signal number is invalid or uncatchable.
82 Raise RuntimeError if there is a problem setting up the handler.
84 if (coroutines.iscoroutine(callback)
85 or coroutines.iscoroutinefunction(callback)):
86 raise TypeError("coroutines cannot be used "
87 "with add_signal_handler()")
88 self._check_signal(sig)
91 # set_wakeup_fd() raises ValueError if this is not the
92 # main thread. By calling it early we ensure that an
93 # event loop running in another thread cannot add a signal
95 signal.set_wakeup_fd(self._csock.fileno())
96 except (ValueError, OSError) as exc:
97 raise RuntimeError(str(exc))
99 handle = events.Handle(callback, args, self)
100 self._signal_handlers[sig] = handle
104 # On Python 3.3 and newer, the C signal handler writes the
105 # signal number into the wakeup file descriptor and then calls
106 # Py_AddPendingCall() to schedule the Python signal handler.
108 # Register a dummy signal handler to ask Python to write the
109 # signal number into the wakup file descriptor.
110 # _process_self_data() will read signal numbers from this file
111 # descriptor to handle signals.
112 signal.signal(sig, _sighandler_noop)
114 # On Python 3.2 and older, the C signal handler first calls
115 # Py_AddPendingCall() to schedule the Python signal handler,
116 # and then write a null byte into the wakeup file descriptor.
117 signal.signal(sig, self._handle_signal)
119 # Set SA_RESTART to limit EINTR occurrences.
120 signal.siginterrupt(sig, False)
121 except (RuntimeError, OSError) as exc:
122 # On Python 2, signal.signal(signal.SIGKILL, signal.SIG_IGN) raises
123 # RuntimeError(22, 'Invalid argument'). On Python 3,
124 # OSError(22, 'Invalid argument') is raised instead.
125 exc_type, exc_value, tb = sys.exc_info()
127 del self._signal_handlers[sig]
128 if not self._signal_handlers:
130 signal.set_wakeup_fd(-1)
131 except (ValueError, OSError) as nexc:
132 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
134 if isinstance(exc, RuntimeError) or exc.errno == errno.EINVAL:
135 raise RuntimeError('sig {0} cannot be caught'.format(sig))
137 reraise(exc_type, exc_value, tb)
139 def _handle_signal(self, sig, frame=None):
140 """Internal helper that is the actual signal handler."""
141 handle = self._signal_handlers.get(sig)
143 return # Assume it's some race condition.
144 if handle._cancelled:
145 self.remove_signal_handler(sig) # Remove it properly.
147 self._add_callback_signalsafe(handle)
149 def remove_signal_handler(self, sig):
150 """Remove a handler for a signal. UNIX only.
152 Return True if a signal handler was removed, False if not.
154 self._check_signal(sig)
156 del self._signal_handlers[sig]
160 if sig == signal.SIGINT:
161 handler = signal.default_int_handler
163 handler = signal.SIG_DFL
166 signal.signal(sig, handler)
167 except OSError as exc:
168 if exc.errno == errno.EINVAL:
169 raise RuntimeError('sig {0} cannot be caught'.format(sig))
173 if not self._signal_handlers:
175 signal.set_wakeup_fd(-1)
176 except (ValueError, OSError) as exc:
177 logger.info('set_wakeup_fd(-1) failed: %s', exc)
181 def _check_signal(self, sig):
182 """Internal helper to validate a signal.
184 Raise ValueError if the signal number is invalid or uncatchable.
185 Raise RuntimeError if there is a problem setting up the handler.
187 if not isinstance(sig, int):
188 raise TypeError('sig must be an int, not {0!r}'.format(sig))
190 if not (1 <= sig < signal.NSIG):
192 'sig {0} out of range(1, {1})'.format(sig, signal.NSIG))
194 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
196 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
198 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
200 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
203 def _make_subprocess_transport(self, protocol, args, shell,
204 stdin, stdout, stderr, bufsize,
205 extra=None, **kwargs):
206 with events.get_child_watcher() as watcher:
207 waiter = futures.Future(loop=self)
208 transp = _UnixSubprocessTransport(self, protocol, args, shell,
209 stdin, stdout, stderr, bufsize,
210 waiter=waiter, extra=extra,
213 watcher.add_child_handler(transp.get_pid(),
214 self._child_watcher_callback, transp)
217 except Exception as exc:
218 # Workaround CPython bug #23353: using yield/yield-from in an
219 # except block of a generator doesn't clear properly
227 yield From(transp._wait())
232 def _child_watcher_callback(self, pid, returncode, transp):
233 self.call_soon_threadsafe(transp._process_exited, returncode)
236 def create_unix_connection(self, protocol_factory, path,
238 server_hostname=None):
239 assert server_hostname is None or isinstance(server_hostname, str)
241 if server_hostname is None:
243 'you have to pass server_hostname when using ssl')
245 if server_hostname is not None:
246 raise ValueError('server_hostname is only meaningful with ssl')
251 'path and sock can not be specified at the same time')
253 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
255 sock.setblocking(False)
256 yield From(self.sock_connect(sock, path))
263 raise ValueError('no path and sock were specified')
264 sock.setblocking(False)
266 transport, protocol = yield From(self._create_connection_transport(
267 sock, protocol_factory, ssl, server_hostname))
268 raise Return(transport, protocol)
271 def create_unix_server(self, protocol_factory, path=None,
272 sock=None, backlog=100, ssl=None):
273 if isinstance(ssl, bool):
274 raise TypeError('ssl argument must be an SSLContext or None')
279 'path and sock can not be specified at the same time')
281 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
285 except socket.error as exc:
287 if exc.errno == errno.EADDRINUSE:
288 # Let's improve the error message by adding
289 # with what exact address it occurs.
290 msg = 'Address {0!r} is already in use'.format(path)
291 raise OSError(errno.EADDRINUSE, msg)
300 'path was not specified, and no sock specified')
302 if sock.family != socket.AF_UNIX:
304 'A UNIX Domain Socket was expected, got {0!r}'.format(sock))
306 server = base_events.Server(self, [sock])
308 sock.setblocking(False)
309 self._start_serving(protocol_factory, sock, ssl, server)
313 if hasattr(os, 'set_blocking'):
314 # Python 3.5 and newer
315 def _set_nonblocking(fd):
316 os.set_blocking(fd, False)
320 def _set_nonblocking(fd):
321 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
322 flags = flags | os.O_NONBLOCK
323 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
326 class _UnixReadPipeTransport(transports.ReadTransport):
328 max_size = 256 * 1024 # max bytes we read in one event loop iteration
330 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
331 super(_UnixReadPipeTransport, self).__init__(extra)
332 self._extra['pipe'] = pipe
335 self._fileno = pipe.fileno()
336 mode = os.fstat(self._fileno).st_mode
337 if not (stat.S_ISFIFO(mode) or
338 stat.S_ISSOCK(mode) or
340 raise ValueError("Pipe transport is for pipes/sockets only.")
341 _set_nonblocking(self._fileno)
342 self._protocol = protocol
343 self._closing = False
344 self._loop.call_soon(self._protocol.connection_made, self)
345 # only start reading when connection_made() has been called
346 self._loop.call_soon(self._loop.add_reader,
347 self._fileno, self._read_ready)
348 if waiter is not None:
349 # only wake up the waiter when connection_made() has been called
350 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
353 info = [self.__class__.__name__]
354 if self._pipe is None:
355 info.append('closed')
357 info.append('closing')
358 info.append('fd=%s' % self._fileno)
359 if self._pipe is not None:
360 polling = selector_events._test_selector_event(
361 self._loop._selector,
362 self._fileno, selectors.EVENT_READ)
364 info.append('polling')
368 info.append('closed')
369 return '<%s>' % ' '.join(info)
371 def _read_ready(self):
373 data = wrap_error(os.read, self._fileno, self.max_size)
374 except (BlockingIOError, InterruptedError):
376 except OSError as exc:
377 self._fatal_error(exc, 'Fatal read error on pipe transport')
380 self._protocol.data_received(data)
382 if self._loop.get_debug():
383 logger.info("%r was closed by peer", self)
385 self._loop.remove_reader(self._fileno)
386 self._loop.call_soon(self._protocol.eof_received)
387 self._loop.call_soon(self._call_connection_lost, None)
389 def pause_reading(self):
390 self._loop.remove_reader(self._fileno)
392 def resume_reading(self):
393 self._loop.add_reader(self._fileno, self._read_ready)
396 if not self._closing:
399 # On Python 3.3 and older, objects with a destructor part of a reference
400 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
404 if self._pipe is not None:
405 warnings.warn("unclosed transport %r" % self, ResourceWarning)
408 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
409 # should be called by exception handler only
410 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
411 if self._loop.get_debug():
412 logger.debug("%r: %s", self, message, exc_info=True)
414 self._loop.call_exception_handler({
418 'protocol': self._protocol,
422 def _close(self, exc):
424 self._loop.remove_reader(self._fileno)
425 self._loop.call_soon(self._call_connection_lost, exc)
427 def _call_connection_lost(self, exc):
429 self._protocol.connection_lost(exc)
433 self._protocol = None
437 class _UnixWritePipeTransport(transports._FlowControlMixin,
438 transports.WriteTransport):
440 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
441 super(_UnixWritePipeTransport, self).__init__(extra, loop)
442 self._extra['pipe'] = pipe
444 self._fileno = pipe.fileno()
445 mode = os.fstat(self._fileno).st_mode
446 is_socket = stat.S_ISSOCK(mode)
448 stat.S_ISFIFO(mode) or
450 raise ValueError("Pipe transport is only for "
451 "pipes, sockets and character devices")
452 _set_nonblocking(self._fileno)
453 self._protocol = protocol
456 self._closing = False # Set when close() or write_eof() called.
458 self._loop.call_soon(self._protocol.connection_made, self)
460 # On AIX, the reader trick (to be notified when the read end of the
461 # socket is closed) only works for sockets. On other platforms it
462 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
463 if is_socket or not sys.platform.startswith("aix"):
464 # only start reading when connection_made() has been called
465 self._loop.call_soon(self._loop.add_reader,
466 self._fileno, self._read_ready)
468 if waiter is not None:
469 # only wake up the waiter when connection_made() has been called
470 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
473 info = [self.__class__.__name__]
474 if self._pipe is None:
475 info.append('closed')
477 info.append('closing')
478 info.append('fd=%s' % self._fileno)
479 if self._pipe is not None:
480 polling = selector_events._test_selector_event(
481 self._loop._selector,
482 self._fileno, selectors.EVENT_WRITE)
484 info.append('polling')
488 bufsize = self.get_write_buffer_size()
489 info.append('bufsize=%s' % bufsize)
491 info.append('closed')
492 return '<%s>' % ' '.join(info)
494 def get_write_buffer_size(self):
495 return sum(len(data) for data in self._buffer)
497 def _read_ready(self):
498 # Pipe was closed by peer.
499 if self._loop.get_debug():
500 logger.info("%r was closed by peer", self)
502 self._close(BrokenPipeError())
506 def write(self, data):
507 data = flatten_bytes(data)
511 if self._conn_lost or self._closing:
512 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
513 logger.warning('pipe closed by peer or '
514 'os.write(pipe, data) raised exception.')
519 # Attempt to send it right away first.
521 n = wrap_error(os.write, self._fileno, data)
522 except (BlockingIOError, InterruptedError):
524 except Exception as exc:
526 self._fatal_error(exc, 'Fatal write error on pipe transport')
532 self._loop.add_writer(self._fileno, self._write_ready)
534 self._buffer.append(data)
535 self._maybe_pause_protocol()
537 def _write_ready(self):
538 data = b''.join(self._buffer)
539 assert data, 'Data should not be empty'
543 n = wrap_error(os.write, self._fileno, data)
544 except (BlockingIOError, InterruptedError):
545 self._buffer.append(data)
546 except Exception as exc:
548 # Remove writer here, _fatal_error() doesn't it
549 # because _buffer is empty.
550 self._loop.remove_writer(self._fileno)
551 self._fatal_error(exc, 'Fatal write error on pipe transport')
554 self._loop.remove_writer(self._fileno)
555 self._maybe_resume_protocol() # May append to buffer.
556 if not self._buffer and self._closing:
557 self._loop.remove_reader(self._fileno)
558 self._call_connection_lost(None)
563 self._buffer.append(data) # Try again later.
565 def can_write_eof(self):
574 self._loop.remove_reader(self._fileno)
575 self._loop.call_soon(self._call_connection_lost, None)
578 if self._pipe is not None and not self._closing:
579 # write_eof is all what we needed to close the write pipe
582 # On Python 3.3 and older, objects with a destructor part of a reference
583 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
587 if self._pipe is not None:
588 warnings.warn("unclosed transport %r" % self, ResourceWarning)
594 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
595 # should be called by exception handler only
596 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
597 if self._loop.get_debug():
598 logger.debug("%r: %s", self, message, exc_info=True)
600 self._loop.call_exception_handler({
604 'protocol': self._protocol,
608 def _close(self, exc=None):
611 self._loop.remove_writer(self._fileno)
613 self._loop.remove_reader(self._fileno)
614 self._loop.call_soon(self._call_connection_lost, exc)
616 def _call_connection_lost(self, exc):
618 self._protocol.connection_lost(exc)
622 self._protocol = None
626 if hasattr(os, 'set_inheritable'):
627 # Python 3.4 and newer
628 _set_inheritable = os.set_inheritable
632 def _set_inheritable(fd, inheritable):
633 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
635 old = fcntl.fcntl(fd, fcntl.F_GETFD)
637 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
639 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
642 class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
644 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
646 if stdin == subprocess.PIPE:
647 # Use a socket pair for stdin, since not all platforms
648 # support selecting read events on the write end of a
649 # socket (which we use in order to detect closing of the
650 # other end). Notably this is needed on AIX, and works
651 # just fine on other platforms.
652 stdin, stdin_w = self._loop._socketpair()
654 # Mark the write end of the stdin pipe as non-inheritable,
655 # needed by close_fds=False on Python 3.3 and older
656 # (Python 3.4 implements the PEP 446, socketpair returns
657 # non-inheritable sockets)
658 _set_inheritable(stdin_w.fileno(), False)
659 self._proc = subprocess.Popen(
660 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
661 universal_newlines=False, bufsize=bufsize, **kwargs)
662 if stdin_w is not None:
663 # Retrieve the file descriptor from stdin_w, stdin_w should not
664 # "own" the file descriptor anymore: closing stdin_fd file
665 # descriptor must close immediatly the file
667 if hasattr(stdin_w, 'detach'):
668 stdin_fd = stdin_w.detach()
669 self._proc.stdin = os.fdopen(stdin_fd, 'wb', bufsize)
671 stdin_dup = os.dup(stdin_w.fileno())
673 self._proc.stdin = os.fdopen(stdin_dup, 'wb', bufsize)
676 class AbstractChildWatcher(object):
677 """Abstract base class for monitoring child processes.
679 Objects derived from this class monitor a collection of subprocesses and
680 report their termination or interruption by a signal.
682 New callbacks are registered with .add_child_handler(). Starting a new
683 process must be done within a 'with' block to allow the watcher to suspend
684 its activity until the new process if fully registered (this is needed to
685 prevent a race condition in some implementations).
689 proc = subprocess.Popen("sleep 1")
690 watcher.add_child_handler(proc.pid, callback)
693 Implementations of this class must be thread-safe.
695 Since child watcher objects may catch the SIGCHLD signal and call
696 waitpid(-1), there should be only one active object per process.
699 def add_child_handler(self, pid, callback, *args):
700 """Register a new child handler.
702 Arrange for callback(pid, returncode, *args) to be called when
703 process 'pid' terminates. Specifying another callback for the same
704 process replaces the previous handler.
706 Note: callback() must be thread-safe.
708 raise NotImplementedError()
710 def remove_child_handler(self, pid):
711 """Removes the handler for process 'pid'.
713 The function returns True if the handler was successfully removed,
714 False if there was nothing to remove."""
716 raise NotImplementedError()
718 def attach_loop(self, loop):
719 """Attach the watcher to an event loop.
721 If the watcher was previously attached to an event loop, then it is
722 first detached before attaching to the new loop.
724 Note: loop may be None.
726 raise NotImplementedError()
729 """Close the watcher.
731 This must be called to make sure that any underlying resource is freed.
733 raise NotImplementedError()
736 """Enter the watcher's context and allow starting new processes
738 This function must return self"""
739 raise NotImplementedError()
741 def __exit__(self, a, b, c):
742 """Exit the watcher's context"""
743 raise NotImplementedError()
746 class BaseChildWatcher(AbstractChildWatcher):
752 self.attach_loop(None)
754 def _do_waitpid(self, expected_pid):
755 raise NotImplementedError()
757 def _do_waitpid_all(self):
758 raise NotImplementedError()
760 def attach_loop(self, loop):
761 assert loop is None or isinstance(loop, events.AbstractEventLoop)
763 if self._loop is not None:
764 self._loop.remove_signal_handler(signal.SIGCHLD)
768 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
770 # Prevent a race condition in case a child terminated
772 self._do_waitpid_all()
776 self._do_waitpid_all()
777 except Exception as exc:
778 # self._loop should always be available here
779 # as '_sig_chld' is added as a signal handler
781 self._loop.call_exception_handler({
782 'message': 'Unknown exception in SIGCHLD handler',
786 def _compute_returncode(self, status):
787 if os.WIFSIGNALED(status):
788 # The child process died because of a signal.
789 return -os.WTERMSIG(status)
790 elif os.WIFEXITED(status):
791 # The child process exited (e.g sys.exit()).
792 return os.WEXITSTATUS(status)
794 # The child exited, but we don't understand its status.
795 # This shouldn't happen, but if it does, let's just
796 # return that status; perhaps that helps debug it.
800 class SafeChildWatcher(BaseChildWatcher):
801 """'Safe' child watcher implementation.
803 This implementation avoids disrupting other code spawning processes by
804 polling explicitly each process in the SIGCHLD handler instead of calling
807 This is a safe solution but it has a significant overhead when handling a
808 big number of children (O(n) each time SIGCHLD is raised)
812 super(SafeChildWatcher, self).__init__()
816 self._callbacks.clear()
817 super(SafeChildWatcher, self).close()
822 def __exit__(self, a, b, c):
825 def add_child_handler(self, pid, callback, *args):
826 self._callbacks[pid] = (callback, args)
828 # Prevent a race condition in case the child is already terminated.
829 self._do_waitpid(pid)
831 def remove_child_handler(self, pid):
833 del self._callbacks[pid]
838 def _do_waitpid_all(self):
840 for pid in list(self._callbacks):
841 self._do_waitpid(pid)
843 def _do_waitpid(self, expected_pid):
844 assert expected_pid > 0
847 pid, status = os.waitpid(expected_pid, os.WNOHANG)
848 except ChildProcessError:
849 # The child process is already reaped
850 # (may happen if waitpid() is called elsewhere).
854 "Unknown child process pid %d, will report returncode 255",
858 # The child process is still alive.
861 returncode = self._compute_returncode(status)
862 if self._loop.get_debug():
863 logger.debug('process %s exited with returncode %s',
864 expected_pid, returncode)
867 callback, args = self._callbacks.pop(pid)
868 except KeyError: # pragma: no cover
869 # May happen if .remove_child_handler() is called
870 # after os.waitpid() returns.
871 if self._loop.get_debug():
872 logger.warning("Child watcher got an unexpected pid: %r",
875 callback(pid, returncode, *args)
878 class FastChildWatcher(BaseChildWatcher):
879 """'Fast' child watcher implementation.
881 This implementation reaps every terminated processes by calling
882 os.waitpid(-1) directly, possibly breaking other code spawning processes
883 and waiting for their termination.
885 There is no noticeable overhead when handling a big number of children
886 (O(1) each time a child terminates).
889 super(FastChildWatcher, self).__init__()
891 self._lock = threading.Lock()
896 self._callbacks.clear()
897 self._zombies.clear()
898 super(FastChildWatcher, self).close()
906 def __exit__(self, a, b, c):
910 if self._forks or not self._zombies:
913 collateral_victims = str(self._zombies)
914 self._zombies.clear()
917 "Caught subprocesses termination from unknown pids: %s",
920 def add_child_handler(self, pid, callback, *args):
921 assert self._forks, "Must use the context manager"
924 returncode = self._zombies.pop(pid)
926 # The child is running.
927 self._callbacks[pid] = callback, args
930 # The child is dead already. We can fire the callback.
931 callback(pid, returncode, *args)
933 def remove_child_handler(self, pid):
935 del self._callbacks[pid]
940 def _do_waitpid_all(self):
941 # Because of signal coalescing, we must keep calling waitpid() as
942 # long as we're able to reap a child.
945 pid, status = wrap_error(os.waitpid, -1, os.WNOHANG)
946 except ChildProcessError:
947 # No more child processes exist.
951 # A child process is still alive.
954 returncode = self._compute_returncode(status)
958 callback, args = self._callbacks.pop(pid)
962 # It may not be registered yet.
963 self._zombies[pid] = returncode
964 if self._loop.get_debug():
965 logger.debug('unknown process %s exited '
966 'with returncode %s',
971 if self._loop.get_debug():
972 logger.debug('process %s exited with returncode %s',
977 "Caught subprocess termination from unknown pid: "
978 "%d -> %d", pid, returncode)
980 callback(pid, returncode, *args)
983 class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
984 """UNIX event loop policy with a watcher for child processes."""
985 _loop_factory = _UnixSelectorEventLoop
988 super(_UnixDefaultEventLoopPolicy, self).__init__()
991 def _init_watcher(self):
993 if self._watcher is None: # pragma: no branch
994 self._watcher = SafeChildWatcher()
995 if isinstance(threading.current_thread(),
996 threading._MainThread):
997 self._watcher.attach_loop(self._local._loop)
999 def set_event_loop(self, loop):
1000 """Set the event loop.
1002 As a side effect, if a child watcher was set before, then calling
1003 .set_event_loop() from the main thread will call .attach_loop(loop) on
1007 super(_UnixDefaultEventLoopPolicy, self).set_event_loop(loop)
1009 if self._watcher is not None and \
1010 isinstance(threading.current_thread(), threading._MainThread):
1011 self._watcher.attach_loop(loop)
1013 def get_child_watcher(self):
1014 """Get the watcher for child processes.
1016 If not yet set, a SafeChildWatcher object is automatically created.
1018 if self._watcher is None:
1019 self._init_watcher()
1021 return self._watcher
1023 def set_child_watcher(self, watcher):
1024 """Set the watcher for child processes."""
1026 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1028 if self._watcher is not None:
1029 self._watcher.close()
1031 self._watcher = watcher
1033 SelectorEventLoop = _UnixSelectorEventLoop
1034 DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy