3 __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
12 from .coroutines import coroutine, From, Return
15 class QueueEmpty(Exception):
16 """Exception raised when Queue.get_nowait() is called on a Queue object
22 class QueueFull(Exception):
23 """Exception raised when the Queue.put_nowait() method is called on a Queue
30 """A queue, useful for coordinating producer and consumer coroutines.
32 If maxsize is less than or equal to zero, the queue size is infinite. If it
33 is an integer greater than 0, then "yield from put()" will block when the
34 queue reaches maxsize, until an item is removed by get().
36 Unlike the standard library Queue, you can reliably know this Queue's size
37 with qsize(), since your single-threaded asyncio application won't be
38 interrupted between calling qsize() and doing an operation on the Queue.
41 def __init__(self, maxsize=0, loop=None):
43 self._loop = events.get_event_loop()
46 self._maxsize = maxsize
49 self._getters = collections.deque()
51 self._putters = collections.deque()
52 self._unfinished_tasks = 0
53 self._finished = locks.Event(loop=self._loop)
57 # These three are overridable in subclasses.
59 def _init(self, maxsize):
60 self._queue = collections.deque()
63 return self._queue.popleft()
66 self._queue.append(item)
68 # End of the overridable methods.
70 def __put_internal(self, item):
72 self._unfinished_tasks += 1
73 self._finished.clear()
76 return '<{0} at {1:#x} {2}>'.format(
77 type(self).__name__, id(self), self._format())
80 return '<{0} {1}>'.format(type(self).__name__, self._format())
83 result = 'maxsize={0!r}'.format(self._maxsize)
84 if getattr(self, '_queue', None):
85 result += ' _queue={0!r}'.format(list(self._queue))
87 result += ' _getters[{0}]'.format(len(self._getters))
89 result += ' _putters[{0}]'.format(len(self._putters))
90 if self._unfinished_tasks:
91 result += ' tasks={0}'.format(self._unfinished_tasks)
94 def _consume_done_getters(self):
95 # Delete waiters at the head of the get() queue who've timed out.
96 while self._getters and self._getters[0].done():
97 self._getters.popleft()
99 def _consume_done_putters(self):
100 # Delete waiters at the head of the put() queue who've timed out.
101 while self._putters and self._putters[0].done():
102 self._putters.popleft()
105 """Number of items in the queue."""
106 return len(self._queue)
110 """Number of items allowed in the queue."""
114 """Return True if the queue is empty, False otherwise."""
115 return not self._queue
118 """Return True if there are maxsize items in the queue.
120 Note: if the Queue was initialized with maxsize=0 (the default),
121 then full() is never True.
123 if self._maxsize <= 0:
126 return self.qsize() >= self._maxsize
130 """Put an item into the queue.
132 Put an item into the queue. If the queue is full, wait until a free
133 slot is available before adding item.
135 This method is a coroutine.
137 self._consume_done_getters()
139 assert not self._queue, (
140 'queue non-empty, why are getters waiting?')
142 getter = self._getters.popleft()
143 self.__put_internal(item)
145 # getter cannot be cancelled, we just removed done getters
146 getter.set_result(self._get())
148 elif self._maxsize > 0 and self._maxsize <= self.qsize():
149 waiter = futures.Future(loop=self._loop)
151 self._putters.append(waiter)
156 self.__put_internal(item)
158 def put_nowait(self, item):
159 """Put an item into the queue without blocking.
161 If no free slot is immediately available, raise QueueFull.
163 self._consume_done_getters()
165 assert not self._queue, (
166 'queue non-empty, why are getters waiting?')
168 getter = self._getters.popleft()
169 self.__put_internal(item)
171 # getter cannot be cancelled, we just removed done getters
172 getter.set_result(self._get())
174 elif self._maxsize > 0 and self._maxsize <= self.qsize():
177 self.__put_internal(item)
181 """Remove and return an item from the queue.
183 If queue is empty, wait until an item is available.
185 This method is a coroutine.
187 self._consume_done_putters()
189 assert self.full(), 'queue not full, why are putters waiting?'
190 putter = self._putters.popleft()
192 # When a getter runs and frees up a slot so this putter can
193 # run, we need to defer the put for a tick to ensure that
194 # getters and putters alternate perfectly. See
195 # ChannelTest.test_wait.
196 self._loop.call_soon(putter._set_result_unless_cancelled, None)
198 raise Return(self._get())
201 raise Return(self._get())
203 waiter = futures.Future(loop=self._loop)
204 self._getters.append(waiter)
206 value = (yield From(waiter))
208 except futures.CancelledError:
209 # if we get CancelledError, it means someone cancelled this
210 # get() coroutine. But there is a chance that the waiter
211 # already is ready and contains an item that has just been
212 # removed from the queue. In this case, we need to put the item
213 # back into the front of the queue. This get() must either
214 # succeed without fault or, if it gets cancelled, it must be as
215 # if it never happened.
217 self._put_it_back(waiter.result())
220 def _put_it_back(self, item):
222 This is called when we have a waiter to get() an item and this waiter
223 gets cancelled. In this case, we put the item back: wake up another
224 waiter or put it in the _queue.
226 self._consume_done_getters()
228 assert not self._queue, (
229 'queue non-empty, why are getters waiting?')
231 getter = self._getters.popleft()
232 self.__put_internal(item)
234 # getter cannot be cancelled, we just removed done getters
235 getter.set_result(item)
237 self._queue.appendleft(item)
239 def get_nowait(self):
240 """Remove and return an item from the queue.
242 Return an item if one is immediately available, else raise QueueEmpty.
244 self._consume_done_putters()
246 assert self.full(), 'queue not full, why are putters waiting?'
247 putter = self._putters.popleft()
248 # Wake putter on next tick.
250 # getter cannot be cancelled, we just removed done putters
251 putter.set_result(None)
261 """Indicate that a formerly enqueued task is complete.
263 Used by queue consumers. For each get() used to fetch a task,
264 a subsequent call to task_done() tells the queue that the processing
265 on the task is complete.
267 If a join() is currently blocking, it will resume when all items have
268 been processed (meaning that a task_done() call was received for every
269 item that had been put() into the queue).
271 Raises ValueError if called more times than there were items placed in
274 if self._unfinished_tasks <= 0:
275 raise ValueError('task_done() called too many times')
276 self._unfinished_tasks -= 1
277 if self._unfinished_tasks == 0:
282 """Block until all items in the queue have been gotten and processed.
284 The count of unfinished tasks goes up whenever an item is added to the
285 queue. The count goes down whenever a consumer calls task_done() to
286 indicate that the item was retrieved and all work on it is complete.
287 When the count of unfinished tasks drops to zero, join() unblocks.
289 if self._unfinished_tasks > 0:
290 yield From(self._finished.wait())
293 class PriorityQueue(Queue):
294 """A subclass of Queue; retrieves entries in priority order (lowest first).
296 Entries are typically tuples of the form: (priority number, data).
299 def _init(self, maxsize):
302 def _put(self, item, heappush=heapq.heappush):
303 heappush(self._queue, item)
305 def _get(self, heappop=heapq.heappop):
306 return heappop(self._queue)
309 class LifoQueue(Queue):
310 """A subclass of Queue that retrieves most recently added entries first."""
312 def _init(self, maxsize):
315 def _put(self, item):
316 self._queue.append(item)
319 return self._queue.pop()
323 JoinableQueue = Queue
324 """Deprecated alias for Queue."""
325 __all__.append('JoinableQueue')