1 """Event loop using a selector and related classes.
3 A selector is a "notify-when-ready" multiplexer. For a subclass which
4 also includes support for signal handling, see the unix_events sub-module.
7 __all__ = ['BaseSelectorEventLoop']
17 from .py3_ssl import wrap_ssl_error, SSLWantReadError, SSLWantWriteError
18 except ImportError: # pragma: no cover
21 from . import base_events
23 from . import constants
26 from . import selectors
27 from . import sslproto
28 from . import transports
29 from .compat import flatten_bytes
30 from .coroutines import coroutine, From
31 from .log import logger
32 from .py33_exceptions import (wrap_error,
33 BlockingIOError, InterruptedError, ConnectionAbortedError, BrokenPipeError,
36 # On Mac OS 10.6 with Python 2.6.1 or OpenIndiana 148 with Python 2.6.4,
37 # _SelectorSslTransport._read_ready() hangs if the socket has no data.
38 # Example: test_events.test_create_server_ssl()
39 _SSL_REQUIRES_SELECT = (sys.version_info < (2, 6, 6))
40 if _SSL_REQUIRES_SELECT:
44 def _get_socket_error(sock, address):
45 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
47 # Jump to the except clause below.
48 raise OSError(err, 'Connect call failed %s' % (address,))
51 def _test_selector_event(selector, fd, event):
52 # Test if the selector is monitoring 'event' events
53 # for the file descriptor 'fd'.
55 key = selector.get_key(fd)
59 return bool(key.events & event)
62 class BaseSelectorEventLoop(base_events.BaseEventLoop):
63 """Selector event loop.
65 See events.EventLoop for API specification.
68 def __init__(self, selector=None):
69 super(BaseSelectorEventLoop, self).__init__()
72 selector = selectors.DefaultSelector()
73 logger.debug('Using selector: %s', selector.__class__.__name__)
74 self._selector = selector
75 self._make_self_pipe()
77 def _make_socket_transport(self, sock, protocol, waiter=None,
78 extra=None, server=None):
79 return _SelectorSocketTransport(self, sock, protocol, waiter,
82 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
83 server_side=False, server_hostname=None,
84 extra=None, server=None):
85 if not sslproto._is_sslproto_available():
86 return self._make_legacy_ssl_transport(
87 rawsock, protocol, sslcontext, waiter,
88 server_side=server_side, server_hostname=server_hostname,
89 extra=extra, server=server)
91 ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
92 server_side, server_hostname)
93 _SelectorSocketTransport(self, rawsock, ssl_protocol,
94 extra=extra, server=server)
95 return ssl_protocol._app_transport
97 def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
99 server_side=False, server_hostname=None,
100 extra=None, server=None):
101 # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
102 # on Python 3.4 and older, when ssl.MemoryBIO is not available.
103 return _SelectorSslTransport(
104 self, rawsock, protocol, sslcontext, waiter,
105 server_side, server_hostname, extra, server)
107 def _make_datagram_transport(self, sock, protocol,
108 address=None, waiter=None, extra=None):
109 return _SelectorDatagramTransport(self, sock, protocol,
110 address, waiter, extra)
113 if self.is_running():
114 raise RuntimeError("Cannot close a running event loop")
117 self._close_self_pipe()
118 super(BaseSelectorEventLoop, self).close()
119 if self._selector is not None:
120 self._selector.close()
121 self._selector = None
123 def _socketpair(self):
124 raise NotImplementedError
126 def _close_self_pipe(self):
127 self.remove_reader(self._ssock.fileno())
132 self._internal_fds -= 1
134 def _make_self_pipe(self):
135 # A self-socket, really. :-)
136 self._ssock, self._csock = self._socketpair()
137 self._ssock.setblocking(False)
138 self._csock.setblocking(False)
139 self._internal_fds += 1
140 self.add_reader(self._ssock.fileno(), self._read_from_self)
142 def _process_self_data(self, data):
145 def _read_from_self(self):
148 data = wrap_error(self._ssock.recv, 4096)
151 self._process_self_data(data)
152 except InterruptedError:
154 except BlockingIOError:
157 def _write_to_self(self):
158 # This may be called from a different thread, possibly after
159 # _close_self_pipe() has been called or even while it is
160 # running. Guard for self._csock being None or closed. When
161 # a socket is closed, send() raises OSError (with errno set to
162 # EBADF, but let's not rely on the exact error code).
164 if csock is not None:
166 wrap_error(csock.send, b'\0')
169 logger.debug("Fail to write a null byte into the "
173 def _start_serving(self, protocol_factory, sock,
174 sslcontext=None, server=None):
175 self.add_reader(sock.fileno(), self._accept_connection,
176 protocol_factory, sock, sslcontext, server)
178 def _accept_connection(self, protocol_factory, sock,
179 sslcontext=None, server=None):
181 conn, addr = wrap_error(sock.accept)
183 logger.debug("%r got a new connection from %r: %r",
185 conn.setblocking(False)
186 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
188 except socket.error as exc:
189 # There's nowhere to send the error, so just log it.
190 if exc.errno in (errno.EMFILE, errno.ENFILE,
191 errno.ENOBUFS, errno.ENOMEM):
192 # Some platforms (e.g. Linux keep reporting the FD as
193 # ready, so we remove the read handler temporarily.
194 # We'll try again in a while.
195 self.call_exception_handler({
196 'message': 'socket.accept() out of system resource',
200 self.remove_reader(sock.fileno())
201 self.call_later(constants.ACCEPT_RETRY_DELAY,
203 protocol_factory, sock, sslcontext, server)
205 raise # The event loop will catch, log and ignore it.
207 extra = {'peername': addr}
208 accept = self._accept_connection2(protocol_factory, conn, extra,
210 self.create_task(accept)
213 def _accept_connection2(self, protocol_factory, conn, extra,
214 sslcontext=None, server=None):
218 protocol = protocol_factory()
219 waiter = futures.Future(loop=self)
221 transport = self._make_ssl_transport(
222 conn, protocol, sslcontext, waiter=waiter,
223 server_side=True, extra=extra, server=server)
225 transport = self._make_socket_transport(
226 conn, protocol, waiter=waiter, extra=extra,
235 # It's now up to the protocol to handle the connection.
236 except Exception as exc:
239 'message': ('Error on transport creation '
240 'for incoming connection'),
243 if protocol is not None:
244 context['protocol'] = protocol
245 if transport is not None:
246 context['transport'] = transport
247 self.call_exception_handler(context)
249 def add_reader(self, fd, callback, *args):
250 """Add a reader callback."""
252 handle = events.Handle(callback, args, self)
254 key = self._selector.get_key(fd)
256 self._selector.register(fd, selectors.EVENT_READ,
259 mask, (reader, writer) = key.events, key.data
260 self._selector.modify(fd, mask | selectors.EVENT_READ,
262 if reader is not None:
265 def remove_reader(self, fd):
266 """Remove a reader callback."""
270 key = self._selector.get_key(fd)
274 mask, (reader, writer) = key.events, key.data
275 mask &= ~selectors.EVENT_READ
277 self._selector.unregister(fd)
279 self._selector.modify(fd, mask, (None, writer))
281 if reader is not None:
287 def add_writer(self, fd, callback, *args):
288 """Add a writer callback.."""
290 handle = events.Handle(callback, args, self)
292 key = self._selector.get_key(fd)
294 self._selector.register(fd, selectors.EVENT_WRITE,
297 mask, (reader, writer) = key.events, key.data
298 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
300 if writer is not None:
303 def remove_writer(self, fd):
304 """Remove a writer callback."""
308 key = self._selector.get_key(fd)
312 mask, (reader, writer) = key.events, key.data
313 # Remove both writer and connector.
314 mask &= ~selectors.EVENT_WRITE
316 self._selector.unregister(fd)
318 self._selector.modify(fd, mask, (reader, None))
320 if writer is not None:
326 def sock_recv(self, sock, n):
327 """Receive data from the socket.
329 The return value is a bytes object representing the data received.
330 The maximum amount of data to be received at once is specified by
333 This method is a coroutine.
335 if self._debug and sock.gettimeout() != 0:
336 raise ValueError("the socket must be non-blocking")
337 fut = futures.Future(loop=self)
338 self._sock_recv(fut, False, sock, n)
341 def _sock_recv(self, fut, registered, sock, n):
342 # _sock_recv() can add itself as an I/O callback if the operation can't
343 # be done immediately. Don't use it directly, call sock_recv().
346 # Remove the callback early. It should be rare that the
347 # selector says the fd is ready but the call still returns
348 # EAGAIN, and I am willing to take a hit in that case in
349 # order to simplify the common case.
350 self.remove_reader(fd)
354 data = wrap_error(sock.recv, n)
355 except (BlockingIOError, InterruptedError):
356 self.add_reader(fd, self._sock_recv, fut, True, sock, n)
357 except Exception as exc:
358 fut.set_exception(exc)
362 def sock_sendall(self, sock, data):
363 """Send data to the socket.
365 The socket must be connected to a remote socket. This method continues
366 to send data from data until either all data has been sent or an
367 error occurs. None is returned on success. On error, an exception is
368 raised, and there is no way to determine how much data, if any, was
369 successfully processed by the receiving end of the connection.
371 This method is a coroutine.
373 if self._debug and sock.gettimeout() != 0:
374 raise ValueError("the socket must be non-blocking")
375 fut = futures.Future(loop=self)
377 self._sock_sendall(fut, False, sock, data)
382 def _sock_sendall(self, fut, registered, sock, data):
386 self.remove_writer(fd)
391 n = wrap_error(sock.send, data)
392 except (BlockingIOError, InterruptedError):
394 except Exception as exc:
395 fut.set_exception(exc)
403 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
405 def sock_connect(self, sock, address):
406 """Connect to a remote socket at address.
408 The address must be already resolved to avoid the trap of hanging the
409 entire event loop when the address requires doing a DNS lookup. For
410 example, it must be an IP address, not an hostname, for AF_INET and
411 AF_INET6 address families. Use getaddrinfo() to resolve the hostname
414 This method is a coroutine.
416 if self._debug and sock.gettimeout() != 0:
417 raise ValueError("the socket must be non-blocking")
418 fut = futures.Future(loop=self)
421 base_events._check_resolved_address(sock, address)
422 except ValueError as err:
423 fut.set_exception(err)
425 self._sock_connect(fut, sock, address)
428 def _sock_connect(self, fut, sock, address):
431 wrap_error(sock.connect, address)
432 except (BlockingIOError, InterruptedError):
433 # Issue #23618: When the C function connect() fails with EINTR, the
434 # connection runs in background. We have to wait until the socket
435 # becomes writable to be notified when the connection succeed or
437 fut.add_done_callback(functools.partial(self._sock_connect_done,
439 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
440 except Exception as exc:
441 fut.set_exception(exc)
445 def _sock_connect_done(self, fd, fut):
446 self.remove_writer(fd)
448 def _sock_connect_cb(self, fut, sock, address):
453 wrap_error(_get_socket_error, sock, address)
454 except (BlockingIOError, InterruptedError):
455 # socket is still registered, the callback will be retried later
457 except Exception as exc:
458 fut.set_exception(exc)
462 def sock_accept(self, sock):
463 """Accept a connection.
465 The socket must be bound to an address and listening for connections.
466 The return value is a pair (conn, address) where conn is a new socket
467 object usable to send and receive data on the connection, and address
468 is the address bound to the socket on the other end of the connection.
470 This method is a coroutine.
472 if self._debug and sock.gettimeout() != 0:
473 raise ValueError("the socket must be non-blocking")
474 fut = futures.Future(loop=self)
475 self._sock_accept(fut, False, sock)
478 def _sock_accept(self, fut, registered, sock):
481 self.remove_reader(fd)
485 conn, address = wrap_error(sock.accept)
486 conn.setblocking(False)
487 except (BlockingIOError, InterruptedError):
488 self.add_reader(fd, self._sock_accept, fut, True, sock)
489 except Exception as exc:
490 fut.set_exception(exc)
492 fut.set_result((conn, address))
494 def _process_events(self, event_list):
495 for key, mask in event_list:
496 fileobj, (reader, writer) = key.fileobj, key.data
497 if mask & selectors.EVENT_READ and reader is not None:
498 if reader._cancelled:
499 self.remove_reader(fileobj)
501 self._add_callback(reader)
502 if mask & selectors.EVENT_WRITE and writer is not None:
503 if writer._cancelled:
504 self.remove_writer(fileobj)
506 self._add_callback(writer)
508 def _stop_serving(self, sock):
509 self.remove_reader(sock.fileno())
513 class _SelectorTransport(transports._FlowControlMixin,
514 transports.Transport):
516 max_size = 256 * 1024 # Buffer size passed to recv().
518 _buffer_factory = bytearray # Constructs initial value for self._buffer.
520 # Attribute used in the destructor: it must be set even if the constructor
521 # is not called (see _SelectorSslTransport which may start by raising an
525 def __init__(self, loop, sock, protocol, extra=None, server=None):
526 super(_SelectorTransport, self).__init__(extra, loop)
527 self._extra['socket'] = sock
528 self._extra['sockname'] = sock.getsockname()
529 if 'peername' not in self._extra:
531 self._extra['peername'] = sock.getpeername()
533 self._extra['peername'] = None
535 self._sock_fd = sock.fileno()
536 self._protocol = protocol
537 self._protocol_connected = True
538 self._server = server
539 self._buffer = self._buffer_factory()
540 self._conn_lost = 0 # Set when call to connection_lost scheduled.
541 self._closing = False # Set when close() called.
542 if self._server is not None:
543 self._server._attach()
546 info = [self.__class__.__name__]
547 if self._sock is None:
548 info.append('closed')
550 info.append('closing')
551 info.append('fd=%s' % self._sock_fd)
552 # test if the transport was closed
553 if self._loop is not None and not self._loop.is_closed():
554 polling = _test_selector_event(self._loop._selector,
555 self._sock_fd, selectors.EVENT_READ)
557 info.append('read=polling')
559 info.append('read=idle')
561 polling = _test_selector_event(self._loop._selector,
563 selectors.EVENT_WRITE)
569 bufsize = self.get_write_buffer_size()
570 info.append('write=<%s, bufsize=%s>' % (state, bufsize))
571 return '<%s>' % ' '.join(info)
574 self._force_close(None)
580 self._loop.remove_reader(self._sock_fd)
583 self._loop.call_soon(self._call_connection_lost, None)
585 # On Python 3.3 and older, objects with a destructor part of a reference
586 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
590 if self._sock is not None:
591 warnings.warn("unclosed transport %r" % self, ResourceWarning)
594 def _fatal_error(self, exc, message='Fatal error on transport'):
595 # Should be called from exception handler only.
596 if isinstance(exc, (BrokenPipeError,
597 ConnectionResetError, ConnectionAbortedError)):
598 if self._loop.get_debug():
599 logger.debug("%r: %s", self, message, exc_info=True)
601 self._loop.call_exception_handler({
605 'protocol': self._protocol,
607 self._force_close(exc)
609 def _force_close(self, exc):
614 self._loop.remove_writer(self._sock_fd)
615 if not self._closing:
617 self._loop.remove_reader(self._sock_fd)
619 self._loop.call_soon(self._call_connection_lost, exc)
621 def _call_connection_lost(self, exc):
623 if self._protocol_connected:
624 self._protocol.connection_lost(exc)
628 self._protocol = None
630 server = self._server
631 if server is not None:
635 def get_write_buffer_size(self):
636 return len(self._buffer)
639 class _SelectorSocketTransport(_SelectorTransport):
641 def __init__(self, loop, sock, protocol, waiter=None,
642 extra=None, server=None):
643 super(_SelectorSocketTransport, self).__init__(loop, sock, protocol, extra, server)
647 self._loop.call_soon(self._protocol.connection_made, self)
648 # only start reading when connection_made() has been called
649 self._loop.call_soon(self._loop.add_reader,
650 self._sock_fd, self._read_ready)
651 if waiter is not None:
652 # only wake up the waiter when connection_made() has been called
653 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
655 def pause_reading(self):
657 raise RuntimeError('Cannot pause_reading() when closing')
659 raise RuntimeError('Already paused')
661 self._loop.remove_reader(self._sock_fd)
662 if self._loop.get_debug():
663 logger.debug("%r pauses reading", self)
665 def resume_reading(self):
667 raise RuntimeError('Not paused')
671 self._loop.add_reader(self._sock_fd, self._read_ready)
672 if self._loop.get_debug():
673 logger.debug("%r resumes reading", self)
675 def _read_ready(self):
677 data = wrap_error(self._sock.recv, self.max_size)
678 except (BlockingIOError, InterruptedError):
680 except Exception as exc:
681 self._fatal_error(exc, 'Fatal read error on socket transport')
684 self._protocol.data_received(data)
686 if self._loop.get_debug():
687 logger.debug("%r received EOF", self)
688 keep_open = self._protocol.eof_received()
690 # We're keeping the connection open so the
691 # protocol can write more, but we still can't
692 # receive more, so remove the reader callback.
693 self._loop.remove_reader(self._sock_fd)
697 def write(self, data):
698 data = flatten_bytes(data)
700 raise RuntimeError('Cannot call write() after write_eof()')
705 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
706 logger.warning('socket.send() raised exception.')
711 # Optimization: try to send now.
713 n = wrap_error(self._sock.send, data)
714 except (BlockingIOError, InterruptedError):
716 except Exception as exc:
717 self._fatal_error(exc, 'Fatal write error on socket transport')
723 # Not all was written; register write handler.
724 self._loop.add_writer(self._sock_fd, self._write_ready)
726 # Add it to the buffer.
727 self._buffer.extend(data)
728 self._maybe_pause_protocol()
730 def _write_ready(self):
731 assert self._buffer, 'Data should not be empty'
733 data = flatten_bytes(self._buffer)
735 n = wrap_error(self._sock.send, data)
736 except (BlockingIOError, InterruptedError):
738 except Exception as exc:
739 self._loop.remove_writer(self._sock_fd)
741 self._fatal_error(exc, 'Fatal write error on socket transport')
745 self._maybe_resume_protocol() # May append to buffer.
747 self._loop.remove_writer(self._sock_fd)
749 self._call_connection_lost(None)
751 self._sock.shutdown(socket.SHUT_WR)
758 self._sock.shutdown(socket.SHUT_WR)
760 def can_write_eof(self):
764 class _SelectorSslTransport(_SelectorTransport):
766 _buffer_factory = bytearray
768 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
769 server_side=False, server_hostname=None,
770 extra=None, server=None):
772 raise RuntimeError('stdlib ssl module not available')
775 sslcontext = sslproto._create_transport_context(server_side, server_hostname)
778 'server_side': server_side,
779 'do_handshake_on_connect': False,
781 if server_hostname and not server_side:
782 wrap_kwargs['server_hostname'] = server_hostname
783 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
785 super(_SelectorSslTransport, self).__init__(loop, sslsock, protocol, extra, server)
786 # the protocol connection is only made after the SSL handshake
787 self._protocol_connected = False
789 self._server_hostname = server_hostname
790 self._waiter = waiter
791 self._sslcontext = sslcontext
794 # SSL-specific extra info. (peercert is set later)
795 self._extra.update(sslcontext=sslcontext)
797 if self._loop.get_debug():
798 logger.debug("%r starts SSL handshake", self)
799 start_time = self._loop.time()
802 self._on_handshake(start_time)
804 def _wakeup_waiter(self, exc=None):
805 if self._waiter is None:
807 if not self._waiter.cancelled():
809 self._waiter.set_exception(exc)
811 self._waiter.set_result(None)
814 def _on_handshake(self, start_time):
816 wrap_ssl_error(self._sock.do_handshake)
817 except SSLWantReadError:
818 self._loop.add_reader(self._sock_fd,
819 self._on_handshake, start_time)
821 except SSLWantWriteError:
822 self._loop.add_writer(self._sock_fd,
823 self._on_handshake, start_time)
825 except BaseException as exc:
826 if self._loop.get_debug():
827 logger.warning("%r: SSL handshake failed",
829 self._loop.remove_reader(self._sock_fd)
830 self._loop.remove_writer(self._sock_fd)
832 self._wakeup_waiter(exc)
833 if isinstance(exc, Exception):
838 self._loop.remove_reader(self._sock_fd)
839 self._loop.remove_writer(self._sock_fd)
841 peercert = self._sock.getpeercert()
842 if not hasattr(self._sslcontext, 'check_hostname'):
843 # Verify hostname if requested, Python 3.4+ uses check_hostname
844 # and checks the hostname in do_handshake()
845 if (self._server_hostname and
846 self._sslcontext.verify_mode != ssl.CERT_NONE):
848 ssl.match_hostname(peercert, self._server_hostname)
849 except Exception as exc:
850 if self._loop.get_debug():
851 logger.warning("%r: SSL handshake failed "
852 "on matching the hostname",
855 self._wakeup_waiter(exc)
858 # Add extra info that becomes available after handshake.
859 self._extra.update(peercert=peercert,
860 cipher=self._sock.cipher(),
862 if hasattr(self._sock, 'compression'):
863 self._extra['compression'] = self._sock.compression()
865 self._read_wants_write = False
866 self._write_wants_read = False
867 self._loop.add_reader(self._sock_fd, self._read_ready)
868 self._protocol_connected = True
869 self._loop.call_soon(self._protocol.connection_made, self)
870 # only wake up the waiter when connection_made() has been called
871 self._loop.call_soon(self._wakeup_waiter)
873 if self._loop.get_debug():
874 dt = self._loop.time() - start_time
875 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
877 def pause_reading(self):
878 # XXX This is a bit icky, given the comment at the top of
879 # _read_ready(). Is it possible to evoke a deadlock? I don't
880 # know, although it doesn't look like it; write() will still
881 # accept more data for the buffer and eventually the app will
882 # call resume_reading() again, and things will flow again.
885 raise RuntimeError('Cannot pause_reading() when closing')
887 raise RuntimeError('Already paused')
889 self._loop.remove_reader(self._sock_fd)
890 if self._loop.get_debug():
891 logger.debug("%r pauses reading", self)
893 def resume_reading(self):
895 raise RuntimeError('Not paused')
899 self._loop.add_reader(self._sock_fd, self._read_ready)
900 if self._loop.get_debug():
901 logger.debug("%r resumes reading", self)
903 def _sock_recv(self):
904 return wrap_ssl_error(self._sock.recv, self.max_size)
906 def _read_ready(self):
907 if self._write_wants_read:
908 self._write_wants_read = False
912 self._loop.add_writer(self._sock_fd, self._write_ready)
915 if _SSL_REQUIRES_SELECT:
916 rfds = (self._sock.fileno(),)
917 rfds = select.select(rfds, (), (), 0.0)[0]
921 data = wrap_error(self._sock_recv)
922 except (BlockingIOError, InterruptedError, SSLWantReadError):
924 except SSLWantWriteError:
925 self._read_wants_write = True
926 self._loop.remove_reader(self._sock_fd)
927 self._loop.add_writer(self._sock_fd, self._write_ready)
928 except Exception as exc:
929 self._fatal_error(exc, 'Fatal read error on SSL transport')
932 self._protocol.data_received(data)
935 if self._loop.get_debug():
936 logger.debug("%r received EOF", self)
937 keep_open = self._protocol.eof_received()
939 logger.warning('returning true from eof_received() '
940 'has no effect when using ssl')
944 def _write_ready(self):
945 if self._read_wants_write:
946 self._read_wants_write = False
949 if not (self._paused or self._closing):
950 self._loop.add_reader(self._sock_fd, self._read_ready)
953 data = flatten_bytes(self._buffer)
955 n = wrap_error(self._sock.send, data)
956 except (BlockingIOError, InterruptedError, SSLWantWriteError):
958 except SSLWantReadError:
960 self._loop.remove_writer(self._sock_fd)
961 self._write_wants_read = True
962 except Exception as exc:
963 self._loop.remove_writer(self._sock_fd)
965 self._fatal_error(exc, 'Fatal write error on SSL transport')
971 self._maybe_resume_protocol() # May append to buffer.
974 self._loop.remove_writer(self._sock_fd)
976 self._call_connection_lost(None)
978 def write(self, data):
979 data = flatten_bytes(data)
984 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
985 logger.warning('socket.send() raised exception.')
990 self._loop.add_writer(self._sock_fd, self._write_ready)
992 # Add it to the buffer.
993 self._buffer.extend(data)
994 self._maybe_pause_protocol()
996 def can_write_eof(self):
1000 class _SelectorDatagramTransport(_SelectorTransport):
1002 _buffer_factory = collections.deque
1004 def __init__(self, loop, sock, protocol, address=None,
1005 waiter=None, extra=None):
1006 super(_SelectorDatagramTransport, self).__init__(loop, sock,
1008 self._address = address
1009 self._loop.call_soon(self._protocol.connection_made, self)
1010 # only start reading when connection_made() has been called
1011 self._loop.call_soon(self._loop.add_reader,
1012 self._sock_fd, self._read_ready)
1013 if waiter is not None:
1014 # only wake up the waiter when connection_made() has been called
1015 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
1017 def get_write_buffer_size(self):
1018 return sum(len(data) for data, _ in self._buffer)
1020 def _read_ready(self):
1022 data, addr = wrap_error(self._sock.recvfrom, self.max_size)
1023 except (BlockingIOError, InterruptedError):
1025 except OSError as exc:
1026 self._protocol.error_received(exc)
1027 except Exception as exc:
1028 self._fatal_error(exc, 'Fatal read error on datagram transport')
1030 self._protocol.datagram_received(data, addr)
1032 def sendto(self, data, addr=None):
1033 data = flatten_bytes(data)
1037 if self._address and addr not in (None, self._address):
1038 raise ValueError('Invalid address: must be None or %s' %
1041 if self._conn_lost and self._address:
1042 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1043 logger.warning('socket.send() raised exception.')
1044 self._conn_lost += 1
1047 if not self._buffer:
1048 # Attempt to send it right away first.
1051 wrap_error(self._sock.send, data)
1053 wrap_error(self._sock.sendto, data, addr)
1055 except (BlockingIOError, InterruptedError):
1056 self._loop.add_writer(self._sock_fd, self._sendto_ready)
1057 except OSError as exc:
1058 self._protocol.error_received(exc)
1060 except Exception as exc:
1061 self._fatal_error(exc,
1062 'Fatal write error on datagram transport')
1065 # Ensure that what we buffer is immutable.
1066 self._buffer.append((bytes(data), addr))
1067 self._maybe_pause_protocol()
1069 def _sendto_ready(self):
1071 data, addr = self._buffer.popleft()
1074 wrap_error(self._sock.send, data)
1076 wrap_error(self._sock.sendto, data, addr)
1077 except (BlockingIOError, InterruptedError):
1078 self._buffer.appendleft((data, addr)) # Try again later.
1080 except OSError as exc:
1081 self._protocol.error_received(exc)
1083 except Exception as exc:
1084 self._fatal_error(exc,
1085 'Fatal write error on datagram transport')
1088 self._maybe_resume_protocol() # May append to buffer.
1089 if not self._buffer:
1090 self._loop.remove_writer(self._sock_fd)
1092 self._call_connection_lost(None)