--- /dev/null
+from typing import Tuple, Union, Optional, Any, Dict, overload\r
+\r
+import os\r
+import select\r
+import sys\r
+import time\r
+import warnings\r
+from socket import SocketType\r
+from typing import Optional\r
+\r
+from errno import (EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL,\r
+ ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED,\r
+ EPIPE, EAGAIN, errorcode)\r
+\r
+# cyclic dependence with asynchat\r
+_maptype = Dict[str, Any]\r
+\r
+\r
+class ExitNow(Exception): ...\r
+\r
+def read(obj: Any) -> None: ...\r
+def write(obj: Any) -> None: ...\r
+def readwrite(obj: Any, flags: int) -> None: ...\r
+def poll(timeout: float = ..., map: _maptype = ...) -> None: ...\r
+def poll2(timeout: float = ..., map: _maptype = ...) -> None: ...\r
+\r
+poll3 = poll2\r
+\r
+def loop(timeout: float = ..., use_poll: bool = ..., map: _maptype = ..., count: Optional[int] = ...) -> None: ...\r
+\r
+\r
+# Not really subclass of socket.socket; it's only delegation.\r
+# It is not covariant to it.\r
+class dispatcher:\r
+\r
+ debug = ... # type: bool\r
+ connected = ... # type: bool\r
+ accepting = ... # type: bool\r
+ connecting = ... # type: bool\r
+ closing = ... # type: bool\r
+ ignore_log_types = ... # type: frozenset[str]\r
+ socket = ... # type: Optional[SocketType] # undocumented\r
+\r
+ def __init__(self, sock: Optional[SocketType] = ..., map: _maptype = ...) -> None: ...\r
+ def add_channel(self, map: _maptype = ...) -> None: ...\r
+ def del_channel(self, map: _maptype = ...) -> None: ...\r
+ def create_socket(self, family: int, type: int) -> None: ...\r
+ def set_socket(self, sock: SocketType, map: _maptype = ...) -> None: ...\r
+ def set_reuse_addr(self) -> None: ...\r
+ def readable(self) -> bool: ...\r
+ def writable(self) -> bool: ...\r
+ def listen(self, backlog: int) -> None: ...\r
+ def bind(self, address: Union[tuple, str]) -> None: ...\r
+ def connect(self, address: Union[tuple, str]) -> None: ...\r
+ def accept(self) -> Optional[Tuple[SocketType, Any]]: ...\r
+ def send(self, data: bytes) -> int: ...\r
+ def recv(self, buffer_size: int) -> bytes: ...\r
+ def close(self) -> None: ...\r
+\r
+ def log(self, message: Any) -> None: ...\r
+ def log_info(self, message: Any, type: str = ...) -> None: ...\r
+ def handle_read_event(self) -> None: ...\r
+ def handle_connect_event(self) -> None: ...\r
+ def handle_write_event(self) -> None: ...\r
+ def handle_expt_event(self) -> None: ...\r
+ def handle_error(self) -> None: ...\r
+ def handle_expt(self) -> None: ...\r
+ def handle_read(self) -> None: ...\r
+ def handle_write(self) -> None: ...\r
+ def handle_connect(self) -> None: ...\r
+ def handle_accept(self) -> None: ...\r
+ def handle_close(self) -> None: ...\r
+\r
+ if sys.version_info < (3, 5):\r
+ # Historically, some methods were "imported" from `self.socket` by\r
+ # means of `__getattr__`. This was long deprecated, and as of Python\r
+ # 3.5 has been removed; simply call the relevant methods directly on\r
+ # self.socket if necessary.\r
+\r
+ def detach(self) -> int: ...\r
+ def fileno(self) -> int: ...\r
+\r
+ # return value is an address\r
+ def getpeername(self) -> Any: ...\r
+ def getsockname(self) -> Any: ...\r
+\r
+ @overload\r
+ def getsockopt(self, level: int, optname: int) -> int: ...\r
+ @overload\r
+ def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...\r
+\r
+ def gettimeout(self) -> float: ...\r
+ def ioctl(self, control: object,\r
+ option: Tuple[int, int, int]) -> None: ...\r
+ # TODO the return value may be BinaryIO or TextIO, depending on mode\r
+ def makefile(self, mode: str = ..., buffering: int = ...,\r
+ encoding: str = ..., errors: str = ...,\r
+ newline: str = ...) -> Any:\r
+ ...\r
+\r
+ # return type is an address\r
+ def recvfrom(self, bufsize: int, flags: int = ...) -> Any: ...\r
+ def recvfrom_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ...\r
+ def recv_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ...\r
+ def sendall(self, data: bytes, flags: int = ...) -> None: ...\r
+ def sendto(self, data: bytes, address: Union[tuple, str], flags: int = ...) -> int: ...\r
+ def setblocking(self, flag: bool) -> None: ...\r
+ def settimeout(self, value: Union[float, None]) -> None: ...\r
+ def setsockopt(self, level: int, optname: int, value: Union[int, bytes]) -> None: ...\r
+ def shutdown(self, how: int) -> None: ...\r
+\r
+class dispatcher_with_send(dispatcher):\r
+ def __init__(self, sock: SocketType = ..., map: _maptype = ...) -> None: ...\r
+ def initiate_send(self) -> None: ...\r
+ def handle_write(self) -> None: ...\r
+ # incompatible signature:\r
+ # def send(self, data: bytes) -> Optional[int]: ...\r
+\r
+def compact_traceback() -> Tuple[Tuple[str, str, str], type, type, str]: ...\r
+def close_all(map: _maptype = ..., ignore_all: bool = ...) -> None: ...\r
+\r
+# if os.name == 'posix':\r
+# import fcntl\r
+class file_wrapper:\r
+ fd = ... # type: int\r
+\r
+ def __init__(self, fd: int) -> None: ...\r
+ def recv(self, bufsize: int, flags: int = ...) -> bytes: ...\r
+ def send(self, data: bytes, flags: int = ...) -> int: ...\r
+\r
+ @overload\r
+ def getsockopt(self, level: int, optname: int) -> int: ...\r
+ @overload\r
+ def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...\r
+\r
+ def read(self, bufsize: int, flags: int = ...) -> bytes: ...\r
+ def write(self, data: bytes, flags: int = ...) -> int: ...\r
+\r
+ def close(self) -> None: ...\r
+ def fileno(self) -> int: ...\r
+\r
+class file_dispatcher(dispatcher):\r
+ def __init__(self, fd: int, map: _maptype = ...) -> None: ...\r
+ def set_file(self, fd: int) -> None: ...\r