1 # Stubs for multiprocessing.context
\r
3 from logging import Logger
\r
4 import multiprocessing
\r
5 from multiprocessing import synchronize
\r
6 from multiprocessing import queues
\r
9 Any, Callable, Iterable, Optional, List, Mapping, Sequence, Tuple, Type,
\r
13 _LockLike = Union[synchronize.Lock, synchronize.RLock]
\r
15 class ProcessError(Exception): ...
\r
17 class BufferTooShort(ProcessError): ...
\r
19 class TimeoutError(ProcessError): ...
\r
21 class AuthenticationError(ProcessError): ...
\r
23 class BaseContext(object):
\r
24 ProcessError = ... # type: Type[Exception]
\r
25 BufferTooShort = ... # type: Type[Exception]
\r
26 TimeoutError = ... # type: Type[Exception]
\r
27 AuthenticationError = ... # type: Type[Exception]
\r
29 # N.B. The methods below are applied at runtime to generate
\r
30 # multiprocessing.*, so the signatures should be identical (modulo self).
\r
33 def current_process() -> multiprocessing.Process: ...
\r
35 def active_children() -> List[multiprocessing.Process]: ...
\r
36 def cpu_count(self) -> int: ...
\r
37 # TODO: change return to SyncManager once a stub exists in multiprocessing.managers
\r
38 def Manager(self) -> Any: ...
\r
39 # TODO: change return to Pipe once a stub exists in multiprocessing.connection
\r
40 def Pipe(self, duplex: bool) -> Any: ...
\r
44 action: Optional[Callable] = ...,
\r
45 timeout: Optional[float] = ...) -> synchronize.Barrier: ...
\r
46 def BoundedSemaphore(self,
\r
47 value: int = ...) -> synchronize.BoundedSemaphore: ...
\r
49 lock: Optional[_LockLike] = ...) -> synchronize.Condition: ...
\r
50 def Event(self, lock: Optional[_LockLike] = ...) -> synchronize.Event: ...
\r
51 def Lock(self) -> synchronize.Lock: ...
\r
52 def RLock(self) -> synchronize.RLock: ...
\r
53 def Semaphore(self, value: int = ...) -> synchronize.Semaphore: ...
\r
55 def Queue(self, maxsize: int = ...) -> queues.Queue: ...
\r
56 def JoinableQueue(self, maxsize: int = ...) -> queues.JoinableQueue: ...
\r
57 def SimpleQueue(self) -> queues.SimpleQueue: ...
\r
60 processes: Optional[int] = ...,
\r
61 initializer: Optional[Callable[..., Any]] = ...,
\r
62 initargs: Iterable[Any] = ...,
\r
63 maxtasksperchild: Optional[int] = ...
\r
64 ) -> multiprocessing.pool.Pool: ...
\r
68 target: Optional[Callable] = ...,
\r
69 name: Optional[str] = ...,
\r
70 args: Iterable[Any] = ...,
\r
71 kwargs: Mapping[Any, Any] = ...,
\r
73 daemon: Optional[bool] = ...
\r
74 ) -> multiprocessing.Process: ...
\r
75 # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
\r
76 # how to handle the ctype
\r
77 # TODO: change return to RawValue once a stub exists in multiprocessing.sharedctypes
\r
78 def RawValue(self, typecode_or_type: Any, *args: Any) -> Any: ...
\r
79 # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
\r
80 # how to handle the ctype
\r
81 # TODO: change return to RawArray once a stub exists in multiprocessing.sharedctypes
\r
82 def RawArray(self, typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...
\r
83 # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
\r
84 # how to handle the ctype
\r
85 # TODO: change return to Value once a stub exists in multiprocessing.sharedctypes
\r
88 typecode_or_type: Any,
\r
92 # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
\r
93 # how to handle the ctype
\r
94 # TODO: change return to Array once a stub exists in multiprocessing.sharedctypes
\r
97 typecode_or_type: Any,
\r
98 size_or_initializer: Union[int, Sequence[Any]],
\r
102 def freeze_support(self) -> None: ...
\r
103 def get_logger(self) -> Logger: ...
\r
104 def log_to_stderr(self, level: Optional[str] = ...) -> Logger: ...
\r
105 def allow_connection_pickling(self) -> None: ...
\r
106 def set_executable(self, executable: str) -> None: ...
\r
107 def set_forkserver_preload(self, module_names: List[str]) -> None: ...
\r
108 def get_context(self, method: Optional[str] = ...) -> BaseContext: ...
\r
109 def get_start_method(self, allow_none: bool = ...) -> str: ...
\r
110 def set_start_method(self, method: Optional[str] = ...) -> None: ...
\r
112 def reducer(self) -> str: ...
\r
114 def reducer(self, reduction: str) -> None: ...
\r
115 def _check_available(self) -> None: ...
\r
117 class Process(object):
\r
118 _start_method: Optional[str]
\r
120 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
121 def _Popen(process_obj: Any) -> DefaultContext: ...
\r
123 class DefaultContext(object):
\r
124 Process = ... # type: Type[multiprocessing.Process]
\r
126 def __init__(self, context: BaseContext) -> None: ...
\r
127 def get_context(self, method: Optional[str] = ...) -> BaseContext: ...
\r
128 def set_start_method(self, method: str, force: bool = ...) -> None: ...
\r
129 def get_start_method(self, allow_none: bool = ...) -> str: ...
\r
130 def get_all_start_methods(self) -> List[str]: ...
\r
132 if sys.platform != 'win32':
\r
133 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
134 class ForkProcess(Any): # type: ignore
\r
137 def _Popen(process_obj: Any) -> Any: ...
\r
139 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
140 class SpawnProcess(Any): # type: ignore
\r
143 def _Popen(process_obj: Any) -> SpawnProcess: ...
\r
145 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
146 class ForkServerProcess(Any): # type: ignore
\r
149 def _Popen(process_obj: Any) -> Any: ...
\r
151 class ForkContext(BaseContext):
\r
153 Process = ... # type: Type[ForkProcess]
\r
155 class SpawnContext(BaseContext):
\r
157 Process = ... # type: Type[SpawnProcess]
\r
159 class ForkServerContext(BaseContext):
\r
161 Process = ... # type: Type[ForkServerProcess]
\r
163 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
164 class SpawnProcess(Any): # type: ignore
\r
167 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
168 def _Popen(process_obj: Process) -> Any: ...
\r
170 class SpawnContext(BaseContext):
\r
172 Process = ... # type: Type[SpawnProcess]
\r
174 def _force_start_method(method: str) -> None: ...
\r
175 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
176 def get_spawning_popen() -> Optional[Any]: ...
\r
177 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists
\r
178 def set_spawning_popen(popen: Any) -> None: ...
\r
179 def assert_spawning(obj: Any) -> None: ...
\r