1 from typing import Tuple, Union, Optional, Any, Dict, overload
\r
8 from socket import SocketType
\r
9 from typing import Optional
\r
11 from errno import (EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL,
\r
12 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED,
\r
13 EPIPE, EAGAIN, errorcode)
\r
15 # cyclic dependence with asynchat
\r
16 _maptype = Dict[str, Any]
\r
19 class ExitNow(Exception): ...
\r
21 def read(obj: Any) -> None: ...
\r
22 def write(obj: Any) -> None: ...
\r
23 def readwrite(obj: Any, flags: int) -> None: ...
\r
24 def poll(timeout: float = ..., map: _maptype = ...) -> None: ...
\r
25 def poll2(timeout: float = ..., map: _maptype = ...) -> None: ...
\r
29 def loop(timeout: float = ..., use_poll: bool = ..., map: _maptype = ..., count: Optional[int] = ...) -> None: ...
\r
32 # Not really subclass of socket.socket; it's only delegation.
\r
33 # It is not covariant to it.
\r
36 debug = ... # type: bool
\r
37 connected = ... # type: bool
\r
38 accepting = ... # type: bool
\r
39 connecting = ... # type: bool
\r
40 closing = ... # type: bool
\r
41 ignore_log_types = ... # type: frozenset[str]
\r
42 socket = ... # type: Optional[SocketType] # undocumented
\r
44 def __init__(self, sock: Optional[SocketType] = ..., map: _maptype = ...) -> None: ...
\r
45 def add_channel(self, map: _maptype = ...) -> None: ...
\r
46 def del_channel(self, map: _maptype = ...) -> None: ...
\r
47 def create_socket(self, family: int, type: int) -> None: ...
\r
48 def set_socket(self, sock: SocketType, map: _maptype = ...) -> None: ...
\r
49 def set_reuse_addr(self) -> None: ...
\r
50 def readable(self) -> bool: ...
\r
51 def writable(self) -> bool: ...
\r
52 def listen(self, backlog: int) -> None: ...
\r
53 def bind(self, address: Union[tuple, str]) -> None: ...
\r
54 def connect(self, address: Union[tuple, str]) -> None: ...
\r
55 def accept(self) -> Optional[Tuple[SocketType, Any]]: ...
\r
56 def send(self, data: bytes) -> int: ...
\r
57 def recv(self, buffer_size: int) -> bytes: ...
\r
58 def close(self) -> None: ...
\r
60 def log(self, message: Any) -> None: ...
\r
61 def log_info(self, message: Any, type: str = ...) -> None: ...
\r
62 def handle_read_event(self) -> None: ...
\r
63 def handle_connect_event(self) -> None: ...
\r
64 def handle_write_event(self) -> None: ...
\r
65 def handle_expt_event(self) -> None: ...
\r
66 def handle_error(self) -> None: ...
\r
67 def handle_expt(self) -> None: ...
\r
68 def handle_read(self) -> None: ...
\r
69 def handle_write(self) -> None: ...
\r
70 def handle_connect(self) -> None: ...
\r
71 def handle_accept(self) -> None: ...
\r
72 def handle_close(self) -> None: ...
\r
74 if sys.version_info < (3, 5):
\r
75 # Historically, some methods were "imported" from `self.socket` by
\r
76 # means of `__getattr__`. This was long deprecated, and as of Python
\r
77 # 3.5 has been removed; simply call the relevant methods directly on
\r
78 # self.socket if necessary.
\r
80 def detach(self) -> int: ...
\r
81 def fileno(self) -> int: ...
\r
83 # return value is an address
\r
84 def getpeername(self) -> Any: ...
\r
85 def getsockname(self) -> Any: ...
\r
88 def getsockopt(self, level: int, optname: int) -> int: ...
\r
90 def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
\r
92 def gettimeout(self) -> float: ...
\r
93 def ioctl(self, control: object,
\r
94 option: Tuple[int, int, int]) -> None: ...
\r
95 # TODO the return value may be BinaryIO or TextIO, depending on mode
\r
96 def makefile(self, mode: str = ..., buffering: int = ...,
\r
97 encoding: str = ..., errors: str = ...,
\r
98 newline: str = ...) -> Any:
\r
101 # return type is an address
\r
102 def recvfrom(self, bufsize: int, flags: int = ...) -> Any: ...
\r
103 def recvfrom_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ...
\r
104 def recv_into(self, buffer: bytes, nbytes: int, flags: int = ...) -> Any: ...
\r
105 def sendall(self, data: bytes, flags: int = ...) -> None: ...
\r
106 def sendto(self, data: bytes, address: Union[tuple, str], flags: int = ...) -> int: ...
\r
107 def setblocking(self, flag: bool) -> None: ...
\r
108 def settimeout(self, value: Union[float, None]) -> None: ...
\r
109 def setsockopt(self, level: int, optname: int, value: Union[int, bytes]) -> None: ...
\r
110 def shutdown(self, how: int) -> None: ...
\r
112 class dispatcher_with_send(dispatcher):
\r
113 def __init__(self, sock: SocketType = ..., map: _maptype = ...) -> None: ...
\r
114 def initiate_send(self) -> None: ...
\r
115 def handle_write(self) -> None: ...
\r
116 # incompatible signature:
\r
117 # def send(self, data: bytes) -> Optional[int]: ...
\r
119 def compact_traceback() -> Tuple[Tuple[str, str, str], type, type, str]: ...
\r
120 def close_all(map: _maptype = ..., ignore_all: bool = ...) -> None: ...
\r
122 # if os.name == 'posix':
\r
124 class file_wrapper:
\r
125 fd = ... # type: int
\r
127 def __init__(self, fd: int) -> None: ...
\r
128 def recv(self, bufsize: int, flags: int = ...) -> bytes: ...
\r
129 def send(self, data: bytes, flags: int = ...) -> int: ...
\r
132 def getsockopt(self, level: int, optname: int) -> int: ...
\r
134 def getsockopt(self, level: int, optname: int, buflen: int) -> bytes: ...
\r
136 def read(self, bufsize: int, flags: int = ...) -> bytes: ...
\r
137 def write(self, data: bytes, flags: int = ...) -> int: ...
\r
139 def close(self) -> None: ...
\r
140 def fileno(self) -> int: ...
\r
142 class file_dispatcher(dispatcher):
\r
143 def __init__(self, fd: int, map: _maptype = ...) -> None: ...
\r
144 def set_file(self, fd: int) -> None: ...
\r