1 from typing import Callable, ContextManager, Optional, Union
\r
3 from multiprocessing.context import BaseContext
\r
7 _LockLike = Union[Lock, RLock]
\r
9 class Barrier(threading.Barrier):
\r
12 action: Optional[Callable] = ...,
\r
13 timeout: Optional[float] = ...,
\r
15 ctx: BaseContext) -> None: ...
\r
17 class BoundedSemaphore(Semaphore):
\r
18 def __init__(self, value: int = ..., *, ctx: BaseContext) -> None: ...
\r
20 class Condition(ContextManager[bool]):
\r
22 lock: Optional[_LockLike] = ...,
\r
24 ctx: BaseContext) -> None: ...
\r
25 if sys.version_info >= (3, 7):
\r
26 def notify(self, n: int = ...) -> None: ...
\r
28 def notify(self) -> None: ...
\r
29 def notify_all(self) -> None: ...
\r
30 def wait(self, timeout: Optional[float] = ...) -> bool: ...
\r
32 predicate: Callable[[], bool],
\r
33 timeout: Optional[float] = ...) -> bool: ...
\r
36 timeout: Optional[float] = ...) -> bool: ...
\r
37 def release(self) -> None: ...
\r
39 class Event(ContextManager[bool]):
\r
41 lock: Optional[_LockLike] = ...,
\r
43 ctx: BaseContext) -> None: ...
\r
44 def is_set(self) -> bool: ...
\r
45 def set(self) -> None: ...
\r
46 def clear(self) -> None: ...
\r
47 def wait(self, timeout: Optional[float] = ...) -> bool: ...
\r
49 class Lock(SemLock):
\r
50 def __init__(self, *, ctx: BaseContext) -> None: ...
\r
52 class RLock(SemLock):
\r
53 def __init__(self, *, ctx: BaseContext) -> None: ...
\r
55 class Semaphore(SemLock):
\r
56 def __init__(self, value: int = ..., *, ctx: BaseContext) -> None: ...
\r
58 # Not part of public API
\r
59 class SemLock(ContextManager[bool]):
\r
62 timeout: Optional[float] = ...) -> bool: ...
\r
63 def release(self) -> None: ...
\r