efficient vim config
[dotfiles/.git] / .local / lib / python3.9 / site-packages / pynvim / msgpack_rpc / event_loop / uv.py
1 """Event loop implementation that uses pyuv(libuv-python bindings)."""
2 import sys
3 from collections import deque
4
5 import pyuv
6
7 from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop
8
9
10 class UvEventLoop(BaseEventLoop):
11
12     """`BaseEventLoop` subclass that uses `pvuv` as a backend."""
13
14     def _init(self):
15         self._loop = pyuv.Loop()
16         self._async = pyuv.Async(self._loop, self._on_async)
17         self._connection_error = None
18         self._error_stream = None
19         self._callbacks = deque()
20
21     def _on_connect(self, stream, error):
22         self.stop()
23         if error:
24             msg = 'Cannot connect to {}: {}'.format(
25                 self._connect_address, pyuv.errno.strerror(error))
26             self._connection_error = OSError(msg)
27             return
28         self._read_stream = self._write_stream = stream
29
30     def _on_read(self, handle, data, error):
31         if error or not data:
32             msg = pyuv.errno.strerror(error) if error else 'EOF'
33             self._on_error(msg)
34             return
35         if handle == self._error_stream:
36             return
37         self._on_data(data)
38
39     def _on_write(self, handle, error):
40         if error:
41             msg = pyuv.errno.strerror(error)
42             self._on_error(msg)
43
44     def _on_exit(self, handle, exit_status, term_signal):
45         self._on_error('EOF')
46
47     def _disconnected(self, *args):
48         raise OSError('Not connected to Nvim')
49
50     def _connect_tcp(self, address, port):
51         stream = pyuv.TCP(self._loop)
52         self._connect_address = '{}:{}'.format(address, port)
53         stream.connect((address, port), self._on_connect)
54
55     def _connect_socket(self, path):
56         stream = pyuv.Pipe(self._loop)
57         self._connect_address = path
58         stream.connect(path, self._on_connect)
59
60     def _connect_stdio(self):
61         self._read_stream = pyuv.Pipe(self._loop)
62         self._read_stream.open(sys.stdin.fileno())
63         self._write_stream = pyuv.Pipe(self._loop)
64         self._write_stream.open(sys.stdout.fileno())
65
66     def _connect_child(self, argv):
67         self._write_stream = pyuv.Pipe(self._loop)
68         self._read_stream = pyuv.Pipe(self._loop)
69         self._error_stream = pyuv.Pipe(self._loop)
70         stdin = pyuv.StdIO(self._write_stream,
71                            flags=pyuv.UV_CREATE_PIPE + pyuv.UV_READABLE_PIPE)
72         stdout = pyuv.StdIO(self._read_stream,
73                             flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE)
74         stderr = pyuv.StdIO(self._error_stream,
75                             flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE)
76         pyuv.Process.spawn(self._loop,
77                            args=argv,
78                            exit_callback=self._on_exit,
79                            flags=pyuv.UV_PROCESS_WINDOWS_HIDE,
80                            stdio=(stdin, stdout, stderr,))
81         self._error_stream.start_read(self._on_read)
82
83     def _start_reading(self):
84         if self._transport_type in ['tcp', 'socket']:
85             self._loop.run()
86             if self._connection_error:
87                 self.run = self.send = self._disconnected
88                 raise self._connection_error
89         self._read_stream.start_read(self._on_read)
90
91     def _send(self, data):
92         self._write_stream.write(data, self._on_write)
93
94     def _run(self):
95         self._loop.run(pyuv.UV_RUN_DEFAULT)
96
97     def _stop(self):
98         self._loop.stop()
99
100     def _close(self):
101         pass
102
103     def _threadsafe_call(self, fn):
104         self._callbacks.append(fn)
105         self._async.send()
106
107     def _on_async(self, handle):
108         while self._callbacks:
109             self._callbacks.popleft()()
110
111     def _setup_signals(self, signals):
112         self._signal_handles = []
113
114         def handler(h, signum):
115             self._on_signal(signum)
116
117         for signum in signals:
118             handle = pyuv.Signal(self._loop)
119             handle.start(handler, signum)
120             self._signal_handles.append(handle)
121
122     def _teardown_signals(self):
123         for handle in self._signal_handles:
124             handle.stop()