1 """Stream-related things."""
3 __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
4 'open_connection', 'start_server',
10 if hasattr(socket, 'AF_UNIX'):
11 __all__.extend(['open_unix_connection', 'start_unix_server'])
13 from . import coroutines
17 from . import protocols
18 from .coroutines import coroutine, From, Return
19 from .py33_exceptions import ConnectionResetError
20 from .log import logger
23 _DEFAULT_LIMIT = 2**16
26 class IncompleteReadError(EOFError):
28 Incomplete read error. Attributes:
30 - partial: read bytes string before the end of stream was reached
31 - expected: total number of expected bytes
33 def __init__(self, partial, expected):
34 EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
35 % (len(partial), expected))
36 self.partial = partial
37 self.expected = expected
41 def open_connection(host=None, port=None,
42 loop=None, limit=_DEFAULT_LIMIT, **kwds):
43 """A wrapper for create_connection() returning a (reader, writer) pair.
45 The reader returned is a StreamReader instance; the writer is a
46 StreamWriter instance.
48 The arguments are all the usual arguments to create_connection()
49 except protocol_factory; most common are positional host and port,
50 with various optional keyword arguments following.
52 Additional optional keyword arguments are loop (to set the event loop
53 instance to use) and limit (to set the buffer limit passed to the
56 (If you want to customize the StreamReader and/or
57 StreamReaderProtocol classes, just copy the code -- there's
58 really nothing special here except some convenience.)
61 loop = events.get_event_loop()
62 reader = StreamReader(limit=limit, loop=loop)
63 protocol = StreamReaderProtocol(reader, loop=loop)
64 transport, _ = yield From(loop.create_connection(
65 lambda: protocol, host, port, **kwds))
66 writer = StreamWriter(transport, protocol, reader, loop)
67 raise Return(reader, writer)
71 def start_server(client_connected_cb, host=None, port=None,
72 loop=None, limit=_DEFAULT_LIMIT, **kwds):
73 """Start a socket server, call back for each client connected.
75 The first parameter, `client_connected_cb`, takes two parameters:
76 client_reader, client_writer. client_reader is a StreamReader
77 object, while client_writer is a StreamWriter object. This
78 parameter can either be a plain callback function or a coroutine;
79 if it is a coroutine, it will be automatically converted into a
82 The rest of the arguments are all the usual arguments to
83 loop.create_server() except protocol_factory; most common are
84 positional host and port, with various optional keyword arguments
85 following. The return value is the same as loop.create_server().
87 Additional optional keyword arguments are loop (to set the event loop
88 instance to use) and limit (to set the buffer limit passed to the
91 The return value is the same as loop.create_server(), i.e. a
92 Server object which can be used to stop the service.
95 loop = events.get_event_loop()
98 reader = StreamReader(limit=limit, loop=loop)
99 protocol = StreamReaderProtocol(reader, client_connected_cb,
103 server = yield From(loop.create_server(factory, host, port, **kwds))
107 if hasattr(socket, 'AF_UNIX'):
108 # UNIX Domain Sockets are supported on this platform
111 def open_unix_connection(path=None,
112 loop=None, limit=_DEFAULT_LIMIT, **kwds):
113 """Similar to `open_connection` but works with UNIX Domain Sockets."""
115 loop = events.get_event_loop()
116 reader = StreamReader(limit=limit, loop=loop)
117 protocol = StreamReaderProtocol(reader, loop=loop)
118 transport, _ = yield From(loop.create_unix_connection(
119 lambda: protocol, path, **kwds))
120 writer = StreamWriter(transport, protocol, reader, loop)
121 raise Return(reader, writer)
125 def start_unix_server(client_connected_cb, path=None,
126 loop=None, limit=_DEFAULT_LIMIT, **kwds):
127 """Similar to `start_server` but works with UNIX Domain Sockets."""
129 loop = events.get_event_loop()
132 reader = StreamReader(limit=limit, loop=loop)
133 protocol = StreamReaderProtocol(reader, client_connected_cb,
137 server = (yield From(loop.create_unix_server(factory, path, **kwds)))
141 class FlowControlMixin(protocols.Protocol):
142 """Reusable flow control logic for StreamWriter.drain().
144 This implements the protocol methods pause_writing(),
145 resume_reading() and connection_lost(). If the subclass overrides
146 these it must call the super methods.
148 StreamWriter.drain() must wait for _drain_helper() coroutine.
151 def __init__(self, loop=None):
153 self._loop = events.get_event_loop()
157 self._drain_waiter = None
158 self._connection_lost = False
160 def pause_writing(self):
161 assert not self._paused
163 if self._loop.get_debug():
164 logger.debug("%r pauses writing", self)
166 def resume_writing(self):
169 if self._loop.get_debug():
170 logger.debug("%r resumes writing", self)
172 waiter = self._drain_waiter
173 if waiter is not None:
174 self._drain_waiter = None
175 if not waiter.done():
176 waiter.set_result(None)
178 def connection_lost(self, exc):
179 self._connection_lost = True
180 # Wake up the writer if currently paused.
183 waiter = self._drain_waiter
186 self._drain_waiter = None
190 waiter.set_result(None)
192 waiter.set_exception(exc)
195 def _drain_helper(self):
196 if self._connection_lost:
197 raise ConnectionResetError('Connection lost')
200 waiter = self._drain_waiter
201 assert waiter is None or waiter.cancelled()
202 waiter = futures.Future(loop=self._loop)
203 self._drain_waiter = waiter
207 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
208 """Helper class to adapt between Protocol and StreamReader.
210 (This is a helper class instead of making StreamReader itself a
211 Protocol subclass, because the StreamReader has other potential
212 uses, and to prevent the user of the StreamReader to accidentally
213 call inappropriate methods of the protocol.)
216 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
217 super(StreamReaderProtocol, self).__init__(loop=loop)
218 self._stream_reader = stream_reader
219 self._stream_writer = None
220 self._client_connected_cb = client_connected_cb
222 def connection_made(self, transport):
223 self._stream_reader.set_transport(transport)
224 if self._client_connected_cb is not None:
225 self._stream_writer = StreamWriter(transport, self,
228 res = self._client_connected_cb(self._stream_reader,
230 if coroutines.iscoroutine(res):
231 self._loop.create_task(res)
233 def connection_lost(self, exc):
235 self._stream_reader.feed_eof()
237 self._stream_reader.set_exception(exc)
238 super(StreamReaderProtocol, self).connection_lost(exc)
240 def data_received(self, data):
241 self._stream_reader.feed_data(data)
243 def eof_received(self):
244 self._stream_reader.feed_eof()
248 class StreamWriter(object):
249 """Wraps a Transport.
251 This exposes write(), writelines(), [can_]write_eof(),
252 get_extra_info() and close(). It adds drain() which returns an
253 optional Future on which you can wait for flow control. It also
254 adds a transport property which references the Transport
258 def __init__(self, transport, protocol, reader, loop):
259 self._transport = transport
260 self._protocol = protocol
261 # drain() expects that the reader has a exception() method
262 assert reader is None or isinstance(reader, StreamReader)
263 self._reader = reader
267 info = [self.__class__.__name__, 'transport=%r' % self._transport]
268 if self._reader is not None:
269 info.append('reader=%r' % self._reader)
270 return '<%s>' % ' '.join(info)
274 return self._transport
276 def write(self, data):
277 self._transport.write(data)
279 def writelines(self, data):
280 self._transport.writelines(data)
283 return self._transport.write_eof()
285 def can_write_eof(self):
286 return self._transport.can_write_eof()
289 return self._transport.close()
291 def get_extra_info(self, name, default=None):
292 return self._transport.get_extra_info(name, default)
296 """Flush the write buffer.
298 The intended use is to write
301 yield From(w.drain())
303 if self._reader is not None:
304 exc = self._reader.exception()
307 yield From(self._protocol._drain_helper())
310 class StreamReader(object):
312 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
313 # The line length limit is a security feature;
314 # it also doubles as half the buffer limit.
317 self._loop = events.get_event_loop()
320 self._buffer = bytearray()
321 self._eof = False # Whether we're done.
322 self._waiter = None # A future used by _wait_for_data()
323 self._exception = None
324 self._transport = None
328 info = ['StreamReader']
330 info.append('%d bytes' % len(info))
333 if self._limit != _DEFAULT_LIMIT:
334 info.append('l=%d' % self._limit)
336 info.append('w=%r' % self._waiter)
338 info.append('e=%r' % self._exception)
340 info.append('t=%r' % self._transport)
342 info.append('paused')
343 return '<%s>' % ' '.join(info)
346 return self._exception
348 def set_exception(self, exc):
349 self._exception = exc
351 waiter = self._waiter
352 if waiter is not None:
354 if not waiter.cancelled():
355 waiter.set_exception(exc)
357 def _wakeup_waiter(self):
358 """Wakeup read() or readline() function waiting for data or EOF."""
359 waiter = self._waiter
360 if waiter is not None:
362 if not waiter.cancelled():
363 waiter.set_result(None)
365 def set_transport(self, transport):
366 assert self._transport is None, 'Transport already set'
367 self._transport = transport
369 def _maybe_resume_transport(self):
370 if self._paused and len(self._buffer) <= self._limit:
372 self._transport.resume_reading()
376 self._wakeup_waiter()
379 """Return True if the buffer is empty and 'feed_eof' was called."""
380 return self._eof and not self._buffer
382 def feed_data(self, data):
383 assert not self._eof, 'feed_data after feed_eof'
388 self._buffer.extend(data)
389 self._wakeup_waiter()
391 if (self._transport is not None and
393 len(self._buffer) > 2*self._limit):
395 self._transport.pause_reading()
396 except NotImplementedError:
397 # The transport can't be paused.
398 # We'll just have to buffer all data.
399 # Forget the transport so we don't keep trying.
400 self._transport = None
405 def _wait_for_data(self, func_name):
406 """Wait until feed_data() or feed_eof() is called."""
407 # StreamReader uses a future to link the protocol feed_data() method
408 # to a read coroutine. Running two read coroutines at the same time
409 # would have an unexpected behaviour. It would not possible to know
410 # which coroutine would get the next data.
411 if self._waiter is not None:
412 raise RuntimeError('%s() called while another coroutine is '
413 'already waiting for incoming data' % func_name)
415 # In asyncio, there is no need to recheck if we got data or EOF thanks
416 # to "yield from". In trollius, a StreamReader method can be called
417 # after the _wait_for_data() coroutine is scheduled and before it is
419 if self._buffer or self._eof:
422 self._waiter = futures.Future(loop=self._loop)
424 yield From(self._waiter)
430 if self._exception is not None:
431 raise self._exception
437 while self._buffer and not_enough:
438 ichar = self._buffer.find(b'\n')
440 line.extend(self._buffer)
444 line.extend(self._buffer[:ichar])
445 del self._buffer[:ichar]
448 if len(line) > self._limit:
449 self._maybe_resume_transport()
450 raise ValueError('Line is too long')
456 yield From(self._wait_for_data('readline'))
458 self._maybe_resume_transport()
459 raise Return(bytes(line))
462 def read(self, n=-1):
463 if self._exception is not None:
464 raise self._exception
470 # This used to just loop creating a new waiter hoping to
471 # collect everything in self._buffer, but that would
472 # deadlock if the subprocess sends more than self.limit
473 # bytes. So just call self.read(self._limit) until EOF.
476 block = yield From(self.read(self._limit))
480 raise Return(b''.join(blocks))
482 if not self._buffer and not self._eof:
483 yield From(self._wait_for_data('read'))
485 if n < 0 or len(self._buffer) <= n:
486 data = bytes(self._buffer)
489 # n > 0 and len(self._buffer) > n
490 data = bytes(self._buffer[:n])
493 self._maybe_resume_transport()
497 def readexactly(self, n):
498 if self._exception is not None:
499 raise self._exception
501 # There used to be "optimized" code here. It created its own
502 # Future and waited until self._buffer had at least the n
503 # bytes, then called read(n). Unfortunately, this could pause
504 # the transport if the argument was larger than the pause
505 # limit (which is twice self._limit). So now we just read()
506 # into a local buffer.
510 block = yield From(self.read(n))
512 partial = b''.join(blocks)
513 raise IncompleteReadError(partial, len(partial) + n)
517 raise Return(b''.join(blocks))
519 # FIXME: should we support __aiter__ and __anext__ in Trollius?
522 # def __aiter__(self):
526 # def __anext__(self):
527 # val = yield from self.readline()
529 # raise StopAsyncIteration