efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / proactor_events.py
1 """Event loop using a proactor and related classes.
2
3 A proactor is a "notify-on-completion" multiplexer.  Currently a
4 proactor is only implemented on Windows with IOCP.
5 """
6
7 __all__ = ['BaseProactorEventLoop']
8
9 import socket
10 import warnings
11
12 from . import base_events
13 from . import compat
14 from . import constants
15 from . import futures
16 from . import sslproto
17 from . import transports
18 from .log import logger
19 from .compat import flatten_bytes
20 from .py33_exceptions import (BrokenPipeError,
21     ConnectionAbortedError, ConnectionResetError)
22
23
24 class _ProactorBasePipeTransport(transports._FlowControlMixin,
25                                  transports.BaseTransport):
26     """Base class for pipe and socket transports."""
27
28     def __init__(self, loop, sock, protocol, waiter=None,
29                  extra=None, server=None):
30         super(_ProactorBasePipeTransport, self).__init__(extra, loop)
31         self._set_extra(sock)
32         self._sock = sock
33         self._protocol = protocol
34         self._server = server
35         self._buffer = None  # None or bytearray.
36         self._read_fut = None
37         self._write_fut = None
38         self._pending_write = 0
39         self._conn_lost = 0
40         self._closing = False  # Set when close() called.
41         self._eof_written = False
42         if self._server is not None:
43             self._server._attach()
44         self._loop.call_soon(self._protocol.connection_made, self)
45         if waiter is not None:
46             # only wake up the waiter when connection_made() has been called
47             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
48
49     def __repr__(self):
50         info = [self.__class__.__name__]
51         if self._sock is None:
52             info.append('closed')
53         elif self._closing:
54             info.append('closing')
55         if self._sock is not None:
56             info.append('fd=%s' % self._sock.fileno())
57         if self._read_fut is not None:
58             info.append('read=%s' % self._read_fut)
59         if self._write_fut is not None:
60             info.append("write=%r" % self._write_fut)
61         if self._buffer:
62             bufsize = len(self._buffer)
63             info.append('write_bufsize=%s' % bufsize)
64         if self._eof_written:
65             info.append('EOF written')
66         return '<%s>' % ' '.join(info)
67
68     def _set_extra(self, sock):
69         self._extra['pipe'] = sock
70
71     def close(self):
72         if self._closing:
73             return
74         self._closing = True
75         self._conn_lost += 1
76         if not self._buffer and self._write_fut is None:
77             self._loop.call_soon(self._call_connection_lost, None)
78         if self._read_fut is not None:
79             self._read_fut.cancel()
80             self._read_fut = None
81
82     # On Python 3.3 and older, objects with a destructor part of a reference
83     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
84     # to the PEP 442.
85     if compat.PY34:
86         def __del__(self):
87             if self._sock is not None:
88                 warnings.warn("unclosed transport %r" % self, ResourceWarning)
89                 self.close()
90
91     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
92         if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
93             if self._loop.get_debug():
94                 logger.debug("%r: %s", self, message, exc_info=True)
95         else:
96             self._loop.call_exception_handler({
97                 'message': message,
98                 'exception': exc,
99                 'transport': self,
100                 'protocol': self._protocol,
101             })
102         self._force_close(exc)
103
104     def _force_close(self, exc):
105         if self._closing:
106             return
107         self._closing = True
108         self._conn_lost += 1
109         if self._write_fut:
110             self._write_fut.cancel()
111             self._write_fut = None
112         if self._read_fut:
113             self._read_fut.cancel()
114             self._read_fut = None
115         self._pending_write = 0
116         self._buffer = None
117         self._loop.call_soon(self._call_connection_lost, exc)
118
119     def _call_connection_lost(self, exc):
120         try:
121             self._protocol.connection_lost(exc)
122         finally:
123             # XXX If there is a pending overlapped read on the other
124             # end then it may fail with ERROR_NETNAME_DELETED if we
125             # just close our end.  First calling shutdown() seems to
126             # cure it, but maybe using DisconnectEx() would be better.
127             if hasattr(self._sock, 'shutdown'):
128                 self._sock.shutdown(socket.SHUT_RDWR)
129             self._sock.close()
130             self._sock = None
131             server = self._server
132             if server is not None:
133                 server._detach()
134                 self._server = None
135
136     def get_write_buffer_size(self):
137         size = self._pending_write
138         if self._buffer is not None:
139             size += len(self._buffer)
140         return size
141
142
143 class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
144                                  transports.ReadTransport):
145     """Transport for read pipes."""
146
147     def __init__(self, loop, sock, protocol, waiter=None,
148                  extra=None, server=None):
149         super(_ProactorReadPipeTransport, self).__init__(loop, sock, protocol,
150                                                          waiter, extra, server)
151         self._paused = False
152         self._loop.call_soon(self._loop_reading)
153
154     def pause_reading(self):
155         if self._closing:
156             raise RuntimeError('Cannot pause_reading() when closing')
157         if self._paused:
158             raise RuntimeError('Already paused')
159         self._paused = True
160         if self._loop.get_debug():
161             logger.debug("%r pauses reading", self)
162
163     def resume_reading(self):
164         if not self._paused:
165             raise RuntimeError('Not paused')
166         self._paused = False
167         if self._closing:
168             return
169         self._loop.call_soon(self._loop_reading, self._read_fut)
170         if self._loop.get_debug():
171             logger.debug("%r resumes reading", self)
172
173     def _loop_reading(self, fut=None):
174         if self._paused:
175             return
176         data = None
177
178         try:
179             if fut is not None:
180                 assert self._read_fut is fut or (self._read_fut is None and
181                                                  self._closing)
182                 self._read_fut = None
183                 data = fut.result()  # deliver data later in "finally" clause
184
185             if self._closing:
186                 # since close() has been called we ignore any read data
187                 data = None
188                 return
189
190             if data == b'':
191                 # we got end-of-file so no need to reschedule a new read
192                 return
193
194             # reschedule a new read
195             self._read_fut = self._loop._proactor.recv(self._sock, 4096)
196         except ConnectionAbortedError as exc:
197             if not self._closing:
198                 self._fatal_error(exc, 'Fatal read error on pipe transport')
199             elif self._loop.get_debug():
200                 logger.debug("Read error on pipe transport while closing",
201                              exc_info=True)
202         except ConnectionResetError as exc:
203             self._force_close(exc)
204         except OSError as exc:
205             self._fatal_error(exc, 'Fatal read error on pipe transport')
206         except futures.CancelledError:
207             if not self._closing:
208                 raise
209         else:
210             self._read_fut.add_done_callback(self._loop_reading)
211         finally:
212             if data:
213                 self._protocol.data_received(data)
214             elif data is not None:
215                 if self._loop.get_debug():
216                     logger.debug("%r received EOF", self)
217                 keep_open = self._protocol.eof_received()
218                 if not keep_open:
219                     self.close()
220
221
222 class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
223                                       transports.WriteTransport):
224     """Transport for write pipes."""
225
226     def write(self, data):
227         data = flatten_bytes(data)
228         if self._eof_written:
229             raise RuntimeError('write_eof() already called')
230
231         if not data:
232             return
233
234         if self._conn_lost:
235             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
236                 logger.warning('socket.send() raised exception.')
237             self._conn_lost += 1
238             return
239
240         # Observable states:
241         # 1. IDLE: _write_fut and _buffer both None
242         # 2. WRITING: _write_fut set; _buffer None
243         # 3. BACKED UP: _write_fut set; _buffer a bytearray
244         # We always copy the data, so the caller can't modify it
245         # while we're still waiting for the I/O to happen.
246         if self._write_fut is None:  # IDLE -> WRITING
247             assert self._buffer is None
248             # Pass a copy, except if it's already immutable.
249             self._loop_writing(data=bytes(data))
250         elif not self._buffer:  # WRITING -> BACKED UP
251             # Make a mutable copy which we can extend.
252             self._buffer = bytearray(data)
253             self._maybe_pause_protocol()
254         else:  # BACKED UP
255             # Append to buffer (also copies).
256             self._buffer.extend(data)
257             self._maybe_pause_protocol()
258
259     def _loop_writing(self, f=None, data=None):
260         try:
261             assert f is self._write_fut
262             self._write_fut = None
263             self._pending_write = 0
264             if f:
265                 f.result()
266             if data is None:
267                 data = self._buffer
268                 self._buffer = None
269             if not data:
270                 if self._closing:
271                     self._loop.call_soon(self._call_connection_lost, None)
272                 if self._eof_written:
273                     self._sock.shutdown(socket.SHUT_WR)
274                 # Now that we've reduced the buffer size, tell the
275                 # protocol to resume writing if it was paused.  Note that
276                 # we do this last since the callback is called immediately
277                 # and it may add more data to the buffer (even causing the
278                 # protocol to be paused again).
279                 self._maybe_resume_protocol()
280             else:
281                 self._write_fut = self._loop._proactor.send(self._sock, data)
282                 if not self._write_fut.done():
283                     assert self._pending_write == 0
284                     self._pending_write = len(data)
285                     self._write_fut.add_done_callback(self._loop_writing)
286                     self._maybe_pause_protocol()
287                 else:
288                     self._write_fut.add_done_callback(self._loop_writing)
289         except ConnectionResetError as exc:
290             self._force_close(exc)
291         except OSError as exc:
292             self._fatal_error(exc, 'Fatal write error on pipe transport')
293
294     def can_write_eof(self):
295         return True
296
297     def write_eof(self):
298         self.close()
299
300     def abort(self):
301         self._force_close(None)
302
303
304 class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
305     def __init__(self, *args, **kw):
306         super(_ProactorWritePipeTransport, self).__init__(*args, **kw)
307         self._read_fut = self._loop._proactor.recv(self._sock, 16)
308         self._read_fut.add_done_callback(self._pipe_closed)
309
310     def _pipe_closed(self, fut):
311         if fut.cancelled():
312             # the transport has been closed
313             return
314         assert fut.result() == b''
315         if self._closing:
316             assert self._read_fut is None
317             return
318         assert fut is self._read_fut, (fut, self._read_fut)
319         self._read_fut = None
320         if self._write_fut is not None:
321             self._force_close(BrokenPipeError())
322         else:
323             self.close()
324
325
326 class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
327                                    _ProactorBaseWritePipeTransport,
328                                    transports.Transport):
329     """Transport for duplex pipes."""
330
331     def can_write_eof(self):
332         return False
333
334     def write_eof(self):
335         raise NotImplementedError
336
337
338 class _ProactorSocketTransport(_ProactorReadPipeTransport,
339                                _ProactorBaseWritePipeTransport,
340                                transports.Transport):
341     """Transport for connected sockets."""
342
343     def _set_extra(self, sock):
344         self._extra['socket'] = sock
345         try:
346             self._extra['sockname'] = sock.getsockname()
347         except (socket.error, AttributeError):
348             if self._loop.get_debug():
349                 logger.warning("getsockname() failed on %r",
350                              sock, exc_info=True)
351         if 'peername' not in self._extra:
352             try:
353                 self._extra['peername'] = sock.getpeername()
354             except (socket.error, AttributeError):
355                 if self._loop.get_debug():
356                     logger.warning("getpeername() failed on %r",
357                                    sock, exc_info=True)
358
359     def can_write_eof(self):
360         return True
361
362     def write_eof(self):
363         if self._closing or self._eof_written:
364             return
365         self._eof_written = True
366         if self._write_fut is None:
367             self._sock.shutdown(socket.SHUT_WR)
368
369
370 class BaseProactorEventLoop(base_events.BaseEventLoop):
371
372     def __init__(self, proactor):
373         super(BaseProactorEventLoop, self).__init__()
374         logger.debug('Using proactor: %s', proactor.__class__.__name__)
375         self._proactor = proactor
376         self._selector = proactor   # convenient alias
377         self._self_reading_future = None
378         self._accept_futures = {}   # socket file descriptor => Future
379         proactor.set_loop(self)
380         self._make_self_pipe()
381
382     def _make_socket_transport(self, sock, protocol, waiter=None,
383                                extra=None, server=None):
384         return _ProactorSocketTransport(self, sock, protocol, waiter,
385                                         extra, server)
386
387     def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
388                             server_side=False, server_hostname=None,
389                             extra=None, server=None):
390         if not sslproto._is_sslproto_available():
391             raise NotImplementedError("Proactor event loop requires Python 3.5"
392                                       " or newer (ssl.MemoryBIO) to support "
393                                       "SSL")
394
395         ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
396                                             server_side, server_hostname)
397         _ProactorSocketTransport(self, rawsock, ssl_protocol,
398                                  extra=extra, server=server)
399         return ssl_protocol._app_transport
400
401     def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
402                                     extra=None):
403         return _ProactorDuplexPipeTransport(self,
404                                             sock, protocol, waiter, extra)
405
406     def _make_read_pipe_transport(self, sock, protocol, waiter=None,
407                                   extra=None):
408         return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
409
410     def _make_write_pipe_transport(self, sock, protocol, waiter=None,
411                                    extra=None):
412         # We want connection_lost() to be called when other end closes
413         return _ProactorWritePipeTransport(self,
414                                            sock, protocol, waiter, extra)
415
416     def close(self):
417         if self.is_running():
418             raise RuntimeError("Cannot close a running event loop")
419         if self.is_closed():
420             return
421
422         # Call these methods before closing the event loop (before calling
423         # BaseEventLoop.close), because they can schedule callbacks with
424         # call_soon(), which is forbidden when the event loop is closed.
425         self._stop_accept_futures()
426         self._close_self_pipe()
427         self._proactor.close()
428         self._proactor = None
429         self._selector = None
430
431         # Close the event loop
432         super(BaseProactorEventLoop, self).close()
433
434     def sock_recv(self, sock, n):
435         return self._proactor.recv(sock, n)
436
437     def sock_sendall(self, sock, data):
438         return self._proactor.send(sock, data)
439
440     def sock_connect(self, sock, address):
441         try:
442             if self._debug:
443                 base_events._check_resolved_address(sock, address)
444         except ValueError as err:
445             fut = futures.Future(loop=self)
446             fut.set_exception(err)
447             return fut
448         else:
449             return self._proactor.connect(sock, address)
450
451     def sock_accept(self, sock):
452         return self._proactor.accept(sock)
453
454     def _socketpair(self):
455         raise NotImplementedError
456
457     def _close_self_pipe(self):
458         if self._self_reading_future is not None:
459             self._self_reading_future.cancel()
460             self._self_reading_future = None
461         self._ssock.close()
462         self._ssock = None
463         self._csock.close()
464         self._csock = None
465         self._internal_fds -= 1
466
467     def _make_self_pipe(self):
468         # A self-socket, really. :-)
469         self._ssock, self._csock = self._socketpair()
470         self._ssock.setblocking(False)
471         self._csock.setblocking(False)
472         self._internal_fds += 1
473         self.call_soon(self._loop_self_reading)
474
475     def _loop_self_reading(self, f=None):
476         try:
477             if f is not None:
478                 f.result()  # may raise
479             f = self._proactor.recv(self._ssock, 4096)
480         except futures.CancelledError:
481             # _close_self_pipe() has been called, stop waiting for data
482             return
483         except Exception as exc:
484             self.call_exception_handler({
485                 'message': 'Error on reading from the event loop self pipe',
486                 'exception': exc,
487                 'loop': self,
488             })
489         else:
490             self._self_reading_future = f
491             f.add_done_callback(self._loop_self_reading)
492
493     def _write_to_self(self):
494         self._csock.send(b'\0')
495
496     def _start_serving(self, protocol_factory, sock,
497                        sslcontext=None, server=None):
498
499         def loop(f=None):
500             try:
501                 if f is not None:
502                     conn, addr = f.result()
503                     if self._debug:
504                         logger.debug("%r got a new connection from %r: %r",
505                                      server, addr, conn)
506                     protocol = protocol_factory()
507                     if sslcontext is not None:
508                         self._make_ssl_transport(
509                             conn, protocol, sslcontext, server_side=True,
510                             extra={'peername': addr}, server=server)
511                     else:
512                         self._make_socket_transport(
513                             conn, protocol,
514                             extra={'peername': addr}, server=server)
515                 if self.is_closed():
516                     return
517                 f = self._proactor.accept(sock)
518             except OSError as exc:
519                 if sock.fileno() != -1:
520                     self.call_exception_handler({
521                         'message': 'Accept failed on a socket',
522                         'exception': exc,
523                         'socket': sock,
524                     })
525                     sock.close()
526                 elif self._debug:
527                     logger.debug("Accept failed on socket %r",
528                                  sock, exc_info=True)
529             except futures.CancelledError:
530                 sock.close()
531             else:
532                 self._accept_futures[sock.fileno()] = f
533                 f.add_done_callback(loop)
534
535         self.call_soon(loop)
536
537     def _process_events(self, event_list):
538         # Events are processed in the IocpProactor._poll() method
539         pass
540
541     def _stop_accept_futures(self):
542         for future in self._accept_futures.values():
543             future.cancel()
544         self._accept_futures.clear()
545
546     def _stop_serving(self, sock):
547         self._stop_accept_futures()
548         self._proactor._stop_serving(sock)
549         sock.close()