Cleanup for stow ---STOW
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / selector_events.py
diff --git a/.local/lib/python2.7/site-packages/trollius/selector_events.py b/.local/lib/python2.7/site-packages/trollius/selector_events.py
new file mode 100644 (file)
index 0000000..67ef26e
--- /dev/null
@@ -0,0 +1,1092 @@
+"""Event loop using a selector and related classes.
+
+A selector is a "notify-when-ready" multiplexer.  For a subclass which
+also includes support for signal handling, see the unix_events sub-module.
+"""
+
+__all__ = ['BaseSelectorEventLoop']
+
+import collections
+import errno
+import functools
+import socket
+import sys
+import warnings
+try:
+    import ssl
+    from .py3_ssl import wrap_ssl_error, SSLWantReadError, SSLWantWriteError
+except ImportError:  # pragma: no cover
+    ssl = None
+
+from . import base_events
+from . import compat
+from . import constants
+from . import events
+from . import futures
+from . import selectors
+from . import sslproto
+from . import transports
+from .compat import flatten_bytes
+from .coroutines import coroutine, From
+from .log import logger
+from .py33_exceptions import (wrap_error,
+    BlockingIOError, InterruptedError, ConnectionAbortedError, BrokenPipeError,
+    ConnectionResetError)
+
+# On Mac OS 10.6 with Python 2.6.1 or OpenIndiana 148 with Python 2.6.4,
+# _SelectorSslTransport._read_ready() hangs if the socket has no data.
+# Example: test_events.test_create_server_ssl()
+_SSL_REQUIRES_SELECT = (sys.version_info < (2, 6, 6))
+if _SSL_REQUIRES_SELECT:
+    import select
+
+
+def _get_socket_error(sock, address):
+    err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+    if err != 0:
+        # Jump to the except clause below.
+        raise OSError(err, 'Connect call failed %s' % (address,))
+
+
+def _test_selector_event(selector, fd, event):
+    # Test if the selector is monitoring 'event' events
+    # for the file descriptor 'fd'.
+    try:
+        key = selector.get_key(fd)
+    except KeyError:
+        return False
+    else:
+        return bool(key.events & event)
+
+
+class BaseSelectorEventLoop(base_events.BaseEventLoop):
+    """Selector event loop.
+
+    See events.EventLoop for API specification.
+    """
+
+    def __init__(self, selector=None):
+        super(BaseSelectorEventLoop, self).__init__()
+
+        if selector is None:
+            selector = selectors.DefaultSelector()
+        logger.debug('Using selector: %s', selector.__class__.__name__)
+        self._selector = selector
+        self._make_self_pipe()
+
+    def _make_socket_transport(self, sock, protocol, waiter=None,
+                               extra=None, server=None):
+        return _SelectorSocketTransport(self, sock, protocol, waiter,
+                                        extra, server)
+
+    def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
+                            server_side=False, server_hostname=None,
+                            extra=None, server=None):
+        if not sslproto._is_sslproto_available():
+            return self._make_legacy_ssl_transport(
+                rawsock, protocol, sslcontext, waiter,
+                server_side=server_side, server_hostname=server_hostname,
+                extra=extra, server=server)
+
+        ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
+                                            server_side, server_hostname)
+        _SelectorSocketTransport(self, rawsock, ssl_protocol,
+                                 extra=extra, server=server)
+        return ssl_protocol._app_transport
+
+    def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
+                                   waiter,
+                                   server_side=False, server_hostname=None,
+                                   extra=None, server=None):
+        # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
+        # on Python 3.4 and older, when ssl.MemoryBIO is not available.
+        return _SelectorSslTransport(
+            self, rawsock, protocol, sslcontext, waiter,
+            server_side, server_hostname, extra, server)
+
+    def _make_datagram_transport(self, sock, protocol,
+                                 address=None, waiter=None, extra=None):
+        return _SelectorDatagramTransport(self, sock, protocol,
+                                          address, waiter, extra)
+
+    def close(self):
+        if self.is_running():
+            raise RuntimeError("Cannot close a running event loop")
+        if self.is_closed():
+            return
+        self._close_self_pipe()
+        super(BaseSelectorEventLoop, self).close()
+        if self._selector is not None:
+            self._selector.close()
+            self._selector = None
+
+    def _socketpair(self):
+        raise NotImplementedError
+
+    def _close_self_pipe(self):
+        self.remove_reader(self._ssock.fileno())
+        self._ssock.close()
+        self._ssock = None
+        self._csock.close()
+        self._csock = None
+        self._internal_fds -= 1
+
+    def _make_self_pipe(self):
+        # A self-socket, really. :-)
+        self._ssock, self._csock = self._socketpair()
+        self._ssock.setblocking(False)
+        self._csock.setblocking(False)
+        self._internal_fds += 1
+        self.add_reader(self._ssock.fileno(), self._read_from_self)
+
+    def _process_self_data(self, data):
+        pass
+
+    def _read_from_self(self):
+        while True:
+            try:
+                data = wrap_error(self._ssock.recv, 4096)
+                if not data:
+                    break
+                self._process_self_data(data)
+            except InterruptedError:
+                continue
+            except BlockingIOError:
+                break
+
+    def _write_to_self(self):
+        # This may be called from a different thread, possibly after
+        # _close_self_pipe() has been called or even while it is
+        # running.  Guard for self._csock being None or closed.  When
+        # a socket is closed, send() raises OSError (with errno set to
+        # EBADF, but let's not rely on the exact error code).
+        csock = self._csock
+        if csock is not None:
+            try:
+                wrap_error(csock.send, b'\0')
+            except OSError:
+                if self._debug:
+                    logger.debug("Fail to write a null byte into the "
+                                 "self-pipe socket",
+                                 exc_info=True)
+
+    def _start_serving(self, protocol_factory, sock,
+                       sslcontext=None, server=None):
+        self.add_reader(sock.fileno(), self._accept_connection,
+                        protocol_factory, sock, sslcontext, server)
+
+    def _accept_connection(self, protocol_factory, sock,
+                           sslcontext=None, server=None):
+        try:
+            conn, addr = wrap_error(sock.accept)
+            if self._debug:
+                logger.debug("%r got a new connection from %r: %r",
+                             server, addr, conn)
+            conn.setblocking(False)
+        except (BlockingIOError, InterruptedError, ConnectionAbortedError):
+            pass  # False alarm.
+        except socket.error as exc:
+            # There's nowhere to send the error, so just log it.
+            if exc.errno in (errno.EMFILE, errno.ENFILE,
+                             errno.ENOBUFS, errno.ENOMEM):
+                # Some platforms (e.g. Linux keep reporting the FD as
+                # ready, so we remove the read handler temporarily.
+                # We'll try again in a while.
+                self.call_exception_handler({
+                    'message': 'socket.accept() out of system resource',
+                    'exception': exc,
+                    'socket': sock,
+                })
+                self.remove_reader(sock.fileno())
+                self.call_later(constants.ACCEPT_RETRY_DELAY,
+                                self._start_serving,
+                                protocol_factory, sock, sslcontext, server)
+            else:
+                raise  # The event loop will catch, log and ignore it.
+        else:
+            extra = {'peername': addr}
+            accept = self._accept_connection2(protocol_factory, conn, extra,
+                                              sslcontext, server)
+            self.create_task(accept)
+
+    @coroutine
+    def _accept_connection2(self, protocol_factory, conn, extra,
+                            sslcontext=None, server=None):
+        protocol = None
+        transport = None
+        try:
+            protocol = protocol_factory()
+            waiter = futures.Future(loop=self)
+            if sslcontext:
+                transport = self._make_ssl_transport(
+                    conn, protocol, sslcontext, waiter=waiter,
+                    server_side=True, extra=extra, server=server)
+            else:
+                transport = self._make_socket_transport(
+                    conn, protocol, waiter=waiter, extra=extra,
+                    server=server)
+
+            try:
+                yield From(waiter)
+            except:
+                transport.close()
+                raise
+
+            # It's now up to the protocol to handle the connection.
+        except Exception as exc:
+            if self._debug:
+                context = {
+                    'message': ('Error on transport creation '
+                                'for incoming connection'),
+                    'exception': exc,
+                }
+                if protocol is not None:
+                    context['protocol'] = protocol
+                if transport is not None:
+                    context['transport'] = transport
+                self.call_exception_handler(context)
+
+    def add_reader(self, fd, callback, *args):
+        """Add a reader callback."""
+        self._check_closed()
+        handle = events.Handle(callback, args, self)
+        try:
+            key = self._selector.get_key(fd)
+        except KeyError:
+            self._selector.register(fd, selectors.EVENT_READ,
+                                    (handle, None))
+        else:
+            mask, (reader, writer) = key.events, key.data
+            self._selector.modify(fd, mask | selectors.EVENT_READ,
+                                  (handle, writer))
+            if reader is not None:
+                reader.cancel()
+
+    def remove_reader(self, fd):
+        """Remove a reader callback."""
+        if self.is_closed():
+            return False
+        try:
+            key = self._selector.get_key(fd)
+        except KeyError:
+            return False
+        else:
+            mask, (reader, writer) = key.events, key.data
+            mask &= ~selectors.EVENT_READ
+            if not mask:
+                self._selector.unregister(fd)
+            else:
+                self._selector.modify(fd, mask, (None, writer))
+
+            if reader is not None:
+                reader.cancel()
+                return True
+            else:
+                return False
+
+    def add_writer(self, fd, callback, *args):
+        """Add a writer callback.."""
+        self._check_closed()
+        handle = events.Handle(callback, args, self)
+        try:
+            key = self._selector.get_key(fd)
+        except KeyError:
+            self._selector.register(fd, selectors.EVENT_WRITE,
+                                    (None, handle))
+        else:
+            mask, (reader, writer) = key.events, key.data
+            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
+                                  (reader, handle))
+            if writer is not None:
+                writer.cancel()
+
+    def remove_writer(self, fd):
+        """Remove a writer callback."""
+        if self.is_closed():
+            return False
+        try:
+            key = self._selector.get_key(fd)
+        except KeyError:
+            return False
+        else:
+            mask, (reader, writer) = key.events, key.data
+            # Remove both writer and connector.
+            mask &= ~selectors.EVENT_WRITE
+            if not mask:
+                self._selector.unregister(fd)
+            else:
+                self._selector.modify(fd, mask, (reader, None))
+
+            if writer is not None:
+                writer.cancel()
+                return True
+            else:
+                return False
+
+    def sock_recv(self, sock, n):
+        """Receive data from the socket.
+
+        The return value is a bytes object representing the data received.
+        The maximum amount of data to be received at once is specified by
+        nbytes.
+
+        This method is a coroutine.
+        """
+        if self._debug and sock.gettimeout() != 0:
+            raise ValueError("the socket must be non-blocking")
+        fut = futures.Future(loop=self)
+        self._sock_recv(fut, False, sock, n)
+        return fut
+
+    def _sock_recv(self, fut, registered, sock, n):
+        # _sock_recv() can add itself as an I/O callback if the operation can't
+        # be done immediately. Don't use it directly, call sock_recv().
+        fd = sock.fileno()
+        if registered:
+            # Remove the callback early.  It should be rare that the
+            # selector says the fd is ready but the call still returns
+            # EAGAIN, and I am willing to take a hit in that case in
+            # order to simplify the common case.
+            self.remove_reader(fd)
+        if fut.cancelled():
+            return
+        try:
+            data = wrap_error(sock.recv, n)
+        except (BlockingIOError, InterruptedError):
+            self.add_reader(fd, self._sock_recv, fut, True, sock, n)
+        except Exception as exc:
+            fut.set_exception(exc)
+        else:
+            fut.set_result(data)
+
+    def sock_sendall(self, sock, data):
+        """Send data to the socket.
+
+        The socket must be connected to a remote socket. This method continues
+        to send data from data until either all data has been sent or an
+        error occurs. None is returned on success. On error, an exception is
+        raised, and there is no way to determine how much data, if any, was
+        successfully processed by the receiving end of the connection.
+
+        This method is a coroutine.
+        """
+        if self._debug and sock.gettimeout() != 0:
+            raise ValueError("the socket must be non-blocking")
+        fut = futures.Future(loop=self)
+        if data:
+            self._sock_sendall(fut, False, sock, data)
+        else:
+            fut.set_result(None)
+        return fut
+
+    def _sock_sendall(self, fut, registered, sock, data):
+        fd = sock.fileno()
+
+        if registered:
+            self.remove_writer(fd)
+        if fut.cancelled():
+            return
+
+        try:
+            n = wrap_error(sock.send, data)
+        except (BlockingIOError, InterruptedError):
+            n = 0
+        except Exception as exc:
+            fut.set_exception(exc)
+            return
+
+        if n == len(data):
+            fut.set_result(None)
+        else:
+            if n:
+                data = data[n:]
+            self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
+
+    def sock_connect(self, sock, address):
+        """Connect to a remote socket at address.
+
+        The address must be already resolved to avoid the trap of hanging the
+        entire event loop when the address requires doing a DNS lookup. For
+        example, it must be an IP address, not an hostname, for AF_INET and
+        AF_INET6 address families. Use getaddrinfo() to resolve the hostname
+        asynchronously.
+
+        This method is a coroutine.
+        """
+        if self._debug and sock.gettimeout() != 0:
+            raise ValueError("the socket must be non-blocking")
+        fut = futures.Future(loop=self)
+        try:
+            if self._debug:
+                base_events._check_resolved_address(sock, address)
+        except ValueError as err:
+            fut.set_exception(err)
+        else:
+            self._sock_connect(fut, sock, address)
+        return fut
+
+    def _sock_connect(self, fut, sock, address):
+        fd = sock.fileno()
+        try:
+            wrap_error(sock.connect, address)
+        except (BlockingIOError, InterruptedError):
+            # Issue #23618: When the C function connect() fails with EINTR, the
+            # connection runs in background. We have to wait until the socket
+            # becomes writable to be notified when the connection succeed or
+            # fails.
+            fut.add_done_callback(functools.partial(self._sock_connect_done,
+                                                    fd))
+            self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
+        except Exception as exc:
+            fut.set_exception(exc)
+        else:
+            fut.set_result(None)
+
+    def _sock_connect_done(self, fd, fut):
+        self.remove_writer(fd)
+
+    def _sock_connect_cb(self, fut, sock, address):
+        if fut.cancelled():
+            return
+
+        try:
+            wrap_error(_get_socket_error, sock, address)
+        except (BlockingIOError, InterruptedError):
+            # socket is still registered, the callback will be retried later
+            pass
+        except Exception as exc:
+            fut.set_exception(exc)
+        else:
+            fut.set_result(None)
+
+    def sock_accept(self, sock):
+        """Accept a connection.
+
+        The socket must be bound to an address and listening for connections.
+        The return value is a pair (conn, address) where conn is a new socket
+        object usable to send and receive data on the connection, and address
+        is the address bound to the socket on the other end of the connection.
+
+        This method is a coroutine.
+        """
+        if self._debug and sock.gettimeout() != 0:
+            raise ValueError("the socket must be non-blocking")
+        fut = futures.Future(loop=self)
+        self._sock_accept(fut, False, sock)
+        return fut
+
+    def _sock_accept(self, fut, registered, sock):
+        fd = sock.fileno()
+        if registered:
+            self.remove_reader(fd)
+        if fut.cancelled():
+            return
+        try:
+            conn, address = wrap_error(sock.accept)
+            conn.setblocking(False)
+        except (BlockingIOError, InterruptedError):
+            self.add_reader(fd, self._sock_accept, fut, True, sock)
+        except Exception as exc:
+            fut.set_exception(exc)
+        else:
+            fut.set_result((conn, address))
+
+    def _process_events(self, event_list):
+        for key, mask in event_list:
+            fileobj, (reader, writer) = key.fileobj, key.data
+            if mask & selectors.EVENT_READ and reader is not None:
+                if reader._cancelled:
+                    self.remove_reader(fileobj)
+                else:
+                    self._add_callback(reader)
+            if mask & selectors.EVENT_WRITE and writer is not None:
+                if writer._cancelled:
+                    self.remove_writer(fileobj)
+                else:
+                    self._add_callback(writer)
+
+    def _stop_serving(self, sock):
+        self.remove_reader(sock.fileno())
+        sock.close()
+
+
+class _SelectorTransport(transports._FlowControlMixin,
+                         transports.Transport):
+
+    max_size = 256 * 1024  # Buffer size passed to recv().
+
+    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
+
+    # Attribute used in the destructor: it must be set even if the constructor
+    # is not called (see _SelectorSslTransport which may start by raising an
+    # exception)
+    _sock = None
+
+    def __init__(self, loop, sock, protocol, extra=None, server=None):
+        super(_SelectorTransport, self).__init__(extra, loop)
+        self._extra['socket'] = sock
+        self._extra['sockname'] = sock.getsockname()
+        if 'peername' not in self._extra:
+            try:
+                self._extra['peername'] = sock.getpeername()
+            except socket.error:
+                self._extra['peername'] = None
+        self._sock = sock
+        self._sock_fd = sock.fileno()
+        self._protocol = protocol
+        self._protocol_connected = True
+        self._server = server
+        self._buffer = self._buffer_factory()
+        self._conn_lost = 0  # Set when call to connection_lost scheduled.
+        self._closing = False  # Set when close() called.
+        if self._server is not None:
+            self._server._attach()
+
+    def __repr__(self):
+        info = [self.__class__.__name__]
+        if self._sock is None:
+            info.append('closed')
+        elif self._closing:
+            info.append('closing')
+        info.append('fd=%s' % self._sock_fd)
+        # test if the transport was closed
+        if self._loop is not None and not self._loop.is_closed():
+            polling = _test_selector_event(self._loop._selector,
+                                           self._sock_fd, selectors.EVENT_READ)
+            if polling:
+                info.append('read=polling')
+            else:
+                info.append('read=idle')
+
+            polling = _test_selector_event(self._loop._selector,
+                                           self._sock_fd,
+                                           selectors.EVENT_WRITE)
+            if polling:
+                state = 'polling'
+            else:
+                state = 'idle'
+
+            bufsize = self.get_write_buffer_size()
+            info.append('write=<%s, bufsize=%s>' % (state, bufsize))
+        return '<%s>' % ' '.join(info)
+
+    def abort(self):
+        self._force_close(None)
+
+    def close(self):
+        if self._closing:
+            return
+        self._closing = True
+        self._loop.remove_reader(self._sock_fd)
+        if not self._buffer:
+            self._conn_lost += 1
+            self._loop.call_soon(self._call_connection_lost, None)
+
+    # On Python 3.3 and older, objects with a destructor part of a reference
+    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
+    # to the PEP 442.
+    if compat.PY34:
+        def __del__(self):
+            if self._sock is not None:
+                warnings.warn("unclosed transport %r" % self, ResourceWarning)
+                self._sock.close()
+
+    def _fatal_error(self, exc, message='Fatal error on transport'):
+        # Should be called from exception handler only.
+        if isinstance(exc, (BrokenPipeError,
+                            ConnectionResetError, ConnectionAbortedError)):
+            if self._loop.get_debug():
+                logger.debug("%r: %s", self, message, exc_info=True)
+        else:
+            self._loop.call_exception_handler({
+                'message': message,
+                'exception': exc,
+                'transport': self,
+                'protocol': self._protocol,
+            })
+        self._force_close(exc)
+
+    def _force_close(self, exc):
+        if self._conn_lost:
+            return
+        if self._buffer:
+            del self._buffer[:]
+            self._loop.remove_writer(self._sock_fd)
+        if not self._closing:
+            self._closing = True
+            self._loop.remove_reader(self._sock_fd)
+        self._conn_lost += 1
+        self._loop.call_soon(self._call_connection_lost, exc)
+
+    def _call_connection_lost(self, exc):
+        try:
+            if self._protocol_connected:
+                self._protocol.connection_lost(exc)
+        finally:
+            self._sock.close()
+            self._sock = None
+            self._protocol = None
+            self._loop = None
+            server = self._server
+            if server is not None:
+                server._detach()
+                self._server = None
+
+    def get_write_buffer_size(self):
+        return len(self._buffer)
+
+
+class _SelectorSocketTransport(_SelectorTransport):
+
+    def __init__(self, loop, sock, protocol, waiter=None,
+                 extra=None, server=None):
+        super(_SelectorSocketTransport, self).__init__(loop, sock, protocol, extra, server)
+        self._eof = False
+        self._paused = False
+
+        self._loop.call_soon(self._protocol.connection_made, self)
+        # only start reading when connection_made() has been called
+        self._loop.call_soon(self._loop.add_reader,
+                             self._sock_fd, self._read_ready)
+        if waiter is not None:
+            # only wake up the waiter when connection_made() has been called
+            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
+
+    def pause_reading(self):
+        if self._closing:
+            raise RuntimeError('Cannot pause_reading() when closing')
+        if self._paused:
+            raise RuntimeError('Already paused')
+        self._paused = True
+        self._loop.remove_reader(self._sock_fd)
+        if self._loop.get_debug():
+            logger.debug("%r pauses reading", self)
+
+    def resume_reading(self):
+        if not self._paused:
+            raise RuntimeError('Not paused')
+        self._paused = False
+        if self._closing:
+            return
+        self._loop.add_reader(self._sock_fd, self._read_ready)
+        if self._loop.get_debug():
+            logger.debug("%r resumes reading", self)
+
+    def _read_ready(self):
+        try:
+            data = wrap_error(self._sock.recv, self.max_size)
+        except (BlockingIOError, InterruptedError):
+            pass
+        except Exception as exc:
+            self._fatal_error(exc, 'Fatal read error on socket transport')
+        else:
+            if data:
+                self._protocol.data_received(data)
+            else:
+                if self._loop.get_debug():
+                    logger.debug("%r received EOF", self)
+                keep_open = self._protocol.eof_received()
+                if keep_open:
+                    # We're keeping the connection open so the
+                    # protocol can write more, but we still can't
+                    # receive more, so remove the reader callback.
+                    self._loop.remove_reader(self._sock_fd)
+                else:
+                    self.close()
+
+    def write(self, data):
+        data = flatten_bytes(data)
+        if self._eof:
+            raise RuntimeError('Cannot call write() after write_eof()')
+        if not data:
+            return
+
+        if self._conn_lost:
+            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+                logger.warning('socket.send() raised exception.')
+            self._conn_lost += 1
+            return
+
+        if not self._buffer:
+            # Optimization: try to send now.
+            try:
+                n = wrap_error(self._sock.send, data)
+            except (BlockingIOError, InterruptedError):
+                pass
+            except Exception as exc:
+                self._fatal_error(exc, 'Fatal write error on socket transport')
+                return
+            else:
+                data = data[n:]
+                if not data:
+                    return
+            # Not all was written; register write handler.
+            self._loop.add_writer(self._sock_fd, self._write_ready)
+
+        # Add it to the buffer.
+        self._buffer.extend(data)
+        self._maybe_pause_protocol()
+
+    def _write_ready(self):
+        assert self._buffer, 'Data should not be empty'
+
+        data = flatten_bytes(self._buffer)
+        try:
+            n = wrap_error(self._sock.send, data)
+        except (BlockingIOError, InterruptedError):
+            pass
+        except Exception as exc:
+            self._loop.remove_writer(self._sock_fd)
+            del self._buffer[:]
+            self._fatal_error(exc, 'Fatal write error on socket transport')
+        else:
+            if n:
+                del self._buffer[:n]
+            self._maybe_resume_protocol()  # May append to buffer.
+            if not self._buffer:
+                self._loop.remove_writer(self._sock_fd)
+                if self._closing:
+                    self._call_connection_lost(None)
+                elif self._eof:
+                    self._sock.shutdown(socket.SHUT_WR)
+
+    def write_eof(self):
+        if self._eof:
+            return
+        self._eof = True
+        if not self._buffer:
+            self._sock.shutdown(socket.SHUT_WR)
+
+    def can_write_eof(self):
+        return True
+
+
+class _SelectorSslTransport(_SelectorTransport):
+
+    _buffer_factory = bytearray
+
+    def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
+                 server_side=False, server_hostname=None,
+                 extra=None, server=None):
+        if ssl is None:
+            raise RuntimeError('stdlib ssl module not available')
+
+        if not sslcontext:
+            sslcontext = sslproto._create_transport_context(server_side, server_hostname)
+
+        wrap_kwargs = {
+            'server_side': server_side,
+            'do_handshake_on_connect': False,
+        }
+        if server_hostname and not server_side:
+            wrap_kwargs['server_hostname'] = server_hostname
+        sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
+
+        super(_SelectorSslTransport, self).__init__(loop, sslsock, protocol, extra, server)
+        # the protocol connection is only made after the SSL handshake
+        self._protocol_connected = False
+
+        self._server_hostname = server_hostname
+        self._waiter = waiter
+        self._sslcontext = sslcontext
+        self._paused = False
+
+        # SSL-specific extra info.  (peercert is set later)
+        self._extra.update(sslcontext=sslcontext)
+
+        if self._loop.get_debug():
+            logger.debug("%r starts SSL handshake", self)
+            start_time = self._loop.time()
+        else:
+            start_time = None
+        self._on_handshake(start_time)
+
+    def _wakeup_waiter(self, exc=None):
+        if self._waiter is None:
+            return
+        if not self._waiter.cancelled():
+            if exc is not None:
+                self._waiter.set_exception(exc)
+            else:
+                self._waiter.set_result(None)
+        self._waiter = None
+
+    def _on_handshake(self, start_time):
+        try:
+            wrap_ssl_error(self._sock.do_handshake)
+        except SSLWantReadError:
+            self._loop.add_reader(self._sock_fd,
+                                  self._on_handshake, start_time)
+            return
+        except SSLWantWriteError:
+            self._loop.add_writer(self._sock_fd,
+                                  self._on_handshake, start_time)
+            return
+        except BaseException as exc:
+            if self._loop.get_debug():
+                logger.warning("%r: SSL handshake failed",
+                               self, exc_info=True)
+            self._loop.remove_reader(self._sock_fd)
+            self._loop.remove_writer(self._sock_fd)
+            self._sock.close()
+            self._wakeup_waiter(exc)
+            if isinstance(exc, Exception):
+                return
+            else:
+                raise
+
+        self._loop.remove_reader(self._sock_fd)
+        self._loop.remove_writer(self._sock_fd)
+
+        peercert = self._sock.getpeercert()
+        if not hasattr(self._sslcontext, 'check_hostname'):
+            # Verify hostname if requested, Python 3.4+ uses check_hostname
+            # and checks the hostname in do_handshake()
+            if (self._server_hostname and
+                self._sslcontext.verify_mode != ssl.CERT_NONE):
+                try:
+                    ssl.match_hostname(peercert, self._server_hostname)
+                except Exception as exc:
+                    if self._loop.get_debug():
+                        logger.warning("%r: SSL handshake failed "
+                                       "on matching the hostname",
+                                       self, exc_info=True)
+                    self._sock.close()
+                    self._wakeup_waiter(exc)
+                    return
+
+        # Add extra info that becomes available after handshake.
+        self._extra.update(peercert=peercert,
+                           cipher=self._sock.cipher(),
+                           )
+        if hasattr(self._sock, 'compression'):
+            self._extra['compression'] = self._sock.compression()
+
+        self._read_wants_write = False
+        self._write_wants_read = False
+        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._protocol_connected = True
+        self._loop.call_soon(self._protocol.connection_made, self)
+        # only wake up the waiter when connection_made() has been called
+        self._loop.call_soon(self._wakeup_waiter)
+
+        if self._loop.get_debug():
+            dt = self._loop.time() - start_time
+            logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
+
+    def pause_reading(self):
+        # XXX This is a bit icky, given the comment at the top of
+        # _read_ready().  Is it possible to evoke a deadlock?  I don't
+        # know, although it doesn't look like it; write() will still
+        # accept more data for the buffer and eventually the app will
+        # call resume_reading() again, and things will flow again.
+
+        if self._closing:
+            raise RuntimeError('Cannot pause_reading() when closing')
+        if self._paused:
+            raise RuntimeError('Already paused')
+        self._paused = True
+        self._loop.remove_reader(self._sock_fd)
+        if self._loop.get_debug():
+            logger.debug("%r pauses reading", self)
+
+    def resume_reading(self):
+        if not self._paused:
+            raise RuntimeError('Not paused')
+        self._paused = False
+        if self._closing:
+            return
+        self._loop.add_reader(self._sock_fd, self._read_ready)
+        if self._loop.get_debug():
+            logger.debug("%r resumes reading", self)
+
+    def _sock_recv(self):
+        return wrap_ssl_error(self._sock.recv, self.max_size)
+
+    def _read_ready(self):
+        if self._write_wants_read:
+            self._write_wants_read = False
+            self._write_ready()
+
+            if self._buffer:
+                self._loop.add_writer(self._sock_fd, self._write_ready)
+
+        try:
+            if _SSL_REQUIRES_SELECT:
+                rfds = (self._sock.fileno(),)
+                rfds = select.select(rfds, (), (), 0.0)[0]
+                if not rfds:
+                    # False alarm.
+                    return
+            data = wrap_error(self._sock_recv)
+        except (BlockingIOError, InterruptedError, SSLWantReadError):
+            pass
+        except SSLWantWriteError:
+            self._read_wants_write = True
+            self._loop.remove_reader(self._sock_fd)
+            self._loop.add_writer(self._sock_fd, self._write_ready)
+        except Exception as exc:
+            self._fatal_error(exc, 'Fatal read error on SSL transport')
+        else:
+            if data:
+                self._protocol.data_received(data)
+            else:
+                try:
+                    if self._loop.get_debug():
+                        logger.debug("%r received EOF", self)
+                    keep_open = self._protocol.eof_received()
+                    if keep_open:
+                        logger.warning('returning true from eof_received() '
+                                       'has no effect when using ssl')
+                finally:
+                    self.close()
+
+    def _write_ready(self):
+        if self._read_wants_write:
+            self._read_wants_write = False
+            self._read_ready()
+
+            if not (self._paused or self._closing):
+                self._loop.add_reader(self._sock_fd, self._read_ready)
+
+        if self._buffer:
+            data = flatten_bytes(self._buffer)
+            try:
+                n = wrap_error(self._sock.send, data)
+            except (BlockingIOError, InterruptedError, SSLWantWriteError):
+                n = 0
+            except SSLWantReadError:
+                n = 0
+                self._loop.remove_writer(self._sock_fd)
+                self._write_wants_read = True
+            except Exception as exc:
+                self._loop.remove_writer(self._sock_fd)
+                del self._buffer[:]
+                self._fatal_error(exc, 'Fatal write error on SSL transport')
+                return
+
+            if n:
+                del self._buffer[:n]
+
+        self._maybe_resume_protocol()  # May append to buffer.
+
+        if not self._buffer:
+            self._loop.remove_writer(self._sock_fd)
+            if self._closing:
+                self._call_connection_lost(None)
+
+    def write(self, data):
+        data = flatten_bytes(data)
+        if not data:
+            return
+
+        if self._conn_lost:
+            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+                logger.warning('socket.send() raised exception.')
+            self._conn_lost += 1
+            return
+
+        if not self._buffer:
+            self._loop.add_writer(self._sock_fd, self._write_ready)
+
+        # Add it to the buffer.
+        self._buffer.extend(data)
+        self._maybe_pause_protocol()
+
+    def can_write_eof(self):
+        return False
+
+
+class _SelectorDatagramTransport(_SelectorTransport):
+
+    _buffer_factory = collections.deque
+
+    def __init__(self, loop, sock, protocol, address=None,
+                 waiter=None, extra=None):
+        super(_SelectorDatagramTransport, self).__init__(loop, sock,
+                                                         protocol, extra)
+        self._address = address
+        self._loop.call_soon(self._protocol.connection_made, self)
+        # only start reading when connection_made() has been called
+        self._loop.call_soon(self._loop.add_reader,
+                             self._sock_fd, self._read_ready)
+        if waiter is not None:
+            # only wake up the waiter when connection_made() has been called
+            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
+
+    def get_write_buffer_size(self):
+        return sum(len(data) for data, _ in self._buffer)
+
+    def _read_ready(self):
+        try:
+            data, addr = wrap_error(self._sock.recvfrom, self.max_size)
+        except (BlockingIOError, InterruptedError):
+            pass
+        except OSError as exc:
+            self._protocol.error_received(exc)
+        except Exception as exc:
+            self._fatal_error(exc, 'Fatal read error on datagram transport')
+        else:
+            self._protocol.datagram_received(data, addr)
+
+    def sendto(self, data, addr=None):
+        data = flatten_bytes(data)
+        if not data:
+            return
+
+        if self._address and addr not in (None, self._address):
+            raise ValueError('Invalid address: must be None or %s' %
+                             (self._address,))
+
+        if self._conn_lost and self._address:
+            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+                logger.warning('socket.send() raised exception.')
+            self._conn_lost += 1
+            return
+
+        if not self._buffer:
+            # Attempt to send it right away first.
+            try:
+                if self._address:
+                    wrap_error(self._sock.send, data)
+                else:
+                    wrap_error(self._sock.sendto, data, addr)
+                return
+            except (BlockingIOError, InterruptedError):
+                self._loop.add_writer(self._sock_fd, self._sendto_ready)
+            except OSError as exc:
+                self._protocol.error_received(exc)
+                return
+            except Exception as exc:
+                self._fatal_error(exc,
+                                  'Fatal write error on datagram transport')
+                return
+
+        # Ensure that what we buffer is immutable.
+        self._buffer.append((bytes(data), addr))
+        self._maybe_pause_protocol()
+
+    def _sendto_ready(self):
+        while self._buffer:
+            data, addr = self._buffer.popleft()
+            try:
+                if self._address:
+                    wrap_error(self._sock.send, data)
+                else:
+                    wrap_error(self._sock.sendto, data, addr)
+            except (BlockingIOError, InterruptedError):
+                self._buffer.appendleft((data, addr))  # Try again later.
+                break
+            except OSError as exc:
+                self._protocol.error_received(exc)
+                return
+            except Exception as exc:
+                self._fatal_error(exc,
+                                  'Fatal write error on datagram transport')
+                return
+
+        self._maybe_resume_protocol()  # May append to buffer.
+        if not self._buffer:
+            self._loop.remove_writer(self._sock_fd)
+            if self._closing:
+                self._call_connection_lost(None)