massive update, probably broken
[dotfiles/.git] / .config / coc / extensions / coc-python-data / languageServer.0.5.59 / Typeshed / stdlib / 3 / multiprocessing / context.pyi
1 # Stubs for multiprocessing.context\r
2 \r
3 from logging import Logger\r
4 import multiprocessing\r
5 from multiprocessing import synchronize\r
6 from multiprocessing import queues\r
7 import sys\r
8 from typing import (\r
9     Any, Callable, Iterable, Optional, List, Mapping, Sequence, Tuple, Type,\r
10     Union,\r
11 )\r
12 \r
13 _LockLike = Union[synchronize.Lock, synchronize.RLock]\r
14 \r
15 class ProcessError(Exception): ...\r
16 \r
17 class BufferTooShort(ProcessError): ...\r
18 \r
19 class TimeoutError(ProcessError): ...\r
20 \r
21 class AuthenticationError(ProcessError): ...\r
22 \r
23 class BaseContext(object):\r
24     ProcessError = ...  # type: Type[Exception]\r
25     BufferTooShort = ...  # type: Type[Exception]\r
26     TimeoutError = ...  # type: Type[Exception]\r
27     AuthenticationError = ...  # type: Type[Exception]\r
28 \r
29     # N.B. The methods below are applied at runtime to generate\r
30     # multiprocessing.*, so the signatures should be identical (modulo self).\r
31 \r
32     @staticmethod\r
33     def current_process() -> multiprocessing.Process: ...\r
34     @staticmethod\r
35     def active_children() -> List[multiprocessing.Process]: ...\r
36     def cpu_count(self) -> int: ...\r
37     # TODO: change return to SyncManager once a stub exists in multiprocessing.managers\r
38     def Manager(self) -> Any: ...\r
39     # TODO: change return to Pipe once a stub exists in multiprocessing.connection\r
40     def Pipe(self, duplex: bool) -> Any: ...\r
41 \r
42     def Barrier(self,\r
43                 parties: int,\r
44                 action: Optional[Callable] = ...,\r
45                 timeout: Optional[float] = ...) -> synchronize.Barrier: ...\r
46     def BoundedSemaphore(self,\r
47                          value: int = ...) -> synchronize.BoundedSemaphore: ...\r
48     def Condition(self,\r
49                   lock: Optional[_LockLike] = ...) -> synchronize.Condition: ...\r
50     def Event(self, lock: Optional[_LockLike] = ...) -> synchronize.Event: ...\r
51     def Lock(self) -> synchronize.Lock: ...\r
52     def RLock(self) -> synchronize.RLock: ...\r
53     def Semaphore(self, value: int = ...) -> synchronize.Semaphore: ...\r
54 \r
55     def Queue(self, maxsize: int = ...) -> queues.Queue: ...\r
56     def JoinableQueue(self, maxsize: int = ...) -> queues.JoinableQueue: ...\r
57     def SimpleQueue(self) -> queues.SimpleQueue: ...\r
58     def Pool(\r
59         self,\r
60         processes: Optional[int] = ...,\r
61         initializer: Optional[Callable[..., Any]] = ...,\r
62         initargs: Iterable[Any] = ...,\r
63         maxtasksperchild: Optional[int] = ...\r
64     ) -> multiprocessing.pool.Pool: ...\r
65     def Process(\r
66         self,\r
67         group: Any = ...,\r
68         target: Optional[Callable] = ...,\r
69         name: Optional[str] = ...,\r
70         args: Iterable[Any] = ...,\r
71         kwargs: Mapping[Any, Any] = ...,\r
72         *,\r
73         daemon: Optional[bool] = ...\r
74     ) -> multiprocessing.Process: ...\r
75     # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out\r
76     # how to handle the ctype\r
77     # TODO: change return to RawValue once a stub exists in multiprocessing.sharedctypes\r
78     def RawValue(self, typecode_or_type: Any, *args: Any) -> Any: ...\r
79     # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out\r
80     # how to handle the ctype\r
81     # TODO: change return to RawArray once a stub exists in multiprocessing.sharedctypes\r
82     def RawArray(self, typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...\r
83     # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out\r
84     # how to handle the ctype\r
85     # TODO: change return to Value once a stub exists in multiprocessing.sharedctypes\r
86     def Value(\r
87         self,\r
88         typecode_or_type: Any,\r
89         *args: Any,\r
90         lock: bool = ...\r
91     ) -> Any: ...\r
92     # TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out\r
93     # how to handle the ctype\r
94     # TODO: change return to Array once a stub exists in multiprocessing.sharedctypes\r
95     def Array(\r
96         self,\r
97         typecode_or_type: Any,\r
98         size_or_initializer: Union[int, Sequence[Any]],\r
99         *,\r
100         lock: bool = ...\r
101     ) -> Any: ...\r
102     def freeze_support(self) -> None: ...\r
103     def get_logger(self) -> Logger: ...\r
104     def log_to_stderr(self, level: Optional[str] = ...) -> Logger: ...\r
105     def allow_connection_pickling(self) -> None: ...\r
106     def set_executable(self, executable: str) -> None: ...\r
107     def set_forkserver_preload(self, module_names: List[str]) -> None: ...\r
108     def get_context(self, method: Optional[str] = ...) -> BaseContext: ...\r
109     def get_start_method(self, allow_none: bool = ...) -> str: ...\r
110     def set_start_method(self, method: Optional[str] = ...) -> None: ...\r
111     @property\r
112     def reducer(self) -> str: ...\r
113     @reducer.setter\r
114     def reducer(self, reduction: str) -> None: ...\r
115     def _check_available(self) -> None: ...\r
116 \r
117 class Process(object):\r
118     _start_method: Optional[str]\r
119     @staticmethod\r
120     # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
121     def _Popen(process_obj: Any) -> DefaultContext: ...\r
122 \r
123 class DefaultContext(object):\r
124     Process = ...  # type: Type[multiprocessing.Process]\r
125 \r
126     def __init__(self, context: BaseContext) -> None: ...\r
127     def get_context(self, method: Optional[str] = ...) -> BaseContext: ...\r
128     def set_start_method(self, method: str, force: bool = ...) -> None: ...\r
129     def get_start_method(self, allow_none: bool = ...) -> str: ...\r
130     def get_all_start_methods(self) -> List[str]: ...\r
131 \r
132 if sys.platform != 'win32':\r
133     # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
134     class ForkProcess(Any):  # type: ignore\r
135         _start_method: str\r
136         @staticmethod\r
137         def _Popen(process_obj: Any) -> Any: ...\r
138 \r
139     # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
140     class SpawnProcess(Any):  # type: ignore\r
141         _start_method: str\r
142         @staticmethod\r
143         def _Popen(process_obj: Any) -> SpawnProcess: ...\r
144 \r
145     # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
146     class ForkServerProcess(Any):  # type: ignore\r
147         _start_method: str\r
148         @staticmethod\r
149         def _Popen(process_obj: Any) -> Any: ...\r
150 \r
151     class ForkContext(BaseContext):\r
152         _name: str\r
153         Process = ...  # type: Type[ForkProcess]\r
154 \r
155     class SpawnContext(BaseContext):\r
156         _name: str\r
157         Process = ...  # type: Type[SpawnProcess]\r
158 \r
159     class ForkServerContext(BaseContext):\r
160         _name: str\r
161         Process = ...  # type: Type[ForkServerProcess]\r
162 else:\r
163     # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
164     class SpawnProcess(Any):  # type: ignore\r
165         _start_method: str\r
166         @staticmethod\r
167         # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
168         def _Popen(process_obj: Process) -> Any: ...\r
169 \r
170     class SpawnContext(BaseContext):\r
171         _name: str\r
172         Process = ...  # type: Type[SpawnProcess]\r
173 \r
174 def _force_start_method(method: str) -> None: ...\r
175 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
176 def get_spawning_popen() -> Optional[Any]: ...\r
177 # TODO: type should be BaseProcess once a stub in multiprocessing.process exists\r
178 def set_spawning_popen(popen: Any) -> None: ...\r
179 def assert_spawning(obj: Any) -> None: ...\r