2 from socket import socket
\r
5 from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Sequence, Tuple, TypeVar, Union, overload
\r
6 from abc import ABCMeta, abstractmethod
\r
7 from asyncio.futures import Future
\r
8 from asyncio.coroutines import coroutine
\r
9 from asyncio.protocols import BaseProtocol
\r
10 from asyncio.tasks import Task
\r
11 from asyncio.transports import BaseTransport
\r
16 _Context = Dict[str, Any]
\r
17 _ExceptionHandler = Callable[[AbstractEventLoop, _Context], Any]
\r
18 _ProtocolFactory = Callable[[], BaseProtocol]
\r
19 _SSLContext = Union[bool, None, ssl.SSLContext]
\r
20 _TransProtPair = Tuple[BaseTransport, BaseProtocol]
\r
24 _args = ... # type: List[Any]
\r
25 def __init__(self, callback: Callable[..., Any], args: List[Any],
\r
26 loop: AbstractEventLoop) -> None: ...
\r
27 def __repr__(self) -> str: ...
\r
28 def cancel(self) -> None: ...
\r
29 def _run(self) -> None: ...
\r
31 class TimerHandle(Handle):
\r
32 def __init__(self, when: float, callback: Callable[..., Any], args: List[Any],
\r
33 loop: AbstractEventLoop) -> None: ...
\r
34 def __hash__(self) -> int: ...
\r
36 class AbstractServer:
\r
37 def close(self) -> None: ...
\r
39 def wait_closed(self) -> Generator[Any, None, None]: ...
\r
41 class AbstractEventLoop(metaclass=ABCMeta):
\r
43 def run_forever(self) -> None: ...
\r
45 # Can't use a union, see mypy issue # 1873.
\r
48 def run_until_complete(self, future: Generator[Any, None, _T]) -> _T: ...
\r
51 def run_until_complete(self, future: Awaitable[_T]) -> _T: ...
\r
54 def stop(self) -> None: ...
\r
56 def is_running(self) -> bool: ...
\r
58 def is_closed(self) -> bool: ...
\r
60 def close(self) -> None: ...
\r
61 if sys.version_info >= (3, 6):
\r
64 def shutdown_asyncgens(self) -> Generator[Any, None, None]: ...
\r
65 # Methods scheduling callbacks. All these return Handles.
\r
67 def call_soon(self, callback: Callable[..., Any], *args: Any) -> Handle: ...
\r
69 def call_later(self, delay: Union[int, float], callback: Callable[..., Any], *args: Any) -> Handle: ...
\r
71 def call_at(self, when: float, callback: Callable[..., Any], *args: Any) -> Handle: ...
\r
73 def time(self) -> float: ...
\r
75 if sys.version_info >= (3, 5):
\r
77 def create_future(self) -> Future[Any]: ...
\r
80 def create_task(self, coro: Union[Awaitable[_T], Generator[Any, None, _T]]) -> Task[_T]: ...
\r
82 def set_task_factory(self, factory: Optional[Callable[[AbstractEventLoop, Generator[Any, None, _T]], Future[_T]]]) -> None: ...
\r
84 def get_task_factory(self) -> Optional[Callable[[AbstractEventLoop, Generator[Any, None, _T]], Future[_T]]]: ...
\r
85 # Methods for interacting with threads
\r
87 def call_soon_threadsafe(self, callback: Callable[..., Any], *args: Any) -> Handle: ...
\r
90 def run_in_executor(self, executor: Any,
\r
91 func: Callable[..., _T], *args: Any) -> Generator[Any, None, _T]: ...
\r
93 def set_default_executor(self, executor: Any) -> None: ...
\r
94 # Network I/O methods returning Futures.
\r
97 # TODO the "Tuple[Any, ...]" should be "Union[Tuple[str, int], Tuple[str, int, int, int]]" but that triggers
\r
98 # https://github.com/python/mypy/issues/2509
\r
99 def getaddrinfo(self, host: Optional[str], port: Union[str, int, None], *,
\r
100 family: int = ..., type: int = ..., proto: int = ..., flags: int = ...) -> Generator[Any, None, List[Tuple[int, int, int, str, Tuple[Any, ...]]]]: ...
\r
103 def getnameinfo(self, sockaddr: tuple, flags: int = ...) -> Generator[Any, None, Tuple[str, int]]: ...
\r
106 def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,
\r
107 ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: Optional[socket] = ...,
\r
108 local_addr: str = ..., server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...
\r
111 def create_server(self, protocol_factory: _ProtocolFactory, host: Union[str, Sequence[str]] = ..., port: int = ..., *,
\r
112 family: int = ..., flags: int = ...,
\r
113 sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...,
\r
114 reuse_address: Optional[bool] = ...,
\r
115 reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...
\r
118 def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *,
\r
119 ssl: _SSLContext = ..., sock: Optional[socket] = ...,
\r
120 server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...
\r
123 def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *,
\r
124 sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...) -> Generator[Any, None, AbstractServer]: ...
\r
127 def create_datagram_endpoint(self, protocol_factory: _ProtocolFactory,
\r
128 local_addr: Optional[Tuple[str, int]] = ..., remote_addr: Optional[Tuple[str, int]] = ..., *,
\r
129 family: int = ..., proto: int = ..., flags: int = ...,
\r
130 reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,
\r
131 allow_broadcast: Optional[bool] = ...,
\r
132 sock: Optional[socket] = ...) -> Generator[Any, None, _TransProtPair]: ...
\r
135 def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...) -> Generator[Any, None, _TransProtPair]: ...
\r
136 # Pipes and subprocesses.
\r
139 def connect_read_pipe(self, protocol_factory: _ProtocolFactory, pipe: Any) -> Generator[Any, None, _TransProtPair]: ...
\r
142 def connect_write_pipe(self, protocol_factory: _ProtocolFactory, pipe: Any) -> Generator[Any, None, _TransProtPair]: ...
\r
145 def subprocess_shell(self, protocol_factory: _ProtocolFactory, cmd: Union[bytes, str], *, stdin: Any = ...,
\r
146 stdout: Any = ..., stderr: Any = ...,
\r
147 **kwargs: Any) -> Generator[Any, None, _TransProtPair]: ...
\r
150 def subprocess_exec(self, protocol_factory: _ProtocolFactory, *args: Any, stdin: Any = ...,
\r
151 stdout: Any = ..., stderr: Any = ...,
\r
152 **kwargs: Any) -> Generator[Any, None, _TransProtPair]: ...
\r
154 def add_reader(self, fd: selectors._FileObject, callback: Callable[..., Any], *args: Any) -> None: ...
\r
156 def remove_reader(self, fd: selectors._FileObject) -> None: ...
\r
158 def add_writer(self, fd: selectors._FileObject, callback: Callable[..., Any], *args: Any) -> None: ...
\r
160 def remove_writer(self, fd: selectors._FileObject) -> None: ...
\r
161 # Completion based I/O methods returning Futures.
\r
164 def sock_recv(self, sock: socket, nbytes: int) -> Generator[Any, None, bytes]: ...
\r
167 def sock_sendall(self, sock: socket, data: bytes) -> Generator[Any, None, None]: ...
\r
170 def sock_connect(self, sock: socket, address: str) -> Generator[Any, None, None]: ...
\r
173 def sock_accept(self, sock: socket) -> Generator[Any, None, Tuple[socket, Any]]: ...
\r
176 def add_signal_handler(self, sig: int, callback: Callable[..., Any], *args: Any) -> None: ...
\r
178 def remove_signal_handler(self, sig: int) -> None: ...
\r
181 def set_exception_handler(self, handler: _ExceptionHandler) -> None: ...
\r
183 def get_exception_handler(self) -> _ExceptionHandler: ...
\r
185 def default_exception_handler(self, context: _Context) -> None: ...
\r
187 def call_exception_handler(self, context: _Context) -> None: ...
\r
188 # Debug flag management.
\r
190 def get_debug(self) -> bool: ...
\r
192 def set_debug(self, enabled: bool) -> None: ...
\r
194 class AbstractEventLoopPolicy(metaclass=ABCMeta):
\r
196 def get_event_loop(self) -> AbstractEventLoop: ...
\r
198 def set_event_loop(self, loop: AbstractEventLoop) -> None: ...
\r
200 def new_event_loop(self) -> AbstractEventLoop: ...
\r
201 # Child processes handling (Unix only).
\r
203 def get_child_watcher(self) -> Any: ... # TODO: unix_events.AbstractChildWatcher
\r
205 def set_child_watcher(self, watcher: Any) -> None: ... # TODO: unix_events.AbstractChildWatcher
\r
207 class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy, metaclass=ABCMeta):
\r
208 def __init__(self) -> None: ...
\r
209 def get_event_loop(self) -> AbstractEventLoop: ...
\r
210 def set_event_loop(self, loop: AbstractEventLoop) -> None: ...
\r
211 def new_event_loop(self) -> AbstractEventLoop: ...
\r
213 def get_event_loop_policy() -> AbstractEventLoopPolicy: ...
\r
214 def set_event_loop_policy(policy: AbstractEventLoopPolicy) -> None: ...
\r
216 def get_event_loop() -> AbstractEventLoop: ...
\r
217 def set_event_loop(loop: AbstractEventLoop) -> None: ...
\r
218 def new_event_loop() -> AbstractEventLoop: ...
\r
220 def get_child_watcher() -> Any: ... # TODO: unix_events.AbstractChildWatcher
\r
221 def set_child_watcher(watcher: Any) -> None: ... # TODO: unix_events.AbstractChildWatcher
\r
223 def _set_running_loop(loop: AbstractEventLoop) -> None: ...
\r
224 def _get_running_loop() -> AbstractEventLoop: ...
\r