Cleanup for stow ---STOW
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / windows_events.py
diff --git a/.local/lib/python2.7/site-packages/trollius/windows_events.py b/.local/lib/python2.7/site-packages/trollius/windows_events.py
new file mode 100644 (file)
index 0000000..3102d23
--- /dev/null
@@ -0,0 +1,777 @@
+"""Selector and proactor event loops for Windows."""
+
+import errno
+import math
+import socket
+import struct
+import weakref
+
+from . import events
+from . import base_subprocess
+from . import futures
+from . import proactor_events
+from . import py33_winapi as _winapi
+from . import selector_events
+from . import tasks
+from . import windows_utils
+from . import _overlapped
+from .coroutines import coroutine, From, Return
+from .log import logger
+from .py33_exceptions import wrap_error, BrokenPipeError, ConnectionResetError
+
+
+__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
+           'DefaultEventLoopPolicy',
+           ]
+
+
+NULL = 0
+INFINITE = 0xffffffff
+ERROR_CONNECTION_REFUSED = 1225
+ERROR_CONNECTION_ABORTED = 1236
+
+# Initial delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_INIT_DELAY = 0.001
+
+# Maximum delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_MAX_DELAY = 0.100
+
+
+class _OverlappedFuture(futures.Future):
+    """Subclass of Future which represents an overlapped operation.
+
+    Cancelling it will immediately cancel the overlapped operation.
+    """
+
+    def __init__(self, ov, loop=None):
+        super(_OverlappedFuture, self).__init__(loop=loop)
+        if self._source_traceback:
+            del self._source_traceback[-1]
+        self._ov = ov
+
+    def _repr_info(self):
+        info = super(_OverlappedFuture, self)._repr_info()
+        if self._ov is not None:
+            state = 'pending' if self._ov.pending else 'completed'
+            info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
+        return info
+
+    def _cancel_overlapped(self):
+        if self._ov is None:
+            return
+        try:
+            self._ov.cancel()
+        except OSError as exc:
+            context = {
+                'message': 'Cancelling an overlapped future failed',
+                'exception': exc,
+                'future': self,
+            }
+            if self._source_traceback:
+                context['source_traceback'] = self._source_traceback
+            self._loop.call_exception_handler(context)
+        self._ov = None
+
+    def cancel(self):
+        self._cancel_overlapped()
+        return super(_OverlappedFuture, self).cancel()
+
+    def set_exception(self, exception):
+        super(_OverlappedFuture, self).set_exception(exception)
+        self._cancel_overlapped()
+
+    def set_result(self, result):
+        super(_OverlappedFuture, self).set_result(result)
+        self._ov = None
+
+
+class _BaseWaitHandleFuture(futures.Future):
+    """Subclass of Future which represents a wait handle."""
+
+    def __init__(self, ov, handle, wait_handle, loop=None):
+        super(_BaseWaitHandleFuture, self).__init__(loop=loop)
+        if self._source_traceback:
+            del self._source_traceback[-1]
+        # Keep a reference to the Overlapped object to keep it alive until the
+        # wait is unregistered
+        self._ov = ov
+        self._handle = handle
+        self._wait_handle = wait_handle
+
+        # Should we call UnregisterWaitEx() if the wait completes
+        # or is cancelled?
+        self._registered = True
+
+    def _poll(self):
+        # non-blocking wait: use a timeout of 0 millisecond
+        return (_winapi.WaitForSingleObject(self._handle, 0) ==
+                _winapi.WAIT_OBJECT_0)
+
+    def _repr_info(self):
+        info = super(_BaseWaitHandleFuture, self)._repr_info()
+        info.append('handle=%#x' % self._handle)
+        if self._handle is not None:
+            state = 'signaled' if self._poll() else 'waiting'
+            info.append(state)
+        if self._wait_handle is not None:
+            info.append('wait_handle=%#x' % self._wait_handle)
+        return info
+
+    def _unregister_wait_cb(self, fut):
+        # The wait was unregistered: it's not safe to destroy the Overlapped
+        # object
+        self._ov = None
+
+    def _unregister_wait(self):
+        if not self._registered:
+            return
+        self._registered = False
+
+        wait_handle = self._wait_handle
+        self._wait_handle = None
+        try:
+            _overlapped.UnregisterWait(wait_handle)
+        except OSError as exc:
+            if exc.winerror != _overlapped.ERROR_IO_PENDING:
+                context = {
+                    'message': 'Failed to unregister the wait handle',
+                    'exception': exc,
+                    'future': self,
+                }
+                if self._source_traceback:
+                    context['source_traceback'] = self._source_traceback
+                self._loop.call_exception_handler(context)
+                return
+            # ERROR_IO_PENDING means that the unregister is pending
+
+        self._unregister_wait_cb(None)
+
+    def cancel(self):
+        self._unregister_wait()
+        return super(_BaseWaitHandleFuture, self).cancel()
+
+    def set_exception(self, exception):
+        self._unregister_wait()
+        super(_BaseWaitHandleFuture, self).set_exception(exception)
+
+    def set_result(self, result):
+        self._unregister_wait()
+        super(_BaseWaitHandleFuture, self).set_result(result)
+
+
+class _WaitCancelFuture(_BaseWaitHandleFuture):
+    """Subclass of Future which represents a wait for the cancellation of a
+    _WaitHandleFuture using an event.
+    """
+
+    def __init__(self, ov, event, wait_handle, loop=None):
+        super(_WaitCancelFuture, self).__init__(ov, event, wait_handle,
+                                                loop=loop)
+
+        self._done_callback = None
+
+    def cancel(self):
+        raise RuntimeError("_WaitCancelFuture must not be cancelled")
+
+    def _schedule_callbacks(self):
+        super(_WaitCancelFuture, self)._schedule_callbacks()
+        if self._done_callback is not None:
+            self._done_callback(self)
+
+
+class _WaitHandleFuture(_BaseWaitHandleFuture):
+    def __init__(self, ov, handle, wait_handle, proactor, loop=None):
+        super(_WaitHandleFuture, self).__init__(ov, handle, wait_handle,
+                                                loop=loop)
+        self._proactor = proactor
+        self._unregister_proactor = True
+        self._event = _overlapped.CreateEvent(None, True, False, None)
+        self._event_fut = None
+
+    def _unregister_wait_cb(self, fut):
+        if self._event is not None:
+            _winapi.CloseHandle(self._event)
+            self._event = None
+            self._event_fut = None
+
+        # If the wait was cancelled, the wait may never be signalled, so
+        # it's required to unregister it. Otherwise, IocpProactor.close() will
+        # wait forever for an event which will never come.
+        #
+        # If the IocpProactor already received the event, it's safe to call
+        # _unregister() because we kept a reference to the Overlapped object
+        # which is used as an unique key.
+        self._proactor._unregister(self._ov)
+        self._proactor = None
+
+        super(_WaitHandleFuture, self)._unregister_wait_cb(fut)
+
+    def _unregister_wait(self):
+        if not self._registered:
+            return
+        self._registered = False
+
+        wait_handle = self._wait_handle
+        self._wait_handle = None
+        try:
+            _overlapped.UnregisterWaitEx(wait_handle, self._event)
+        except OSError as exc:
+            if exc.winerror != _overlapped.ERROR_IO_PENDING:
+                context = {
+                    'message': 'Failed to unregister the wait handle',
+                    'exception': exc,
+                    'future': self,
+                }
+                if self._source_traceback:
+                    context['source_traceback'] = self._source_traceback
+                self._loop.call_exception_handler(context)
+                return
+            # ERROR_IO_PENDING is not an error, the wait was unregistered
+
+        self._event_fut = self._proactor._wait_cancel(self._event,
+                                                      self._unregister_wait_cb)
+
+
+class PipeServer(object):
+    """Class representing a pipe server.
+
+    This is much like a bound, listening socket.
+    """
+    def __init__(self, address):
+        self._address = address
+        self._free_instances = weakref.WeakSet()
+        # initialize the pipe attribute before calling _server_pipe_handle()
+        # because this function can raise an exception and the destructor calls
+        # the close() method
+        self._pipe = None
+        self._accept_pipe_future = None
+        self._pipe = self._server_pipe_handle(True)
+
+    def _get_unconnected_pipe(self):
+        # Create new instance and return previous one.  This ensures
+        # that (until the server is closed) there is always at least
+        # one pipe handle for address.  Therefore if a client attempt
+        # to connect it will not fail with FileNotFoundError.
+        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
+        return tmp
+
+    def _server_pipe_handle(self, first):
+        # Return a wrapper for a new pipe handle.
+        if self.closed():
+            return None
+        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
+        if first:
+            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
+        h = wrap_error(_winapi.CreateNamedPipe,
+            self._address, flags,
+            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
+            _winapi.PIPE_WAIT,
+            _winapi.PIPE_UNLIMITED_INSTANCES,
+            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
+            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
+        pipe = windows_utils.PipeHandle(h)
+        self._free_instances.add(pipe)
+        return pipe
+
+    def closed(self):
+        return (self._address is None)
+
+    def close(self):
+        if self._accept_pipe_future is not None:
+            self._accept_pipe_future.cancel()
+            self._accept_pipe_future = None
+        # Close all instances which have not been connected to by a client.
+        if self._address is not None:
+            for pipe in self._free_instances:
+                pipe.close()
+            self._pipe = None
+            self._address = None
+            self._free_instances.clear()
+
+    __del__ = close
+
+
+class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
+    """Windows version of selector event loop."""
+
+    def _socketpair(self):
+        return windows_utils.socketpair()
+
+
+class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
+    """Windows version of proactor event loop using IOCP."""
+
+    def __init__(self, proactor=None):
+        if proactor is None:
+            proactor = IocpProactor()
+        super(ProactorEventLoop, self).__init__(proactor)
+
+    def _socketpair(self):
+        return windows_utils.socketpair()
+
+    @coroutine
+    def create_pipe_connection(self, protocol_factory, address):
+        f = self._proactor.connect_pipe(address)
+        pipe = yield From(f)
+        protocol = protocol_factory()
+        trans = self._make_duplex_pipe_transport(pipe, protocol,
+                                                 extra={'addr': address})
+        raise Return(trans, protocol)
+
+    @coroutine
+    def start_serving_pipe(self, protocol_factory, address):
+        server = PipeServer(address)
+
+        def loop_accept_pipe(f=None):
+            pipe = None
+            try:
+                if f:
+                    pipe = f.result()
+                    server._free_instances.discard(pipe)
+
+                    if server.closed():
+                        # A client connected before the server was closed:
+                        # drop the client (close the pipe) and exit
+                        pipe.close()
+                        return
+
+                    protocol = protocol_factory()
+                    self._make_duplex_pipe_transport(
+                        pipe, protocol, extra={'addr': address})
+
+                pipe = server._get_unconnected_pipe()
+                if pipe is None:
+                    return
+
+                f = self._proactor.accept_pipe(pipe)
+            except OSError as exc:
+                if pipe and pipe.fileno() != -1:
+                    self.call_exception_handler({
+                        'message': 'Pipe accept failed',
+                        'exception': exc,
+                        'pipe': pipe,
+                    })
+                    pipe.close()
+                elif self._debug:
+                    logger.warning("Accept pipe failed on pipe %r",
+                                   pipe, exc_info=True)
+            except futures.CancelledError:
+                if pipe:
+                    pipe.close()
+            else:
+                server._accept_pipe_future = f
+                f.add_done_callback(loop_accept_pipe)
+
+        self.call_soon(loop_accept_pipe)
+        return [server]
+
+    @coroutine
+    def _make_subprocess_transport(self, protocol, args, shell,
+                                   stdin, stdout, stderr, bufsize,
+                                   extra=None, **kwargs):
+        waiter = futures.Future(loop=self)
+        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
+                                             stdin, stdout, stderr, bufsize,
+                                             waiter=waiter, extra=extra,
+                                             **kwargs)
+        try:
+            yield From(waiter)
+        except Exception as exc:
+            # Workaround CPython bug #23353: using yield/yield-from in an
+            # except block of a generator doesn't clear properly sys.exc_info()
+            err = exc
+        else:
+            err = None
+
+        if err is not None:
+            transp.close()
+            yield From(transp._wait())
+            raise err
+
+        raise Return(transp)
+
+
+class IocpProactor(object):
+    """Proactor implementation using IOCP."""
+
+    def __init__(self, concurrency=0xffffffff):
+        self._loop = None
+        self._results = []
+        self._iocp = _overlapped.CreateIoCompletionPort(
+            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
+        self._cache = {}
+        self._registered = weakref.WeakSet()
+        self._unregistered = []
+        self._stopped_serving = weakref.WeakSet()
+
+    def __repr__(self):
+        return ('<%s overlapped#=%s result#=%s>'
+                % (self.__class__.__name__, len(self._cache),
+                   len(self._results)))
+
+    def set_loop(self, loop):
+        self._loop = loop
+
+    def select(self, timeout=None):
+        if not self._results:
+            self._poll(timeout)
+        tmp = self._results
+        self._results = []
+        return tmp
+
+    def _result(self, value):
+        fut = futures.Future(loop=self._loop)
+        fut.set_result(value)
+        return fut
+
+    def recv(self, conn, nbytes, flags=0):
+        self._register_with_iocp(conn)
+        ov = _overlapped.Overlapped(NULL)
+        try:
+            if isinstance(conn, socket.socket):
+                wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags)
+            else:
+                wrap_error(ov.ReadFile, conn.fileno(), nbytes)
+        except BrokenPipeError:
+            return self._result(b'')
+
+        def finish_recv(trans, key, ov):
+            try:
+                return wrap_error(ov.getresult)
+            except WindowsError as exc:
+                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+                    raise ConnectionResetError(*exc.args)
+                else:
+                    raise
+
+        return self._register(ov, conn, finish_recv)
+
+    def send(self, conn, buf, flags=0):
+        self._register_with_iocp(conn)
+        ov = _overlapped.Overlapped(NULL)
+        if isinstance(conn, socket.socket):
+            ov.WSASend(conn.fileno(), buf, flags)
+        else:
+            ov.WriteFile(conn.fileno(), buf)
+
+        def finish_send(trans, key, ov):
+            try:
+                return wrap_error(ov.getresult)
+            except WindowsError as exc:
+                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+                    raise ConnectionResetError(*exc.args)
+                else:
+                    raise
+
+        return self._register(ov, conn, finish_send)
+
+    def accept(self, listener):
+        self._register_with_iocp(listener)
+        conn = self._get_accept_socket(listener.family)
+        ov = _overlapped.Overlapped(NULL)
+        ov.AcceptEx(listener.fileno(), conn.fileno())
+
+        def finish_accept(trans, key, ov):
+            wrap_error(ov.getresult)
+            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
+            buf = struct.pack('@P', listener.fileno())
+            conn.setsockopt(socket.SOL_SOCKET,
+                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
+            conn.settimeout(listener.gettimeout())
+            return conn, conn.getpeername()
+
+        @coroutine
+        def accept_coro(future, conn):
+            # Coroutine closing the accept socket if the future is cancelled
+            try:
+                yield From(future)
+            except futures.CancelledError:
+                conn.close()
+                raise
+
+        future = self._register(ov, listener, finish_accept)
+        coro = accept_coro(future, conn)
+        tasks.ensure_future(coro, loop=self._loop)
+        return future
+
+    def connect(self, conn, address):
+        self._register_with_iocp(conn)
+        # The socket needs to be locally bound before we call ConnectEx().
+        try:
+            _overlapped.BindLocal(conn.fileno(), conn.family)
+        except WindowsError as e:
+            if e.winerror != errno.WSAEINVAL:
+                raise
+            # Probably already locally bound; check using getsockname().
+            if conn.getsockname()[1] == 0:
+                raise
+        ov = _overlapped.Overlapped(NULL)
+        ov.ConnectEx(conn.fileno(), address)
+
+        def finish_connect(trans, key, ov):
+            wrap_error(ov.getresult)
+            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
+            conn.setsockopt(socket.SOL_SOCKET,
+                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
+            return conn
+
+        return self._register(ov, conn, finish_connect)
+
+    def accept_pipe(self, pipe):
+        self._register_with_iocp(pipe)
+        ov = _overlapped.Overlapped(NULL)
+        connected = ov.ConnectNamedPipe(pipe.fileno())
+
+        if connected:
+            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
+            # that the pipe is connected. There is no need to wait for the
+            # completion of the connection.
+            return self._result(pipe)
+
+        def finish_accept_pipe(trans, key, ov):
+            wrap_error(ov.getresult)
+            return pipe
+
+        return self._register(ov, pipe, finish_accept_pipe)
+
+    @coroutine
+    def connect_pipe(self, address):
+        delay = CONNECT_PIPE_INIT_DELAY
+        while True:
+            # Unfortunately there is no way to do an overlapped connect to a pipe.
+            # Call CreateFile() in a loop until it doesn't fail with
+            # ERROR_PIPE_BUSY
+            try:
+                handle = wrap_error(_overlapped.ConnectPipe, address)
+                break
+            except WindowsError as exc:
+                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
+                    raise
+
+            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
+            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
+            yield From(tasks.sleep(delay, loop=self._loop))
+
+        raise Return(windows_utils.PipeHandle(handle))
+
+    def wait_for_handle(self, handle, timeout=None):
+        """Wait for a handle.
+
+        Return a Future object. The result of the future is True if the wait
+        completed, or False if the wait did not complete (on timeout).
+        """
+        return self._wait_for_handle(handle, timeout, False)
+
+    def _wait_cancel(self, event, done_callback):
+        fut = self._wait_for_handle(event, None, True)
+        # add_done_callback() cannot be used because the wait may only complete
+        # in IocpProactor.close(), while the event loop is not running.
+        fut._done_callback = done_callback
+        return fut
+
+    def _wait_for_handle(self, handle, timeout, _is_cancel):
+        if timeout is None:
+            ms = _winapi.INFINITE
+        else:
+            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
+            # round away from zero to wait *at least* timeout seconds.
+            ms = int(math.ceil(timeout * 1e3))
+
+        # We only create ov so we can use ov.address as a key for the cache.
+        ov = _overlapped.Overlapped(NULL)
+        wait_handle = _overlapped.RegisterWaitWithQueue(
+            handle, self._iocp, ov.address, ms)
+        if _is_cancel:
+            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
+        else:
+            f = _WaitHandleFuture(ov, handle, wait_handle, self,
+                                  loop=self._loop)
+        if f._source_traceback:
+            del f._source_traceback[-1]
+
+        def finish_wait_for_handle(trans, key, ov):
+            # Note that this second wait means that we should only use
+            # this with handles types where a successful wait has no
+            # effect.  So events or processes are all right, but locks
+            # or semaphores are not.  Also note if the handle is
+            # signalled and then quickly reset, then we may return
+            # False even though we have not timed out.
+            return f._poll()
+
+        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
+        return f
+
+    def _register_with_iocp(self, obj):
+        # To get notifications of finished ops on this objects sent to the
+        # completion port, were must register the handle.
+        if obj not in self._registered:
+            self._registered.add(obj)
+            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
+            # XXX We could also use SetFileCompletionNotificationModes()
+            # to avoid sending notifications to completion port of ops
+            # that succeed immediately.
+
+    def _register(self, ov, obj, callback):
+        # Return a future which will be set with the result of the
+        # operation when it completes.  The future's value is actually
+        # the value returned by callback().
+        f = _OverlappedFuture(ov, loop=self._loop)
+        if f._source_traceback:
+            del f._source_traceback[-1]
+        if not ov.pending:
+            # The operation has completed, so no need to postpone the
+            # work.  We cannot take this short cut if we need the
+            # NumberOfBytes, CompletionKey values returned by
+            # PostQueuedCompletionStatus().
+            try:
+                value = callback(None, None, ov)
+            except OSError as e:
+                f.set_exception(e)
+            else:
+                f.set_result(value)
+            # Even if GetOverlappedResult() was called, we have to wait for the
+            # notification of the completion in GetQueuedCompletionStatus().
+            # Register the overlapped operation to keep a reference to the
+            # OVERLAPPED object, otherwise the memory is freed and Windows may
+            # read uninitialized memory.
+
+        # Register the overlapped operation for later.  Note that
+        # we only store obj to prevent it from being garbage
+        # collected too early.
+        self._cache[ov.address] = (f, ov, obj, callback)
+        return f
+
+    def _unregister(self, ov):
+        """Unregister an overlapped object.
+
+        Call this method when its future has been cancelled. The event can
+        already be signalled (pending in the proactor event queue). It is also
+        safe if the event is never signalled (because it was cancelled).
+        """
+        self._unregistered.append(ov)
+
+    def _get_accept_socket(self, family):
+        s = socket.socket(family)
+        s.settimeout(0)
+        return s
+
+    def _poll(self, timeout=None):
+        if timeout is None:
+            ms = INFINITE
+        elif timeout < 0:
+            raise ValueError("negative timeout")
+        else:
+            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
+            # round away from zero to wait *at least* timeout seconds.
+            ms = int(math.ceil(timeout * 1e3))
+            if ms >= INFINITE:
+                raise ValueError("timeout too big")
+
+        while True:
+            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
+            if status is None:
+                break
+            ms = 0
+
+            err, transferred, key, address = status
+            try:
+                f, ov, obj, callback = self._cache.pop(address)
+            except KeyError:
+                if self._loop.get_debug():
+                    self._loop.call_exception_handler({
+                        'message': ('GetQueuedCompletionStatus() returned an '
+                                    'unexpected event'),
+                        'status': ('err=%s transferred=%s key=%#x address=%#x'
+                                   % (err, transferred, key, address)),
+                    })
+
+                # key is either zero, or it is used to return a pipe
+                # handle which should be closed to avoid a leak.
+                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
+                    _winapi.CloseHandle(key)
+                continue
+
+            if obj in self._stopped_serving:
+                f.cancel()
+            # Don't call the callback if _register() already read the result or
+            # if the overlapped has been cancelled
+            elif not f.done():
+                try:
+                    value = callback(transferred, key, ov)
+                except OSError as e:
+                    f.set_exception(e)
+                    self._results.append(f)
+                else:
+                    f.set_result(value)
+                    self._results.append(f)
+
+        # Remove unregisted futures
+        for ov in self._unregistered:
+            self._cache.pop(ov.address, None)
+        del self._unregistered[:]
+
+    def _stop_serving(self, obj):
+        # obj is a socket or pipe handle.  It will be closed in
+        # BaseProactorEventLoop._stop_serving() which will make any
+        # pending operations fail quickly.
+        self._stopped_serving.add(obj)
+
+    def close(self):
+        # Cancel remaining registered operations.
+        for address, (fut, ov, obj, callback) in list(self._cache.items()):
+            if fut.cancelled():
+                # Nothing to do with cancelled futures
+                pass
+            elif isinstance(fut, _WaitCancelFuture):
+                # _WaitCancelFuture must not be cancelled
+                pass
+            else:
+                try:
+                    fut.cancel()
+                except OSError as exc:
+                    if self._loop is not None:
+                        context = {
+                            'message': 'Cancelling a future failed',
+                            'exception': exc,
+                            'future': fut,
+                        }
+                        if fut._source_traceback:
+                            context['source_traceback'] = fut._source_traceback
+                        self._loop.call_exception_handler(context)
+
+        while self._cache:
+            if not self._poll(1):
+                logger.debug('taking long time to close proactor')
+
+        self._results = []
+        if self._iocp is not None:
+            _winapi.CloseHandle(self._iocp)
+            self._iocp = None
+
+    def __del__(self):
+        self.close()
+
+
+class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+
+    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+        self._proc = windows_utils.Popen(
+            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+            bufsize=bufsize, **kwargs)
+
+        def callback(f):
+            returncode = self._proc.poll()
+            self._process_exited(returncode)
+
+        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
+        f.add_done_callback(callback)
+
+
+SelectorEventLoop = _WindowsSelectorEventLoop
+
+
+class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+    _loop_factory = SelectorEventLoop
+
+
+DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy