| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | """Stream-related things.""" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-25 15:07:18 -08:00
										 |  |  | __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol', | 
					
						
							| 
									
										
										
										
											2014-02-18 12:15:06 -05:00
										 |  |  |            'open_connection', 'start_server', | 
					
						
							|  |  |  |            'IncompleteReadError', | 
					
						
							| 
									
										
										
										
											2013-11-19 11:43:38 -08:00
										 |  |  |            ] | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-18 12:15:06 -05:00
										 |  |  | import socket | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-18 10:24:30 -08:00
										 |  |  | if hasattr(socket, 'AF_UNIX'): | 
					
						
							|  |  |  |     __all__.extend(['open_unix_connection', 'start_unix_server']) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  | from . import coroutines | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | from . import events | 
					
						
							|  |  |  | from . import futures | 
					
						
							|  |  |  | from . import protocols | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  | from .coroutines import coroutine | 
					
						
							| 
									
										
										
										
											2014-07-14 18:33:40 +02:00
										 |  |  | from .log import logger | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | _DEFAULT_LIMIT = 2**16 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-30 16:05:28 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-25 15:32:06 +01:00
										 |  |  | class IncompleteReadError(EOFError): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Incomplete read error. Attributes: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     - partial: read bytes string before the end of stream was reached | 
					
						
							|  |  |  |     - expected: total number of expected bytes | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     def __init__(self, partial, expected): | 
					
						
							|  |  |  |         EOFError.__init__(self, "%s bytes read on a total of %s expected bytes" | 
					
						
							|  |  |  |                                 % (len(partial), expected)) | 
					
						
							|  |  |  |         self.partial = partial | 
					
						
							|  |  |  |         self.expected = expected | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  | @coroutine | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | def open_connection(host=None, port=None, *, | 
					
						
							|  |  |  |                     loop=None, limit=_DEFAULT_LIMIT, **kwds): | 
					
						
							|  |  |  |     """A wrapper for create_connection() returning a (reader, writer) pair.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The reader returned is a StreamReader instance; the writer is a | 
					
						
							| 
									
										
										
										
											2014-01-23 17:40:03 +01:00
										 |  |  |     StreamWriter instance. | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     The arguments are all the usual arguments to create_connection() | 
					
						
							|  |  |  |     except protocol_factory; most common are positional host and port, | 
					
						
							|  |  |  |     with various optional keyword arguments following. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Additional optional keyword arguments are loop (to set the event loop | 
					
						
							|  |  |  |     instance to use) and limit (to set the buffer limit passed to the | 
					
						
							|  |  |  |     StreamReader). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     (If you want to customize the StreamReader and/or | 
					
						
							|  |  |  |     StreamReaderProtocol classes, just copy the code -- there's | 
					
						
							|  |  |  |     really nothing special here except some convenience.) | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     if loop is None: | 
					
						
							|  |  |  |         loop = events.get_event_loop() | 
					
						
							|  |  |  |     reader = StreamReader(limit=limit, loop=loop) | 
					
						
							| 
									
										
										
										
											2014-01-10 13:26:38 -08:00
										 |  |  |     protocol = StreamReaderProtocol(reader, loop=loop) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     transport, _ = yield from loop.create_connection( | 
					
						
							|  |  |  |         lambda: protocol, host, port, **kwds) | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |     writer = StreamWriter(transport, protocol, reader, loop) | 
					
						
							|  |  |  |     return reader, writer | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  | @coroutine | 
					
						
							| 
									
										
										
										
											2013-11-19 11:43:38 -08:00
										 |  |  | def start_server(client_connected_cb, host=None, port=None, *, | 
					
						
							|  |  |  |                  loop=None, limit=_DEFAULT_LIMIT, **kwds): | 
					
						
							|  |  |  |     """Start a socket server, call back for each client connected.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The first parameter, `client_connected_cb`, takes two parameters: | 
					
						
							|  |  |  |     client_reader, client_writer.  client_reader is a StreamReader | 
					
						
							|  |  |  |     object, while client_writer is a StreamWriter object.  This | 
					
						
							|  |  |  |     parameter can either be a plain callback function or a coroutine; | 
					
						
							|  |  |  |     if it is a coroutine, it will be automatically converted into a | 
					
						
							|  |  |  |     Task. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The rest of the arguments are all the usual arguments to | 
					
						
							|  |  |  |     loop.create_server() except protocol_factory; most common are | 
					
						
							|  |  |  |     positional host and port, with various optional keyword arguments | 
					
						
							|  |  |  |     following.  The return value is the same as loop.create_server(). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Additional optional keyword arguments are loop (to set the event loop | 
					
						
							|  |  |  |     instance to use) and limit (to set the buffer limit passed to the | 
					
						
							|  |  |  |     StreamReader). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     The return value is the same as loop.create_server(), i.e. a | 
					
						
							|  |  |  |     Server object which can be used to stop the service. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     if loop is None: | 
					
						
							|  |  |  |         loop = events.get_event_loop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def factory(): | 
					
						
							|  |  |  |         reader = StreamReader(limit=limit, loop=loop) | 
					
						
							|  |  |  |         protocol = StreamReaderProtocol(reader, client_connected_cb, | 
					
						
							|  |  |  |                                         loop=loop) | 
					
						
							|  |  |  |         return protocol | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return (yield from loop.create_server(factory, host, port, **kwds)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-18 12:15:06 -05:00
										 |  |  | if hasattr(socket, 'AF_UNIX'): | 
					
						
							|  |  |  |     # UNIX Domain Sockets are supported on this platform | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  |     @coroutine | 
					
						
							| 
									
										
										
										
											2014-02-18 12:15:06 -05:00
										 |  |  |     def open_unix_connection(path=None, *, | 
					
						
							|  |  |  |                              loop=None, limit=_DEFAULT_LIMIT, **kwds): | 
					
						
							|  |  |  |         """Similar to `open_connection` but works with UNIX Domain Sockets.""" | 
					
						
							|  |  |  |         if loop is None: | 
					
						
							|  |  |  |             loop = events.get_event_loop() | 
					
						
							|  |  |  |         reader = StreamReader(limit=limit, loop=loop) | 
					
						
							|  |  |  |         protocol = StreamReaderProtocol(reader, loop=loop) | 
					
						
							|  |  |  |         transport, _ = yield from loop.create_unix_connection( | 
					
						
							|  |  |  |             lambda: protocol, path, **kwds) | 
					
						
							|  |  |  |         writer = StreamWriter(transport, protocol, reader, loop) | 
					
						
							|  |  |  |         return reader, writer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  |     @coroutine | 
					
						
							| 
									
										
										
										
											2014-02-18 12:15:06 -05:00
										 |  |  |     def start_unix_server(client_connected_cb, path=None, *, | 
					
						
							|  |  |  |                           loop=None, limit=_DEFAULT_LIMIT, **kwds): | 
					
						
							|  |  |  |         """Similar to `start_server` but works with UNIX Domain Sockets.""" | 
					
						
							|  |  |  |         if loop is None: | 
					
						
							|  |  |  |             loop = events.get_event_loop() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         def factory(): | 
					
						
							|  |  |  |             reader = StreamReader(limit=limit, loop=loop) | 
					
						
							|  |  |  |             protocol = StreamReaderProtocol(reader, client_connected_cb, | 
					
						
							|  |  |  |                                             loop=loop) | 
					
						
							|  |  |  |             return protocol | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return (yield from loop.create_unix_server(factory, path, **kwds)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  | class FlowControlMixin(protocols.Protocol): | 
					
						
							|  |  |  |     """Reusable flow control logic for StreamWriter.drain().
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This implements the protocol methods pause_writing(), | 
					
						
							|  |  |  |     resume_reading() and connection_lost().  If the subclass overrides | 
					
						
							|  |  |  |     these it must call the super methods. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |     StreamWriter.drain() must wait for _drain_helper() coroutine. | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, loop=None): | 
					
						
							| 
									
										
										
										
											2015-01-09 21:32:05 +01:00
										 |  |  |         if loop is None: | 
					
						
							|  |  |  |             self._loop = events.get_event_loop() | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self._loop = loop | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |         self._paused = False | 
					
						
							|  |  |  |         self._drain_waiter = None | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |         self._connection_lost = False | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def pause_writing(self): | 
					
						
							|  |  |  |         assert not self._paused | 
					
						
							|  |  |  |         self._paused = True | 
					
						
							| 
									
										
										
										
											2014-07-14 18:33:40 +02:00
										 |  |  |         if self._loop.get_debug(): | 
					
						
							|  |  |  |             logger.debug("%r pauses writing", self) | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def resume_writing(self): | 
					
						
							|  |  |  |         assert self._paused | 
					
						
							|  |  |  |         self._paused = False | 
					
						
							| 
									
										
										
										
											2014-07-14 18:33:40 +02:00
										 |  |  |         if self._loop.get_debug(): | 
					
						
							|  |  |  |             logger.debug("%r resumes writing", self) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |         waiter = self._drain_waiter | 
					
						
							|  |  |  |         if waiter is not None: | 
					
						
							|  |  |  |             self._drain_waiter = None | 
					
						
							|  |  |  |             if not waiter.done(): | 
					
						
							|  |  |  |                 waiter.set_result(None) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def connection_lost(self, exc): | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |         self._connection_lost = True | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |         # Wake up the writer if currently paused. | 
					
						
							|  |  |  |         if not self._paused: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         waiter = self._drain_waiter | 
					
						
							|  |  |  |         if waiter is None: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         self._drain_waiter = None | 
					
						
							|  |  |  |         if waiter.done(): | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         if exc is None: | 
					
						
							|  |  |  |             waiter.set_result(None) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             waiter.set_exception(exc) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |     @coroutine | 
					
						
							|  |  |  |     def _drain_helper(self): | 
					
						
							|  |  |  |         if self._connection_lost: | 
					
						
							|  |  |  |             raise ConnectionResetError('Connection lost') | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |         if not self._paused: | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |             return | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |         waiter = self._drain_waiter | 
					
						
							|  |  |  |         assert waiter is None or waiter.cancelled() | 
					
						
							|  |  |  |         waiter = futures.Future(loop=self._loop) | 
					
						
							|  |  |  |         self._drain_waiter = waiter | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |         yield from waiter | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): | 
					
						
							|  |  |  |     """Helper class to adapt between Protocol and StreamReader.
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     (This is a helper class instead of making StreamReader itself a | 
					
						
							|  |  |  |     Protocol subclass, because the StreamReader has other potential | 
					
						
							|  |  |  |     uses, and to prevent the user of the StreamReader to accidentally | 
					
						
							|  |  |  |     call inappropriate methods of the protocol.) | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-19 11:43:38 -08:00
										 |  |  |     def __init__(self, stream_reader, client_connected_cb=None, loop=None): | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |         super().__init__(loop=loop) | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         self._stream_reader = stream_reader | 
					
						
							| 
									
										
										
										
											2013-11-19 11:43:38 -08:00
										 |  |  |         self._stream_writer = None | 
					
						
							|  |  |  |         self._client_connected_cb = client_connected_cb | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def connection_made(self, transport): | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         self._stream_reader.set_transport(transport) | 
					
						
							| 
									
										
										
										
											2013-11-19 11:43:38 -08:00
										 |  |  |         if self._client_connected_cb is not None: | 
					
						
							|  |  |  |             self._stream_writer = StreamWriter(transport, self, | 
					
						
							|  |  |  |                                                self._stream_reader, | 
					
						
							|  |  |  |                                                self._loop) | 
					
						
							|  |  |  |             res = self._client_connected_cb(self._stream_reader, | 
					
						
							|  |  |  |                                             self._stream_writer) | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  |             if coroutines.iscoroutine(res): | 
					
						
							| 
									
										
										
										
											2014-07-08 11:29:25 +02:00
										 |  |  |                 self._loop.create_task(res) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def connection_lost(self, exc): | 
					
						
							|  |  |  |         if exc is None: | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |             self._stream_reader.feed_eof() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         else: | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |             self._stream_reader.set_exception(exc) | 
					
						
							| 
									
										
										
										
											2014-01-29 14:24:45 -08:00
										 |  |  |         super().connection_lost(exc) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def data_received(self, data): | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         self._stream_reader.feed_data(data) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def eof_received(self): | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         self._stream_reader.feed_eof() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class StreamWriter: | 
					
						
							|  |  |  |     """Wraps a Transport.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This exposes write(), writelines(), [can_]write_eof(), | 
					
						
							|  |  |  |     get_extra_info() and close().  It adds drain() which returns an | 
					
						
							|  |  |  |     optional Future on which you can wait for flow control.  It also | 
					
						
							| 
									
										
										
										
											2014-01-10 13:26:38 -08:00
										 |  |  |     adds a transport property which references the Transport | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |     directly. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, transport, protocol, reader, loop): | 
					
						
							|  |  |  |         self._transport = transport | 
					
						
							|  |  |  |         self._protocol = protocol | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |         # drain() expects that the reader has a exception() method | 
					
						
							|  |  |  |         assert reader is None or isinstance(reader, StreamReader) | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         self._reader = reader | 
					
						
							|  |  |  |         self._loop = loop | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-14 18:33:40 +02:00
										 |  |  |     def __repr__(self): | 
					
						
							| 
									
										
										
										
											2015-01-15 21:50:19 +01:00
										 |  |  |         info = [self.__class__.__name__, 'transport=%r' % self._transport] | 
					
						
							| 
									
										
										
										
											2014-07-14 18:33:40 +02:00
										 |  |  |         if self._reader is not None: | 
					
						
							|  |  |  |             info.append('reader=%r' % self._reader) | 
					
						
							|  |  |  |         return '<%s>' % ' '.join(info) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |     @property | 
					
						
							|  |  |  |     def transport(self): | 
					
						
							|  |  |  |         return self._transport | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def write(self, data): | 
					
						
							|  |  |  |         self._transport.write(data) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def writelines(self, data): | 
					
						
							|  |  |  |         self._transport.writelines(data) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def write_eof(self): | 
					
						
							|  |  |  |         return self._transport.write_eof() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def can_write_eof(self): | 
					
						
							|  |  |  |         return self._transport.can_write_eof() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-15 21:50:19 +01:00
										 |  |  |     def close(self): | 
					
						
							|  |  |  |         return self._transport.close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |     def get_extra_info(self, name, default=None): | 
					
						
							|  |  |  |         return self._transport.get_extra_info(name, default) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |     @coroutine | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |     def drain(self): | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |         """Flush the write buffer.
 | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         The intended use is to write | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |           w.write(data) | 
					
						
							|  |  |  |           yield from w.drain() | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2014-07-22 12:03:40 +02:00
										 |  |  |         if self._reader is not None: | 
					
						
							|  |  |  |             exc = self._reader.exception() | 
					
						
							|  |  |  |             if exc is not None: | 
					
						
							|  |  |  |                 raise exc | 
					
						
							|  |  |  |         yield from self._protocol._drain_helper() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class StreamReader: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, limit=_DEFAULT_LIMIT, loop=None): | 
					
						
							|  |  |  |         # The line length limit is  a security feature; | 
					
						
							|  |  |  |         # it also doubles as half the buffer limit. | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         self._limit = limit | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         if loop is None: | 
					
						
							| 
									
										
										
										
											2015-01-09 21:32:05 +01:00
										 |  |  |             self._loop = events.get_event_loop() | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self._loop = loop | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         self._buffer = bytearray() | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  |         self._eof = False    # Whether we're done. | 
					
						
							|  |  |  |         self._waiter = None  # A future used by _wait_for_data() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         self._exception = None | 
					
						
							|  |  |  |         self._transport = None | 
					
						
							|  |  |  |         self._paused = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def exception(self): | 
					
						
							|  |  |  |         return self._exception | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def set_exception(self, exc): | 
					
						
							|  |  |  |         self._exception = exc | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         waiter = self._waiter | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         if waiter is not None: | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |             self._waiter = None | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             if not waiter.cancelled(): | 
					
						
							|  |  |  |                 waiter.set_exception(exc) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  |     def _wakeup_waiter(self): | 
					
						
							|  |  |  |         """Wakeup read() or readline() function waiting for data or EOF.""" | 
					
						
							|  |  |  |         waiter = self._waiter | 
					
						
							|  |  |  |         if waiter is not None: | 
					
						
							|  |  |  |             self._waiter = None | 
					
						
							|  |  |  |             if not waiter.cancelled(): | 
					
						
							|  |  |  |                 waiter.set_result(None) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     def set_transport(self, transport): | 
					
						
							|  |  |  |         assert self._transport is None, 'Transport already set' | 
					
						
							|  |  |  |         self._transport = transport | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _maybe_resume_transport(self): | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         if self._paused and len(self._buffer) <= self._limit: | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             self._paused = False | 
					
						
							| 
									
										
										
										
											2013-10-18 07:58:20 -07:00
										 |  |  |             self._transport.resume_reading() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def feed_eof(self): | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |         self._eof = True | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  |         self._wakeup_waiter() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-06 00:14:30 -05:00
										 |  |  |     def at_eof(self): | 
					
						
							|  |  |  |         """Return True if the buffer is empty and 'feed_eof' was called.""" | 
					
						
							|  |  |  |         return self._eof and not self._buffer | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     def feed_data(self, data): | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         assert not self._eof, 'feed_data after feed_eof' | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         if not data: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         self._buffer.extend(data) | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  |         self._wakeup_waiter() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if (self._transport is not None and | 
					
						
							|  |  |  |             not self._paused and | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |             len(self._buffer) > 2*self._limit): | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             try: | 
					
						
							| 
									
										
										
										
											2013-10-18 07:58:20 -07:00
										 |  |  |                 self._transport.pause_reading() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |             except NotImplementedError: | 
					
						
							|  |  |  |                 # The transport can't be paused. | 
					
						
							|  |  |  |                 # We'll just have to buffer all data. | 
					
						
							|  |  |  |                 # Forget the transport so we don't keep trying. | 
					
						
							|  |  |  |                 self._transport = None | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 self._paused = True | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-03-18 11:37:42 +01:00
										 |  |  |     @coroutine | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  |     def _wait_for_data(self, func_name): | 
					
						
							|  |  |  |         """Wait until feed_data() or feed_eof() is called.""" | 
					
						
							| 
									
										
										
										
											2014-01-23 17:40:03 +01:00
										 |  |  |         # StreamReader uses a future to link the protocol feed_data() method | 
					
						
							|  |  |  |         # to a read coroutine. Running two read coroutines at the same time | 
					
						
							|  |  |  |         # would have an unexpected behaviour. It would not possible to know | 
					
						
							|  |  |  |         # which coroutine would get the next data. | 
					
						
							|  |  |  |         if self._waiter is not None: | 
					
						
							|  |  |  |             raise RuntimeError('%s() called while another coroutine is ' | 
					
						
							|  |  |  |                                'already waiting for incoming data' % func_name) | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self._waiter = futures.Future(loop=self._loop) | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             yield from self._waiter | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self._waiter = None | 
					
						
							| 
									
										
										
										
											2014-01-23 17:40:03 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  |     @coroutine | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     def readline(self): | 
					
						
							|  |  |  |         if self._exception is not None: | 
					
						
							|  |  |  |             raise self._exception | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         line = bytearray() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         not_enough = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         while not_enough: | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |             while self._buffer and not_enough: | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |                 ichar = self._buffer.find(b'\n') | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                 if ichar < 0: | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |                     line.extend(self._buffer) | 
					
						
							|  |  |  |                     self._buffer.clear() | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                 else: | 
					
						
							|  |  |  |                     ichar += 1 | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |                     line.extend(self._buffer[:ichar]) | 
					
						
							|  |  |  |                     del self._buffer[:ichar] | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                     not_enough = False | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |                 if len(line) > self._limit: | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                     self._maybe_resume_transport() | 
					
						
							|  |  |  |                     raise ValueError('Line is too long') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |             if self._eof: | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |                 break | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if not_enough: | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  |                 yield from self._wait_for_data('readline') | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self._maybe_resume_transport() | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         return bytes(line) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  |     @coroutine | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     def read(self, n=-1): | 
					
						
							|  |  |  |         if self._exception is not None: | 
					
						
							|  |  |  |             raise self._exception | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if not n: | 
					
						
							|  |  |  |             return b'' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if n < 0: | 
					
						
							| 
									
										
										
										
											2014-05-12 10:04:37 -07:00
										 |  |  |             # This used to just loop creating a new waiter hoping to | 
					
						
							|  |  |  |             # collect everything in self._buffer, but that would | 
					
						
							|  |  |  |             # deadlock if the subprocess sends more than self.limit | 
					
						
							|  |  |  |             # bytes.  So just call self.read(self._limit) until EOF. | 
					
						
							|  |  |  |             blocks = [] | 
					
						
							|  |  |  |             while True: | 
					
						
							|  |  |  |                 block = yield from self.read(self._limit) | 
					
						
							|  |  |  |                 if not block: | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 blocks.append(block) | 
					
						
							|  |  |  |             return b''.join(blocks) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |         else: | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |             if not self._buffer and not self._eof: | 
					
						
							| 
									
										
										
										
											2015-01-14 00:53:37 +01:00
										 |  |  |                 yield from self._wait_for_data('read') | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         if n < 0 or len(self._buffer) <= n: | 
					
						
							|  |  |  |             data = bytes(self._buffer) | 
					
						
							| 
									
										
										
										
											2013-10-18 15:17:11 -07:00
										 |  |  |             self._buffer.clear() | 
					
						
							| 
									
										
										
										
											2014-02-05 18:11:13 -05:00
										 |  |  |         else: | 
					
						
							|  |  |  |             # n > 0 and len(self._buffer) > n | 
					
						
							|  |  |  |             data = bytes(self._buffer[:n]) | 
					
						
							|  |  |  |             del self._buffer[:n] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self._maybe_resume_transport() | 
					
						
							|  |  |  |         return data | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-06-29 00:46:45 +02:00
										 |  |  |     @coroutine | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  |     def readexactly(self, n): | 
					
						
							|  |  |  |         if self._exception is not None: | 
					
						
							|  |  |  |             raise self._exception | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-06 16:09:18 -08:00
										 |  |  |         # There used to be "optimized" code here.  It created its own | 
					
						
							|  |  |  |         # Future and waited until self._buffer had at least the n | 
					
						
							|  |  |  |         # bytes, then called read(n).  Unfortunately, this could pause | 
					
						
							|  |  |  |         # the transport if the argument was larger than the pause | 
					
						
							|  |  |  |         # limit (which is twice self._limit).  So now we just read() | 
					
						
							|  |  |  |         # into a local buffer. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         blocks = [] | 
					
						
							|  |  |  |         while n > 0: | 
					
						
							|  |  |  |             block = yield from self.read(n) | 
					
						
							|  |  |  |             if not block: | 
					
						
							| 
									
										
										
										
											2014-01-25 15:32:06 +01:00
										 |  |  |                 partial = b''.join(blocks) | 
					
						
							|  |  |  |                 raise IncompleteReadError(partial, len(partial) + n) | 
					
						
							| 
									
										
										
										
											2014-01-06 16:09:18 -08:00
										 |  |  |             blocks.append(block) | 
					
						
							|  |  |  |             n -= len(block) | 
					
						
							| 
									
										
										
										
											2013-10-17 13:40:50 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-06 16:09:18 -08:00
										 |  |  |         return b''.join(blocks) |