3 from _typeshed import StrOrBytesPath
4 from asyncio import events, protocols, streams, transports
5 from typing import IO, Any, Callable, Union
6 from typing_extensions import Literal
8 if sys.version_info >= (3, 8):
9 _ExecArg = StrOrBytesPath
11 _ExecArg = Union[str, bytes]
17 class SubprocessStreamProtocol(streams.FlowControlMixin, protocols.SubprocessProtocol):
18 stdin: streams.StreamWriter | None
19 stdout: streams.StreamReader | None
20 stderr: streams.StreamReader | None
21 def __init__(self, limit: int, loop: events.AbstractEventLoop) -> None: ...
22 def connection_made(self, transport: transports.BaseTransport) -> None: ...
23 def pipe_data_received(self, fd: int, data: bytes | str) -> None: ...
24 def pipe_connection_lost(self, fd: int, exc: Exception | None) -> None: ...
25 def process_exited(self) -> None: ...
28 stdin: streams.StreamWriter | None
29 stdout: streams.StreamReader | None
30 stderr: streams.StreamReader | None
33 self, transport: transports.BaseTransport, protocol: protocols.BaseProtocol, loop: events.AbstractEventLoop
36 def returncode(self) -> int | None: ...
37 async def wait(self) -> int: ...
38 def send_signal(self, signal: int) -> None: ...
39 def terminate(self) -> None: ...
40 def kill(self) -> None: ...
41 async def communicate(self, input: bytes | None = ...) -> tuple[bytes, bytes]: ...
43 if sys.version_info >= (3, 10):
44 async def create_subprocess_shell(
46 stdin: int | IO[Any] | None = ...,
47 stdout: int | IO[Any] | None = ...,
48 stderr: int | IO[Any] | None = ...,
51 # These parameters are forced to these values by BaseEventLoop.subprocess_shell
52 universal_newlines: Literal[False] = ...,
53 shell: Literal[True] = ...,
54 bufsize: Literal[0] = ...,
57 text: Literal[False, None] = ...,
58 # These parameters are taken by subprocess.Popen, which this ultimately delegates to
59 executable: StrOrBytesPath | None = ...,
60 preexec_fn: Callable[[], Any] | None = ...,
61 close_fds: bool = ...,
62 cwd: StrOrBytesPath | None = ...,
63 env: subprocess._ENV | None = ...,
64 startupinfo: Any | None = ...,
65 creationflags: int = ...,
66 restore_signals: bool = ...,
67 start_new_session: bool = ...,
70 async def create_subprocess_exec(
73 stdin: int | IO[Any] | None = ...,
74 stdout: int | IO[Any] | None = ...,
75 stderr: int | IO[Any] | None = ...,
77 # These parameters are forced to these values by BaseEventLoop.subprocess_shell
78 universal_newlines: Literal[False] = ...,
79 shell: Literal[True] = ...,
80 bufsize: Literal[0] = ...,
83 # These parameters are taken by subprocess.Popen, which this ultimately delegates to
84 text: bool | None = ...,
85 executable: StrOrBytesPath | None = ...,
86 preexec_fn: Callable[[], Any] | None = ...,
87 close_fds: bool = ...,
88 cwd: StrOrBytesPath | None = ...,
89 env: subprocess._ENV | None = ...,
90 startupinfo: Any | None = ...,
91 creationflags: int = ...,
92 restore_signals: bool = ...,
93 start_new_session: bool = ...,
98 async def create_subprocess_shell(
100 stdin: int | IO[Any] | None = ...,
101 stdout: int | IO[Any] | None = ...,
102 stderr: int | IO[Any] | None = ...,
103 loop: events.AbstractEventLoop | None = ...,
106 # These parameters are forced to these values by BaseEventLoop.subprocess_shell
107 universal_newlines: Literal[False] = ...,
108 shell: Literal[True] = ...,
109 bufsize: Literal[0] = ...,
110 encoding: None = ...,
112 text: Literal[False, None] = ...,
113 # These parameters are taken by subprocess.Popen, which this ultimately delegates to
114 executable: StrOrBytesPath | None = ...,
115 preexec_fn: Callable[[], Any] | None = ...,
116 close_fds: bool = ...,
117 cwd: StrOrBytesPath | None = ...,
118 env: subprocess._ENV | None = ...,
119 startupinfo: Any | None = ...,
120 creationflags: int = ...,
121 restore_signals: bool = ...,
122 start_new_session: bool = ...,
125 async def create_subprocess_exec(
128 stdin: int | IO[Any] | None = ...,
129 stdout: int | IO[Any] | None = ...,
130 stderr: int | IO[Any] | None = ...,
131 loop: events.AbstractEventLoop | None = ...,
133 # These parameters are forced to these values by BaseEventLoop.subprocess_shell
134 universal_newlines: Literal[False] = ...,
135 shell: Literal[True] = ...,
136 bufsize: Literal[0] = ...,
137 encoding: None = ...,
139 # These parameters are taken by subprocess.Popen, which this ultimately delegates to
140 text: bool | None = ...,
141 executable: StrOrBytesPath | None = ...,
142 preexec_fn: Callable[[], Any] | None = ...,
143 close_fds: bool = ...,
144 cwd: StrOrBytesPath | None = ...,
145 env: subprocess._ENV | None = ...,
146 startupinfo: Any | None = ...,
147 creationflags: int = ...,
148 restore_signals: bool = ...,
149 start_new_session: bool = ...,