massive update, probably broken
[dotfiles/.git] / .config / coc / extensions / coc-python-data / languageServer.0.5.59 / Typeshed / stdlib / 3.4 / asyncio / events.pyi
1 import selectors\r
2 from socket import socket\r
3 import ssl\r
4 import sys\r
5 from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Sequence, Tuple, TypeVar, Union, overload\r
6 from abc import ABCMeta, abstractmethod\r
7 from asyncio.futures import Future\r
8 from asyncio.coroutines import coroutine\r
9 from asyncio.protocols import BaseProtocol\r
10 from asyncio.tasks import Task\r
11 from asyncio.transports import BaseTransport\r
12 \r
13 __all__: List[str]\r
14 \r
15 _T = TypeVar('_T')\r
16 _Context = Dict[str, Any]\r
17 _ExceptionHandler = Callable[[AbstractEventLoop, _Context], Any]\r
18 _ProtocolFactory = Callable[[], BaseProtocol]\r
19 _SSLContext = Union[bool, None, ssl.SSLContext]\r
20 _TransProtPair = Tuple[BaseTransport, BaseProtocol]\r
21 \r
22 class Handle:\r
23     _cancelled = False\r
24     _args = ...  # type: List[Any]\r
25     def __init__(self, callback: Callable[..., Any], args: List[Any],\r
26         loop: AbstractEventLoop) -> None: ...\r
27     def __repr__(self) -> str: ...\r
28     def cancel(self) -> None: ...\r
29     def _run(self) -> None: ...\r
30 \r
31 class TimerHandle(Handle):\r
32     def __init__(self, when: float, callback: Callable[..., Any], args: List[Any],\r
33                  loop: AbstractEventLoop) -> None: ...\r
34     def __hash__(self) -> int: ...\r
35 \r
36 class AbstractServer:\r
37     def close(self) -> None: ...\r
38     @coroutine\r
39     def wait_closed(self) -> Generator[Any, None, None]: ...\r
40 \r
41 class AbstractEventLoop(metaclass=ABCMeta):\r
42     @abstractmethod\r
43     def run_forever(self) -> None: ...\r
44 \r
45     # Can't use a union, see mypy issue  # 1873.\r
46     @overload\r
47     @abstractmethod\r
48     def run_until_complete(self, future: Generator[Any, None, _T]) -> _T: ...\r
49     @overload\r
50     @abstractmethod\r
51     def run_until_complete(self, future: Awaitable[_T]) -> _T: ...\r
52 \r
53     @abstractmethod\r
54     def stop(self) -> None: ...\r
55     @abstractmethod\r
56     def is_running(self) -> bool: ...\r
57     @abstractmethod\r
58     def is_closed(self) -> bool: ...\r
59     @abstractmethod\r
60     def close(self) -> None: ...\r
61     if sys.version_info >= (3, 6):\r
62         @abstractmethod\r
63         @coroutine\r
64         def shutdown_asyncgens(self) -> Generator[Any, None, None]: ...\r
65     # Methods scheduling callbacks.  All these return Handles.\r
66     @abstractmethod\r
67     def call_soon(self, callback: Callable[..., Any], *args: Any) -> Handle: ...\r
68     @abstractmethod\r
69     def call_later(self, delay: Union[int, float], callback: Callable[..., Any], *args: Any) -> Handle: ...\r
70     @abstractmethod\r
71     def call_at(self, when: float, callback: Callable[..., Any], *args: Any) -> Handle: ...\r
72     @abstractmethod\r
73     def time(self) -> float: ...\r
74     # Future methods\r
75     if sys.version_info >= (3, 5):\r
76         @abstractmethod\r
77         def create_future(self) -> Future[Any]: ...\r
78     # Tasks methods\r
79     @abstractmethod\r
80     def create_task(self, coro: Union[Awaitable[_T], Generator[Any, None, _T]]) -> Task[_T]: ...\r
81     @abstractmethod\r
82     def set_task_factory(self, factory: Optional[Callable[[AbstractEventLoop, Generator[Any, None, _T]], Future[_T]]]) -> None: ...\r
83     @abstractmethod\r
84     def get_task_factory(self) -> Optional[Callable[[AbstractEventLoop, Generator[Any, None, _T]], Future[_T]]]: ...\r
85     # Methods for interacting with threads\r
86     @abstractmethod\r
87     def call_soon_threadsafe(self, callback: Callable[..., Any], *args: Any) -> Handle: ...\r
88     @abstractmethod\r
89     @coroutine\r
90     def run_in_executor(self, executor: Any,\r
91         func: Callable[..., _T], *args: Any) -> Generator[Any, None, _T]: ...\r
92     @abstractmethod\r
93     def set_default_executor(self, executor: Any) -> None: ...\r
94     # Network I/O methods returning Futures.\r
95     @abstractmethod\r
96     @coroutine\r
97     # TODO the "Tuple[Any, ...]" should be "Union[Tuple[str, int], Tuple[str, int, int, int]]" but that triggers\r
98     # https://github.com/python/mypy/issues/2509\r
99     def getaddrinfo(self, host: Optional[str], port: Union[str, int, None], *,\r
100         family: int = ..., type: int = ..., proto: int = ..., flags: int = ...) -> Generator[Any, None, List[Tuple[int, int, int, str, Tuple[Any, ...]]]]: ...\r
101     @abstractmethod\r
102     @coroutine\r
103     def getnameinfo(self, sockaddr: tuple, flags: int = ...) -> Generator[Any, None, Tuple[str, int]]: ...\r
104     @abstractmethod\r
105     @coroutine\r
106     def create_connection(self, protocol_factory: _ProtocolFactory, host: str = ..., port: int = ..., *,\r
107                           ssl: _SSLContext = ..., family: int = ..., proto: int = ..., flags: int = ..., sock: Optional[socket] = ...,\r
108                           local_addr: str = ..., server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...\r
109     @abstractmethod\r
110     @coroutine\r
111     def create_server(self, protocol_factory: _ProtocolFactory, host: Union[str, Sequence[str]] = ..., port: int = ..., *,\r
112                       family: int = ..., flags: int = ...,\r
113                       sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...,\r
114                       reuse_address: Optional[bool] = ...,\r
115                       reuse_port: Optional[bool] = ...) -> Generator[Any, None, AbstractServer]: ...\r
116     @abstractmethod\r
117     @coroutine\r
118     def create_unix_connection(self, protocol_factory: _ProtocolFactory, path: str, *,\r
119                                ssl: _SSLContext = ..., sock: Optional[socket] = ...,\r
120                                server_hostname: str = ...) -> Generator[Any, None, _TransProtPair]: ...\r
121     @abstractmethod\r
122     @coroutine\r
123     def create_unix_server(self, protocol_factory: _ProtocolFactory, path: str, *,\r
124                            sock: Optional[socket] = ..., backlog: int = ..., ssl: _SSLContext = ...) -> Generator[Any, None, AbstractServer]: ...\r
125     @abstractmethod\r
126     @coroutine\r
127     def create_datagram_endpoint(self, protocol_factory: _ProtocolFactory,\r
128                                  local_addr: Optional[Tuple[str, int]] = ..., remote_addr: Optional[Tuple[str, int]] = ..., *,\r
129                                  family: int = ..., proto: int = ..., flags: int = ...,\r
130                                  reuse_address: Optional[bool] = ..., reuse_port: Optional[bool] = ...,\r
131                                  allow_broadcast: Optional[bool] = ...,\r
132                                  sock: Optional[socket] = ...) -> Generator[Any, None, _TransProtPair]: ...\r
133     @abstractmethod\r
134     @coroutine\r
135     def connect_accepted_socket(self, protocol_factory: _ProtocolFactory, sock: socket, *, ssl: _SSLContext = ...) -> Generator[Any, None, _TransProtPair]: ...\r
136     # Pipes and subprocesses.\r
137     @abstractmethod\r
138     @coroutine\r
139     def connect_read_pipe(self, protocol_factory: _ProtocolFactory, pipe: Any) -> Generator[Any, None, _TransProtPair]: ...\r
140     @abstractmethod\r
141     @coroutine\r
142     def connect_write_pipe(self, protocol_factory: _ProtocolFactory, pipe: Any) -> Generator[Any, None, _TransProtPair]: ...\r
143     @abstractmethod\r
144     @coroutine\r
145     def subprocess_shell(self, protocol_factory: _ProtocolFactory, cmd: Union[bytes, str], *, stdin: Any = ...,\r
146                          stdout: Any = ..., stderr: Any = ...,\r
147                          **kwargs: Any) -> Generator[Any, None, _TransProtPair]: ...\r
148     @abstractmethod\r
149     @coroutine\r
150     def subprocess_exec(self, protocol_factory: _ProtocolFactory, *args: Any, stdin: Any = ...,\r
151                         stdout: Any = ..., stderr: Any = ...,\r
152                         **kwargs: Any) -> Generator[Any, None, _TransProtPair]: ...\r
153     @abstractmethod\r
154     def add_reader(self, fd: selectors._FileObject, callback: Callable[..., Any], *args: Any) -> None: ...\r
155     @abstractmethod\r
156     def remove_reader(self, fd: selectors._FileObject) -> None: ...\r
157     @abstractmethod\r
158     def add_writer(self, fd: selectors._FileObject, callback: Callable[..., Any], *args: Any) -> None: ...\r
159     @abstractmethod\r
160     def remove_writer(self, fd: selectors._FileObject) -> None: ...\r
161     # Completion based I/O methods returning Futures.\r
162     @abstractmethod\r
163     @coroutine\r
164     def sock_recv(self, sock: socket, nbytes: int) -> Generator[Any, None, bytes]: ...\r
165     @abstractmethod\r
166     @coroutine\r
167     def sock_sendall(self, sock: socket, data: bytes) -> Generator[Any, None, None]: ...\r
168     @abstractmethod\r
169     @coroutine\r
170     def sock_connect(self, sock: socket, address: str) -> Generator[Any, None, None]: ...\r
171     @abstractmethod\r
172     @coroutine\r
173     def sock_accept(self, sock: socket) -> Generator[Any, None, Tuple[socket, Any]]: ...\r
174     # Signal handling.\r
175     @abstractmethod\r
176     def add_signal_handler(self, sig: int, callback: Callable[..., Any], *args: Any) -> None: ...\r
177     @abstractmethod\r
178     def remove_signal_handler(self, sig: int) -> None: ...\r
179     # Error handlers.\r
180     @abstractmethod\r
181     def set_exception_handler(self, handler: _ExceptionHandler) -> None: ...\r
182     @abstractmethod\r
183     def get_exception_handler(self) -> _ExceptionHandler: ...\r
184     @abstractmethod\r
185     def default_exception_handler(self, context: _Context) -> None: ...\r
186     @abstractmethod\r
187     def call_exception_handler(self, context: _Context) -> None: ...\r
188     # Debug flag management.\r
189     @abstractmethod\r
190     def get_debug(self) -> bool: ...\r
191     @abstractmethod\r
192     def set_debug(self, enabled: bool) -> None: ...\r
193 \r
194 class AbstractEventLoopPolicy(metaclass=ABCMeta):\r
195     @abstractmethod\r
196     def get_event_loop(self) -> AbstractEventLoop: ...\r
197     @abstractmethod\r
198     def set_event_loop(self, loop: AbstractEventLoop) -> None: ...\r
199     @abstractmethod\r
200     def new_event_loop(self) -> AbstractEventLoop: ...\r
201     # Child processes handling (Unix only).\r
202     @abstractmethod\r
203     def get_child_watcher(self) -> Any: ...  # TODO: unix_events.AbstractChildWatcher\r
204     @abstractmethod\r
205     def set_child_watcher(self, watcher: Any) -> None: ...  # TODO: unix_events.AbstractChildWatcher\r
206 \r
207 class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy, metaclass=ABCMeta):\r
208     def __init__(self) -> None: ...\r
209     def get_event_loop(self) -> AbstractEventLoop: ...\r
210     def set_event_loop(self, loop: AbstractEventLoop) -> None: ...\r
211     def new_event_loop(self) -> AbstractEventLoop: ...\r
212 \r
213 def get_event_loop_policy() -> AbstractEventLoopPolicy: ...\r
214 def set_event_loop_policy(policy: AbstractEventLoopPolicy) -> None: ...\r
215 \r
216 def get_event_loop() -> AbstractEventLoop: ...\r
217 def set_event_loop(loop: AbstractEventLoop) -> None: ...\r
218 def new_event_loop() -> AbstractEventLoop: ...\r
219 \r
220 def get_child_watcher() -> Any: ...  # TODO: unix_events.AbstractChildWatcher\r
221 def set_child_watcher(watcher: Any) -> None: ...  # TODO: unix_events.AbstractChildWatcher\r
222 \r
223 def _set_running_loop(loop: AbstractEventLoop) -> None: ...\r
224 def _get_running_loop() -> AbstractEventLoop: ...\r