1 # Python Tools for Visual Studio
2 # Copyright(c) Microsoft Corporation
5 # Licensed under the Apache License, Version 2.0 (the License); you may not use
6 # this file except in compliance with the License. You may obtain a copy of the
7 # License at http://www.apache.org/licenses/LICENSE-2.0
9 # THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
10 # OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY
11 # IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
12 # MERCHANTABLITY OR NON-INFRINGEMENT.
14 # See the Apache Version 2.0 License for specific language governing
15 # permissions and limitations under the License.
17 __author__ = "Microsoft Corporation <ptvshelp@microsoft.com>"
18 __version__ = "3.0.0.0"
26 from types import CodeType, FunctionType
31 import _thread as thread
33 class _TestOutput(object):
34 """file like object which redirects output to the repl window."""
37 def __init__(self, old_out, is_stdout):
38 self.is_stdout = is_stdout
39 self.old_out = old_out
40 if sys.version >= '3.' and hasattr(old_out, 'buffer'):
41 self.buffer = _TestOutputBuffer(old_out.buffer, is_stdout)
47 def writelines(self, lines):
55 def write(self, value):
56 _channel.send_event('stdout' if self.is_stdout else 'stderr', content=value)
58 self.old_out.write(value)
59 # flush immediately, else things go wonky and out of order
75 def __getattr__(self, name):
76 return getattr(self.old_out, name)
78 class _TestOutputBuffer(object):
79 def __init__(self, old_buffer, is_stdout):
80 self.buffer = old_buffer
81 self.is_stdout = is_stdout
83 def write(self, data):
84 _channel.send_event('stdout' if self.is_stdout else 'stderr', content=data)
85 self.buffer.write(data)
90 def truncate(self, pos = None):
91 return self.buffer.truncate(pos)
94 return self.buffer.tell()
96 def seek(self, pos, whence = 0):
97 return self.buffer.seek(pos, whence)
99 class _IpcChannel(object):
100 def __init__(self, socket, callback):
103 self.callback = callback
104 self.lock = thread.allocate_lock()
106 # start the testing reader thread loop
107 self.test_thread_id = thread.start_new_thread(self.readSocket, ())
112 def readSocket(self):
114 data = self.socket.recv(1024)
123 def send_event(self, name, **args):
125 body = {'type': 'event', 'seq': self.seq, 'event':name, 'body':args}
127 content = json.dumps(body).encode('utf8')
128 headers = ('Content-Length: %d\n\n' % (len(content), )).encode('utf8')
129 self.socket.send(headers)
130 self.socket.send(content)
135 class VsTestResult(unittest.TextTestResult):
136 def startTest(self, test):
137 super(VsTestResult, self).startTest(test)
138 if _channel is not None:
144 def addError(self, test, err):
145 super(VsTestResult, self).addError(test, err)
146 self.sendResult(test, 'error', err)
148 def addFailure(self, test, err):
149 super(VsTestResult, self).addFailure(test, err)
150 self.sendResult(test, 'failed', err)
152 def addSuccess(self, test):
153 super(VsTestResult, self).addSuccess(test)
154 self.sendResult(test, 'passed')
156 def addSkip(self, test, reason):
157 super(VsTestResult, self).addSkip(test, reason)
158 self.sendResult(test, 'skipped')
160 def addExpectedFailure(self, test, err):
161 super(VsTestResult, self).addExpectedFailure(test, err)
162 self.sendResult(test, 'failed', err)
164 def addUnexpectedSuccess(self, test):
165 super(VsTestResult, self).addUnexpectedSuccess(test)
166 self.sendResult(test, 'passed')
168 def sendResult(self, test, outcome, trace = None):
169 if _channel is not None:
172 if trace is not None:
173 traceback.print_exc()
174 formatted = traceback.format_exception(*trace)
175 # Remove the 'Traceback (most recent call last)'
176 formatted = formatted[1:]
177 tb = ''.join(formatted)
178 message = str(trace[1])
189 os.kill(os.getpid(), signal.SIGUSR1)
192 os.kill(os.getpid(), signal.SIGTERM)
196 class ExitCommand(Exception):
199 def signal_handler(signal, frame):
206 from optparse import OptionParser
209 parser = OptionParser(prog = 'visualstudio_py_testlauncher', usage = 'Usage: %prog [<option>] <test names>... ')
210 parser.add_option('--debug', action='store_true', help='Whether debugging the unit tests')
211 parser.add_option('-x', '--mixed-mode', action='store_true', help='wait for mixed-mode debugger to attach')
212 parser.add_option('-t', '--test', type='str', dest='tests', action='append', help='specifies a test to run')
213 parser.add_option('--testFile', type='str', help='Fully qualitified path to file name')
214 parser.add_option('-c', '--coverage', type='str', help='enable code coverage and specify filename')
215 parser.add_option('-r', '--result-port', type='int', help='connect to port on localhost and send test results')
216 parser.add_option('--us', type='str', help='Directory to start discovery')
217 parser.add_option('--up', type='str', help='Pattern to match test files (''test*.py'' default)')
218 parser.add_option('--ut', type='str', help='Top level directory of project (default to start directory)')
219 parser.add_option('--uvInt', '--verboseInt', type='int', help='Verbose output (0 none, 1 (no -v) simple, 2 (-v) full)')
220 parser.add_option('--uf', '--failfast', type='str', help='Stop on first failure')
221 parser.add_option('--uc', '--catch', type='str', help='Catch control-C and display results')
222 (opts, _) = parser.parse_args()
225 from ptvsd.visualstudio_py_debugger import DONT_DEBUG, DEBUG_ENTRYPOINTS, get_code
227 sys.path[0] = os.getcwd()
230 signal.signal(signal.SIGUSR1, signal_handler)
233 signal.signal(signal.SIGTERM, signal_handler)
236 _channel = _IpcChannel(socket.create_connection(('127.0.0.1', opts.result_port)), stopTests)
237 sys.stdout = _TestOutput(sys.stdout, is_stdout = True)
238 sys.stderr = _TestOutput(sys.stderr, is_stdout = False)
241 DONT_DEBUG.append(os.path.normcase(__file__))
242 DEBUG_ENTRYPOINTS.add(get_code(main))
245 elif opts.mixed_mode:
246 # For mixed-mode attach, there's no ptvsd and hence no wait_for_attach(),
247 # so we have to use Win32 API in a loop to do the same thing.
248 from time import sleep
249 from ctypes import windll, c_char
251 if windll.kernel32.IsDebuggerPresent() != 0:
255 debugger_helper = windll['Microsoft.PythonTools.Debugger.Helper.x86.dll']
257 debugger_helper = windll['Microsoft.PythonTools.Debugger.Helper.x64.dll']
258 isTracing = c_char.in_dll(debugger_helper, "isTracing")
260 if isTracing.value != 0:
269 cov = coverage.coverage(opts.coverage)
274 if opts.tests is None and opts.testFile is None:
279 tests = unittest.defaultTestLoader.discover(opts.us, opts.up)
281 # loadTestsFromNames doesn't work well (with duplicate file names or class names)
282 # Easier approach is find the test suite and use that for running
283 loader = unittest.TestLoader()
284 # opts.us will be passed in
285 suites = loader.discover(opts.us, pattern=os.path.basename(opts.testFile))
288 if opts.tests is None:
289 # Run everything in the test file
292 # Run a specific test class or test method
293 for test_suite in suites._tests:
294 for cls in test_suite._tests:
298 if testId.startswith(opts.tests[0]):
300 if testId == opts.tests[0]:
301 tests = unittest.TestSuite([m])
303 except Exception as err:
304 errorMessage = traceback.format_exception()
308 if tests is None and suite is None:
313 message = 'Failed to identify the test',
316 if opts.uvInt is None:
318 if opts.uf is not None:
319 runner = unittest.TextTestRunner(verbosity=opts.uvInt, resultclass=VsTestResult, failfast=True)
321 runner = unittest.TextTestRunner(verbosity=opts.uvInt, resultclass=VsTestResult)
322 result = runner.run(tests)
323 if _channel is not None:
325 sys.exit(not result.wasSuccessful())
330 cov.xml_report(outfile = opts.coverage + '.xml', omit=__file__)
331 if _channel is not None:
335 _channel.socket.close()
336 # prevent generation of the error 'Error in sys.exitfunc:'
346 if __name__ == '__main__':