minimal adjustments
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-python / pythonFiles / visualstudio_py_testlauncher.py
1 # Python Tools for Visual Studio
2 # Copyright(c) Microsoft Corporation
3 # All rights reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the License); you may not use
6 # this file except in compliance with the License. You may obtain a copy of the
7 # License at http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # THIS CODE IS PROVIDED ON AN  *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
10 # OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY
11 # IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
12 # MERCHANTABLITY OR NON-INFRINGEMENT.
13 #
14 # See the Apache Version 2.0 License for specific language governing
15 # permissions and limitations under the License.
16
17 __author__ = "Microsoft Corporation <ptvshelp@microsoft.com>"
18 __version__ = "3.0.0.0"
19
20 import os
21 import sys
22 import json
23 import unittest
24 import socket
25 import traceback
26 from types import CodeType, FunctionType
27 import signal
28 try:
29     import thread
30 except:
31     import _thread as thread
32
33 class _TestOutput(object):
34     """file like object which redirects output to the repl window."""
35     errors = 'strict'
36
37     def __init__(self, old_out, is_stdout):
38         self.is_stdout = is_stdout
39         self.old_out = old_out
40         if sys.version >= '3.' and hasattr(old_out, 'buffer'):
41             self.buffer = _TestOutputBuffer(old_out.buffer, is_stdout)
42
43     def flush(self):
44         if self.old_out:
45             self.old_out.flush()
46
47     def writelines(self, lines):
48         for line in lines:
49             self.write(line)
50
51     @property
52     def encoding(self):
53         return 'utf8'
54
55     def write(self, value):
56         _channel.send_event('stdout' if self.is_stdout else 'stderr', content=value)
57         if self.old_out:
58             self.old_out.write(value)
59             # flush immediately, else things go wonky and out of order
60             self.flush()
61
62     def isatty(self):
63         return True
64
65     def next(self):
66         pass
67
68     @property
69     def name(self):
70         if self.is_stdout:
71             return "<stdout>"
72         else:
73             return "<stderr>"
74
75     def __getattr__(self, name):
76         return getattr(self.old_out, name)
77
78 class _TestOutputBuffer(object):
79     def __init__(self, old_buffer, is_stdout):
80         self.buffer = old_buffer
81         self.is_stdout = is_stdout
82
83     def write(self, data):
84         _channel.send_event('stdout' if self.is_stdout else 'stderr', content=data)
85         self.buffer.write(data)
86
87     def flush(self):
88         self.buffer.flush()
89
90     def truncate(self, pos = None):
91         return self.buffer.truncate(pos)
92
93     def tell(self):
94         return self.buffer.tell()
95
96     def seek(self, pos, whence = 0):
97         return self.buffer.seek(pos, whence)
98
99 class _IpcChannel(object):
100     def __init__(self, socket, callback):
101         self.socket = socket
102         self.seq = 0
103         self.callback = callback
104         self.lock = thread.allocate_lock()
105         self._closed = False
106         # start the testing reader thread loop
107         self.test_thread_id = thread.start_new_thread(self.readSocket, ())
108
109     def close(self):
110         self._closed = True
111
112     def readSocket(self):
113         try:
114             data = self.socket.recv(1024)
115             self.callback()
116         except OSError:
117             if not self._closed:
118                 raise
119
120     def receive(self):
121         pass
122
123     def send_event(self, name, **args):
124         with self.lock:
125             body = {'type': 'event', 'seq': self.seq, 'event':name, 'body':args}
126             self.seq += 1
127             content = json.dumps(body).encode('utf8')
128             headers = ('Content-Length: %d\n\n' % (len(content), )).encode('utf8')
129             self.socket.send(headers)
130             self.socket.send(content)
131
132 _channel = None
133
134
135 class VsTestResult(unittest.TextTestResult):
136     def startTest(self, test):
137         super(VsTestResult, self).startTest(test)
138         if _channel is not None:
139             _channel.send_event(
140                 name='start',
141                 test = test.id()
142             )
143
144     def addError(self, test, err):
145         super(VsTestResult, self).addError(test, err)
146         self.sendResult(test, 'error', err)
147
148     def addFailure(self, test, err):
149         super(VsTestResult, self).addFailure(test, err)
150         self.sendResult(test, 'failed', err)
151
152     def addSuccess(self, test):
153         super(VsTestResult, self).addSuccess(test)
154         self.sendResult(test, 'passed')
155
156     def addSkip(self, test, reason):
157         super(VsTestResult, self).addSkip(test, reason)
158         self.sendResult(test, 'skipped')
159
160     def addExpectedFailure(self, test, err):
161         super(VsTestResult, self).addExpectedFailure(test, err)
162         self.sendResult(test, 'failed', err)
163
164     def addUnexpectedSuccess(self, test):
165         super(VsTestResult, self).addUnexpectedSuccess(test)
166         self.sendResult(test, 'passed')
167
168     def sendResult(self, test, outcome, trace = None):
169         if _channel is not None:
170             tb = None
171             message = None
172             if trace is not None:
173                 traceback.print_exc()
174                 formatted = traceback.format_exception(*trace)
175                 # Remove the 'Traceback (most recent call last)'
176                 formatted = formatted[1:]
177                 tb = ''.join(formatted)
178                 message = str(trace[1])
179             _channel.send_event(
180                 name='result',
181                 outcome=outcome,
182                 traceback = tb,
183                 message = message,
184                 test = test.id()
185             )
186
187 def stopTests():
188     try:
189         os.kill(os.getpid(), signal.SIGUSR1)
190     except:
191         try:
192             os.kill(os.getpid(), signal.SIGTERM)
193         except:
194             pass
195
196 class ExitCommand(Exception):
197     pass
198
199 def signal_handler(signal, frame):
200     raise ExitCommand()
201
202 def main():
203     import os
204     import sys
205     import unittest
206     from optparse import OptionParser
207     global _channel
208
209     parser = OptionParser(prog = 'visualstudio_py_testlauncher', usage = 'Usage: %prog [<option>] <test names>... ')
210     parser.add_option('--debug', action='store_true', help='Whether debugging the unit tests')
211     parser.add_option('-x', '--mixed-mode', action='store_true', help='wait for mixed-mode debugger to attach')
212     parser.add_option('-t', '--test', type='str', dest='tests', action='append', help='specifies a test to run')
213     parser.add_option('--testFile', type='str', help='Fully qualitified path to file name')
214     parser.add_option('-c', '--coverage', type='str', help='enable code coverage and specify filename')
215     parser.add_option('-r', '--result-port', type='int', help='connect to port on localhost and send test results')
216     parser.add_option('--us', type='str', help='Directory to start discovery')
217     parser.add_option('--up', type='str', help='Pattern to match test files (''test*.py'' default)')
218     parser.add_option('--ut', type='str', help='Top level directory of project (default to start directory)')
219     parser.add_option('--uvInt', '--verboseInt', type='int', help='Verbose output (0 none, 1 (no -v) simple, 2 (-v) full)')
220     parser.add_option('--uf', '--failfast', type='str', help='Stop on first failure')
221     parser.add_option('--uc', '--catch', type='str', help='Catch control-C and display results')
222     (opts, _) = parser.parse_args()
223
224     if opts.debug:
225         from ptvsd.visualstudio_py_debugger import DONT_DEBUG, DEBUG_ENTRYPOINTS, get_code
226
227     sys.path[0] = os.getcwd()
228     if opts.result_port:
229         try:
230             signal.signal(signal.SIGUSR1, signal_handler)
231         except:
232             try:
233                 signal.signal(signal.SIGTERM, signal_handler)
234             except:
235                 pass
236         _channel = _IpcChannel(socket.create_connection(('127.0.0.1', opts.result_port)), stopTests)
237         sys.stdout = _TestOutput(sys.stdout, is_stdout = True)
238         sys.stderr = _TestOutput(sys.stderr, is_stdout = False)
239
240     if opts.debug:
241         DONT_DEBUG.append(os.path.normcase(__file__))
242         DEBUG_ENTRYPOINTS.add(get_code(main))
243
244         pass
245     elif opts.mixed_mode:
246         # For mixed-mode attach, there's no ptvsd and hence no wait_for_attach(),
247         # so we have to use Win32 API in a loop to do the same thing.
248         from time import sleep
249         from ctypes import windll, c_char
250         while True:
251             if windll.kernel32.IsDebuggerPresent() != 0:
252                 break
253             sleep(0.1)
254         try:
255             debugger_helper = windll['Microsoft.PythonTools.Debugger.Helper.x86.dll']
256         except WindowsError:
257             debugger_helper = windll['Microsoft.PythonTools.Debugger.Helper.x64.dll']
258         isTracing = c_char.in_dll(debugger_helper, "isTracing")
259         while True:
260             if isTracing.value != 0:
261                 break
262             sleep(0.1)
263
264     cov = None
265     try:
266         if opts.coverage:
267             try:
268                 import coverage
269                 cov = coverage.coverage(opts.coverage)
270                 cov.load()
271                 cov.start()
272             except:
273                 pass
274         if opts.tests is None and opts.testFile is None:
275             if opts.us is None:
276                 opts.us = '.'
277             if opts.up is None:
278                 opts.up = 'test*.py'
279             tests = unittest.defaultTestLoader.discover(opts.us, opts.up)
280         else:
281             # loadTestsFromNames doesn't work well (with duplicate file names or class names)
282             # Easier approach is find the test suite and use that for running
283             loader = unittest.TestLoader()
284             # opts.us will be passed in
285             suites = loader.discover(opts.us, pattern=os.path.basename(opts.testFile))
286             suite = None
287             tests = None
288             if opts.tests is None:
289                 # Run everything in the test file
290                 tests = suites
291             else:
292                 # Run a specific test class or test method
293                 for test_suite in suites._tests:
294                     for cls in test_suite._tests:
295                         try:
296                             for m in cls._tests:
297                                 testId = m.id()
298                                 if testId.startswith(opts.tests[0]):
299                                     suite = cls
300                                 if testId == opts.tests[0]:
301                                     tests = unittest.TestSuite([m])
302                                     break
303                         except Exception as err:
304                             errorMessage = traceback.format_exception()
305                             pass
306                 if tests is None:
307                     tests = suite
308             if tests is None and suite is None:
309                 _channel.send_event(
310                     name='error',
311                     outcome='',
312                     traceback = '',
313                     message = 'Failed to identify the test',
314                     test = ''
315                 )
316         if opts.uvInt is None:
317             opts.uvInt = 0
318         if opts.uf is not None:
319             runner = unittest.TextTestRunner(verbosity=opts.uvInt, resultclass=VsTestResult, failfast=True)
320         else:
321             runner = unittest.TextTestRunner(verbosity=opts.uvInt, resultclass=VsTestResult)
322         result = runner.run(tests)
323         if _channel is not None:
324             _channel.close()
325         sys.exit(not result.wasSuccessful())
326     finally:
327         if cov is not None:
328             cov.stop()
329             cov.save()
330             cov.xml_report(outfile = opts.coverage + '.xml', omit=__file__)
331         if _channel is not None:
332             _channel.send_event(
333                 name='done'
334             )
335             _channel.socket.close()
336         # prevent generation of the error 'Error in sys.exitfunc:'
337         try:
338             sys.stdout.close()
339         except:
340             pass
341         try:
342             sys.stderr.close()
343         except:
344             pass
345
346 if __name__ == '__main__':
347     main()