1 """Selector and proactor event loops for Windows."""
10 from . import base_subprocess
12 from . import proactor_events
13 from . import py33_winapi as _winapi
14 from . import selector_events
16 from . import windows_utils
17 from . import _overlapped
18 from .coroutines import coroutine, From, Return
19 from .log import logger
20 from .py33_exceptions import wrap_error, BrokenPipeError, ConnectionResetError
23 __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
24 'DefaultEventLoopPolicy',
30 ERROR_CONNECTION_REFUSED = 1225
31 ERROR_CONNECTION_ABORTED = 1236
33 # Initial delay in seconds for connect_pipe() before retrying to connect
34 CONNECT_PIPE_INIT_DELAY = 0.001
36 # Maximum delay in seconds for connect_pipe() before retrying to connect
37 CONNECT_PIPE_MAX_DELAY = 0.100
40 class _OverlappedFuture(futures.Future):
41 """Subclass of Future which represents an overlapped operation.
43 Cancelling it will immediately cancel the overlapped operation.
46 def __init__(self, ov, loop=None):
47 super(_OverlappedFuture, self).__init__(loop=loop)
48 if self._source_traceback:
49 del self._source_traceback[-1]
53 info = super(_OverlappedFuture, self)._repr_info()
54 if self._ov is not None:
55 state = 'pending' if self._ov.pending else 'completed'
56 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
59 def _cancel_overlapped(self):
64 except OSError as exc:
66 'message': 'Cancelling an overlapped future failed',
70 if self._source_traceback:
71 context['source_traceback'] = self._source_traceback
72 self._loop.call_exception_handler(context)
76 self._cancel_overlapped()
77 return super(_OverlappedFuture, self).cancel()
79 def set_exception(self, exception):
80 super(_OverlappedFuture, self).set_exception(exception)
81 self._cancel_overlapped()
83 def set_result(self, result):
84 super(_OverlappedFuture, self).set_result(result)
88 class _BaseWaitHandleFuture(futures.Future):
89 """Subclass of Future which represents a wait handle."""
91 def __init__(self, ov, handle, wait_handle, loop=None):
92 super(_BaseWaitHandleFuture, self).__init__(loop=loop)
93 if self._source_traceback:
94 del self._source_traceback[-1]
95 # Keep a reference to the Overlapped object to keep it alive until the
96 # wait is unregistered
99 self._wait_handle = wait_handle
101 # Should we call UnregisterWaitEx() if the wait completes
103 self._registered = True
106 # non-blocking wait: use a timeout of 0 millisecond
107 return (_winapi.WaitForSingleObject(self._handle, 0) ==
108 _winapi.WAIT_OBJECT_0)
110 def _repr_info(self):
111 info = super(_BaseWaitHandleFuture, self)._repr_info()
112 info.append('handle=%#x' % self._handle)
113 if self._handle is not None:
114 state = 'signaled' if self._poll() else 'waiting'
116 if self._wait_handle is not None:
117 info.append('wait_handle=%#x' % self._wait_handle)
120 def _unregister_wait_cb(self, fut):
121 # The wait was unregistered: it's not safe to destroy the Overlapped
125 def _unregister_wait(self):
126 if not self._registered:
128 self._registered = False
130 wait_handle = self._wait_handle
131 self._wait_handle = None
133 _overlapped.UnregisterWait(wait_handle)
134 except OSError as exc:
135 if exc.winerror != _overlapped.ERROR_IO_PENDING:
137 'message': 'Failed to unregister the wait handle',
141 if self._source_traceback:
142 context['source_traceback'] = self._source_traceback
143 self._loop.call_exception_handler(context)
145 # ERROR_IO_PENDING means that the unregister is pending
147 self._unregister_wait_cb(None)
150 self._unregister_wait()
151 return super(_BaseWaitHandleFuture, self).cancel()
153 def set_exception(self, exception):
154 self._unregister_wait()
155 super(_BaseWaitHandleFuture, self).set_exception(exception)
157 def set_result(self, result):
158 self._unregister_wait()
159 super(_BaseWaitHandleFuture, self).set_result(result)
162 class _WaitCancelFuture(_BaseWaitHandleFuture):
163 """Subclass of Future which represents a wait for the cancellation of a
164 _WaitHandleFuture using an event.
167 def __init__(self, ov, event, wait_handle, loop=None):
168 super(_WaitCancelFuture, self).__init__(ov, event, wait_handle,
171 self._done_callback = None
174 raise RuntimeError("_WaitCancelFuture must not be cancelled")
176 def _schedule_callbacks(self):
177 super(_WaitCancelFuture, self)._schedule_callbacks()
178 if self._done_callback is not None:
179 self._done_callback(self)
182 class _WaitHandleFuture(_BaseWaitHandleFuture):
183 def __init__(self, ov, handle, wait_handle, proactor, loop=None):
184 super(_WaitHandleFuture, self).__init__(ov, handle, wait_handle,
186 self._proactor = proactor
187 self._unregister_proactor = True
188 self._event = _overlapped.CreateEvent(None, True, False, None)
189 self._event_fut = None
191 def _unregister_wait_cb(self, fut):
192 if self._event is not None:
193 _winapi.CloseHandle(self._event)
195 self._event_fut = None
197 # If the wait was cancelled, the wait may never be signalled, so
198 # it's required to unregister it. Otherwise, IocpProactor.close() will
199 # wait forever for an event which will never come.
201 # If the IocpProactor already received the event, it's safe to call
202 # _unregister() because we kept a reference to the Overlapped object
203 # which is used as an unique key.
204 self._proactor._unregister(self._ov)
205 self._proactor = None
207 super(_WaitHandleFuture, self)._unregister_wait_cb(fut)
209 def _unregister_wait(self):
210 if not self._registered:
212 self._registered = False
214 wait_handle = self._wait_handle
215 self._wait_handle = None
217 _overlapped.UnregisterWaitEx(wait_handle, self._event)
218 except OSError as exc:
219 if exc.winerror != _overlapped.ERROR_IO_PENDING:
221 'message': 'Failed to unregister the wait handle',
225 if self._source_traceback:
226 context['source_traceback'] = self._source_traceback
227 self._loop.call_exception_handler(context)
229 # ERROR_IO_PENDING is not an error, the wait was unregistered
231 self._event_fut = self._proactor._wait_cancel(self._event,
232 self._unregister_wait_cb)
235 class PipeServer(object):
236 """Class representing a pipe server.
238 This is much like a bound, listening socket.
240 def __init__(self, address):
241 self._address = address
242 self._free_instances = weakref.WeakSet()
243 # initialize the pipe attribute before calling _server_pipe_handle()
244 # because this function can raise an exception and the destructor calls
247 self._accept_pipe_future = None
248 self._pipe = self._server_pipe_handle(True)
250 def _get_unconnected_pipe(self):
251 # Create new instance and return previous one. This ensures
252 # that (until the server is closed) there is always at least
253 # one pipe handle for address. Therefore if a client attempt
254 # to connect it will not fail with FileNotFoundError.
255 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
258 def _server_pipe_handle(self, first):
259 # Return a wrapper for a new pipe handle.
262 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
264 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
265 h = wrap_error(_winapi.CreateNamedPipe,
266 self._address, flags,
267 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
269 _winapi.PIPE_UNLIMITED_INSTANCES,
270 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
271 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
272 pipe = windows_utils.PipeHandle(h)
273 self._free_instances.add(pipe)
277 return (self._address is None)
280 if self._accept_pipe_future is not None:
281 self._accept_pipe_future.cancel()
282 self._accept_pipe_future = None
283 # Close all instances which have not been connected to by a client.
284 if self._address is not None:
285 for pipe in self._free_instances:
289 self._free_instances.clear()
294 class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
295 """Windows version of selector event loop."""
297 def _socketpair(self):
298 return windows_utils.socketpair()
301 class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
302 """Windows version of proactor event loop using IOCP."""
304 def __init__(self, proactor=None):
306 proactor = IocpProactor()
307 super(ProactorEventLoop, self).__init__(proactor)
309 def _socketpair(self):
310 return windows_utils.socketpair()
313 def create_pipe_connection(self, protocol_factory, address):
314 f = self._proactor.connect_pipe(address)
316 protocol = protocol_factory()
317 trans = self._make_duplex_pipe_transport(pipe, protocol,
318 extra={'addr': address})
319 raise Return(trans, protocol)
322 def start_serving_pipe(self, protocol_factory, address):
323 server = PipeServer(address)
325 def loop_accept_pipe(f=None):
330 server._free_instances.discard(pipe)
333 # A client connected before the server was closed:
334 # drop the client (close the pipe) and exit
338 protocol = protocol_factory()
339 self._make_duplex_pipe_transport(
340 pipe, protocol, extra={'addr': address})
342 pipe = server._get_unconnected_pipe()
346 f = self._proactor.accept_pipe(pipe)
347 except OSError as exc:
348 if pipe and pipe.fileno() != -1:
349 self.call_exception_handler({
350 'message': 'Pipe accept failed',
356 logger.warning("Accept pipe failed on pipe %r",
358 except futures.CancelledError:
362 server._accept_pipe_future = f
363 f.add_done_callback(loop_accept_pipe)
365 self.call_soon(loop_accept_pipe)
369 def _make_subprocess_transport(self, protocol, args, shell,
370 stdin, stdout, stderr, bufsize,
371 extra=None, **kwargs):
372 waiter = futures.Future(loop=self)
373 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
374 stdin, stdout, stderr, bufsize,
375 waiter=waiter, extra=extra,
379 except Exception as exc:
380 # Workaround CPython bug #23353: using yield/yield-from in an
381 # except block of a generator doesn't clear properly sys.exc_info()
388 yield From(transp._wait())
394 class IocpProactor(object):
395 """Proactor implementation using IOCP."""
397 def __init__(self, concurrency=0xffffffff):
400 self._iocp = _overlapped.CreateIoCompletionPort(
401 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
403 self._registered = weakref.WeakSet()
404 self._unregistered = []
405 self._stopped_serving = weakref.WeakSet()
408 return ('<%s overlapped#=%s result#=%s>'
409 % (self.__class__.__name__, len(self._cache),
412 def set_loop(self, loop):
415 def select(self, timeout=None):
416 if not self._results:
422 def _result(self, value):
423 fut = futures.Future(loop=self._loop)
424 fut.set_result(value)
427 def recv(self, conn, nbytes, flags=0):
428 self._register_with_iocp(conn)
429 ov = _overlapped.Overlapped(NULL)
431 if isinstance(conn, socket.socket):
432 wrap_error(ov.WSARecv, conn.fileno(), nbytes, flags)
434 wrap_error(ov.ReadFile, conn.fileno(), nbytes)
435 except BrokenPipeError:
436 return self._result(b'')
438 def finish_recv(trans, key, ov):
440 return wrap_error(ov.getresult)
441 except WindowsError as exc:
442 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
443 raise ConnectionResetError(*exc.args)
447 return self._register(ov, conn, finish_recv)
449 def send(self, conn, buf, flags=0):
450 self._register_with_iocp(conn)
451 ov = _overlapped.Overlapped(NULL)
452 if isinstance(conn, socket.socket):
453 ov.WSASend(conn.fileno(), buf, flags)
455 ov.WriteFile(conn.fileno(), buf)
457 def finish_send(trans, key, ov):
459 return wrap_error(ov.getresult)
460 except WindowsError as exc:
461 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
462 raise ConnectionResetError(*exc.args)
466 return self._register(ov, conn, finish_send)
468 def accept(self, listener):
469 self._register_with_iocp(listener)
470 conn = self._get_accept_socket(listener.family)
471 ov = _overlapped.Overlapped(NULL)
472 ov.AcceptEx(listener.fileno(), conn.fileno())
474 def finish_accept(trans, key, ov):
475 wrap_error(ov.getresult)
476 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
477 buf = struct.pack('@P', listener.fileno())
478 conn.setsockopt(socket.SOL_SOCKET,
479 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
480 conn.settimeout(listener.gettimeout())
481 return conn, conn.getpeername()
484 def accept_coro(future, conn):
485 # Coroutine closing the accept socket if the future is cancelled
488 except futures.CancelledError:
492 future = self._register(ov, listener, finish_accept)
493 coro = accept_coro(future, conn)
494 tasks.ensure_future(coro, loop=self._loop)
497 def connect(self, conn, address):
498 self._register_with_iocp(conn)
499 # The socket needs to be locally bound before we call ConnectEx().
501 _overlapped.BindLocal(conn.fileno(), conn.family)
502 except WindowsError as e:
503 if e.winerror != errno.WSAEINVAL:
505 # Probably already locally bound; check using getsockname().
506 if conn.getsockname()[1] == 0:
508 ov = _overlapped.Overlapped(NULL)
509 ov.ConnectEx(conn.fileno(), address)
511 def finish_connect(trans, key, ov):
512 wrap_error(ov.getresult)
513 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
514 conn.setsockopt(socket.SOL_SOCKET,
515 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
518 return self._register(ov, conn, finish_connect)
520 def accept_pipe(self, pipe):
521 self._register_with_iocp(pipe)
522 ov = _overlapped.Overlapped(NULL)
523 connected = ov.ConnectNamedPipe(pipe.fileno())
526 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
527 # that the pipe is connected. There is no need to wait for the
528 # completion of the connection.
529 return self._result(pipe)
531 def finish_accept_pipe(trans, key, ov):
532 wrap_error(ov.getresult)
535 return self._register(ov, pipe, finish_accept_pipe)
538 def connect_pipe(self, address):
539 delay = CONNECT_PIPE_INIT_DELAY
541 # Unfortunately there is no way to do an overlapped connect to a pipe.
542 # Call CreateFile() in a loop until it doesn't fail with
545 handle = wrap_error(_overlapped.ConnectPipe, address)
547 except WindowsError as exc:
548 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
551 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
552 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
553 yield From(tasks.sleep(delay, loop=self._loop))
555 raise Return(windows_utils.PipeHandle(handle))
557 def wait_for_handle(self, handle, timeout=None):
558 """Wait for a handle.
560 Return a Future object. The result of the future is True if the wait
561 completed, or False if the wait did not complete (on timeout).
563 return self._wait_for_handle(handle, timeout, False)
565 def _wait_cancel(self, event, done_callback):
566 fut = self._wait_for_handle(event, None, True)
567 # add_done_callback() cannot be used because the wait may only complete
568 # in IocpProactor.close(), while the event loop is not running.
569 fut._done_callback = done_callback
572 def _wait_for_handle(self, handle, timeout, _is_cancel):
574 ms = _winapi.INFINITE
576 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
577 # round away from zero to wait *at least* timeout seconds.
578 ms = int(math.ceil(timeout * 1e3))
580 # We only create ov so we can use ov.address as a key for the cache.
581 ov = _overlapped.Overlapped(NULL)
582 wait_handle = _overlapped.RegisterWaitWithQueue(
583 handle, self._iocp, ov.address, ms)
585 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
587 f = _WaitHandleFuture(ov, handle, wait_handle, self,
589 if f._source_traceback:
590 del f._source_traceback[-1]
592 def finish_wait_for_handle(trans, key, ov):
593 # Note that this second wait means that we should only use
594 # this with handles types where a successful wait has no
595 # effect. So events or processes are all right, but locks
596 # or semaphores are not. Also note if the handle is
597 # signalled and then quickly reset, then we may return
598 # False even though we have not timed out.
601 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
604 def _register_with_iocp(self, obj):
605 # To get notifications of finished ops on this objects sent to the
606 # completion port, were must register the handle.
607 if obj not in self._registered:
608 self._registered.add(obj)
609 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
610 # XXX We could also use SetFileCompletionNotificationModes()
611 # to avoid sending notifications to completion port of ops
612 # that succeed immediately.
614 def _register(self, ov, obj, callback):
615 # Return a future which will be set with the result of the
616 # operation when it completes. The future's value is actually
617 # the value returned by callback().
618 f = _OverlappedFuture(ov, loop=self._loop)
619 if f._source_traceback:
620 del f._source_traceback[-1]
622 # The operation has completed, so no need to postpone the
623 # work. We cannot take this short cut if we need the
624 # NumberOfBytes, CompletionKey values returned by
625 # PostQueuedCompletionStatus().
627 value = callback(None, None, ov)
632 # Even if GetOverlappedResult() was called, we have to wait for the
633 # notification of the completion in GetQueuedCompletionStatus().
634 # Register the overlapped operation to keep a reference to the
635 # OVERLAPPED object, otherwise the memory is freed and Windows may
636 # read uninitialized memory.
638 # Register the overlapped operation for later. Note that
639 # we only store obj to prevent it from being garbage
640 # collected too early.
641 self._cache[ov.address] = (f, ov, obj, callback)
644 def _unregister(self, ov):
645 """Unregister an overlapped object.
647 Call this method when its future has been cancelled. The event can
648 already be signalled (pending in the proactor event queue). It is also
649 safe if the event is never signalled (because it was cancelled).
651 self._unregistered.append(ov)
653 def _get_accept_socket(self, family):
654 s = socket.socket(family)
658 def _poll(self, timeout=None):
662 raise ValueError("negative timeout")
664 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
665 # round away from zero to wait *at least* timeout seconds.
666 ms = int(math.ceil(timeout * 1e3))
668 raise ValueError("timeout too big")
671 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
676 err, transferred, key, address = status
678 f, ov, obj, callback = self._cache.pop(address)
680 if self._loop.get_debug():
681 self._loop.call_exception_handler({
682 'message': ('GetQueuedCompletionStatus() returned an '
684 'status': ('err=%s transferred=%s key=%#x address=%#x'
685 % (err, transferred, key, address)),
688 # key is either zero, or it is used to return a pipe
689 # handle which should be closed to avoid a leak.
690 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
691 _winapi.CloseHandle(key)
694 if obj in self._stopped_serving:
696 # Don't call the callback if _register() already read the result or
697 # if the overlapped has been cancelled
700 value = callback(transferred, key, ov)
703 self._results.append(f)
706 self._results.append(f)
708 # Remove unregisted futures
709 for ov in self._unregistered:
710 self._cache.pop(ov.address, None)
711 del self._unregistered[:]
713 def _stop_serving(self, obj):
714 # obj is a socket or pipe handle. It will be closed in
715 # BaseProactorEventLoop._stop_serving() which will make any
716 # pending operations fail quickly.
717 self._stopped_serving.add(obj)
720 # Cancel remaining registered operations.
721 for address, (fut, ov, obj, callback) in list(self._cache.items()):
723 # Nothing to do with cancelled futures
725 elif isinstance(fut, _WaitCancelFuture):
726 # _WaitCancelFuture must not be cancelled
731 except OSError as exc:
732 if self._loop is not None:
734 'message': 'Cancelling a future failed',
738 if fut._source_traceback:
739 context['source_traceback'] = fut._source_traceback
740 self._loop.call_exception_handler(context)
743 if not self._poll(1):
744 logger.debug('taking long time to close proactor')
747 if self._iocp is not None:
748 _winapi.CloseHandle(self._iocp)
755 class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
757 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
758 self._proc = windows_utils.Popen(
759 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
760 bufsize=bufsize, **kwargs)
763 returncode = self._proc.poll()
764 self._process_exited(returncode)
766 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
767 f.add_done_callback(callback)
770 SelectorEventLoop = _WindowsSelectorEventLoop
773 class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
774 _loop_factory = SelectorEventLoop
777 DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy