goodbye stuff
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / windows_events.py
diff --git a/.local/lib/python2.7/site-packages/trollius/windows_events.py b/.local/lib/python2.7/site-packages/trollius/windows_events.py
deleted file mode 100644 (file)
index 3102d23..0000000
+++ /dev/null
@@ -1,777 +0,0 @@
-"""Selector and proactor event loops for Windows."""
-
-import errno
-import math
-import socket
-import struct
-import weakref
-
-from . import events
-from . import base_subprocess
-from . import futures
-from . import proactor_events
-from . import py33_winapi as _winapi
-from . import selector_events
-from . import tasks
-from . import windows_utils
-from . import _overlapped
-from .coroutines import coroutine, From, Return
-from .log import logger
-from .py33_exceptions import wrap_error, BrokenPipeError, ConnectionResetError
-
-
-__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
-           'DefaultEventLoopPolicy',
-           ]
-
-
-NULL = 0
-INFINITE = 0xffffffff
-ERROR_CONNECTION_REFUSED = 1225
-ERROR_CONNECTION_ABORTED = 1236
-
-# Initial delay in seconds for connect_pipe() before retrying to connect
-CONNECT_PIPE_INIT_DELAY = 0.001
-
-# Maximum delay in seconds for connect_pipe() before retrying to connect
-CONNECT_PIPE_MAX_DELAY = 0.100
-
-
-class _OverlappedFuture(futures.Future):
-    """Subclass of Future which represents an overlapped operation.
-
-    Cancelling it will immediately cancel the overlapped operation.
-    """
-
-    def __init__(self, ov, loop=None):
-        super(_OverlappedFuture, self).__init__(loop=loop)
-        if self._source_traceback:
-            del self._source_traceback[-1]
-        self._ov = ov
-
-    def _repr_info(self):
-        info = super(_OverlappedFuture, self)._repr_info()
-        if self._ov is not None:
-            state = 'pending' if self._ov.pending else 'completed'
-            info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
-        return info
-
-    def _cancel_overlapped(self):
-        if self._ov is None:
-            return
-        try:
-            self._ov.cancel()
-        except OSError as exc:
-            context = {
-                'message': 'Cancelling an overlapped future failed',
-                'exception': exc,
-                'future': self,
-            }
-            if self._source_traceback:
-                context['source_traceback'] = self._source_traceback
-            self._loop.call_exception_handler(context)
-        self._ov = None
-
-    def cancel(self):
-        self._cancel_overlapped()
-        return super(_OverlappedFuture, self).cancel()
-
-    def set_exception(self, exception):
-        super(_OverlappedFuture, self).set_exception(exception)
-        self._cancel_overlapped()
-
-    def set_result(self, result):
-        super(_OverlappedFuture, self).set_result(result)
-        self._ov = None
-
-
-class _BaseWaitHandleFuture(futures.Future):
-    """Subclass of Future which represents a wait handle."""
-
-    def __init__(self, ov, handle, wait_handle, loop=None):
-        super(_BaseWaitHandleFuture, self).__init__(loop=loop)
-        if self._source_traceback:
-            del self._source_traceback[-1]
-        # Keep a reference to the Overlapped object to keep it alive until the
-        # wait is unregistered
-        self._ov = ov
-        self._handle = handle
-        self._wait_handle = wait_handle
-
-        # Should we call UnregisterWaitEx() if the wait completes
-        # or is cancelled?
-        self._registered = True
-
-    def _poll(self):
-        # non-blocking wait: use a timeout of 0 millisecond
-        return (_winapi.WaitForSingleObject(self._handle, 0) ==
-                _winapi.WAIT_OBJECT_0)
-
-    def _repr_info(self):
-        info = super(_BaseWaitHandleFuture, self)._repr_info()
-        info.append('handle=%#x' % self._handle)
-        if self._handle is not None:
-            state = 'signaled' if self._poll() else 'waiting'
-            info.append(state)
-        if self._wait_handle is not None:
-            info.append('wait_handle=%#x' % self._wait_handle)
-        return info
-
-    def _unregister_wait_cb(self, fut):
-        # The wait was unregistered: it's not safe to destroy the Overlapped
-        # object
-        self._ov = None
-
-    def _unregister_wait(self):
-        if not self._registered:
-            return
-        self._registered = False
-
-        wait_handle = self._wait_handle
-        self._wait_handle = None
-        try:
-            _overlapped.UnregisterWait(wait_handle)
-        except OSError as exc:
-            if exc.winerror != _overlapped.ERROR_IO_PENDING:
-                context = {
-                    'message': 'Failed to unregister the wait handle',
-                    'exception': exc,
-                    'future': self,
-                }
-                if self._source_traceback:
-                    context['source_traceback'] = self._source_traceback
-                self._loop.call_exception_handler(context)
-                return
-            # ERROR_IO_PENDING means that the unregister is pending
-
-        self._unregister_wait_cb(None)
-
-    def cancel(self):
-        self._unregister_wait()
-        return super(_BaseWaitHandleFuture, self).cancel()
-
-    def set_exception(self, exception):
-        self._unregister_wait()
-        super(_BaseWaitHandleFuture, self).set_exception(exception)
-
-    def set_result(self, result):
-        self._unregister_wait()
-        super(_BaseWaitHandleFuture, self).set_result(result)
-
-
-class _WaitCancelFuture(_BaseWaitHandleFuture):
-    """Subclass of Future which represents a wait for the cancellation of a
-    _WaitHandleFuture using an event.
-    """
-
-    def __init__(self, ov, event, wait_handle, loop=None):
-        super(_WaitCancelFuture, self).__init__(ov, event, wait_handle,
-                                                loop=loop)
-
-        self._done_callback = None
-
-    def cancel(self):
-        raise RuntimeError("_WaitCancelFuture must not be cancelled")
-
-    def _schedule_callbacks(self):
-        super(_WaitCancelFuture, self)._schedule_callbacks()
-        if self._done_callback is not None:
-            self._done_callback(self)
-
-
-class _WaitHandleFuture(_BaseWaitHandleFuture):
-    def __init__(self, ov, handle, wait_handle, proactor, loop=None):
-        super(_WaitHandleFuture, self).__init__(ov, handle, wait_handle,
-                                                loop=loop)
-        self._proactor = proactor
-        self._unregister_proactor = True
-        self._event = _overlapped.CreateEvent(None, True, False, None)
-        self._event_fut = None
-
-    def _unregister_wait_cb(self, fut):
-        if self._event is not None:
-            _winapi.CloseHandle(self._event)
-            self._event = None
-            self._event_fut = None
-
-        # If the wait was cancelled, the wait may never be signalled, so
-        # it's required to unregister it. Otherwise, IocpProactor.close() will
-        # wait forever for an event which will never come.
-        #
-        # If the IocpProactor already received the event, it's safe to call
-        # _unregister() because we kept a reference to the Overlapped object
-        # which is used as an unique key.
-        self._proactor._unregister(self._ov)
-        self._proactor = None
-
-        super(_WaitHandleFuture, self)._unregister_wait_cb(fut)
-
-    def _unregister_wait(self):
-        if not self._registered:
-            return
-        self._registered = False
-
-        wait_handle = self._wait_handle
-        self._wait_handle = None
-        try:
-            _overlapped.UnregisterWaitEx(wait_handle, self._event)
-        except OSError as exc:
-            if exc.winerror != _overlapped.ERROR_IO_PENDING:
-                context = {
-                    'message': 'Failed to unregister the wait handle',
-                    'exception': exc,
-                    'future': self,
-                }
-                if self._source_traceback:
-                    context['source_traceback'] = self._source_traceback
-                self._loop.call_exception_handler(context)
-                return
-            # ERROR_IO_PENDING is not an error, the wait was unregistered
-
-        self._event_fut = self._proactor._wait_cancel(self._event,
-                                                      self._unregister_wait_cb)
-
-
-class PipeServer(object):
-    """Class representing a pipe server.
-
-    This is much like a bound, listening socket.
-    """
-    def __init__(self, address):
-        self._address = address
-        self._free_instances = weakref.WeakSet()
-        # initialize the pipe attribute before calling _server_pipe_handle()
-        # because this function can raise an exception and the destructor calls
-        # the close() method
-        self._pipe = None
-        self._accept_pipe_future = None
-        self._pipe = self._server_pipe_handle(True)
-
-    def _get_unconnected_pipe(self):
-        # Create new instance and return previous one.  This ensures
-        # that (until the server is closed) there is always at least
-        # one pipe handle for address.  Therefore if a client attempt
-        # to connect it will not fail with FileNotFoundError.
-        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
-        return tmp
-
-    def _server_pipe_handle(self, first):
-        # Return a wrapper for a new pipe handle.
-        if self.closed():
-            return None
-        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
-        if first:
-            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
-        h = wrap_error(_winapi.CreateNamedPipe,
-            self._address, flags,
-            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
-            _winapi.PIPE_WAIT,
-            _winapi.PIPE_UNLIMITED_INSTANCES,
-            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
-            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
-        pipe = windows_utils.PipeHandle(h)
-        self._free_instances.add(pipe)
-        return pipe
-
-    def closed(self):
-        return (self._address is None)
-
-    def close(self):
-        if self._accept_pipe_future is not None:
-            self._accept_pipe_future.cancel()
-            self._accept_pipe_future = None
-        # Close all instances which have not been connected to by a client.
-        if self._address is not None:
-            for pipe in self._free_instances:
-                pipe.close()
-            self._pipe = None
-            self._address = None
-            self._free_instances.clear()
-
-    __del__ = close
-
-
-class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
-    """Windows version of selector event loop."""
-
-    def _socketpair(self):
-        return windows_utils.socketpair()
-
-
-class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
-    """Windows version of proactor event loop using IOCP."""
-
-    def __init__(self, proactor=None):
-        if proactor is None:
-            proactor = IocpProactor()
-        super(ProactorEventLoop, self).__init__(proactor)
-
-    def _socketpair(self):
-        return windows_utils.socketpair()
-
-    @coroutine
-    def create_pipe_connection(self, protocol_factory, address):
-        f = self._proactor.connect_pipe(address)
-        pipe = yield From(f)
-        protocol = protocol_factory()
-        trans = self._make_duplex_pipe_transport(pipe, protocol,
-                                                 extra={'addr': address})
-        raise Return(trans, protocol)
-
-    @coroutine
-    def start_serving_pipe(self, protocol_factory, address):
-        server = PipeServer(address)
-
-        def loop_accept_pipe(f=None):
-            pipe = None
-            try:
-                if f:
-                    pipe = f.result()
-                    server._free_instances.discard(pipe)
-
-                    if server.closed():
-                        # A client connected before the server was closed:
-                        # drop the client (close the pipe) and exit
-                        pipe.close()
-                        return
-
-                    protocol = protocol_factory()
-                    self._make_duplex_pipe_transport(
-                        pipe, protocol, extra={'addr': address})
-
-                pipe = server._get_unconnected_pipe()
-                if pipe is None:
-                    return
-
-                f = self._proactor.accept_pipe(pipe)
-            except OSError as exc:
-                if pipe and pipe.fileno() != -1:
-                    self.call_exception_handler({
-                        'message': 'Pipe accept failed',
-                        'exception': exc,
-                        'pipe': pipe,
-                    })
-                    pipe.close()
-                elif self._debug:
-                    logger.warning("Accept pipe failed on pipe %r",
-                                   pipe, exc_info=True)
-            except futures.CancelledError:
-                if pipe:
-                    pipe.close()
-            else:
-                server._accept_pipe_future = f
-                f.add_done_callback(loop_accept_pipe)
-
-        self.call_soon(loop_accept_pipe)
-        return [server]
-
-    @coroutine
-    def _make_subprocess_transport(self, protocol, args, shell,
-                                   stdin, stdout, stderr, bufsize,
-                                   extra=None, **kwargs):
-        waiter = futures.Future(loop=self)
-        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
-                                             stdin, stdout, stderr, bufsize,
-                                             waiter=waiter, extra=extra,
-                                             **kwargs)
-        try:
-            yield From(waiter)
-        except Exception as exc:
-            # Workaround CPython bug #23353: using yield/yield-from in an
-            # except block of a generator doesn't clear properly sys.exc_info()
-            err = exc
-        else:
-            err = None
-
-        if err is not None:
-            transp.close()
-            yield From(transp._wait())
-            raise err
-
-        raise Return(transp)
-
-
-class IocpProactor(object):
-    """Proactor implementation using IOCP."""
-
-    def __init__(self, concurrency=0xffffffff):
-        self._loop = None
-        self._results = []
-        self._iocp = _overlapped.CreateIoCompletionPort(
-            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
-        self._cache = {}
-        self._registered = weakref.WeakSet()
-        self._unregistered = []
-        self._stopped_serving = weakref.WeakSet()
-
-    def __repr__(self):
-        return ('<%s overlapped#=%s result#=%s>'
-                % (self.__class__.__name__, len(self._cache),
-                   len(self._results)))
-
-    def set_loop(self, loop):
-        self._loop = loop
-
-    def select(self, timeout=None):
-        if not self._results:
-            self._poll(timeout)
-        tmp = self._results
-        self._results = []
-        return tmp
-
-    def _result(self, value):
-        fut = futures.Future(loop=self._loop)
-        fut.set_result(value)
-        return fut
-
-    def recv(self, conn, nbytes, flags=0):
-        self._register_with_iocp(conn)
-        ov = _overlapped.Overlapped(NULL)
-        try:
-            if isinstance(conn, socket.socket):
-                wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags)
-            else:
-                wrap_error(ov.ReadFile, conn.fileno(), nbytes)
-        except BrokenPipeError:
-            return self._result(b'')
-
-        def finish_recv(trans, key, ov):
-            try:
-                return wrap_error(ov.getresult)
-            except WindowsError as exc:
-                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
-                    raise ConnectionResetError(*exc.args)
-                else:
-                    raise
-
-        return self._register(ov, conn, finish_recv)
-
-    def send(self, conn, buf, flags=0):
-        self._register_with_iocp(conn)
-        ov = _overlapped.Overlapped(NULL)
-        if isinstance(conn, socket.socket):
-            ov.WSASend(conn.fileno(), buf, flags)
-        else:
-            ov.WriteFile(conn.fileno(), buf)
-
-        def finish_send(trans, key, ov):
-            try:
-                return wrap_error(ov.getresult)
-            except WindowsError as exc:
-                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
-                    raise ConnectionResetError(*exc.args)
-                else:
-                    raise
-
-        return self._register(ov, conn, finish_send)
-
-    def accept(self, listener):
-        self._register_with_iocp(listener)
-        conn = self._get_accept_socket(listener.family)
-        ov = _overlapped.Overlapped(NULL)
-        ov.AcceptEx(listener.fileno(), conn.fileno())
-
-        def finish_accept(trans, key, ov):
-            wrap_error(ov.getresult)
-            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
-            buf = struct.pack('@P', listener.fileno())
-            conn.setsockopt(socket.SOL_SOCKET,
-                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
-            conn.settimeout(listener.gettimeout())
-            return conn, conn.getpeername()
-
-        @coroutine
-        def accept_coro(future, conn):
-            # Coroutine closing the accept socket if the future is cancelled
-            try:
-                yield From(future)
-            except futures.CancelledError:
-                conn.close()
-                raise
-
-        future = self._register(ov, listener, finish_accept)
-        coro = accept_coro(future, conn)
-        tasks.ensure_future(coro, loop=self._loop)
-        return future
-
-    def connect(self, conn, address):
-        self._register_with_iocp(conn)
-        # The socket needs to be locally bound before we call ConnectEx().
-        try:
-            _overlapped.BindLocal(conn.fileno(), conn.family)
-        except WindowsError as e:
-            if e.winerror != errno.WSAEINVAL:
-                raise
-            # Probably already locally bound; check using getsockname().
-            if conn.getsockname()[1] == 0:
-                raise
-        ov = _overlapped.Overlapped(NULL)
-        ov.ConnectEx(conn.fileno(), address)
-
-        def finish_connect(trans, key, ov):
-            wrap_error(ov.getresult)
-            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
-            conn.setsockopt(socket.SOL_SOCKET,
-                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
-            return conn
-
-        return self._register(ov, conn, finish_connect)
-
-    def accept_pipe(self, pipe):
-        self._register_with_iocp(pipe)
-        ov = _overlapped.Overlapped(NULL)
-        connected = ov.ConnectNamedPipe(pipe.fileno())
-
-        if connected:
-            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
-            # that the pipe is connected. There is no need to wait for the
-            # completion of the connection.
-            return self._result(pipe)
-
-        def finish_accept_pipe(trans, key, ov):
-            wrap_error(ov.getresult)
-            return pipe
-
-        return self._register(ov, pipe, finish_accept_pipe)
-
-    @coroutine
-    def connect_pipe(self, address):
-        delay = CONNECT_PIPE_INIT_DELAY
-        while True:
-            # Unfortunately there is no way to do an overlapped connect to a pipe.
-            # Call CreateFile() in a loop until it doesn't fail with
-            # ERROR_PIPE_BUSY
-            try:
-                handle = wrap_error(_overlapped.ConnectPipe, address)
-                break
-            except WindowsError as exc:
-                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
-                    raise
-
-            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
-            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
-            yield From(tasks.sleep(delay, loop=self._loop))
-
-        raise Return(windows_utils.PipeHandle(handle))
-
-    def wait_for_handle(self, handle, timeout=None):
-        """Wait for a handle.
-
-        Return a Future object. The result of the future is True if the wait
-        completed, or False if the wait did not complete (on timeout).
-        """
-        return self._wait_for_handle(handle, timeout, False)
-
-    def _wait_cancel(self, event, done_callback):
-        fut = self._wait_for_handle(event, None, True)
-        # add_done_callback() cannot be used because the wait may only complete
-        # in IocpProactor.close(), while the event loop is not running.
-        fut._done_callback = done_callback
-        return fut
-
-    def _wait_for_handle(self, handle, timeout, _is_cancel):
-        if timeout is None:
-            ms = _winapi.INFINITE
-        else:
-            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
-            # round away from zero to wait *at least* timeout seconds.
-            ms = int(math.ceil(timeout * 1e3))
-
-        # We only create ov so we can use ov.address as a key for the cache.
-        ov = _overlapped.Overlapped(NULL)
-        wait_handle = _overlapped.RegisterWaitWithQueue(
-            handle, self._iocp, ov.address, ms)
-        if _is_cancel:
-            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
-        else:
-            f = _WaitHandleFuture(ov, handle, wait_handle, self,
-                                  loop=self._loop)
-        if f._source_traceback:
-            del f._source_traceback[-1]
-
-        def finish_wait_for_handle(trans, key, ov):
-            # Note that this second wait means that we should only use
-            # this with handles types where a successful wait has no
-            # effect.  So events or processes are all right, but locks
-            # or semaphores are not.  Also note if the handle is
-            # signalled and then quickly reset, then we may return
-            # False even though we have not timed out.
-            return f._poll()
-
-        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
-        return f
-
-    def _register_with_iocp(self, obj):
-        # To get notifications of finished ops on this objects sent to the
-        # completion port, were must register the handle.
-        if obj not in self._registered:
-            self._registered.add(obj)
-            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
-            # XXX We could also use SetFileCompletionNotificationModes()
-            # to avoid sending notifications to completion port of ops
-            # that succeed immediately.
-
-    def _register(self, ov, obj, callback):
-        # Return a future which will be set with the result of the
-        # operation when it completes.  The future's value is actually
-        # the value returned by callback().
-        f = _OverlappedFuture(ov, loop=self._loop)
-        if f._source_traceback:
-            del f._source_traceback[-1]
-        if not ov.pending:
-            # The operation has completed, so no need to postpone the
-            # work.  We cannot take this short cut if we need the
-            # NumberOfBytes, CompletionKey values returned by
-            # PostQueuedCompletionStatus().
-            try:
-                value = callback(None, None, ov)
-            except OSError as e:
-                f.set_exception(e)
-            else:
-                f.set_result(value)
-            # Even if GetOverlappedResult() was called, we have to wait for the
-            # notification of the completion in GetQueuedCompletionStatus().
-            # Register the overlapped operation to keep a reference to the
-            # OVERLAPPED object, otherwise the memory is freed and Windows may
-            # read uninitialized memory.
-
-        # Register the overlapped operation for later.  Note that
-        # we only store obj to prevent it from being garbage
-        # collected too early.
-        self._cache[ov.address] = (f, ov, obj, callback)
-        return f
-
-    def _unregister(self, ov):
-        """Unregister an overlapped object.
-
-        Call this method when its future has been cancelled. The event can
-        already be signalled (pending in the proactor event queue). It is also
-        safe if the event is never signalled (because it was cancelled).
-        """
-        self._unregistered.append(ov)
-
-    def _get_accept_socket(self, family):
-        s = socket.socket(family)
-        s.settimeout(0)
-        return s
-
-    def _poll(self, timeout=None):
-        if timeout is None:
-            ms = INFINITE
-        elif timeout < 0:
-            raise ValueError("negative timeout")
-        else:
-            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
-            # round away from zero to wait *at least* timeout seconds.
-            ms = int(math.ceil(timeout * 1e3))
-            if ms >= INFINITE:
-                raise ValueError("timeout too big")
-
-        while True:
-            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
-            if status is None:
-                break
-            ms = 0
-
-            err, transferred, key, address = status
-            try:
-                f, ov, obj, callback = self._cache.pop(address)
-            except KeyError:
-                if self._loop.get_debug():
-                    self._loop.call_exception_handler({
-                        'message': ('GetQueuedCompletionStatus() returned an '
-                                    'unexpected event'),
-                        'status': ('err=%s transferred=%s key=%#x address=%#x'
-                                   % (err, transferred, key, address)),
-                    })
-
-                # key is either zero, or it is used to return a pipe
-                # handle which should be closed to avoid a leak.
-                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
-                    _winapi.CloseHandle(key)
-                continue
-
-            if obj in self._stopped_serving:
-                f.cancel()
-            # Don't call the callback if _register() already read the result or
-            # if the overlapped has been cancelled
-            elif not f.done():
-                try:
-                    value = callback(transferred, key, ov)
-                except OSError as e:
-                    f.set_exception(e)
-                    self._results.append(f)
-                else:
-                    f.set_result(value)
-                    self._results.append(f)
-
-        # Remove unregisted futures
-        for ov in self._unregistered:
-            self._cache.pop(ov.address, None)
-        del self._unregistered[:]
-
-    def _stop_serving(self, obj):
-        # obj is a socket or pipe handle.  It will be closed in
-        # BaseProactorEventLoop._stop_serving() which will make any
-        # pending operations fail quickly.
-        self._stopped_serving.add(obj)
-
-    def close(self):
-        # Cancel remaining registered operations.
-        for address, (fut, ov, obj, callback) in list(self._cache.items()):
-            if fut.cancelled():
-                # Nothing to do with cancelled futures
-                pass
-            elif isinstance(fut, _WaitCancelFuture):
-                # _WaitCancelFuture must not be cancelled
-                pass
-            else:
-                try:
-                    fut.cancel()
-                except OSError as exc:
-                    if self._loop is not None:
-                        context = {
-                            'message': 'Cancelling a future failed',
-                            'exception': exc,
-                            'future': fut,
-                        }
-                        if fut._source_traceback:
-                            context['source_traceback'] = fut._source_traceback
-                        self._loop.call_exception_handler(context)
-
-        while self._cache:
-            if not self._poll(1):
-                logger.debug('taking long time to close proactor')
-
-        self._results = []
-        if self._iocp is not None:
-            _winapi.CloseHandle(self._iocp)
-            self._iocp = None
-
-    def __del__(self):
-        self.close()
-
-
-class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
-
-    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
-        self._proc = windows_utils.Popen(
-            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
-            bufsize=bufsize, **kwargs)
-
-        def callback(f):
-            returncode = self._proc.poll()
-            self._process_exited(returncode)
-
-        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
-        f.add_done_callback(callback)
-
-
-SelectorEventLoop = _WindowsSelectorEventLoop
-
-
-class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
-    _loop_factory = SelectorEventLoop
-
-
-DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy