--- /dev/null
+# Python Tools for Visual Studio
+# Copyright(c) Microsoft Corporation
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the License); you may not use
+# this file except in compliance with the License. You may obtain a copy of the
+# License at http://www.apache.org/licenses/LICENSE-2.0
+#
+# THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
+# OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY
+# IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
+# MERCHANTABLITY OR NON-INFRINGEMENT.
+#
+# See the Apache Version 2.0 License for specific language governing
+# permissions and limitations under the License.
+
+__author__ = "Microsoft Corporation <ptvshelp@microsoft.com>"
+__version__ = "3.0.0.0"
+
+import os
+import sys
+import json
+import unittest
+import socket
+import traceback
+from types import CodeType, FunctionType
+import signal
+try:
+ import thread
+except:
+ import _thread as thread
+
+class _TestOutput(object):
+ """file like object which redirects output to the repl window."""
+ errors = 'strict'
+
+ def __init__(self, old_out, is_stdout):
+ self.is_stdout = is_stdout
+ self.old_out = old_out
+ if sys.version >= '3.' and hasattr(old_out, 'buffer'):
+ self.buffer = _TestOutputBuffer(old_out.buffer, is_stdout)
+
+ def flush(self):
+ if self.old_out:
+ self.old_out.flush()
+
+ def writelines(self, lines):
+ for line in lines:
+ self.write(line)
+
+ @property
+ def encoding(self):
+ return 'utf8'
+
+ def write(self, value):
+ _channel.send_event('stdout' if self.is_stdout else 'stderr', content=value)
+ if self.old_out:
+ self.old_out.write(value)
+ # flush immediately, else things go wonky and out of order
+ self.flush()
+
+ def isatty(self):
+ return True
+
+ def next(self):
+ pass
+
+ @property
+ def name(self):
+ if self.is_stdout:
+ return "<stdout>"
+ else:
+ return "<stderr>"
+
+ def __getattr__(self, name):
+ return getattr(self.old_out, name)
+
+class _TestOutputBuffer(object):
+ def __init__(self, old_buffer, is_stdout):
+ self.buffer = old_buffer
+ self.is_stdout = is_stdout
+
+ def write(self, data):
+ _channel.send_event('stdout' if self.is_stdout else 'stderr', content=data)
+ self.buffer.write(data)
+
+ def flush(self):
+ self.buffer.flush()
+
+ def truncate(self, pos = None):
+ return self.buffer.truncate(pos)
+
+ def tell(self):
+ return self.buffer.tell()
+
+ def seek(self, pos, whence = 0):
+ return self.buffer.seek(pos, whence)
+
+class _IpcChannel(object):
+ def __init__(self, socket, callback):
+ self.socket = socket
+ self.seq = 0
+ self.callback = callback
+ self.lock = thread.allocate_lock()
+ self._closed = False
+ # start the testing reader thread loop
+ self.test_thread_id = thread.start_new_thread(self.readSocket, ())
+
+ def close(self):
+ self._closed = True
+
+ def readSocket(self):
+ try:
+ data = self.socket.recv(1024)
+ self.callback()
+ except OSError:
+ if not self._closed:
+ raise
+
+ def receive(self):
+ pass
+
+ def send_event(self, name, **args):
+ with self.lock:
+ body = {'type': 'event', 'seq': self.seq, 'event':name, 'body':args}
+ self.seq += 1
+ content = json.dumps(body).encode('utf8')
+ headers = ('Content-Length: %d\n\n' % (len(content), )).encode('utf8')
+ self.socket.send(headers)
+ self.socket.send(content)
+
+_channel = None
+
+
+class VsTestResult(unittest.TextTestResult):
+ def startTest(self, test):
+ super(VsTestResult, self).startTest(test)
+ if _channel is not None:
+ _channel.send_event(
+ name='start',
+ test = test.id()
+ )
+
+ def addError(self, test, err):
+ super(VsTestResult, self).addError(test, err)
+ self.sendResult(test, 'error', err)
+
+ def addFailure(self, test, err):
+ super(VsTestResult, self).addFailure(test, err)
+ self.sendResult(test, 'failed', err)
+
+ def addSuccess(self, test):
+ super(VsTestResult, self).addSuccess(test)
+ self.sendResult(test, 'passed')
+
+ def addSkip(self, test, reason):
+ super(VsTestResult, self).addSkip(test, reason)
+ self.sendResult(test, 'skipped')
+
+ def addExpectedFailure(self, test, err):
+ super(VsTestResult, self).addExpectedFailure(test, err)
+ self.sendResult(test, 'failed', err)
+
+ def addUnexpectedSuccess(self, test):
+ super(VsTestResult, self).addUnexpectedSuccess(test)
+ self.sendResult(test, 'passed')
+
+ def sendResult(self, test, outcome, trace = None):
+ if _channel is not None:
+ tb = None
+ message = None
+ if trace is not None:
+ traceback.print_exc()
+ formatted = traceback.format_exception(*trace)
+ # Remove the 'Traceback (most recent call last)'
+ formatted = formatted[1:]
+ tb = ''.join(formatted)
+ message = str(trace[1])
+ _channel.send_event(
+ name='result',
+ outcome=outcome,
+ traceback = tb,
+ message = message,
+ test = test.id()
+ )
+
+def stopTests():
+ try:
+ os.kill(os.getpid(), signal.SIGUSR1)
+ except:
+ try:
+ os.kill(os.getpid(), signal.SIGTERM)
+ except:
+ pass
+
+class ExitCommand(Exception):
+ pass
+
+def signal_handler(signal, frame):
+ raise ExitCommand()
+
+def main():
+ import os
+ import sys
+ import unittest
+ from optparse import OptionParser
+ global _channel
+
+ parser = OptionParser(prog = 'visualstudio_py_testlauncher', usage = 'Usage: %prog [<option>] <test names>... ')
+ parser.add_option('--debug', action='store_true', help='Whether debugging the unit tests')
+ parser.add_option('-x', '--mixed-mode', action='store_true', help='wait for mixed-mode debugger to attach')
+ parser.add_option('-t', '--test', type='str', dest='tests', action='append', help='specifies a test to run')
+ parser.add_option('--testFile', type='str', help='Fully qualitified path to file name')
+ parser.add_option('-c', '--coverage', type='str', help='enable code coverage and specify filename')
+ parser.add_option('-r', '--result-port', type='int', help='connect to port on localhost and send test results')
+ parser.add_option('--us', type='str', help='Directory to start discovery')
+ parser.add_option('--up', type='str', help='Pattern to match test files (''test*.py'' default)')
+ parser.add_option('--ut', type='str', help='Top level directory of project (default to start directory)')
+ parser.add_option('--uvInt', '--verboseInt', type='int', help='Verbose output (0 none, 1 (no -v) simple, 2 (-v) full)')
+ parser.add_option('--uf', '--failfast', type='str', help='Stop on first failure')
+ parser.add_option('--uc', '--catch', type='str', help='Catch control-C and display results')
+ (opts, _) = parser.parse_args()
+
+ if opts.debug:
+ from ptvsd.visualstudio_py_debugger import DONT_DEBUG, DEBUG_ENTRYPOINTS, get_code
+
+ sys.path[0] = os.getcwd()
+ if opts.result_port:
+ try:
+ signal.signal(signal.SIGUSR1, signal_handler)
+ except:
+ try:
+ signal.signal(signal.SIGTERM, signal_handler)
+ except:
+ pass
+ _channel = _IpcChannel(socket.create_connection(('127.0.0.1', opts.result_port)), stopTests)
+ sys.stdout = _TestOutput(sys.stdout, is_stdout = True)
+ sys.stderr = _TestOutput(sys.stderr, is_stdout = False)
+
+ if opts.debug:
+ DONT_DEBUG.append(os.path.normcase(__file__))
+ DEBUG_ENTRYPOINTS.add(get_code(main))
+
+ pass
+ elif opts.mixed_mode:
+ # For mixed-mode attach, there's no ptvsd and hence no wait_for_attach(),
+ # so we have to use Win32 API in a loop to do the same thing.
+ from time import sleep
+ from ctypes import windll, c_char
+ while True:
+ if windll.kernel32.IsDebuggerPresent() != 0:
+ break
+ sleep(0.1)
+ try:
+ debugger_helper = windll['Microsoft.PythonTools.Debugger.Helper.x86.dll']
+ except WindowsError:
+ debugger_helper = windll['Microsoft.PythonTools.Debugger.Helper.x64.dll']
+ isTracing = c_char.in_dll(debugger_helper, "isTracing")
+ while True:
+ if isTracing.value != 0:
+ break
+ sleep(0.1)
+
+ cov = None
+ try:
+ if opts.coverage:
+ try:
+ import coverage
+ cov = coverage.coverage(opts.coverage)
+ cov.load()
+ cov.start()
+ except:
+ pass
+ if opts.tests is None and opts.testFile is None:
+ if opts.us is None:
+ opts.us = '.'
+ if opts.up is None:
+ opts.up = 'test*.py'
+ tests = unittest.defaultTestLoader.discover(opts.us, opts.up)
+ else:
+ # loadTestsFromNames doesn't work well (with duplicate file names or class names)
+ # Easier approach is find the test suite and use that for running
+ loader = unittest.TestLoader()
+ # opts.us will be passed in
+ suites = loader.discover(opts.us, pattern=os.path.basename(opts.testFile))
+ suite = None
+ tests = None
+ if opts.tests is None:
+ # Run everything in the test file
+ tests = suites
+ else:
+ # Run a specific test class or test method
+ for test_suite in suites._tests:
+ for cls in test_suite._tests:
+ try:
+ for m in cls._tests:
+ testId = m.id()
+ if testId.startswith(opts.tests[0]):
+ suite = cls
+ if testId == opts.tests[0]:
+ tests = unittest.TestSuite([m])
+ break
+ except Exception as err:
+ errorMessage = traceback.format_exception()
+ pass
+ if tests is None:
+ tests = suite
+ if tests is None and suite is None:
+ _channel.send_event(
+ name='error',
+ outcome='',
+ traceback = '',
+ message = 'Failed to identify the test',
+ test = ''
+ )
+ if opts.uvInt is None:
+ opts.uvInt = 0
+ if opts.uf is not None:
+ runner = unittest.TextTestRunner(verbosity=opts.uvInt, resultclass=VsTestResult, failfast=True)
+ else:
+ runner = unittest.TextTestRunner(verbosity=opts.uvInt, resultclass=VsTestResult)
+ result = runner.run(tests)
+ if _channel is not None:
+ _channel.close()
+ sys.exit(not result.wasSuccessful())
+ finally:
+ if cov is not None:
+ cov.stop()
+ cov.save()
+ cov.xml_report(outfile = opts.coverage + '.xml', omit=__file__)
+ if _channel is not None:
+ _channel.send_event(
+ name='done'
+ )
+ _channel.socket.close()
+ # prevent generation of the error 'Error in sys.exitfunc:'
+ try:
+ sys.stdout.close()
+ except:
+ pass
+ try:
+ sys.stderr.close()
+ except:
+ pass
+
+if __name__ == '__main__':
+ main()