2 from typing import Any, Awaitable, Callable, Generator, Iterable, List, Optional, Tuple, Union
\r
4 from . import coroutines
\r
6 from . import protocols
\r
7 from . import transports
\r
9 _ClientConnectedCallback = Callable[[StreamReader, StreamWriter], Optional[Awaitable[None]]]
\r
14 class IncompleteReadError(EOFError):
\r
15 expected = ... # type: Optional[int]
\r
16 partial = ... # type: bytes
\r
17 def __init__(self, partial: bytes, expected: Optional[int]) -> None: ...
\r
19 class LimitOverrunError(Exception):
\r
20 consumed = ... # type: int
\r
21 def __init__(self, message: str, consumed: int) -> None: ...
\r
23 @coroutines.coroutine
\r
24 def open_connection(
\r
26 port: Union[int, str] = ...,
\r
28 loop: Optional[events.AbstractEventLoop] = ...,
\r
31 ) -> Generator[Any, None, Tuple[StreamReader, StreamWriter]]: ...
\r
33 @coroutines.coroutine
\r
35 client_connected_cb: _ClientConnectedCallback,
\r
36 host: Optional[str] = ...,
\r
37 port: Optional[Union[int, str]] = ...,
\r
39 loop: Optional[events.AbstractEventLoop] = ...,
\r
42 ) -> Generator[Any, None, events.AbstractServer]: ...
\r
44 if sys.platform != 'win32':
\r
45 @coroutines.coroutine
\r
46 def open_unix_connection(
\r
49 loop: Optional[events.AbstractEventLoop] = ...,
\r
52 ) -> Generator[Any, None, Tuple[StreamReader, StreamWriter]]: ...
\r
54 @coroutines.coroutine
\r
55 def start_unix_server(
\r
56 client_connected_cb: _ClientConnectedCallback,
\r
59 loop: Optional[events.AbstractEventLoop] = ...,
\r
61 **kwds: Any) -> Generator[Any, None, events.AbstractServer]: ...
\r
63 class FlowControlMixin(protocols.Protocol): ...
\r
65 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
\r
67 stream_reader: StreamReader,
\r
68 client_connected_cb: _ClientConnectedCallback = ...,
\r
69 loop: Optional[events.AbstractEventLoop] = ...) -> None: ...
\r
70 def connection_made(self, transport: transports.BaseTransport) -> None: ...
\r
71 def connection_lost(self, exc: Exception) -> None: ...
\r
72 def data_received(self, data: bytes) -> None: ...
\r
73 def eof_received(self) -> bool: ...
\r
77 transport: transports.BaseTransport,
\r
78 protocol: protocols.BaseProtocol,
\r
79 reader: StreamReader,
\r
80 loop: events.AbstractEventLoop) -> None: ...
\r
82 def transport(self) -> transports.BaseTransport: ...
\r
83 def write(self, data: bytes) -> None: ...
\r
84 def writelines(self, data: Iterable[bytes]) -> None: ...
\r
85 def write_eof(self) -> None: ...
\r
86 def can_write_eof(self) -> bool: ...
\r
87 def close(self) -> None: ...
\r
88 def get_extra_info(self, name: str, default: Any = ...) -> Any: ...
\r
89 @coroutines.coroutine
\r
90 def drain(self) -> Generator[Any, None, None]: ...
\r
95 loop: Optional[events.AbstractEventLoop] = ...) -> None: ...
\r
96 def exception(self) -> Exception: ...
\r
97 def set_exception(self, exc: Exception) -> None: ...
\r
98 def set_transport(self, transport: transports.BaseTransport) -> None: ...
\r
99 def feed_eof(self) -> None: ...
\r
100 def at_eof(self) -> bool: ...
\r
101 def feed_data(self, data: bytes) -> None: ...
\r
102 @coroutines.coroutine
\r
103 def readline(self) -> Generator[Any, None, bytes]: ...
\r
104 @coroutines.coroutine
\r
105 def readuntil(self, separator: bytes = ...) -> Generator[Any, None, bytes]: ...
\r
106 @coroutines.coroutine
\r
107 def read(self, n: int = ...) -> Generator[Any, None, bytes]: ...
\r
108 @coroutines.coroutine
\r
109 def readexactly(self, n: int) -> Generator[Any, None, bytes]: ...
\r