efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / queues.py
1 """Queues"""
2
3 __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
4
5 import collections
6 import heapq
7
8 from . import compat
9 from . import events
10 from . import futures
11 from . import locks
12 from .coroutines import coroutine, From, Return
13
14
15 class QueueEmpty(Exception):
16     """Exception raised when Queue.get_nowait() is called on a Queue object
17     which is empty.
18     """
19     pass
20
21
22 class QueueFull(Exception):
23     """Exception raised when the Queue.put_nowait() method is called on a Queue
24     object which is full.
25     """
26     pass
27
28
29 class Queue(object):
30     """A queue, useful for coordinating producer and consumer coroutines.
31
32     If maxsize is less than or equal to zero, the queue size is infinite. If it
33     is an integer greater than 0, then "yield from put()" will block when the
34     queue reaches maxsize, until an item is removed by get().
35
36     Unlike the standard library Queue, you can reliably know this Queue's size
37     with qsize(), since your single-threaded asyncio application won't be
38     interrupted between calling qsize() and doing an operation on the Queue.
39     """
40
41     def __init__(self, maxsize=0, loop=None):
42         if loop is None:
43             self._loop = events.get_event_loop()
44         else:
45             self._loop = loop
46         self._maxsize = maxsize
47
48         # Futures.
49         self._getters = collections.deque()
50         # Futures
51         self._putters = collections.deque()
52         self._unfinished_tasks = 0
53         self._finished = locks.Event(loop=self._loop)
54         self._finished.set()
55         self._init(maxsize)
56
57     # These three are overridable in subclasses.
58
59     def _init(self, maxsize):
60         self._queue = collections.deque()
61
62     def _get(self):
63         return self._queue.popleft()
64
65     def _put(self, item):
66         self._queue.append(item)
67
68     # End of the overridable methods.
69
70     def __put_internal(self, item):
71         self._put(item)
72         self._unfinished_tasks += 1
73         self._finished.clear()
74
75     def __repr__(self):
76         return '<{0} at {1:#x} {2}>'.format(
77             type(self).__name__, id(self), self._format())
78
79     def __str__(self):
80         return '<{0} {1}>'.format(type(self).__name__, self._format())
81
82     def _format(self):
83         result = 'maxsize={0!r}'.format(self._maxsize)
84         if getattr(self, '_queue', None):
85             result += ' _queue={0!r}'.format(list(self._queue))
86         if self._getters:
87             result += ' _getters[{0}]'.format(len(self._getters))
88         if self._putters:
89             result += ' _putters[{0}]'.format(len(self._putters))
90         if self._unfinished_tasks:
91             result += ' tasks={0}'.format(self._unfinished_tasks)
92         return result
93
94     def _consume_done_getters(self):
95         # Delete waiters at the head of the get() queue who've timed out.
96         while self._getters and self._getters[0].done():
97             self._getters.popleft()
98
99     def _consume_done_putters(self):
100         # Delete waiters at the head of the put() queue who've timed out.
101         while self._putters and self._putters[0].done():
102             self._putters.popleft()
103
104     def qsize(self):
105         """Number of items in the queue."""
106         return len(self._queue)
107
108     @property
109     def maxsize(self):
110         """Number of items allowed in the queue."""
111         return self._maxsize
112
113     def empty(self):
114         """Return True if the queue is empty, False otherwise."""
115         return not self._queue
116
117     def full(self):
118         """Return True if there are maxsize items in the queue.
119
120         Note: if the Queue was initialized with maxsize=0 (the default),
121         then full() is never True.
122         """
123         if self._maxsize <= 0:
124             return False
125         else:
126             return self.qsize() >= self._maxsize
127
128     @coroutine
129     def put(self, item):
130         """Put an item into the queue.
131
132         Put an item into the queue. If the queue is full, wait until a free
133         slot is available before adding item.
134
135         This method is a coroutine.
136         """
137         self._consume_done_getters()
138         if self._getters:
139             assert not self._queue, (
140                 'queue non-empty, why are getters waiting?')
141
142             getter = self._getters.popleft()
143             self.__put_internal(item)
144
145             # getter cannot be cancelled, we just removed done getters
146             getter.set_result(self._get())
147
148         elif self._maxsize > 0 and self._maxsize <= self.qsize():
149             waiter = futures.Future(loop=self._loop)
150
151             self._putters.append(waiter)
152             yield From(waiter)
153             self._put(item)
154
155         else:
156             self.__put_internal(item)
157
158     def put_nowait(self, item):
159         """Put an item into the queue without blocking.
160
161         If no free slot is immediately available, raise QueueFull.
162         """
163         self._consume_done_getters()
164         if self._getters:
165             assert not self._queue, (
166                 'queue non-empty, why are getters waiting?')
167
168             getter = self._getters.popleft()
169             self.__put_internal(item)
170
171             # getter cannot be cancelled, we just removed done getters
172             getter.set_result(self._get())
173
174         elif self._maxsize > 0 and self._maxsize <= self.qsize():
175             raise QueueFull
176         else:
177             self.__put_internal(item)
178
179     @coroutine
180     def get(self):
181         """Remove and return an item from the queue.
182
183         If queue is empty, wait until an item is available.
184
185         This method is a coroutine.
186         """
187         self._consume_done_putters()
188         if self._putters:
189             assert self.full(), 'queue not full, why are putters waiting?'
190             putter = self._putters.popleft()
191
192             # When a getter runs and frees up a slot so this putter can
193             # run, we need to defer the put for a tick to ensure that
194             # getters and putters alternate perfectly. See
195             # ChannelTest.test_wait.
196             self._loop.call_soon(putter._set_result_unless_cancelled, None)
197
198             raise Return(self._get())
199
200         elif self.qsize():
201             raise Return(self._get())
202         else:
203             waiter = futures.Future(loop=self._loop)
204             self._getters.append(waiter)
205             try:
206                 value = (yield From(waiter))
207                 raise Return(value)
208             except futures.CancelledError:
209                 # if we get CancelledError, it means someone cancelled this
210                 # get() coroutine.  But there is a chance that the waiter
211                 # already is ready and contains an item that has just been
212                 # removed from the queue.  In this case, we need to put the item
213                 # back into the front of the queue.  This get() must either
214                 # succeed without fault or, if it gets cancelled, it must be as
215                 # if it never happened.
216                 if waiter.done():
217                     self._put_it_back(waiter.result())
218                 raise
219
220     def _put_it_back(self, item):
221         """
222         This is called when we have a waiter to get() an item and this waiter
223         gets cancelled.  In this case, we put the item back: wake up another
224         waiter or put it in the _queue.
225         """
226         self._consume_done_getters()
227         if self._getters:
228             assert not self._queue, (
229                 'queue non-empty, why are getters waiting?')
230
231             getter = self._getters.popleft()
232             self.__put_internal(item)
233
234             # getter cannot be cancelled, we just removed done getters
235             getter.set_result(item)
236         else:
237             self._queue.appendleft(item)
238
239     def get_nowait(self):
240         """Remove and return an item from the queue.
241
242         Return an item if one is immediately available, else raise QueueEmpty.
243         """
244         self._consume_done_putters()
245         if self._putters:
246             assert self.full(), 'queue not full, why are putters waiting?'
247             putter = self._putters.popleft()
248             # Wake putter on next tick.
249
250             # getter cannot be cancelled, we just removed done putters
251             putter.set_result(None)
252
253             return self._get()
254
255         elif self.qsize():
256             return self._get()
257         else:
258             raise QueueEmpty
259
260     def task_done(self):
261         """Indicate that a formerly enqueued task is complete.
262
263         Used by queue consumers. For each get() used to fetch a task,
264         a subsequent call to task_done() tells the queue that the processing
265         on the task is complete.
266
267         If a join() is currently blocking, it will resume when all items have
268         been processed (meaning that a task_done() call was received for every
269         item that had been put() into the queue).
270
271         Raises ValueError if called more times than there were items placed in
272         the queue.
273         """
274         if self._unfinished_tasks <= 0:
275             raise ValueError('task_done() called too many times')
276         self._unfinished_tasks -= 1
277         if self._unfinished_tasks == 0:
278             self._finished.set()
279
280     @coroutine
281     def join(self):
282         """Block until all items in the queue have been gotten and processed.
283
284         The count of unfinished tasks goes up whenever an item is added to the
285         queue. The count goes down whenever a consumer calls task_done() to
286         indicate that the item was retrieved and all work on it is complete.
287         When the count of unfinished tasks drops to zero, join() unblocks.
288         """
289         if self._unfinished_tasks > 0:
290             yield From(self._finished.wait())
291
292
293 class PriorityQueue(Queue):
294     """A subclass of Queue; retrieves entries in priority order (lowest first).
295
296     Entries are typically tuples of the form: (priority number, data).
297     """
298
299     def _init(self, maxsize):
300         self._queue = []
301
302     def _put(self, item, heappush=heapq.heappush):
303         heappush(self._queue, item)
304
305     def _get(self, heappop=heapq.heappop):
306         return heappop(self._queue)
307
308
309 class LifoQueue(Queue):
310     """A subclass of Queue that retrieves most recently added entries first."""
311
312     def _init(self, maxsize):
313         self._queue = []
314
315     def _put(self, item):
316         self._queue.append(item)
317
318     def _get(self):
319         return self._queue.pop()
320
321
322 if not compat.PY35:
323     JoinableQueue = Queue
324     """Deprecated alias for Queue."""
325     __all__.append('JoinableQueue')