--- /dev/null
+from typing import Any, Iterable, List, Optional, Tuple, Type, Union\r
+import socket\r
+import types\r
+\r
+# https://docs.python.org/3/library/multiprocessing.html#address-formats\r
+_Address = Union[str, Tuple[str, int]]\r
+\r
+def deliver_challenge(connection: Connection, authkey: bytes) -> None: ...\r
+def answer_challenge(connection: Connection, authkey: bytes) -> None: ...\r
+def wait(object_list: Iterable[Union[Connection, socket.socket, int]], timeout: Optional[float] = ...) -> List[Union[Connection, socket.socket, int]]: ...\r
+def Client(address: _Address, family: Optional[str] = ..., authkey: Optional[bytes] = ...) -> Connection: ...\r
+def Pipe(duplex: bool = ...) -> Tuple[Connection, Connection]: ...\r
+\r
+class Listener:\r
+ def __init__(self, address: Optional[_Address] = ..., family: Optional[str] = ..., backlog: int = ..., authkey: Optional[bytes] = ...) -> None: ...\r
+ def accept(self) -> Connection: ...\r
+ def close(self) -> None: ...\r
+ @property\r
+ def address(self) -> _Address: ...\r
+ @property\r
+ def last_accepted(self) -> Optional[_Address]: ...\r
+ def __enter__(self) -> Listener: ...\r
+ def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], exc_tb: Optional[types.TracebackType]) -> None: ...\r
+\r
+class Connection:\r
+ def close(self) -> None: ...\r
+ def fileno(self) -> int: ...\r
+ def poll(self, timeout: Optional[float] = ...) -> bool: ...\r
+ def recv(self) -> Any: ...\r
+ def recv_bytes(self, maxlength: Optional[int] = ...) -> bytes: ...\r
+ def recv_bytes_into(self, buf: Any, offset: int = ...) -> int: ...\r
+ def send(self, obj: Any) -> None: ...\r
+ def send_bytes(self,\r
+ buf: bytes,\r
+ offset: int = ...,\r
+ size: Optional[int] = ...) -> None: ...\r