1 """Event loop implementation that uses the `asyncio` standard module.
3 The `asyncio` module was added to python standard library on 3.4, and it
4 provides a pure python implementation of an event loop library. It is used
5 as a fallback in case pyuv is not available(on python implementations other
8 Earlier python versions are supported through the `trollius` package, which
9 is a backport of `asyncio` that works on Python 2.6+.
11 from __future__ import absolute_import
16 from collections import deque
19 # For python 3.4+, use the standard library module
21 except (ImportError, SyntaxError):
22 # Fallback to trollius
23 import trollius as asyncio
25 from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop
27 logger = logging.getLogger(__name__)
28 debug, info, warn = (logger.debug, logger.info, logger.warning,)
30 loop_cls = asyncio.SelectorEventLoop
32 from asyncio.windows_utils import PipeHandle
35 # On windows use ProactorEventLoop which support pipes and is backed by the
36 # more powerful IOCP facility
37 # NOTE: we override in the stdio case, because it doesn't work.
38 loop_cls = asyncio.ProactorEventLoop
41 class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol,
42 asyncio.SubprocessProtocol):
44 """`BaseEventLoop` subclass that uses `asyncio` as a backend."""
46 def connection_made(self, transport):
47 """Used to signal `asyncio.Protocol` of a successful connection."""
48 self._transport = transport
49 self._raw_transport = transport
50 if isinstance(transport, asyncio.SubprocessTransport):
51 self._transport = transport.get_pipe_transport(0)
53 def connection_lost(self, exc):
54 """Used to signal `asyncio.Protocol` of a lost connection."""
55 self._on_error(exc.args[0] if exc else 'EOF')
57 def data_received(self, data):
58 """Used to signal `asyncio.Protocol` of incoming data."""
62 self._queued_data.append(data)
64 def pipe_connection_lost(self, fd, exc):
65 """Used to signal `asyncio.SubprocessProtocol` of a lost connection."""
66 self._on_error(exc.args[0] if exc else 'EOF')
68 def pipe_data_received(self, fd, data):
69 """Used to signal `asyncio.SubprocessProtocol` of incoming data."""
70 if fd == 2: # stderr fd number
75 self._queued_data.append(data)
77 def process_exited(self):
78 """Used to signal `asyncio.SubprocessProtocol` when the child exits."""
82 self._loop = loop_cls()
83 self._queued_data = deque()
84 self._fact = lambda: self
85 self._raw_transport = None
87 def _connect_tcp(self, address, port):
88 coroutine = self._loop.create_connection(self._fact, address, port)
89 self._loop.run_until_complete(coroutine)
91 def _connect_socket(self, path):
93 coroutine = self._loop.create_pipe_connection(self._fact, path)
95 coroutine = self._loop.create_unix_connection(self._fact, path)
96 self._loop.run_until_complete(coroutine)
98 def _connect_stdio(self):
100 pipe = PipeHandle(msvcrt.get_osfhandle(sys.stdin.fileno()))
103 coroutine = self._loop.connect_read_pipe(self._fact, pipe)
104 self._loop.run_until_complete(coroutine)
105 pass # replaces next logging statement
106 #debug("native stdin connection successful")
108 # Make sure subprocesses don't clobber stdout,
109 # send the output to stderr instead.
110 rename_stdout = os.dup(sys.stdout.fileno())
111 os.dup2(sys.stderr.fileno(), sys.stdout.fileno())
114 pipe = PipeHandle(msvcrt.get_osfhandle(rename_stdout))
116 pipe = os.fdopen(rename_stdout, 'wb')
117 coroutine = self._loop.connect_write_pipe(self._fact, pipe)
118 self._loop.run_until_complete(coroutine)
119 pass # replaces next logging statement
120 #debug("native stdout connection successful")
122 def _connect_child(self, argv):
124 self._child_watcher = asyncio.get_child_watcher()
125 self._child_watcher.attach_loop(self._loop)
126 coroutine = self._loop.subprocess_exec(self._fact, *argv)
127 self._loop.run_until_complete(coroutine)
129 def _start_reading(self):
132 def _send(self, data):
133 self._transport.write(data)
136 while self._queued_data:
137 self._on_data(self._queued_data.popleft())
138 self._loop.run_forever()
144 if self._raw_transport is not None:
145 self._raw_transport.close()
148 def _threadsafe_call(self, fn):
149 self._loop.call_soon_threadsafe(fn)
151 def _setup_signals(self, signals):
153 # add_signal_handler is not supported in win32
157 self._signals = list(signals)
158 for signum in self._signals:
159 self._loop.add_signal_handler(signum, self._on_signal, signum)
161 def _teardown_signals(self):
162 for signum in self._signals:
163 self._loop.remove_signal_handler(signum)