1 """Synchronization primitives."""
3 __all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
10 from .coroutines import coroutine, From, Return
13 class _ContextManager:
16 This enables the following idiom for acquiring and releasing a
19 with (yield From(lock)):
22 while failing loudly when accidentally using:
28 def __init__(self, lock):
32 # We have no use for the "as ..." clause in the with
33 # statement for locks.
36 def __exit__(self, *args):
40 self._lock = None # Crudely prevent reuse.
43 class _ContextManagerMixin(object):
46 '"yield From" should be used as context manager expression')
48 def __exit__(self, *args):
49 # This must exist because __enter__ exists, even though that
50 # always raises; that's how the with-statement works.
53 # FIXME: support PEP 492?
56 # def __await__(self):
57 # # To make "with await lock" work.
58 # yield from self.acquire()
59 # return _ContextManager(self)
62 # def __aenter__(self):
63 # yield from self.acquire()
64 # # We have no use for the "as ..." clause in the with
65 # # statement for locks.
69 # def __aexit__(self, exc_type, exc, tb):
73 class Lock(_ContextManagerMixin):
74 """Primitive lock objects.
76 A primitive lock is a synchronization primitive that is not owned
77 by a particular coroutine when locked. A primitive lock is in one
78 of two states, 'locked' or 'unlocked'.
80 It is created in the unlocked state. It has two basic methods,
81 acquire() and release(). When the state is unlocked, acquire()
82 changes the state to locked and returns immediately. When the
83 state is locked, acquire() blocks until a call to release() in
84 another coroutine changes it to unlocked, then the acquire() call
85 resets it to locked and returns. The release() method should only
86 be called in the locked state; it changes the state to unlocked
87 and returns immediately. If an attempt is made to release an
88 unlocked lock, a RuntimeError will be raised.
90 When more than one coroutine is blocked in acquire() waiting for
91 the state to turn to unlocked, only one coroutine proceeds when a
92 release() call resets the state to unlocked; first coroutine which
93 is blocked in acquire() is being processed.
95 acquire() is a coroutine and should be called with 'yield From'.
97 Locks also support the context management protocol. '(yield From(lock))'
98 should be used as context manager expression.
110 Context manager usage:
114 with (yield From(lock)):
117 Lock objects can be tested for locking state:
119 if not lock.locked():
127 def __init__(self, loop=None):
128 self._waiters = collections.deque()
133 self._loop = events.get_event_loop()
136 res = super(Lock, self).__repr__()
137 extra = 'locked' if self._locked else 'unlocked'
139 extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
140 return '<{0} [{1}]>'.format(res[1:-1], extra)
143 """Return True if lock is acquired."""
150 This method blocks until the lock is unlocked, then sets it to
151 locked and returns True.
153 if not self._waiters and not self._locked:
157 fut = futures.Future(loop=self._loop)
158 self._waiters.append(fut)
164 self._waiters.remove(fut)
169 When the lock is locked, reset it to unlocked, and return.
170 If any other coroutines are blocked waiting for the lock to become
171 unlocked, allow exactly one of them to proceed.
173 When invoked on an unlocked lock, a RuntimeError is raised.
175 There is no return value.
179 # Wake up the first waiter who isn't cancelled.
180 for fut in self._waiters:
185 raise RuntimeError('Lock is not acquired.')
189 """Asynchronous equivalent to threading.Event.
191 Class implementing event objects. An event manages a flag that can be set
192 to true with the set() method and reset to false with the clear() method.
193 The wait() method blocks until the flag is true. The flag is initially
197 def __init__(self, loop=None):
198 self._waiters = collections.deque()
203 self._loop = events.get_event_loop()
206 res = super(Event, self).__repr__()
207 extra = 'set' if self._value else 'unset'
209 extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
210 return '<{0} [{1}]>'.format(res[1:-1], extra)
213 """Return True if and only if the internal flag is true."""
217 """Set the internal flag to true. All coroutines waiting for it to
218 become true are awakened. Coroutine that call wait() once the flag is
219 true will not block at all.
224 for fut in self._waiters:
229 """Reset the internal flag to false. Subsequently, coroutines calling
230 wait() will block until set() is called to set the internal flag
236 """Block until the internal flag is true.
238 If the internal flag is true on entry, return True
239 immediately. Otherwise, block until another coroutine calls
240 set() to set the flag to true, then return True.
245 fut = futures.Future(loop=self._loop)
246 self._waiters.append(fut)
251 self._waiters.remove(fut)
254 class Condition(_ContextManagerMixin):
255 """Asynchronous equivalent to threading.Condition.
257 This class implements condition variable objects. A condition variable
258 allows one or more coroutines to wait until they are notified by another
261 A new Lock object is created and used as the underlying lock.
264 def __init__(self, lock=None, loop=None):
268 self._loop = events.get_event_loop()
271 lock = Lock(loop=self._loop)
272 elif lock._loop is not self._loop:
273 raise ValueError("loop argument must agree with lock")
276 # Export the lock's locked(), acquire() and release() methods.
277 self.locked = lock.locked
278 self.acquire = lock.acquire
279 self.release = lock.release
281 self._waiters = collections.deque()
284 res = super(Condition, self).__repr__()
285 extra = 'locked' if self.locked() else 'unlocked'
287 extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
288 return '<{0} [{1}]>'.format(res[1:-1], extra)
292 """Wait until notified.
294 If the calling coroutine has not acquired the lock when this
295 method is called, a RuntimeError is raised.
297 This method releases the underlying lock, and then blocks
298 until it is awakened by a notify() or notify_all() call for
299 the same condition variable in another coroutine. Once
300 awakened, it re-acquires the lock and returns True.
302 if not self.locked():
303 raise RuntimeError('cannot wait on un-acquired lock')
307 fut = futures.Future(loop=self._loop)
308 self._waiters.append(fut)
313 self._waiters.remove(fut)
315 except Exception as exc:
316 # Workaround CPython bug #23353: using yield/yield-from in an
317 # except block of a generator doesn't clear properly
324 yield From(self.acquire())
327 yield From(self.acquire())
330 def wait_for(self, predicate):
331 """Wait until a predicate becomes true.
333 The predicate should be a callable which result will be
334 interpreted as a boolean value. The final predicate value is
339 yield From(self.wait())
343 def notify(self, n=1):
344 """By default, wake up one coroutine waiting on this condition, if any.
345 If the calling coroutine has not acquired the lock when this method
346 is called, a RuntimeError is raised.
348 This method wakes up at most n of the coroutines waiting for the
349 condition variable; it is a no-op if no coroutines are waiting.
351 Note: an awakened coroutine does not actually return from its
352 wait() call until it can reacquire the lock. Since notify() does
353 not release the lock, its caller should.
355 if not self.locked():
356 raise RuntimeError('cannot notify on un-acquired lock')
359 for fut in self._waiters:
365 fut.set_result(False)
367 def notify_all(self):
368 """Wake up all threads waiting on this condition. This method acts
369 like notify(), but wakes up all waiting threads instead of one. If the
370 calling thread has not acquired the lock when this method is called,
371 a RuntimeError is raised.
373 self.notify(len(self._waiters))
376 class Semaphore(_ContextManagerMixin):
377 """A Semaphore implementation.
379 A semaphore manages an internal counter which is decremented by each
380 acquire() call and incremented by each release() call. The counter
381 can never go below zero; when acquire() finds that it is zero, it blocks,
382 waiting until some other thread calls release().
384 Semaphores also support the context management protocol.
386 The optional argument gives the initial value for the internal
387 counter; it defaults to 1. If the value given is less than 0,
388 ValueError is raised.
391 def __init__(self, value=1, loop=None):
393 raise ValueError("Semaphore initial value must be >= 0")
395 self._waiters = collections.deque()
399 self._loop = events.get_event_loop()
402 res = super(Semaphore, self).__repr__()
403 extra = 'locked' if self.locked() else 'unlocked,value:{0}'.format(
406 extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
407 return '<{0} [{1}]>'.format(res[1:-1], extra)
410 """Returns True if semaphore can not be acquired immediately."""
411 return self._value == 0
415 """Acquire a semaphore.
417 If the internal counter is larger than zero on entry,
418 decrement it by one and return True immediately. If it is
419 zero on entry, block, waiting until some other coroutine has
420 called release() to make it larger than 0, and then return
423 if not self._waiters and self._value > 0:
427 fut = futures.Future(loop=self._loop)
428 self._waiters.append(fut)
434 self._waiters.remove(fut)
437 """Release a semaphore, incrementing the internal counter by one.
438 When it was zero on entry and another coroutine is waiting for it to
439 become larger than zero again, wake up that coroutine.
442 for waiter in self._waiters:
443 if not waiter.done():
444 waiter.set_result(True)
448 class BoundedSemaphore(Semaphore):
449 """A bounded semaphore implementation.
451 This raises ValueError in release() if it would increase the value
452 above the initial value.
455 def __init__(self, value=1, loop=None):
456 self._bound_value = value
457 super(BoundedSemaphore, self).__init__(value, loop=loop)
460 if self._value >= self._bound_value:
461 raise ValueError('BoundedSemaphore released too many times')
462 super(BoundedSemaphore, self).release()