1 from __future__ import absolute_import
3 __all__ = ['create_subprocess_exec', 'create_subprocess_shell']
8 from . import protocols
11 from .coroutines import coroutine, From, Return
12 from .py33_exceptions import BrokenPipeError, ConnectionResetError
13 from .log import logger
16 PIPE = subprocess.PIPE
17 STDOUT = subprocess.STDOUT
18 if hasattr(subprocess, 'DEVNULL'):
19 DEVNULL = subprocess.DEVNULL
22 class SubprocessStreamProtocol(streams.FlowControlMixin,
23 protocols.SubprocessProtocol):
24 """Like StreamReaderProtocol, but for a subprocess."""
26 def __init__(self, limit, loop):
27 super(SubprocessStreamProtocol, self).__init__(loop=loop)
29 self.stdin = self.stdout = self.stderr = None
30 self._transport = None
33 info = [self.__class__.__name__]
34 if self.stdin is not None:
35 info.append('stdin=%r' % self.stdin)
36 if self.stdout is not None:
37 info.append('stdout=%r' % self.stdout)
38 if self.stderr is not None:
39 info.append('stderr=%r' % self.stderr)
40 return '<%s>' % ' '.join(info)
42 def connection_made(self, transport):
43 self._transport = transport
45 stdout_transport = transport.get_pipe_transport(1)
46 if stdout_transport is not None:
47 self.stdout = streams.StreamReader(limit=self._limit,
49 self.stdout.set_transport(stdout_transport)
51 stderr_transport = transport.get_pipe_transport(2)
52 if stderr_transport is not None:
53 self.stderr = streams.StreamReader(limit=self._limit,
55 self.stderr.set_transport(stderr_transport)
57 stdin_transport = transport.get_pipe_transport(0)
58 if stdin_transport is not None:
59 self.stdin = streams.StreamWriter(stdin_transport,
64 def pipe_data_received(self, fd, data):
71 if reader is not None:
72 reader.feed_data(data)
74 def pipe_connection_lost(self, fd, exc):
79 self.connection_lost(exc)
91 reader.set_exception(exc)
93 def process_exited(self):
94 self._transport.close()
95 self._transport = None
99 def __init__(self, transport, protocol, loop):
100 self._transport = transport
101 self._protocol = protocol
103 self.stdin = protocol.stdin
104 self.stdout = protocol.stdout
105 self.stderr = protocol.stderr
106 self.pid = transport.get_pid()
109 return '<%s %s>' % (self.__class__.__name__, self.pid)
112 def returncode(self):
113 return self._transport.get_returncode()
117 """Wait until the process exit and return the process return code.
119 This method is a coroutine."""
120 return_code = yield From(self._transport._wait())
121 raise Return(return_code)
123 def send_signal(self, signal):
124 self._transport.send_signal(signal)
127 self._transport.terminate()
130 self._transport.kill()
133 def _feed_stdin(self, input):
134 debug = self._loop.get_debug()
135 self.stdin.write(input)
137 logger.debug('%r communicate: feed stdin (%s bytes)',
140 yield From(self.stdin.drain())
141 except (BrokenPipeError, ConnectionResetError) as exc:
142 # communicate() ignores BrokenPipeError and ConnectionResetError
144 logger.debug('%r communicate: stdin got %r', self, exc)
147 logger.debug('%r communicate: close stdin', self)
155 def _read_stream(self, fd):
156 transport = self._transport.get_pipe_transport(fd)
162 if self._loop.get_debug():
163 name = 'stdout' if fd == 1 else 'stderr'
164 logger.debug('%r communicate: read %s', self, name)
165 output = yield From(stream.read())
166 if self._loop.get_debug():
167 name = 'stdout' if fd == 1 else 'stderr'
168 logger.debug('%r communicate: close %s', self, name)
173 def communicate(self, input=None):
175 stdin = self._feed_stdin(input)
178 if self.stdout is not None:
179 stdout = self._read_stream(1)
181 stdout = self._noop()
182 if self.stderr is not None:
183 stderr = self._read_stream(2)
185 stderr = self._noop()
186 stdin, stdout, stderr = yield From(tasks.gather(stdin, stdout, stderr,
188 yield From(self.wait())
189 raise Return(stdout, stderr)
193 def create_subprocess_shell(cmd, **kwds):
194 stdin = kwds.pop('stdin', None)
195 stdout = kwds.pop('stdout', None)
196 stderr = kwds.pop('stderr', None)
197 loop = kwds.pop('loop', None)
198 limit = kwds.pop('limit', streams._DEFAULT_LIMIT)
200 loop = events.get_event_loop()
201 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
203 transport, protocol = yield From(loop.subprocess_shell(
205 cmd, stdin=stdin, stdout=stdout,
206 stderr=stderr, **kwds))
207 raise Return(Process(transport, protocol, loop))
210 def create_subprocess_exec(program, *args, **kwds):
211 stdin = kwds.pop('stdin', None)
212 stdout = kwds.pop('stdout', None)
213 stderr = kwds.pop('stderr', None)
214 loop = kwds.pop('loop', None)
215 limit = kwds.pop('limit', streams._DEFAULT_LIMIT)
217 loop = events.get_event_loop()
218 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
220 transport, protocol = yield From(loop.subprocess_exec(
223 stdin=stdin, stdout=stdout,
224 stderr=stderr, **kwds))
225 raise Return(Process(transport, protocol, loop))