efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / windows_events.py
1 """Selector and proactor event loops for Windows."""
2
3 import errno
4 import math
5 import socket
6 import struct
7 import weakref
8
9 from . import events
10 from . import base_subprocess
11 from . import futures
12 from . import proactor_events
13 from . import py33_winapi as _winapi
14 from . import selector_events
15 from . import tasks
16 from . import windows_utils
17 from . import _overlapped
18 from .coroutines import coroutine, From, Return
19 from .log import logger
20 from .py33_exceptions import wrap_error, BrokenPipeError, ConnectionResetError
21
22
23 __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
24            'DefaultEventLoopPolicy',
25            ]
26
27
28 NULL = 0
29 INFINITE = 0xffffffff
30 ERROR_CONNECTION_REFUSED = 1225
31 ERROR_CONNECTION_ABORTED = 1236
32
33 # Initial delay in seconds for connect_pipe() before retrying to connect
34 CONNECT_PIPE_INIT_DELAY = 0.001
35
36 # Maximum delay in seconds for connect_pipe() before retrying to connect
37 CONNECT_PIPE_MAX_DELAY = 0.100
38
39
40 class _OverlappedFuture(futures.Future):
41     """Subclass of Future which represents an overlapped operation.
42
43     Cancelling it will immediately cancel the overlapped operation.
44     """
45
46     def __init__(self, ov, loop=None):
47         super(_OverlappedFuture, self).__init__(loop=loop)
48         if self._source_traceback:
49             del self._source_traceback[-1]
50         self._ov = ov
51
52     def _repr_info(self):
53         info = super(_OverlappedFuture, self)._repr_info()
54         if self._ov is not None:
55             state = 'pending' if self._ov.pending else 'completed'
56             info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
57         return info
58
59     def _cancel_overlapped(self):
60         if self._ov is None:
61             return
62         try:
63             self._ov.cancel()
64         except OSError as exc:
65             context = {
66                 'message': 'Cancelling an overlapped future failed',
67                 'exception': exc,
68                 'future': self,
69             }
70             if self._source_traceback:
71                 context['source_traceback'] = self._source_traceback
72             self._loop.call_exception_handler(context)
73         self._ov = None
74
75     def cancel(self):
76         self._cancel_overlapped()
77         return super(_OverlappedFuture, self).cancel()
78
79     def set_exception(self, exception):
80         super(_OverlappedFuture, self).set_exception(exception)
81         self._cancel_overlapped()
82
83     def set_result(self, result):
84         super(_OverlappedFuture, self).set_result(result)
85         self._ov = None
86
87
88 class _BaseWaitHandleFuture(futures.Future):
89     """Subclass of Future which represents a wait handle."""
90
91     def __init__(self, ov, handle, wait_handle, loop=None):
92         super(_BaseWaitHandleFuture, self).__init__(loop=loop)
93         if self._source_traceback:
94             del self._source_traceback[-1]
95         # Keep a reference to the Overlapped object to keep it alive until the
96         # wait is unregistered
97         self._ov = ov
98         self._handle = handle
99         self._wait_handle = wait_handle
100
101         # Should we call UnregisterWaitEx() if the wait completes
102         # or is cancelled?
103         self._registered = True
104
105     def _poll(self):
106         # non-blocking wait: use a timeout of 0 millisecond
107         return (_winapi.WaitForSingleObject(self._handle, 0) ==
108                 _winapi.WAIT_OBJECT_0)
109
110     def _repr_info(self):
111         info = super(_BaseWaitHandleFuture, self)._repr_info()
112         info.append('handle=%#x' % self._handle)
113         if self._handle is not None:
114             state = 'signaled' if self._poll() else 'waiting'
115             info.append(state)
116         if self._wait_handle is not None:
117             info.append('wait_handle=%#x' % self._wait_handle)
118         return info
119
120     def _unregister_wait_cb(self, fut):
121         # The wait was unregistered: it's not safe to destroy the Overlapped
122         # object
123         self._ov = None
124
125     def _unregister_wait(self):
126         if not self._registered:
127             return
128         self._registered = False
129
130         wait_handle = self._wait_handle
131         self._wait_handle = None
132         try:
133             _overlapped.UnregisterWait(wait_handle)
134         except OSError as exc:
135             if exc.winerror != _overlapped.ERROR_IO_PENDING:
136                 context = {
137                     'message': 'Failed to unregister the wait handle',
138                     'exception': exc,
139                     'future': self,
140                 }
141                 if self._source_traceback:
142                     context['source_traceback'] = self._source_traceback
143                 self._loop.call_exception_handler(context)
144                 return
145             # ERROR_IO_PENDING means that the unregister is pending
146
147         self._unregister_wait_cb(None)
148
149     def cancel(self):
150         self._unregister_wait()
151         return super(_BaseWaitHandleFuture, self).cancel()
152
153     def set_exception(self, exception):
154         self._unregister_wait()
155         super(_BaseWaitHandleFuture, self).set_exception(exception)
156
157     def set_result(self, result):
158         self._unregister_wait()
159         super(_BaseWaitHandleFuture, self).set_result(result)
160
161
162 class _WaitCancelFuture(_BaseWaitHandleFuture):
163     """Subclass of Future which represents a wait for the cancellation of a
164     _WaitHandleFuture using an event.
165     """
166
167     def __init__(self, ov, event, wait_handle, loop=None):
168         super(_WaitCancelFuture, self).__init__(ov, event, wait_handle,
169                                                 loop=loop)
170
171         self._done_callback = None
172
173     def cancel(self):
174         raise RuntimeError("_WaitCancelFuture must not be cancelled")
175
176     def _schedule_callbacks(self):
177         super(_WaitCancelFuture, self)._schedule_callbacks()
178         if self._done_callback is not None:
179             self._done_callback(self)
180
181
182 class _WaitHandleFuture(_BaseWaitHandleFuture):
183     def __init__(self, ov, handle, wait_handle, proactor, loop=None):
184         super(_WaitHandleFuture, self).__init__(ov, handle, wait_handle,
185                                                 loop=loop)
186         self._proactor = proactor
187         self._unregister_proactor = True
188         self._event = _overlapped.CreateEvent(None, True, False, None)
189         self._event_fut = None
190
191     def _unregister_wait_cb(self, fut):
192         if self._event is not None:
193             _winapi.CloseHandle(self._event)
194             self._event = None
195             self._event_fut = None
196
197         # If the wait was cancelled, the wait may never be signalled, so
198         # it's required to unregister it. Otherwise, IocpProactor.close() will
199         # wait forever for an event which will never come.
200         #
201         # If the IocpProactor already received the event, it's safe to call
202         # _unregister() because we kept a reference to the Overlapped object
203         # which is used as an unique key.
204         self._proactor._unregister(self._ov)
205         self._proactor = None
206
207         super(_WaitHandleFuture, self)._unregister_wait_cb(fut)
208
209     def _unregister_wait(self):
210         if not self._registered:
211             return
212         self._registered = False
213
214         wait_handle = self._wait_handle
215         self._wait_handle = None
216         try:
217             _overlapped.UnregisterWaitEx(wait_handle, self._event)
218         except OSError as exc:
219             if exc.winerror != _overlapped.ERROR_IO_PENDING:
220                 context = {
221                     'message': 'Failed to unregister the wait handle',
222                     'exception': exc,
223                     'future': self,
224                 }
225                 if self._source_traceback:
226                     context['source_traceback'] = self._source_traceback
227                 self._loop.call_exception_handler(context)
228                 return
229             # ERROR_IO_PENDING is not an error, the wait was unregistered
230
231         self._event_fut = self._proactor._wait_cancel(self._event,
232                                                       self._unregister_wait_cb)
233
234
235 class PipeServer(object):
236     """Class representing a pipe server.
237
238     This is much like a bound, listening socket.
239     """
240     def __init__(self, address):
241         self._address = address
242         self._free_instances = weakref.WeakSet()
243         # initialize the pipe attribute before calling _server_pipe_handle()
244         # because this function can raise an exception and the destructor calls
245         # the close() method
246         self._pipe = None
247         self._accept_pipe_future = None
248         self._pipe = self._server_pipe_handle(True)
249
250     def _get_unconnected_pipe(self):
251         # Create new instance and return previous one.  This ensures
252         # that (until the server is closed) there is always at least
253         # one pipe handle for address.  Therefore if a client attempt
254         # to connect it will not fail with FileNotFoundError.
255         tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
256         return tmp
257
258     def _server_pipe_handle(self, first):
259         # Return a wrapper for a new pipe handle.
260         if self.closed():
261             return None
262         flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
263         if first:
264             flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
265         h = wrap_error(_winapi.CreateNamedPipe,
266             self._address, flags,
267             _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
268             _winapi.PIPE_WAIT,
269             _winapi.PIPE_UNLIMITED_INSTANCES,
270             windows_utils.BUFSIZE, windows_utils.BUFSIZE,
271             _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
272         pipe = windows_utils.PipeHandle(h)
273         self._free_instances.add(pipe)
274         return pipe
275
276     def closed(self):
277         return (self._address is None)
278
279     def close(self):
280         if self._accept_pipe_future is not None:
281             self._accept_pipe_future.cancel()
282             self._accept_pipe_future = None
283         # Close all instances which have not been connected to by a client.
284         if self._address is not None:
285             for pipe in self._free_instances:
286                 pipe.close()
287             self._pipe = None
288             self._address = None
289             self._free_instances.clear()
290
291     __del__ = close
292
293
294 class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
295     """Windows version of selector event loop."""
296
297     def _socketpair(self):
298         return windows_utils.socketpair()
299
300
301 class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
302     """Windows version of proactor event loop using IOCP."""
303
304     def __init__(self, proactor=None):
305         if proactor is None:
306             proactor = IocpProactor()
307         super(ProactorEventLoop, self).__init__(proactor)
308
309     def _socketpair(self):
310         return windows_utils.socketpair()
311
312     @coroutine
313     def create_pipe_connection(self, protocol_factory, address):
314         f = self._proactor.connect_pipe(address)
315         pipe = yield From(f)
316         protocol = protocol_factory()
317         trans = self._make_duplex_pipe_transport(pipe, protocol,
318                                                  extra={'addr': address})
319         raise Return(trans, protocol)
320
321     @coroutine
322     def start_serving_pipe(self, protocol_factory, address):
323         server = PipeServer(address)
324
325         def loop_accept_pipe(f=None):
326             pipe = None
327             try:
328                 if f:
329                     pipe = f.result()
330                     server._free_instances.discard(pipe)
331
332                     if server.closed():
333                         # A client connected before the server was closed:
334                         # drop the client (close the pipe) and exit
335                         pipe.close()
336                         return
337
338                     protocol = protocol_factory()
339                     self._make_duplex_pipe_transport(
340                         pipe, protocol, extra={'addr': address})
341
342                 pipe = server._get_unconnected_pipe()
343                 if pipe is None:
344                     return
345
346                 f = self._proactor.accept_pipe(pipe)
347             except OSError as exc:
348                 if pipe and pipe.fileno() != -1:
349                     self.call_exception_handler({
350                         'message': 'Pipe accept failed',
351                         'exception': exc,
352                         'pipe': pipe,
353                     })
354                     pipe.close()
355                 elif self._debug:
356                     logger.warning("Accept pipe failed on pipe %r",
357                                    pipe, exc_info=True)
358             except futures.CancelledError:
359                 if pipe:
360                     pipe.close()
361             else:
362                 server._accept_pipe_future = f
363                 f.add_done_callback(loop_accept_pipe)
364
365         self.call_soon(loop_accept_pipe)
366         return [server]
367
368     @coroutine
369     def _make_subprocess_transport(self, protocol, args, shell,
370                                    stdin, stdout, stderr, bufsize,
371                                    extra=None, **kwargs):
372         waiter = futures.Future(loop=self)
373         transp = _WindowsSubprocessTransport(self, protocol, args, shell,
374                                              stdin, stdout, stderr, bufsize,
375                                              waiter=waiter, extra=extra,
376                                              **kwargs)
377         try:
378             yield From(waiter)
379         except Exception as exc:
380             # Workaround CPython bug #23353: using yield/yield-from in an
381             # except block of a generator doesn't clear properly sys.exc_info()
382             err = exc
383         else:
384             err = None
385
386         if err is not None:
387             transp.close()
388             yield From(transp._wait())
389             raise err
390
391         raise Return(transp)
392
393
394 class IocpProactor(object):
395     """Proactor implementation using IOCP."""
396
397     def __init__(self, concurrency=0xffffffff):
398         self._loop = None
399         self._results = []
400         self._iocp = _overlapped.CreateIoCompletionPort(
401             _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
402         self._cache = {}
403         self._registered = weakref.WeakSet()
404         self._unregistered = []
405         self._stopped_serving = weakref.WeakSet()
406
407     def __repr__(self):
408         return ('<%s overlapped#=%s result#=%s>'
409                 % (self.__class__.__name__, len(self._cache),
410                    len(self._results)))
411
412     def set_loop(self, loop):
413         self._loop = loop
414
415     def select(self, timeout=None):
416         if not self._results:
417             self._poll(timeout)
418         tmp = self._results
419         self._results = []
420         return tmp
421
422     def _result(self, value):
423         fut = futures.Future(loop=self._loop)
424         fut.set_result(value)
425         return fut
426
427     def recv(self, conn, nbytes, flags=0):
428         self._register_with_iocp(conn)
429         ov = _overlapped.Overlapped(NULL)
430         try:
431             if isinstance(conn, socket.socket):
432                 wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags)
433             else:
434                 wrap_error(ov.ReadFile, conn.fileno(), nbytes)
435         except BrokenPipeError:
436             return self._result(b'')
437
438         def finish_recv(trans, key, ov):
439             try:
440                 return wrap_error(ov.getresult)
441             except WindowsError as exc:
442                 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
443                     raise ConnectionResetError(*exc.args)
444                 else:
445                     raise
446
447         return self._register(ov, conn, finish_recv)
448
449     def send(self, conn, buf, flags=0):
450         self._register_with_iocp(conn)
451         ov = _overlapped.Overlapped(NULL)
452         if isinstance(conn, socket.socket):
453             ov.WSASend(conn.fileno(), buf, flags)
454         else:
455             ov.WriteFile(conn.fileno(), buf)
456
457         def finish_send(trans, key, ov):
458             try:
459                 return wrap_error(ov.getresult)
460             except WindowsError as exc:
461                 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
462                     raise ConnectionResetError(*exc.args)
463                 else:
464                     raise
465
466         return self._register(ov, conn, finish_send)
467
468     def accept(self, listener):
469         self._register_with_iocp(listener)
470         conn = self._get_accept_socket(listener.family)
471         ov = _overlapped.Overlapped(NULL)
472         ov.AcceptEx(listener.fileno(), conn.fileno())
473
474         def finish_accept(trans, key, ov):
475             wrap_error(ov.getresult)
476             # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
477             buf = struct.pack('@P', listener.fileno())
478             conn.setsockopt(socket.SOL_SOCKET,
479                             _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
480             conn.settimeout(listener.gettimeout())
481             return conn, conn.getpeername()
482
483         @coroutine
484         def accept_coro(future, conn):
485             # Coroutine closing the accept socket if the future is cancelled
486             try:
487                 yield From(future)
488             except futures.CancelledError:
489                 conn.close()
490                 raise
491
492         future = self._register(ov, listener, finish_accept)
493         coro = accept_coro(future, conn)
494         tasks.ensure_future(coro, loop=self._loop)
495         return future
496
497     def connect(self, conn, address):
498         self._register_with_iocp(conn)
499         # The socket needs to be locally bound before we call ConnectEx().
500         try:
501             _overlapped.BindLocal(conn.fileno(), conn.family)
502         except WindowsError as e:
503             if e.winerror != errno.WSAEINVAL:
504                 raise
505             # Probably already locally bound; check using getsockname().
506             if conn.getsockname()[1] == 0:
507                 raise
508         ov = _overlapped.Overlapped(NULL)
509         ov.ConnectEx(conn.fileno(), address)
510
511         def finish_connect(trans, key, ov):
512             wrap_error(ov.getresult)
513             # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
514             conn.setsockopt(socket.SOL_SOCKET,
515                             _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
516             return conn
517
518         return self._register(ov, conn, finish_connect)
519
520     def accept_pipe(self, pipe):
521         self._register_with_iocp(pipe)
522         ov = _overlapped.Overlapped(NULL)
523         connected = ov.ConnectNamedPipe(pipe.fileno())
524
525         if connected:
526             # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
527             # that the pipe is connected. There is no need to wait for the
528             # completion of the connection.
529             return self._result(pipe)
530
531         def finish_accept_pipe(trans, key, ov):
532             wrap_error(ov.getresult)
533             return pipe
534
535         return self._register(ov, pipe, finish_accept_pipe)
536
537     @coroutine
538     def connect_pipe(self, address):
539         delay = CONNECT_PIPE_INIT_DELAY
540         while True:
541             # Unfortunately there is no way to do an overlapped connect to a pipe.
542             # Call CreateFile() in a loop until it doesn't fail with
543             # ERROR_PIPE_BUSY
544             try:
545                 handle = wrap_error(_overlapped.ConnectPipe, address)
546                 break
547             except WindowsError as exc:
548                 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
549                     raise
550
551             # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
552             delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
553             yield From(tasks.sleep(delay, loop=self._loop))
554
555         raise Return(windows_utils.PipeHandle(handle))
556
557     def wait_for_handle(self, handle, timeout=None):
558         """Wait for a handle.
559
560         Return a Future object. The result of the future is True if the wait
561         completed, or False if the wait did not complete (on timeout).
562         """
563         return self._wait_for_handle(handle, timeout, False)
564
565     def _wait_cancel(self, event, done_callback):
566         fut = self._wait_for_handle(event, None, True)
567         # add_done_callback() cannot be used because the wait may only complete
568         # in IocpProactor.close(), while the event loop is not running.
569         fut._done_callback = done_callback
570         return fut
571
572     def _wait_for_handle(self, handle, timeout, _is_cancel):
573         if timeout is None:
574             ms = _winapi.INFINITE
575         else:
576             # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
577             # round away from zero to wait *at least* timeout seconds.
578             ms = int(math.ceil(timeout * 1e3))
579
580         # We only create ov so we can use ov.address as a key for the cache.
581         ov = _overlapped.Overlapped(NULL)
582         wait_handle = _overlapped.RegisterWaitWithQueue(
583             handle, self._iocp, ov.address, ms)
584         if _is_cancel:
585             f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
586         else:
587             f = _WaitHandleFuture(ov, handle, wait_handle, self,
588                                   loop=self._loop)
589         if f._source_traceback:
590             del f._source_traceback[-1]
591
592         def finish_wait_for_handle(trans, key, ov):
593             # Note that this second wait means that we should only use
594             # this with handles types where a successful wait has no
595             # effect.  So events or processes are all right, but locks
596             # or semaphores are not.  Also note if the handle is
597             # signalled and then quickly reset, then we may return
598             # False even though we have not timed out.
599             return f._poll()
600
601         self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
602         return f
603
604     def _register_with_iocp(self, obj):
605         # To get notifications of finished ops on this objects sent to the
606         # completion port, were must register the handle.
607         if obj not in self._registered:
608             self._registered.add(obj)
609             _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
610             # XXX We could also use SetFileCompletionNotificationModes()
611             # to avoid sending notifications to completion port of ops
612             # that succeed immediately.
613
614     def _register(self, ov, obj, callback):
615         # Return a future which will be set with the result of the
616         # operation when it completes.  The future's value is actually
617         # the value returned by callback().
618         f = _OverlappedFuture(ov, loop=self._loop)
619         if f._source_traceback:
620             del f._source_traceback[-1]
621         if not ov.pending:
622             # The operation has completed, so no need to postpone the
623             # work.  We cannot take this short cut if we need the
624             # NumberOfBytes, CompletionKey values returned by
625             # PostQueuedCompletionStatus().
626             try:
627                 value = callback(None, None, ov)
628             except OSError as e:
629                 f.set_exception(e)
630             else:
631                 f.set_result(value)
632             # Even if GetOverlappedResult() was called, we have to wait for the
633             # notification of the completion in GetQueuedCompletionStatus().
634             # Register the overlapped operation to keep a reference to the
635             # OVERLAPPED object, otherwise the memory is freed and Windows may
636             # read uninitialized memory.
637
638         # Register the overlapped operation for later.  Note that
639         # we only store obj to prevent it from being garbage
640         # collected too early.
641         self._cache[ov.address] = (f, ov, obj, callback)
642         return f
643
644     def _unregister(self, ov):
645         """Unregister an overlapped object.
646
647         Call this method when its future has been cancelled. The event can
648         already be signalled (pending in the proactor event queue). It is also
649         safe if the event is never signalled (because it was cancelled).
650         """
651         self._unregistered.append(ov)
652
653     def _get_accept_socket(self, family):
654         s = socket.socket(family)
655         s.settimeout(0)
656         return s
657
658     def _poll(self, timeout=None):
659         if timeout is None:
660             ms = INFINITE
661         elif timeout < 0:
662             raise ValueError("negative timeout")
663         else:
664             # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
665             # round away from zero to wait *at least* timeout seconds.
666             ms = int(math.ceil(timeout * 1e3))
667             if ms >= INFINITE:
668                 raise ValueError("timeout too big")
669
670         while True:
671             status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
672             if status is None:
673                 break
674             ms = 0
675
676             err, transferred, key, address = status
677             try:
678                 f, ov, obj, callback = self._cache.pop(address)
679             except KeyError:
680                 if self._loop.get_debug():
681                     self._loop.call_exception_handler({
682                         'message': ('GetQueuedCompletionStatus() returned an '
683                                     'unexpected event'),
684                         'status': ('err=%s transferred=%s key=%#x address=%#x'
685                                    % (err, transferred, key, address)),
686                     })
687
688                 # key is either zero, or it is used to return a pipe
689                 # handle which should be closed to avoid a leak.
690                 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
691                     _winapi.CloseHandle(key)
692                 continue
693
694             if obj in self._stopped_serving:
695                 f.cancel()
696             # Don't call the callback if _register() already read the result or
697             # if the overlapped has been cancelled
698             elif not f.done():
699                 try:
700                     value = callback(transferred, key, ov)
701                 except OSError as e:
702                     f.set_exception(e)
703                     self._results.append(f)
704                 else:
705                     f.set_result(value)
706                     self._results.append(f)
707
708         # Remove unregisted futures
709         for ov in self._unregistered:
710             self._cache.pop(ov.address, None)
711         del self._unregistered[:]
712
713     def _stop_serving(self, obj):
714         # obj is a socket or pipe handle.  It will be closed in
715         # BaseProactorEventLoop._stop_serving() which will make any
716         # pending operations fail quickly.
717         self._stopped_serving.add(obj)
718
719     def close(self):
720         # Cancel remaining registered operations.
721         for address, (fut, ov, obj, callback) in list(self._cache.items()):
722             if fut.cancelled():
723                 # Nothing to do with cancelled futures
724                 pass
725             elif isinstance(fut, _WaitCancelFuture):
726                 # _WaitCancelFuture must not be cancelled
727                 pass
728             else:
729                 try:
730                     fut.cancel()
731                 except OSError as exc:
732                     if self._loop is not None:
733                         context = {
734                             'message': 'Cancelling a future failed',
735                             'exception': exc,
736                             'future': fut,
737                         }
738                         if fut._source_traceback:
739                             context['source_traceback'] = fut._source_traceback
740                         self._loop.call_exception_handler(context)
741
742         while self._cache:
743             if not self._poll(1):
744                 logger.debug('taking long time to close proactor')
745
746         self._results = []
747         if self._iocp is not None:
748             _winapi.CloseHandle(self._iocp)
749             self._iocp = None
750
751     def __del__(self):
752         self.close()
753
754
755 class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
756
757     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
758         self._proc = windows_utils.Popen(
759             args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
760             bufsize=bufsize, **kwargs)
761
762         def callback(f):
763             returncode = self._proc.poll()
764             self._process_exited(returncode)
765
766         f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
767         f.add_done_callback(callback)
768
769
770 SelectorEventLoop = _WindowsSelectorEventLoop
771
772
773 class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
774     _loop_factory = SelectorEventLoop
775
776
777 DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy