2 from collections import deque
3 from typing import IO, Any, Callable, Optional, Sequence, Tuple, Union
5 from . import events, futures, protocols, transports
7 _File = Optional[Union[int, IO[Any]]]
9 class BaseSubprocessTransport(transports.SubprocessTransport):
11 _closed: bool # undocumented
12 _protocol: protocols.SubprocessProtocol # undocumented
13 _loop: events.AbstractEventLoop # undocumented
14 _proc: subprocess.Popen[Any] | None # undocumented
15 _pid: int | None # undocumented
16 _returncode: int | None # undocumented
17 _exit_waiters: list[futures.Future[Any]] # undocumented
18 _pending_calls: deque[tuple[Callable[..., Any], Tuple[Any, ...]]] # undocumented
19 _pipes: dict[int, _File] # undocumented
20 _finished: bool # undocumented
23 loop: events.AbstractEventLoop,
24 protocol: protocols.SubprocessProtocol,
25 args: str | bytes | Sequence[str | bytes],
31 waiter: futures.Future[Any] | None = ...,
32 extra: Any | None = ...,
37 args: str | bytes | Sequence[str | bytes],
44 ) -> None: ... # undocumented
45 def set_protocol(self, protocol: protocols.BaseProtocol) -> None: ...
46 def get_protocol(self) -> protocols.BaseProtocol: ...
47 def is_closing(self) -> bool: ...
48 def close(self) -> None: ...
49 def get_pid(self) -> int | None: ... # type: ignore
50 def get_returncode(self) -> int | None: ...
51 def get_pipe_transport(self, fd: int) -> _File: ... # type: ignore
52 def _check_proc(self) -> None: ... # undocumented
53 def send_signal(self, signal: int) -> None: ... # type: ignore
54 def terminate(self) -> None: ...
55 def kill(self) -> None: ...
56 async def _connect_pipes(self, waiter: futures.Future[Any] | None) -> None: ... # undocumented
57 def _call(self, cb: Callable[..., Any], *data: Any) -> None: ... # undocumented
58 def _pipe_connection_lost(self, fd: int, exc: BaseException | None) -> None: ... # undocumented
59 def _pipe_data_received(self, fd: int, data: bytes) -> None: ... # undocumented
60 def _process_exited(self, returncode: int) -> None: ... # undocumented
61 async def _wait(self) -> int: ... # undocumented
62 def _try_finish(self) -> None: ... # undocumented
63 def _call_connection_lost(self, exc: BaseException | None) -> None: ... # undocumented
65 class WriteSubprocessPipeProto(protocols.BaseProtocol): # undocumented
66 def __init__(self, proc: BaseSubprocessTransport, fd: int) -> None: ...
67 def connection_made(self, transport: transports.BaseTransport) -> None: ...
68 def connection_lost(self, exc: BaseException | None) -> None: ...
69 def pause_writing(self) -> None: ...
70 def resume_writing(self) -> None: ...
72 class ReadSubprocessPipeProto(WriteSubprocessPipeProto, protocols.Protocol): # undocumented
73 def data_received(self, data: bytes) -> None: ...