efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / base_subprocess.py
1 import collections
2 import subprocess
3 import warnings
4
5 from . import compat
6 from . import futures
7 from . import protocols
8 from . import transports
9 from .coroutines import coroutine, From, Return
10 from .log import logger
11 from .py33_exceptions import ProcessLookupError
12
13
14 class BaseSubprocessTransport(transports.SubprocessTransport):
15
16     def __init__(self, loop, protocol, args, shell,
17                  stdin, stdout, stderr, bufsize,
18                  waiter=None, extra=None, **kwargs):
19         super(BaseSubprocessTransport, self).__init__(extra)
20         self._closed = False
21         self._protocol = protocol
22         self._loop = loop
23         self._proc = None
24         self._pid = None
25         self._returncode = None
26         self._exit_waiters = []
27         self._pending_calls = collections.deque()
28         self._pipes = {}
29         self._finished = False
30
31         if stdin == subprocess.PIPE:
32             self._pipes[0] = None
33         if stdout == subprocess.PIPE:
34             self._pipes[1] = None
35         if stderr == subprocess.PIPE:
36             self._pipes[2] = None
37
38         # Create the child process: set the _proc attribute
39         try:
40             self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
41                         stderr=stderr, bufsize=bufsize, **kwargs)
42         except:
43             self.close()
44             raise
45
46         self._pid = self._proc.pid
47         self._extra['subprocess'] = self._proc
48
49         if self._loop.get_debug():
50             if isinstance(args, (bytes, str)):
51                 program = args
52             else:
53                 program = args[0]
54             logger.debug('process %r created: pid %s',
55                          program, self._pid)
56
57         self._loop.create_task(self._connect_pipes(waiter))
58
59     def __repr__(self):
60         info = [self.__class__.__name__]
61         if self._closed:
62             info.append('closed')
63         if self._pid is not None:
64             info.append('pid=%s' % self._pid)
65         if self._returncode is not None:
66             info.append('returncode=%s' % self._returncode)
67         elif self._pid is not None:
68             info.append('running')
69         else:
70             info.append('not started')
71
72         stdin = self._pipes.get(0)
73         if stdin is not None:
74             info.append('stdin=%s' % stdin.pipe)
75
76         stdout = self._pipes.get(1)
77         stderr = self._pipes.get(2)
78         if stdout is not None and stderr is stdout:
79             info.append('stdout=stderr=%s' % stdout.pipe)
80         else:
81             if stdout is not None:
82                 info.append('stdout=%s' % stdout.pipe)
83             if stderr is not None:
84                 info.append('stderr=%s' % stderr.pipe)
85
86         return '<%s>' % ' '.join(info)
87
88     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
89         raise NotImplementedError
90
91     def close(self):
92         if self._closed:
93             return
94         self._closed = True
95
96         for proto in self._pipes.values():
97             if proto is None:
98                 continue
99             proto.pipe.close()
100
101         if (self._proc is not None
102         # the child process finished?
103         and self._returncode is None
104         # the child process finished but the transport was not notified yet?
105         and self._proc.poll() is None
106         ):
107             if self._loop.get_debug():
108                 logger.warning('Close running child process: kill %r', self)
109
110             try:
111                 self._proc.kill()
112             except ProcessLookupError:
113                 pass
114
115             # Don't clear the _proc reference yet: _post_init() may still run
116
117     # On Python 3.3 and older, objects with a destructor part of a reference
118     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
119     # to the PEP 442.
120     if compat.PY34:
121         def __del__(self):
122             if not self._closed:
123                 warnings.warn("unclosed transport %r" % self, ResourceWarning)
124                 self.close()
125
126     def get_pid(self):
127         return self._pid
128
129     def get_returncode(self):
130         return self._returncode
131
132     def get_pipe_transport(self, fd):
133         if fd in self._pipes:
134             return self._pipes[fd].pipe
135         else:
136             return None
137
138     def _check_proc(self):
139         if self._proc is None:
140             raise ProcessLookupError()
141
142     def send_signal(self, signal):
143         self._check_proc()
144         self._proc.send_signal(signal)
145
146     def terminate(self):
147         self._check_proc()
148         self._proc.terminate()
149
150     def kill(self):
151         self._check_proc()
152         self._proc.kill()
153
154     @coroutine
155     def _connect_pipes(self, waiter):
156         try:
157             proc = self._proc
158             loop = self._loop
159
160             if proc.stdin is not None:
161                 _, pipe = yield From(loop.connect_write_pipe(
162                     lambda: WriteSubprocessPipeProto(self, 0),
163                     proc.stdin))
164                 self._pipes[0] = pipe
165
166             if proc.stdout is not None:
167                 _, pipe = yield From(loop.connect_read_pipe(
168                     lambda: ReadSubprocessPipeProto(self, 1),
169                     proc.stdout))
170                 self._pipes[1] = pipe
171
172             if proc.stderr is not None:
173                 _, pipe = yield From(loop.connect_read_pipe(
174                     lambda: ReadSubprocessPipeProto(self, 2),
175                     proc.stderr))
176                 self._pipes[2] = pipe
177
178             assert self._pending_calls is not None
179
180             loop.call_soon(self._protocol.connection_made, self)
181             for callback, data in self._pending_calls:
182                 loop.call_soon(callback, *data)
183             self._pending_calls = None
184         except Exception as exc:
185             if waiter is not None and not waiter.cancelled():
186                 waiter.set_exception(exc)
187         else:
188             if waiter is not None and not waiter.cancelled():
189                 waiter.set_result(None)
190
191     def _call(self, cb, *data):
192         if self._pending_calls is not None:
193             self._pending_calls.append((cb, data))
194         else:
195             self._loop.call_soon(cb, *data)
196
197     def _pipe_connection_lost(self, fd, exc):
198         self._call(self._protocol.pipe_connection_lost, fd, exc)
199         self._try_finish()
200
201     def _pipe_data_received(self, fd, data):
202         self._call(self._protocol.pipe_data_received, fd, data)
203
204     def _process_exited(self, returncode):
205         assert returncode is not None, returncode
206         assert self._returncode is None, self._returncode
207         if self._loop.get_debug():
208             logger.info('%r exited with return code %r',
209                         self, returncode)
210         self._returncode = returncode
211         self._call(self._protocol.process_exited)
212         self._try_finish()
213
214         # wake up futures waiting for wait()
215         for waiter in self._exit_waiters:
216             if not waiter.cancelled():
217                 waiter.set_result(returncode)
218         self._exit_waiters = None
219
220     @coroutine
221     def _wait(self):
222         """Wait until the process exit and return the process return code.
223
224         This method is a coroutine."""
225         if self._returncode is not None:
226             raise Return(self._returncode)
227
228         waiter = futures.Future(loop=self._loop)
229         self._exit_waiters.append(waiter)
230         returncode = yield From(waiter)
231         raise Return(returncode)
232
233     def _try_finish(self):
234         assert not self._finished
235         if self._returncode is None:
236             return
237         if all(p is not None and p.disconnected
238                for p in self._pipes.values()):
239             self._finished = True
240             self._call(self._call_connection_lost, None)
241
242     def _call_connection_lost(self, exc):
243         try:
244             self._protocol.connection_lost(exc)
245         finally:
246             self._loop = None
247             self._proc = None
248             self._protocol = None
249
250
251 class WriteSubprocessPipeProto(protocols.BaseProtocol):
252
253     def __init__(self, proc, fd):
254         self.proc = proc
255         self.fd = fd
256         self.pipe = None
257         self.disconnected = False
258
259     def connection_made(self, transport):
260         self.pipe = transport
261
262     def __repr__(self):
263         return ('<%s fd=%s pipe=%r>'
264                 % (self.__class__.__name__, self.fd, self.pipe))
265
266     def connection_lost(self, exc):
267         self.disconnected = True
268         self.proc._pipe_connection_lost(self.fd, exc)
269         self.proc = None
270
271     def pause_writing(self):
272         self.proc._protocol.pause_writing()
273
274     def resume_writing(self):
275         self.proc._protocol.resume_writing()
276
277
278 class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
279                               protocols.Protocol):
280
281     def data_received(self, data):
282         self.proc._pipe_data_received(self.fd, data)