efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / selector_events.py
1 """Event loop using a selector and related classes.
2
3 A selector is a "notify-when-ready" multiplexer.  For a subclass which
4 also includes support for signal handling, see the unix_events sub-module.
5 """
6
7 __all__ = ['BaseSelectorEventLoop']
8
9 import collections
10 import errno
11 import functools
12 import socket
13 import sys
14 import warnings
15 try:
16     import ssl
17     from .py3_ssl import wrap_ssl_error, SSLWantReadError, SSLWantWriteError
18 except ImportError:  # pragma: no cover
19     ssl = None
20
21 from . import base_events
22 from . import compat
23 from . import constants
24 from . import events
25 from . import futures
26 from . import selectors
27 from . import sslproto
28 from . import transports
29 from .compat import flatten_bytes
30 from .coroutines import coroutine, From
31 from .log import logger
32 from .py33_exceptions import (wrap_error,
33     BlockingIOError, InterruptedError, ConnectionAbortedError, BrokenPipeError,
34     ConnectionResetError)
35
36 # On Mac OS 10.6 with Python 2.6.1 or OpenIndiana 148 with Python 2.6.4,
37 # _SelectorSslTransport._read_ready() hangs if the socket has no data.
38 # Example: test_events.test_create_server_ssl()
39 _SSL_REQUIRES_SELECT = (sys.version_info < (2, 6, 6))
40 if _SSL_REQUIRES_SELECT:
41     import select
42
43
44 def _get_socket_error(sock, address):
45     err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
46     if err != 0:
47         # Jump to the except clause below.
48         raise OSError(err, 'Connect call failed %s' % (address,))
49
50
51 def _test_selector_event(selector, fd, event):
52     # Test if the selector is monitoring 'event' events
53     # for the file descriptor 'fd'.
54     try:
55         key = selector.get_key(fd)
56     except KeyError:
57         return False
58     else:
59         return bool(key.events & event)
60
61
62 class BaseSelectorEventLoop(base_events.BaseEventLoop):
63     """Selector event loop.
64
65     See events.EventLoop for API specification.
66     """
67
68     def __init__(self, selector=None):
69         super(BaseSelectorEventLoop, self).__init__()
70
71         if selector is None:
72             selector = selectors.DefaultSelector()
73         logger.debug('Using selector: %s', selector.__class__.__name__)
74         self._selector = selector
75         self._make_self_pipe()
76
77     def _make_socket_transport(self, sock, protocol, waiter=None,
78                                extra=None, server=None):
79         return _SelectorSocketTransport(self, sock, protocol, waiter,
80                                         extra, server)
81
82     def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
83                             server_side=False, server_hostname=None,
84                             extra=None, server=None):
85         if not sslproto._is_sslproto_available():
86             return self._make_legacy_ssl_transport(
87                 rawsock, protocol, sslcontext, waiter,
88                 server_side=server_side, server_hostname=server_hostname,
89                 extra=extra, server=server)
90
91         ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
92                                             server_side, server_hostname)
93         _SelectorSocketTransport(self, rawsock, ssl_protocol,
94                                  extra=extra, server=server)
95         return ssl_protocol._app_transport
96
97     def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
98                                    waiter,
99                                    server_side=False, server_hostname=None,
100                                    extra=None, server=None):
101         # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
102         # on Python 3.4 and older, when ssl.MemoryBIO is not available.
103         return _SelectorSslTransport(
104             self, rawsock, protocol, sslcontext, waiter,
105             server_side, server_hostname, extra, server)
106
107     def _make_datagram_transport(self, sock, protocol,
108                                  address=None, waiter=None, extra=None):
109         return _SelectorDatagramTransport(self, sock, protocol,
110                                           address, waiter, extra)
111
112     def close(self):
113         if self.is_running():
114             raise RuntimeError("Cannot close a running event loop")
115         if self.is_closed():
116             return
117         self._close_self_pipe()
118         super(BaseSelectorEventLoop, self).close()
119         if self._selector is not None:
120             self._selector.close()
121             self._selector = None
122
123     def _socketpair(self):
124         raise NotImplementedError
125
126     def _close_self_pipe(self):
127         self.remove_reader(self._ssock.fileno())
128         self._ssock.close()
129         self._ssock = None
130         self._csock.close()
131         self._csock = None
132         self._internal_fds -= 1
133
134     def _make_self_pipe(self):
135         # A self-socket, really. :-)
136         self._ssock, self._csock = self._socketpair()
137         self._ssock.setblocking(False)
138         self._csock.setblocking(False)
139         self._internal_fds += 1
140         self.add_reader(self._ssock.fileno(), self._read_from_self)
141
142     def _process_self_data(self, data):
143         pass
144
145     def _read_from_self(self):
146         while True:
147             try:
148                 data = wrap_error(self._ssock.recv, 4096)
149                 if not data:
150                     break
151                 self._process_self_data(data)
152             except InterruptedError:
153                 continue
154             except BlockingIOError:
155                 break
156
157     def _write_to_self(self):
158         # This may be called from a different thread, possibly after
159         # _close_self_pipe() has been called or even while it is
160         # running.  Guard for self._csock being None or closed.  When
161         # a socket is closed, send() raises OSError (with errno set to
162         # EBADF, but let's not rely on the exact error code).
163         csock = self._csock
164         if csock is not None:
165             try:
166                 wrap_error(csock.send, b'\0')
167             except OSError:
168                 if self._debug:
169                     logger.debug("Fail to write a null byte into the "
170                                  "self-pipe socket",
171                                  exc_info=True)
172
173     def _start_serving(self, protocol_factory, sock,
174                        sslcontext=None, server=None):
175         self.add_reader(sock.fileno(), self._accept_connection,
176                         protocol_factory, sock, sslcontext, server)
177
178     def _accept_connection(self, protocol_factory, sock,
179                            sslcontext=None, server=None):
180         try:
181             conn, addr = wrap_error(sock.accept)
182             if self._debug:
183                 logger.debug("%r got a new connection from %r: %r",
184                              server, addr, conn)
185             conn.setblocking(False)
186         except (BlockingIOError, InterruptedError, ConnectionAbortedError):
187             pass  # False alarm.
188         except socket.error as exc:
189             # There's nowhere to send the error, so just log it.
190             if exc.errno in (errno.EMFILE, errno.ENFILE,
191                              errno.ENOBUFS, errno.ENOMEM):
192                 # Some platforms (e.g. Linux keep reporting the FD as
193                 # ready, so we remove the read handler temporarily.
194                 # We'll try again in a while.
195                 self.call_exception_handler({
196                     'message': 'socket.accept() out of system resource',
197                     'exception': exc,
198                     'socket': sock,
199                 })
200                 self.remove_reader(sock.fileno())
201                 self.call_later(constants.ACCEPT_RETRY_DELAY,
202                                 self._start_serving,
203                                 protocol_factory, sock, sslcontext, server)
204             else:
205                 raise  # The event loop will catch, log and ignore it.
206         else:
207             extra = {'peername': addr}
208             accept = self._accept_connection2(protocol_factory, conn, extra,
209                                               sslcontext, server)
210             self.create_task(accept)
211
212     @coroutine
213     def _accept_connection2(self, protocol_factory, conn, extra,
214                             sslcontext=None, server=None):
215         protocol = None
216         transport = None
217         try:
218             protocol = protocol_factory()
219             waiter = futures.Future(loop=self)
220             if sslcontext:
221                 transport = self._make_ssl_transport(
222                     conn, protocol, sslcontext, waiter=waiter,
223                     server_side=True, extra=extra, server=server)
224             else:
225                 transport = self._make_socket_transport(
226                     conn, protocol, waiter=waiter, extra=extra,
227                     server=server)
228
229             try:
230                 yield From(waiter)
231             except:
232                 transport.close()
233                 raise
234
235             # It's now up to the protocol to handle the connection.
236         except Exception as exc:
237             if self._debug:
238                 context = {
239                     'message': ('Error on transport creation '
240                                 'for incoming connection'),
241                     'exception': exc,
242                 }
243                 if protocol is not None:
244                     context['protocol'] = protocol
245                 if transport is not None:
246                     context['transport'] = transport
247                 self.call_exception_handler(context)
248
249     def add_reader(self, fd, callback, *args):
250         """Add a reader callback."""
251         self._check_closed()
252         handle = events.Handle(callback, args, self)
253         try:
254             key = self._selector.get_key(fd)
255         except KeyError:
256             self._selector.register(fd, selectors.EVENT_READ,
257                                     (handle, None))
258         else:
259             mask, (reader, writer) = key.events, key.data
260             self._selector.modify(fd, mask | selectors.EVENT_READ,
261                                   (handle, writer))
262             if reader is not None:
263                 reader.cancel()
264
265     def remove_reader(self, fd):
266         """Remove a reader callback."""
267         if self.is_closed():
268             return False
269         try:
270             key = self._selector.get_key(fd)
271         except KeyError:
272             return False
273         else:
274             mask, (reader, writer) = key.events, key.data
275             mask &= ~selectors.EVENT_READ
276             if not mask:
277                 self._selector.unregister(fd)
278             else:
279                 self._selector.modify(fd, mask, (None, writer))
280
281             if reader is not None:
282                 reader.cancel()
283                 return True
284             else:
285                 return False
286
287     def add_writer(self, fd, callback, *args):
288         """Add a writer callback.."""
289         self._check_closed()
290         handle = events.Handle(callback, args, self)
291         try:
292             key = self._selector.get_key(fd)
293         except KeyError:
294             self._selector.register(fd, selectors.EVENT_WRITE,
295                                     (None, handle))
296         else:
297             mask, (reader, writer) = key.events, key.data
298             self._selector.modify(fd, mask | selectors.EVENT_WRITE,
299                                   (reader, handle))
300             if writer is not None:
301                 writer.cancel()
302
303     def remove_writer(self, fd):
304         """Remove a writer callback."""
305         if self.is_closed():
306             return False
307         try:
308             key = self._selector.get_key(fd)
309         except KeyError:
310             return False
311         else:
312             mask, (reader, writer) = key.events, key.data
313             # Remove both writer and connector.
314             mask &= ~selectors.EVENT_WRITE
315             if not mask:
316                 self._selector.unregister(fd)
317             else:
318                 self._selector.modify(fd, mask, (reader, None))
319
320             if writer is not None:
321                 writer.cancel()
322                 return True
323             else:
324                 return False
325
326     def sock_recv(self, sock, n):
327         """Receive data from the socket.
328
329         The return value is a bytes object representing the data received.
330         The maximum amount of data to be received at once is specified by
331         nbytes.
332
333         This method is a coroutine.
334         """
335         if self._debug and sock.gettimeout() != 0:
336             raise ValueError("the socket must be non-blocking")
337         fut = futures.Future(loop=self)
338         self._sock_recv(fut, False, sock, n)
339         return fut
340
341     def _sock_recv(self, fut, registered, sock, n):
342         # _sock_recv() can add itself as an I/O callback if the operation can't
343         # be done immediately. Don't use it directly, call sock_recv().
344         fd = sock.fileno()
345         if registered:
346             # Remove the callback early.  It should be rare that the
347             # selector says the fd is ready but the call still returns
348             # EAGAIN, and I am willing to take a hit in that case in
349             # order to simplify the common case.
350             self.remove_reader(fd)
351         if fut.cancelled():
352             return
353         try:
354             data = wrap_error(sock.recv, n)
355         except (BlockingIOError, InterruptedError):
356             self.add_reader(fd, self._sock_recv, fut, True, sock, n)
357         except Exception as exc:
358             fut.set_exception(exc)
359         else:
360             fut.set_result(data)
361
362     def sock_sendall(self, sock, data):
363         """Send data to the socket.
364
365         The socket must be connected to a remote socket. This method continues
366         to send data from data until either all data has been sent or an
367         error occurs. None is returned on success. On error, an exception is
368         raised, and there is no way to determine how much data, if any, was
369         successfully processed by the receiving end of the connection.
370
371         This method is a coroutine.
372         """
373         if self._debug and sock.gettimeout() != 0:
374             raise ValueError("the socket must be non-blocking")
375         fut = futures.Future(loop=self)
376         if data:
377             self._sock_sendall(fut, False, sock, data)
378         else:
379             fut.set_result(None)
380         return fut
381
382     def _sock_sendall(self, fut, registered, sock, data):
383         fd = sock.fileno()
384
385         if registered:
386             self.remove_writer(fd)
387         if fut.cancelled():
388             return
389
390         try:
391             n = wrap_error(sock.send, data)
392         except (BlockingIOError, InterruptedError):
393             n = 0
394         except Exception as exc:
395             fut.set_exception(exc)
396             return
397
398         if n == len(data):
399             fut.set_result(None)
400         else:
401             if n:
402                 data = data[n:]
403             self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
404
405     def sock_connect(self, sock, address):
406         """Connect to a remote socket at address.
407
408         The address must be already resolved to avoid the trap of hanging the
409         entire event loop when the address requires doing a DNS lookup. For
410         example, it must be an IP address, not an hostname, for AF_INET and
411         AF_INET6 address families. Use getaddrinfo() to resolve the hostname
412         asynchronously.
413
414         This method is a coroutine.
415         """
416         if self._debug and sock.gettimeout() != 0:
417             raise ValueError("the socket must be non-blocking")
418         fut = futures.Future(loop=self)
419         try:
420             if self._debug:
421                 base_events._check_resolved_address(sock, address)
422         except ValueError as err:
423             fut.set_exception(err)
424         else:
425             self._sock_connect(fut, sock, address)
426         return fut
427
428     def _sock_connect(self, fut, sock, address):
429         fd = sock.fileno()
430         try:
431             wrap_error(sock.connect, address)
432         except (BlockingIOError, InterruptedError):
433             # Issue #23618: When the C function connect() fails with EINTR, the
434             # connection runs in background. We have to wait until the socket
435             # becomes writable to be notified when the connection succeed or
436             # fails.
437             fut.add_done_callback(functools.partial(self._sock_connect_done,
438                                                     fd))
439             self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
440         except Exception as exc:
441             fut.set_exception(exc)
442         else:
443             fut.set_result(None)
444
445     def _sock_connect_done(self, fd, fut):
446         self.remove_writer(fd)
447
448     def _sock_connect_cb(self, fut, sock, address):
449         if fut.cancelled():
450             return
451
452         try:
453             wrap_error(_get_socket_error, sock, address)
454         except (BlockingIOError, InterruptedError):
455             # socket is still registered, the callback will be retried later
456             pass
457         except Exception as exc:
458             fut.set_exception(exc)
459         else:
460             fut.set_result(None)
461
462     def sock_accept(self, sock):
463         """Accept a connection.
464
465         The socket must be bound to an address and listening for connections.
466         The return value is a pair (conn, address) where conn is a new socket
467         object usable to send and receive data on the connection, and address
468         is the address bound to the socket on the other end of the connection.
469
470         This method is a coroutine.
471         """
472         if self._debug and sock.gettimeout() != 0:
473             raise ValueError("the socket must be non-blocking")
474         fut = futures.Future(loop=self)
475         self._sock_accept(fut, False, sock)
476         return fut
477
478     def _sock_accept(self, fut, registered, sock):
479         fd = sock.fileno()
480         if registered:
481             self.remove_reader(fd)
482         if fut.cancelled():
483             return
484         try:
485             conn, address = wrap_error(sock.accept)
486             conn.setblocking(False)
487         except (BlockingIOError, InterruptedError):
488             self.add_reader(fd, self._sock_accept, fut, True, sock)
489         except Exception as exc:
490             fut.set_exception(exc)
491         else:
492             fut.set_result((conn, address))
493
494     def _process_events(self, event_list):
495         for key, mask in event_list:
496             fileobj, (reader, writer) = key.fileobj, key.data
497             if mask & selectors.EVENT_READ and reader is not None:
498                 if reader._cancelled:
499                     self.remove_reader(fileobj)
500                 else:
501                     self._add_callback(reader)
502             if mask & selectors.EVENT_WRITE and writer is not None:
503                 if writer._cancelled:
504                     self.remove_writer(fileobj)
505                 else:
506                     self._add_callback(writer)
507
508     def _stop_serving(self, sock):
509         self.remove_reader(sock.fileno())
510         sock.close()
511
512
513 class _SelectorTransport(transports._FlowControlMixin,
514                          transports.Transport):
515
516     max_size = 256 * 1024  # Buffer size passed to recv().
517
518     _buffer_factory = bytearray  # Constructs initial value for self._buffer.
519
520     # Attribute used in the destructor: it must be set even if the constructor
521     # is not called (see _SelectorSslTransport which may start by raising an
522     # exception)
523     _sock = None
524
525     def __init__(self, loop, sock, protocol, extra=None, server=None):
526         super(_SelectorTransport, self).__init__(extra, loop)
527         self._extra['socket'] = sock
528         self._extra['sockname'] = sock.getsockname()
529         if 'peername' not in self._extra:
530             try:
531                 self._extra['peername'] = sock.getpeername()
532             except socket.error:
533                 self._extra['peername'] = None
534         self._sock = sock
535         self._sock_fd = sock.fileno()
536         self._protocol = protocol
537         self._protocol_connected = True
538         self._server = server
539         self._buffer = self._buffer_factory()
540         self._conn_lost = 0  # Set when call to connection_lost scheduled.
541         self._closing = False  # Set when close() called.
542         if self._server is not None:
543             self._server._attach()
544
545     def __repr__(self):
546         info = [self.__class__.__name__]
547         if self._sock is None:
548             info.append('closed')
549         elif self._closing:
550             info.append('closing')
551         info.append('fd=%s' % self._sock_fd)
552         # test if the transport was closed
553         if self._loop is not None and not self._loop.is_closed():
554             polling = _test_selector_event(self._loop._selector,
555                                            self._sock_fd, selectors.EVENT_READ)
556             if polling:
557                 info.append('read=polling')
558             else:
559                 info.append('read=idle')
560
561             polling = _test_selector_event(self._loop._selector,
562                                            self._sock_fd,
563                                            selectors.EVENT_WRITE)
564             if polling:
565                 state = 'polling'
566             else:
567                 state = 'idle'
568
569             bufsize = self.get_write_buffer_size()
570             info.append('write=<%s, bufsize=%s>' % (state, bufsize))
571         return '<%s>' % ' '.join(info)
572
573     def abort(self):
574         self._force_close(None)
575
576     def close(self):
577         if self._closing:
578             return
579         self._closing = True
580         self._loop.remove_reader(self._sock_fd)
581         if not self._buffer:
582             self._conn_lost += 1
583             self._loop.call_soon(self._call_connection_lost, None)
584
585     # On Python 3.3 and older, objects with a destructor part of a reference
586     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
587     # to the PEP 442.
588     if compat.PY34:
589         def __del__(self):
590             if self._sock is not None:
591                 warnings.warn("unclosed transport %r" % self, ResourceWarning)
592                 self._sock.close()
593
594     def _fatal_error(self, exc, message='Fatal error on transport'):
595         # Should be called from exception handler only.
596         if isinstance(exc, (BrokenPipeError,
597                             ConnectionResetError, ConnectionAbortedError)):
598             if self._loop.get_debug():
599                 logger.debug("%r: %s", self, message, exc_info=True)
600         else:
601             self._loop.call_exception_handler({
602                 'message': message,
603                 'exception': exc,
604                 'transport': self,
605                 'protocol': self._protocol,
606             })
607         self._force_close(exc)
608
609     def _force_close(self, exc):
610         if self._conn_lost:
611             return
612         if self._buffer:
613             del self._buffer[:]
614             self._loop.remove_writer(self._sock_fd)
615         if not self._closing:
616             self._closing = True
617             self._loop.remove_reader(self._sock_fd)
618         self._conn_lost += 1
619         self._loop.call_soon(self._call_connection_lost, exc)
620
621     def _call_connection_lost(self, exc):
622         try:
623             if self._protocol_connected:
624                 self._protocol.connection_lost(exc)
625         finally:
626             self._sock.close()
627             self._sock = None
628             self._protocol = None
629             self._loop = None
630             server = self._server
631             if server is not None:
632                 server._detach()
633                 self._server = None
634
635     def get_write_buffer_size(self):
636         return len(self._buffer)
637
638
639 class _SelectorSocketTransport(_SelectorTransport):
640
641     def __init__(self, loop, sock, protocol, waiter=None,
642                  extra=None, server=None):
643         super(_SelectorSocketTransport, self).__init__(loop, sock, protocol, extra, server)
644         self._eof = False
645         self._paused = False
646
647         self._loop.call_soon(self._protocol.connection_made, self)
648         # only start reading when connection_made() has been called
649         self._loop.call_soon(self._loop.add_reader,
650                              self._sock_fd, self._read_ready)
651         if waiter is not None:
652             # only wake up the waiter when connection_made() has been called
653             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
654
655     def pause_reading(self):
656         if self._closing:
657             raise RuntimeError('Cannot pause_reading() when closing')
658         if self._paused:
659             raise RuntimeError('Already paused')
660         self._paused = True
661         self._loop.remove_reader(self._sock_fd)
662         if self._loop.get_debug():
663             logger.debug("%r pauses reading", self)
664
665     def resume_reading(self):
666         if not self._paused:
667             raise RuntimeError('Not paused')
668         self._paused = False
669         if self._closing:
670             return
671         self._loop.add_reader(self._sock_fd, self._read_ready)
672         if self._loop.get_debug():
673             logger.debug("%r resumes reading", self)
674
675     def _read_ready(self):
676         try:
677             data = wrap_error(self._sock.recv, self.max_size)
678         except (BlockingIOError, InterruptedError):
679             pass
680         except Exception as exc:
681             self._fatal_error(exc, 'Fatal read error on socket transport')
682         else:
683             if data:
684                 self._protocol.data_received(data)
685             else:
686                 if self._loop.get_debug():
687                     logger.debug("%r received EOF", self)
688                 keep_open = self._protocol.eof_received()
689                 if keep_open:
690                     # We're keeping the connection open so the
691                     # protocol can write more, but we still can't
692                     # receive more, so remove the reader callback.
693                     self._loop.remove_reader(self._sock_fd)
694                 else:
695                     self.close()
696
697     def write(self, data):
698         data = flatten_bytes(data)
699         if self._eof:
700             raise RuntimeError('Cannot call write() after write_eof()')
701         if not data:
702             return
703
704         if self._conn_lost:
705             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
706                 logger.warning('socket.send() raised exception.')
707             self._conn_lost += 1
708             return
709
710         if not self._buffer:
711             # Optimization: try to send now.
712             try:
713                 n = wrap_error(self._sock.send, data)
714             except (BlockingIOError, InterruptedError):
715                 pass
716             except Exception as exc:
717                 self._fatal_error(exc, 'Fatal write error on socket transport')
718                 return
719             else:
720                 data = data[n:]
721                 if not data:
722                     return
723             # Not all was written; register write handler.
724             self._loop.add_writer(self._sock_fd, self._write_ready)
725
726         # Add it to the buffer.
727         self._buffer.extend(data)
728         self._maybe_pause_protocol()
729
730     def _write_ready(self):
731         assert self._buffer, 'Data should not be empty'
732
733         data = flatten_bytes(self._buffer)
734         try:
735             n = wrap_error(self._sock.send, data)
736         except (BlockingIOError, InterruptedError):
737             pass
738         except Exception as exc:
739             self._loop.remove_writer(self._sock_fd)
740             del self._buffer[:]
741             self._fatal_error(exc, 'Fatal write error on socket transport')
742         else:
743             if n:
744                 del self._buffer[:n]
745             self._maybe_resume_protocol()  # May append to buffer.
746             if not self._buffer:
747                 self._loop.remove_writer(self._sock_fd)
748                 if self._closing:
749                     self._call_connection_lost(None)
750                 elif self._eof:
751                     self._sock.shutdown(socket.SHUT_WR)
752
753     def write_eof(self):
754         if self._eof:
755             return
756         self._eof = True
757         if not self._buffer:
758             self._sock.shutdown(socket.SHUT_WR)
759
760     def can_write_eof(self):
761         return True
762
763
764 class _SelectorSslTransport(_SelectorTransport):
765
766     _buffer_factory = bytearray
767
768     def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
769                  server_side=False, server_hostname=None,
770                  extra=None, server=None):
771         if ssl is None:
772             raise RuntimeError('stdlib ssl module not available')
773
774         if not sslcontext:
775             sslcontext = sslproto._create_transport_context(server_side, server_hostname)
776
777         wrap_kwargs = {
778             'server_side': server_side,
779             'do_handshake_on_connect': False,
780         }
781         if server_hostname and not server_side:
782             wrap_kwargs['server_hostname'] = server_hostname
783         sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
784
785         super(_SelectorSslTransport, self).__init__(loop, sslsock, protocol, extra, server)
786         # the protocol connection is only made after the SSL handshake
787         self._protocol_connected = False
788
789         self._server_hostname = server_hostname
790         self._waiter = waiter
791         self._sslcontext = sslcontext
792         self._paused = False
793
794         # SSL-specific extra info.  (peercert is set later)
795         self._extra.update(sslcontext=sslcontext)
796
797         if self._loop.get_debug():
798             logger.debug("%r starts SSL handshake", self)
799             start_time = self._loop.time()
800         else:
801             start_time = None
802         self._on_handshake(start_time)
803
804     def _wakeup_waiter(self, exc=None):
805         if self._waiter is None:
806             return
807         if not self._waiter.cancelled():
808             if exc is not None:
809                 self._waiter.set_exception(exc)
810             else:
811                 self._waiter.set_result(None)
812         self._waiter = None
813
814     def _on_handshake(self, start_time):
815         try:
816             wrap_ssl_error(self._sock.do_handshake)
817         except SSLWantReadError:
818             self._loop.add_reader(self._sock_fd,
819                                   self._on_handshake, start_time)
820             return
821         except SSLWantWriteError:
822             self._loop.add_writer(self._sock_fd,
823                                   self._on_handshake, start_time)
824             return
825         except BaseException as exc:
826             if self._loop.get_debug():
827                 logger.warning("%r: SSL handshake failed",
828                                self, exc_info=True)
829             self._loop.remove_reader(self._sock_fd)
830             self._loop.remove_writer(self._sock_fd)
831             self._sock.close()
832             self._wakeup_waiter(exc)
833             if isinstance(exc, Exception):
834                 return
835             else:
836                 raise
837
838         self._loop.remove_reader(self._sock_fd)
839         self._loop.remove_writer(self._sock_fd)
840
841         peercert = self._sock.getpeercert()
842         if not hasattr(self._sslcontext, 'check_hostname'):
843             # Verify hostname if requested, Python 3.4+ uses check_hostname
844             # and checks the hostname in do_handshake()
845             if (self._server_hostname and
846                 self._sslcontext.verify_mode != ssl.CERT_NONE):
847                 try:
848                     ssl.match_hostname(peercert, self._server_hostname)
849                 except Exception as exc:
850                     if self._loop.get_debug():
851                         logger.warning("%r: SSL handshake failed "
852                                        "on matching the hostname",
853                                        self, exc_info=True)
854                     self._sock.close()
855                     self._wakeup_waiter(exc)
856                     return
857
858         # Add extra info that becomes available after handshake.
859         self._extra.update(peercert=peercert,
860                            cipher=self._sock.cipher(),
861                            )
862         if hasattr(self._sock, 'compression'):
863             self._extra['compression'] = self._sock.compression()
864
865         self._read_wants_write = False
866         self._write_wants_read = False
867         self._loop.add_reader(self._sock_fd, self._read_ready)
868         self._protocol_connected = True
869         self._loop.call_soon(self._protocol.connection_made, self)
870         # only wake up the waiter when connection_made() has been called
871         self._loop.call_soon(self._wakeup_waiter)
872
873         if self._loop.get_debug():
874             dt = self._loop.time() - start_time
875             logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
876
877     def pause_reading(self):
878         # XXX This is a bit icky, given the comment at the top of
879         # _read_ready().  Is it possible to evoke a deadlock?  I don't
880         # know, although it doesn't look like it; write() will still
881         # accept more data for the buffer and eventually the app will
882         # call resume_reading() again, and things will flow again.
883
884         if self._closing:
885             raise RuntimeError('Cannot pause_reading() when closing')
886         if self._paused:
887             raise RuntimeError('Already paused')
888         self._paused = True
889         self._loop.remove_reader(self._sock_fd)
890         if self._loop.get_debug():
891             logger.debug("%r pauses reading", self)
892
893     def resume_reading(self):
894         if not self._paused:
895             raise RuntimeError('Not paused')
896         self._paused = False
897         if self._closing:
898             return
899         self._loop.add_reader(self._sock_fd, self._read_ready)
900         if self._loop.get_debug():
901             logger.debug("%r resumes reading", self)
902
903     def _sock_recv(self):
904         return wrap_ssl_error(self._sock.recv, self.max_size)
905
906     def _read_ready(self):
907         if self._write_wants_read:
908             self._write_wants_read = False
909             self._write_ready()
910
911             if self._buffer:
912                 self._loop.add_writer(self._sock_fd, self._write_ready)
913
914         try:
915             if _SSL_REQUIRES_SELECT:
916                 rfds = (self._sock.fileno(),)
917                 rfds = select.select(rfds, (), (), 0.0)[0]
918                 if not rfds:
919                     # False alarm.
920                     return
921             data = wrap_error(self._sock_recv)
922         except (BlockingIOError, InterruptedError, SSLWantReadError):
923             pass
924         except SSLWantWriteError:
925             self._read_wants_write = True
926             self._loop.remove_reader(self._sock_fd)
927             self._loop.add_writer(self._sock_fd, self._write_ready)
928         except Exception as exc:
929             self._fatal_error(exc, 'Fatal read error on SSL transport')
930         else:
931             if data:
932                 self._protocol.data_received(data)
933             else:
934                 try:
935                     if self._loop.get_debug():
936                         logger.debug("%r received EOF", self)
937                     keep_open = self._protocol.eof_received()
938                     if keep_open:
939                         logger.warning('returning true from eof_received() '
940                                        'has no effect when using ssl')
941                 finally:
942                     self.close()
943
944     def _write_ready(self):
945         if self._read_wants_write:
946             self._read_wants_write = False
947             self._read_ready()
948
949             if not (self._paused or self._closing):
950                 self._loop.add_reader(self._sock_fd, self._read_ready)
951
952         if self._buffer:
953             data = flatten_bytes(self._buffer)
954             try:
955                 n = wrap_error(self._sock.send, data)
956             except (BlockingIOError, InterruptedError, SSLWantWriteError):
957                 n = 0
958             except SSLWantReadError:
959                 n = 0
960                 self._loop.remove_writer(self._sock_fd)
961                 self._write_wants_read = True
962             except Exception as exc:
963                 self._loop.remove_writer(self._sock_fd)
964                 del self._buffer[:]
965                 self._fatal_error(exc, 'Fatal write error on SSL transport')
966                 return
967
968             if n:
969                 del self._buffer[:n]
970
971         self._maybe_resume_protocol()  # May append to buffer.
972
973         if not self._buffer:
974             self._loop.remove_writer(self._sock_fd)
975             if self._closing:
976                 self._call_connection_lost(None)
977
978     def write(self, data):
979         data = flatten_bytes(data)
980         if not data:
981             return
982
983         if self._conn_lost:
984             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
985                 logger.warning('socket.send() raised exception.')
986             self._conn_lost += 1
987             return
988
989         if not self._buffer:
990             self._loop.add_writer(self._sock_fd, self._write_ready)
991
992         # Add it to the buffer.
993         self._buffer.extend(data)
994         self._maybe_pause_protocol()
995
996     def can_write_eof(self):
997         return False
998
999
1000 class _SelectorDatagramTransport(_SelectorTransport):
1001
1002     _buffer_factory = collections.deque
1003
1004     def __init__(self, loop, sock, protocol, address=None,
1005                  waiter=None, extra=None):
1006         super(_SelectorDatagramTransport, self).__init__(loop, sock,
1007                                                          protocol, extra)
1008         self._address = address
1009         self._loop.call_soon(self._protocol.connection_made, self)
1010         # only start reading when connection_made() has been called
1011         self._loop.call_soon(self._loop.add_reader,
1012                              self._sock_fd, self._read_ready)
1013         if waiter is not None:
1014             # only wake up the waiter when connection_made() has been called
1015             self._loop.call_soon(waiter._set_result_unless_cancelled, None)
1016
1017     def get_write_buffer_size(self):
1018         return sum(len(data) for data, _ in self._buffer)
1019
1020     def _read_ready(self):
1021         try:
1022             data, addr = wrap_error(self._sock.recvfrom, self.max_size)
1023         except (BlockingIOError, InterruptedError):
1024             pass
1025         except OSError as exc:
1026             self._protocol.error_received(exc)
1027         except Exception as exc:
1028             self._fatal_error(exc, 'Fatal read error on datagram transport')
1029         else:
1030             self._protocol.datagram_received(data, addr)
1031
1032     def sendto(self, data, addr=None):
1033         data = flatten_bytes(data)
1034         if not data:
1035             return
1036
1037         if self._address and addr not in (None, self._address):
1038             raise ValueError('Invalid address: must be None or %s' %
1039                              (self._address,))
1040
1041         if self._conn_lost and self._address:
1042             if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1043                 logger.warning('socket.send() raised exception.')
1044             self._conn_lost += 1
1045             return
1046
1047         if not self._buffer:
1048             # Attempt to send it right away first.
1049             try:
1050                 if self._address:
1051                     wrap_error(self._sock.send, data)
1052                 else:
1053                     wrap_error(self._sock.sendto, data, addr)
1054                 return
1055             except (BlockingIOError, InterruptedError):
1056                 self._loop.add_writer(self._sock_fd, self._sendto_ready)
1057             except OSError as exc:
1058                 self._protocol.error_received(exc)
1059                 return
1060             except Exception as exc:
1061                 self._fatal_error(exc,
1062                                   'Fatal write error on datagram transport')
1063                 return
1064
1065         # Ensure that what we buffer is immutable.
1066         self._buffer.append((bytes(data), addr))
1067         self._maybe_pause_protocol()
1068
1069     def _sendto_ready(self):
1070         while self._buffer:
1071             data, addr = self._buffer.popleft()
1072             try:
1073                 if self._address:
1074                     wrap_error(self._sock.send, data)
1075                 else:
1076                     wrap_error(self._sock.sendto, data, addr)
1077             except (BlockingIOError, InterruptedError):
1078                 self._buffer.appendleft((data, addr))  # Try again later.
1079                 break
1080             except OSError as exc:
1081                 self._protocol.error_received(exc)
1082                 return
1083             except Exception as exc:
1084                 self._fatal_error(exc,
1085                                   'Fatal write error on datagram transport')
1086                 return
1087
1088         self._maybe_resume_protocol()  # May append to buffer.
1089         if not self._buffer:
1090             self._loop.remove_writer(self._sock_fd)
1091             if self._closing:
1092                 self._call_connection_lost(None)