2 from collections import deque
3 from types import TracebackType
4 from typing import Any, Awaitable, Callable, Generator, Type, TypeVar
6 from .events import AbstractEventLoop
7 from .futures import Future
11 if sys.version_info >= (3, 9):
12 class _ContextManagerMixin:
13 def __init__(self, lock: Lock | Semaphore) -> None: ...
14 def __aenter__(self) -> Awaitable[None]: ...
16 self, exc_type: Type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None
17 ) -> Awaitable[None]: ...
20 class _ContextManager:
21 def __init__(self, lock: Lock | Semaphore) -> None: ...
22 def __enter__(self) -> object: ...
23 def __exit__(self, *args: Any) -> None: ...
24 class _ContextManagerMixin:
25 def __init__(self, lock: Lock | Semaphore) -> None: ...
26 # Apparently this exists to *prohibit* use as a context manager.
27 def __enter__(self) -> object: ...
28 def __exit__(self, *args: Any) -> None: ...
29 def __iter__(self) -> Generator[Any, None, _ContextManager]: ...
30 def __await__(self) -> Generator[Any, None, _ContextManager]: ...
31 def __aenter__(self) -> Awaitable[None]: ...
33 self, exc_type: Type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None
34 ) -> Awaitable[None]: ...
36 class Lock(_ContextManagerMixin):
37 def __init__(self, *, loop: AbstractEventLoop | None = ...) -> None: ...
38 def locked(self) -> bool: ...
39 async def acquire(self) -> bool: ...
40 def release(self) -> None: ...
43 def __init__(self, *, loop: AbstractEventLoop | None = ...) -> None: ...
44 def is_set(self) -> bool: ...
45 def set(self) -> None: ...
46 def clear(self) -> None: ...
47 async def wait(self) -> bool: ...
49 class Condition(_ContextManagerMixin):
50 def __init__(self, lock: Lock | None = ..., *, loop: AbstractEventLoop | None = ...) -> None: ...
51 def locked(self) -> bool: ...
52 async def acquire(self) -> bool: ...
53 def release(self) -> None: ...
54 async def wait(self) -> bool: ...
55 async def wait_for(self, predicate: Callable[[], _T]) -> _T: ...
56 def notify(self, n: int = ...) -> None: ...
57 def notify_all(self) -> None: ...
59 class Semaphore(_ContextManagerMixin):
61 _waiters: deque[Future[Any]]
62 def __init__(self, value: int = ..., *, loop: AbstractEventLoop | None = ...) -> None: ...
63 def locked(self) -> bool: ...
64 async def acquire(self) -> bool: ...
65 def release(self) -> None: ...
66 def _wake_up_next(self) -> None: ...
68 class BoundedSemaphore(Semaphore):
69 def __init__(self, value: int = ..., *, loop: AbstractEventLoop | None = ...) -> None: ...