efficient vim config
[dotfiles/.git] / .local / lib / python2.7 / site-packages / trollius / streams.py
1 """Stream-related things."""
2
3 __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
4            'open_connection', 'start_server',
5            'IncompleteReadError',
6            ]
7
8 import socket
9
10 if hasattr(socket, 'AF_UNIX'):
11     __all__.extend(['open_unix_connection', 'start_unix_server'])
12
13 from . import coroutines
14 from . import compat
15 from . import events
16 from . import futures
17 from . import protocols
18 from .coroutines import coroutine, From, Return
19 from .py33_exceptions import ConnectionResetError
20 from .log import logger
21
22
23 _DEFAULT_LIMIT = 2**16
24
25
26 class IncompleteReadError(EOFError):
27     """
28     Incomplete read error. Attributes:
29
30     - partial: read bytes string before the end of stream was reached
31     - expected: total number of expected bytes
32     """
33     def __init__(self, partial, expected):
34         EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
35                                 % (len(partial), expected))
36         self.partial = partial
37         self.expected = expected
38
39
40 @coroutine
41 def open_connection(host=None, port=None,
42                     loop=None, limit=_DEFAULT_LIMIT, **kwds):
43     """A wrapper for create_connection() returning a (reader, writer) pair.
44
45     The reader returned is a StreamReader instance; the writer is a
46     StreamWriter instance.
47
48     The arguments are all the usual arguments to create_connection()
49     except protocol_factory; most common are positional host and port,
50     with various optional keyword arguments following.
51
52     Additional optional keyword arguments are loop (to set the event loop
53     instance to use) and limit (to set the buffer limit passed to the
54     StreamReader).
55
56     (If you want to customize the StreamReader and/or
57     StreamReaderProtocol classes, just copy the code -- there's
58     really nothing special here except some convenience.)
59     """
60     if loop is None:
61         loop = events.get_event_loop()
62     reader = StreamReader(limit=limit, loop=loop)
63     protocol = StreamReaderProtocol(reader, loop=loop)
64     transport, _ = yield From(loop.create_connection(
65         lambda: protocol, host, port, **kwds))
66     writer = StreamWriter(transport, protocol, reader, loop)
67     raise Return(reader, writer)
68
69
70 @coroutine
71 def start_server(client_connected_cb, host=None, port=None,
72                  loop=None, limit=_DEFAULT_LIMIT, **kwds):
73     """Start a socket server, call back for each client connected.
74
75     The first parameter, `client_connected_cb`, takes two parameters:
76     client_reader, client_writer.  client_reader is a StreamReader
77     object, while client_writer is a StreamWriter object.  This
78     parameter can either be a plain callback function or a coroutine;
79     if it is a coroutine, it will be automatically converted into a
80     Task.
81
82     The rest of the arguments are all the usual arguments to
83     loop.create_server() except protocol_factory; most common are
84     positional host and port, with various optional keyword arguments
85     following.  The return value is the same as loop.create_server().
86
87     Additional optional keyword arguments are loop (to set the event loop
88     instance to use) and limit (to set the buffer limit passed to the
89     StreamReader).
90
91     The return value is the same as loop.create_server(), i.e. a
92     Server object which can be used to stop the service.
93     """
94     if loop is None:
95         loop = events.get_event_loop()
96
97     def factory():
98         reader = StreamReader(limit=limit, loop=loop)
99         protocol = StreamReaderProtocol(reader, client_connected_cb,
100                                         loop=loop)
101         return protocol
102
103     server = yield From(loop.create_server(factory, host, port, **kwds))
104     raise Return(server)
105
106
107 if hasattr(socket, 'AF_UNIX'):
108     # UNIX Domain Sockets are supported on this platform
109
110     @coroutine
111     def open_unix_connection(path=None,
112                              loop=None, limit=_DEFAULT_LIMIT, **kwds):
113         """Similar to `open_connection` but works with UNIX Domain Sockets."""
114         if loop is None:
115             loop = events.get_event_loop()
116         reader = StreamReader(limit=limit, loop=loop)
117         protocol = StreamReaderProtocol(reader, loop=loop)
118         transport, _ = yield From(loop.create_unix_connection(
119             lambda: protocol, path, **kwds))
120         writer = StreamWriter(transport, protocol, reader, loop)
121         raise Return(reader, writer)
122
123
124     @coroutine
125     def start_unix_server(client_connected_cb, path=None,
126                           loop=None, limit=_DEFAULT_LIMIT, **kwds):
127         """Similar to `start_server` but works with UNIX Domain Sockets."""
128         if loop is None:
129             loop = events.get_event_loop()
130
131         def factory():
132             reader = StreamReader(limit=limit, loop=loop)
133             protocol = StreamReaderProtocol(reader, client_connected_cb,
134                                             loop=loop)
135             return protocol
136
137         server = (yield From(loop.create_unix_server(factory, path, **kwds)))
138         raise Return(server)
139
140
141 class FlowControlMixin(protocols.Protocol):
142     """Reusable flow control logic for StreamWriter.drain().
143
144     This implements the protocol methods pause_writing(),
145     resume_reading() and connection_lost().  If the subclass overrides
146     these it must call the super methods.
147
148     StreamWriter.drain() must wait for _drain_helper() coroutine.
149     """
150
151     def __init__(self, loop=None):
152         if loop is None:
153             self._loop = events.get_event_loop()
154         else:
155             self._loop = loop
156         self._paused = False
157         self._drain_waiter = None
158         self._connection_lost = False
159
160     def pause_writing(self):
161         assert not self._paused
162         self._paused = True
163         if self._loop.get_debug():
164             logger.debug("%r pauses writing", self)
165
166     def resume_writing(self):
167         assert self._paused
168         self._paused = False
169         if self._loop.get_debug():
170             logger.debug("%r resumes writing", self)
171
172         waiter = self._drain_waiter
173         if waiter is not None:
174             self._drain_waiter = None
175             if not waiter.done():
176                 waiter.set_result(None)
177
178     def connection_lost(self, exc):
179         self._connection_lost = True
180         # Wake up the writer if currently paused.
181         if not self._paused:
182             return
183         waiter = self._drain_waiter
184         if waiter is None:
185             return
186         self._drain_waiter = None
187         if waiter.done():
188             return
189         if exc is None:
190             waiter.set_result(None)
191         else:
192             waiter.set_exception(exc)
193
194     @coroutine
195     def _drain_helper(self):
196         if self._connection_lost:
197             raise ConnectionResetError('Connection lost')
198         if not self._paused:
199             return
200         waiter = self._drain_waiter
201         assert waiter is None or waiter.cancelled()
202         waiter = futures.Future(loop=self._loop)
203         self._drain_waiter = waiter
204         yield From(waiter)
205
206
207 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
208     """Helper class to adapt between Protocol and StreamReader.
209
210     (This is a helper class instead of making StreamReader itself a
211     Protocol subclass, because the StreamReader has other potential
212     uses, and to prevent the user of the StreamReader to accidentally
213     call inappropriate methods of the protocol.)
214     """
215
216     def __init__(self, stream_reader, client_connected_cb=None, loop=None):
217         super(StreamReaderProtocol, self).__init__(loop=loop)
218         self._stream_reader = stream_reader
219         self._stream_writer = None
220         self._client_connected_cb = client_connected_cb
221
222     def connection_made(self, transport):
223         self._stream_reader.set_transport(transport)
224         if self._client_connected_cb is not None:
225             self._stream_writer = StreamWriter(transport, self,
226                                                self._stream_reader,
227                                                self._loop)
228             res = self._client_connected_cb(self._stream_reader,
229                                             self._stream_writer)
230             if coroutines.iscoroutine(res):
231                 self._loop.create_task(res)
232
233     def connection_lost(self, exc):
234         if exc is None:
235             self._stream_reader.feed_eof()
236         else:
237             self._stream_reader.set_exception(exc)
238         super(StreamReaderProtocol, self).connection_lost(exc)
239
240     def data_received(self, data):
241         self._stream_reader.feed_data(data)
242
243     def eof_received(self):
244         self._stream_reader.feed_eof()
245         return True
246
247
248 class StreamWriter(object):
249     """Wraps a Transport.
250
251     This exposes write(), writelines(), [can_]write_eof(),
252     get_extra_info() and close().  It adds drain() which returns an
253     optional Future on which you can wait for flow control.  It also
254     adds a transport property which references the Transport
255     directly.
256     """
257
258     def __init__(self, transport, protocol, reader, loop):
259         self._transport = transport
260         self._protocol = protocol
261         # drain() expects that the reader has a exception() method
262         assert reader is None or isinstance(reader, StreamReader)
263         self._reader = reader
264         self._loop = loop
265
266     def __repr__(self):
267         info = [self.__class__.__name__, 'transport=%r' % self._transport]
268         if self._reader is not None:
269             info.append('reader=%r' % self._reader)
270         return '<%s>' % ' '.join(info)
271
272     @property
273     def transport(self):
274         return self._transport
275
276     def write(self, data):
277         self._transport.write(data)
278
279     def writelines(self, data):
280         self._transport.writelines(data)
281
282     def write_eof(self):
283         return self._transport.write_eof()
284
285     def can_write_eof(self):
286         return self._transport.can_write_eof()
287
288     def close(self):
289         return self._transport.close()
290
291     def get_extra_info(self, name, default=None):
292         return self._transport.get_extra_info(name, default)
293
294     @coroutine
295     def drain(self):
296         """Flush the write buffer.
297
298         The intended use is to write
299
300           w.write(data)
301           yield From(w.drain())
302         """
303         if self._reader is not None:
304             exc = self._reader.exception()
305             if exc is not None:
306                 raise exc
307         yield From(self._protocol._drain_helper())
308
309
310 class StreamReader(object):
311
312     def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
313         # The line length limit is  a security feature;
314         # it also doubles as half the buffer limit.
315         self._limit = limit
316         if loop is None:
317             self._loop = events.get_event_loop()
318         else:
319             self._loop = loop
320         self._buffer = bytearray()
321         self._eof = False    # Whether we're done.
322         self._waiter = None  # A future used by _wait_for_data()
323         self._exception = None
324         self._transport = None
325         self._paused = False
326
327     def __repr__(self):
328         info = ['StreamReader']
329         if self._buffer:
330             info.append('%d bytes' % len(info))
331         if self._eof:
332             info.append('eof')
333         if self._limit != _DEFAULT_LIMIT:
334             info.append('l=%d' % self._limit)
335         if self._waiter:
336             info.append('w=%r' % self._waiter)
337         if self._exception:
338             info.append('e=%r' % self._exception)
339         if self._transport:
340             info.append('t=%r' % self._transport)
341         if self._paused:
342             info.append('paused')
343         return '<%s>' % ' '.join(info)
344
345     def exception(self):
346         return self._exception
347
348     def set_exception(self, exc):
349         self._exception = exc
350
351         waiter = self._waiter
352         if waiter is not None:
353             self._waiter = None
354             if not waiter.cancelled():
355                 waiter.set_exception(exc)
356
357     def _wakeup_waiter(self):
358         """Wakeup read() or readline() function waiting for data or EOF."""
359         waiter = self._waiter
360         if waiter is not None:
361             self._waiter = None
362             if not waiter.cancelled():
363                 waiter.set_result(None)
364
365     def set_transport(self, transport):
366         assert self._transport is None, 'Transport already set'
367         self._transport = transport
368
369     def _maybe_resume_transport(self):
370         if self._paused and len(self._buffer) <= self._limit:
371             self._paused = False
372             self._transport.resume_reading()
373
374     def feed_eof(self):
375         self._eof = True
376         self._wakeup_waiter()
377
378     def at_eof(self):
379         """Return True if the buffer is empty and 'feed_eof' was called."""
380         return self._eof and not self._buffer
381
382     def feed_data(self, data):
383         assert not self._eof, 'feed_data after feed_eof'
384
385         if not data:
386             return
387
388         self._buffer.extend(data)
389         self._wakeup_waiter()
390
391         if (self._transport is not None and
392             not self._paused and
393             len(self._buffer) > 2*self._limit):
394             try:
395                 self._transport.pause_reading()
396             except NotImplementedError:
397                 # The transport can't be paused.
398                 # We'll just have to buffer all data.
399                 # Forget the transport so we don't keep trying.
400                 self._transport = None
401             else:
402                 self._paused = True
403
404     @coroutine
405     def _wait_for_data(self, func_name):
406         """Wait until feed_data() or feed_eof() is called."""
407         # StreamReader uses a future to link the protocol feed_data() method
408         # to a read coroutine. Running two read coroutines at the same time
409         # would have an unexpected behaviour. It would not possible to know
410         # which coroutine would get the next data.
411         if self._waiter is not None:
412             raise RuntimeError('%s() called while another coroutine is '
413                                'already waiting for incoming data' % func_name)
414
415         # In asyncio, there is no need to recheck if we got data or EOF thanks
416         # to "yield from". In trollius, a StreamReader method can be called
417         # after the _wait_for_data() coroutine is scheduled and before it is
418         # really executed.
419         if self._buffer or self._eof:
420             return
421
422         self._waiter = futures.Future(loop=self._loop)
423         try:
424             yield From(self._waiter)
425         finally:
426             self._waiter = None
427
428     @coroutine
429     def readline(self):
430         if self._exception is not None:
431             raise self._exception
432
433         line = bytearray()
434         not_enough = True
435
436         while not_enough:
437             while self._buffer and not_enough:
438                 ichar = self._buffer.find(b'\n')
439                 if ichar < 0:
440                     line.extend(self._buffer)
441                     del self._buffer[:]
442                 else:
443                     ichar += 1
444                     line.extend(self._buffer[:ichar])
445                     del self._buffer[:ichar]
446                     not_enough = False
447
448                 if len(line) > self._limit:
449                     self._maybe_resume_transport()
450                     raise ValueError('Line is too long')
451
452             if self._eof:
453                 break
454
455             if not_enough:
456                 yield From(self._wait_for_data('readline'))
457
458         self._maybe_resume_transport()
459         raise Return(bytes(line))
460
461     @coroutine
462     def read(self, n=-1):
463         if self._exception is not None:
464             raise self._exception
465
466         if not n:
467             raise Return(b'')
468
469         if n < 0:
470             # This used to just loop creating a new waiter hoping to
471             # collect everything in self._buffer, but that would
472             # deadlock if the subprocess sends more than self.limit
473             # bytes.  So just call self.read(self._limit) until EOF.
474             blocks = []
475             while True:
476                 block = yield From(self.read(self._limit))
477                 if not block:
478                     break
479                 blocks.append(block)
480             raise Return(b''.join(blocks))
481         else:
482             if not self._buffer and not self._eof:
483                 yield From(self._wait_for_data('read'))
484
485         if n < 0 or len(self._buffer) <= n:
486             data = bytes(self._buffer)
487             del self._buffer[:]
488         else:
489             # n > 0 and len(self._buffer) > n
490             data = bytes(self._buffer[:n])
491             del self._buffer[:n]
492
493         self._maybe_resume_transport()
494         raise Return(data)
495
496     @coroutine
497     def readexactly(self, n):
498         if self._exception is not None:
499             raise self._exception
500
501         # There used to be "optimized" code here.  It created its own
502         # Future and waited until self._buffer had at least the n
503         # bytes, then called read(n).  Unfortunately, this could pause
504         # the transport if the argument was larger than the pause
505         # limit (which is twice self._limit).  So now we just read()
506         # into a local buffer.
507
508         blocks = []
509         while n > 0:
510             block = yield From(self.read(n))
511             if not block:
512                 partial = b''.join(blocks)
513                 raise IncompleteReadError(partial, len(partial) + n)
514             blocks.append(block)
515             n -= len(block)
516
517         raise Return(b''.join(blocks))
518
519     # FIXME: should we support __aiter__ and __anext__ in Trollius?
520     #if compat.PY35:
521     #    @coroutine
522     #    def __aiter__(self):
523     #        return self
524     #
525     #    @coroutine
526     #    def __anext__(self):
527     #        val = yield from self.readline()
528     #        if val == b'':
529     #            raise StopAsyncIteration
530     #        return val