1 from typing import Any, Iterable, List, Optional, Tuple, Type, Union
\r
5 # https://docs.python.org/3/library/multiprocessing.html#address-formats
\r
6 _Address = Union[str, Tuple[str, int]]
\r
8 def deliver_challenge(connection: Connection, authkey: bytes) -> None: ...
\r
9 def answer_challenge(connection: Connection, authkey: bytes) -> None: ...
\r
10 def wait(object_list: Iterable[Union[Connection, socket.socket, int]], timeout: Optional[float] = ...) -> List[Union[Connection, socket.socket, int]]: ...
\r
11 def Client(address: _Address, family: Optional[str] = ..., authkey: Optional[bytes] = ...) -> Connection: ...
\r
12 def Pipe(duplex: bool = ...) -> Tuple[Connection, Connection]: ...
\r
15 def __init__(self, address: Optional[_Address] = ..., family: Optional[str] = ..., backlog: int = ..., authkey: Optional[bytes] = ...) -> None: ...
\r
16 def accept(self) -> Connection: ...
\r
17 def close(self) -> None: ...
\r
19 def address(self) -> _Address: ...
\r
21 def last_accepted(self) -> Optional[_Address]: ...
\r
22 def __enter__(self) -> Listener: ...
\r
23 def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], exc_tb: Optional[types.TracebackType]) -> None: ...
\r
26 def close(self) -> None: ...
\r
27 def fileno(self) -> int: ...
\r
28 def poll(self, timeout: Optional[float] = ...) -> bool: ...
\r
29 def recv(self) -> Any: ...
\r
30 def recv_bytes(self, maxlength: Optional[int] = ...) -> bytes: ...
\r
31 def recv_bytes_into(self, buf: Any, offset: int = ...) -> int: ...
\r
32 def send(self, obj: Any) -> None: ...
\r
33 def send_bytes(self,
\r
36 size: Optional[int] = ...) -> None: ...
\r