1"""This plugin provides test results in the standard XUnit XML format.
2
3It's designed for the `Jenkins`_ (previously Hudson) continuous build
4system, but will probably work for anything else that understands an
5XUnit-formatted XML representation of test results.
6
7Add this shell command to your builder ::
8
9    nosetests --with-xunit
10
11And by default a file named nosetests.xml will be written to the
12working directory.
13
14In a Jenkins builder, tick the box named "Publish JUnit test result report"
15under the Post-build Actions and enter this value for Test report XMLs::
16
17    **/nosetests.xml
18
19If you need to change the name or location of the file, you can set the
20``--xunit-file`` option.
21
22Here is an abbreviated version of what an XML test report might look like::
23
24    <?xml version="1.0" encoding="UTF-8"?>
25    <testsuite name="nosetests" tests="1" errors="1" failures="0" skip="0">
26        <testcase classname="path_to_test_suite.TestSomething"
27                  name="test_it" time="0">
28            <error type="exceptions.TypeError" message="oops, wrong type">
29            Traceback (most recent call last):
30            ...
31            TypeError: oops, wrong type
32            </error>
33        </testcase>
34    </testsuite>
35
36.. _Jenkins: http://jenkins-ci.org/
37
38"""
39import codecs
40import doctest
41import os
42import sys
43import traceback
44import re
45import inspect
46from io import StringIO
47from time import time
48from xml.sax import saxutils
49
50from nose.plugins.base import Plugin
51from nose.exc import SkipTest
52from nose.pyversion import force_unicode, format_exception
53
54# Invalid XML characters, control characters 0-31 sans \t, \n and \r
55CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]")
56
57TEST_ID = re.compile(r'^(.*?)(\(.*\))$')
58
59def xml_safe(value):
60    """Replaces invalid XML characters with '?'."""
61    return CONTROL_CHARACTERS.sub('?', value)
62
63def escape_cdata(cdata):
64    """Escape a string for an XML CDATA section."""
65    return xml_safe(cdata).replace(']]>', ']]>]]&gt;<![CDATA[')
66
67def id_split(idval):
68    m = TEST_ID.match(idval)
69    if m:
70        name, fargs = m.groups()
71        head, tail = name.rsplit(".", 1)
72        return [head, tail+fargs]
73    else:
74        return idval.rsplit(".", 1)
75
76def nice_classname(obj):
77    """Returns a nice name for class object or class instance.
78
79        >>> nice_classname(Exception()) # doctest: +ELLIPSIS
80        '...Exception'
81        >>> nice_classname(Exception) # doctest: +ELLIPSIS
82        '...Exception'
83
84    """
85    if inspect.isclass(obj):
86        cls_name = obj.__name__
87    else:
88        cls_name = obj.__class__.__name__
89    mod = inspect.getmodule(obj)
90    if mod:
91        name = mod.__name__
92        # jython
93        if name.startswith('org.python.core.'):
94            name = name[len('org.python.core.'):]
95        return "%s.%s" % (name, cls_name)
96    else:
97        return cls_name
98
99def exc_message(exc_info):
100    """Return the exception's message."""
101    exc = exc_info[1]
102    if exc is None:
103        # str exception
104        result = exc_info[0]
105    else:
106        try:
107            result = str(exc)
108        except UnicodeEncodeError:
109            try:
110                result = str(exc)
111            except UnicodeError:
112                # Fallback to args as neither str nor
113                # unicode(Exception(u'\xe6')) work in Python < 2.6
114                result = exc.args[0]
115    result = force_unicode(result, 'UTF-8')
116    return xml_safe(result)
117
118class Tee(object):
119    def __init__(self, encoding, *args):
120        self._encoding = encoding
121        self._streams = args
122
123    def write(self, data):
124        data = force_unicode(data, self._encoding)
125        for s in self._streams:
126            s.write(data)
127
128    def writelines(self, lines):
129        for line in lines:
130            self.write(line)
131
132    def flush(self):
133        for s in self._streams:
134            s.flush()
135
136    def isatty(self):
137        return False
138
139
140class Xunit(Plugin):
141    """This plugin provides test results in the standard XUnit XML format."""
142    name = 'xunit'
143    score = 1500
144    encoding = 'UTF-8'
145    error_report_file = None
146
147    def __init__(self):
148        super(Xunit, self).__init__()
149        self._capture_stack = []
150        self._currentStdout = None
151        self._currentStderr = None
152
153    def _timeTaken(self):
154        if hasattr(self, '_timer'):
155            taken = time() - self._timer
156        else:
157            # test died before it ran (probably error in setup())
158            # or success/failure added before test started probably
159            # due to custom TestResult munging
160            taken = 0.0
161        return taken
162
163    def _quoteattr(self, attr):
164        """Escape an XML attribute. Value can be unicode."""
165        attr = xml_safe(attr)
166        return saxutils.quoteattr(attr)
167
168    def options(self, parser, env):
169        """Sets additional command line options."""
170        Plugin.options(self, parser, env)
171        parser.add_option(
172            '--xunit-file', action='store',
173            dest='xunit_file', metavar="FILE",
174            default=env.get('NOSE_XUNIT_FILE', 'nosetests.xml'),
175            help=("Path to xml file to store the xunit report in. "
176                  "Default is nosetests.xml in the working directory "
177                  "[NOSE_XUNIT_FILE]"))
178
179    def configure(self, options, config):
180        """Configures the xunit plugin."""
181        Plugin.configure(self, options, config)
182        self.config = config
183        if self.enabled:
184            self.stats = {'errors': 0,
185                          'failures': 0,
186                          'passes': 0,
187                          'skipped': 0
188                          }
189            self.errorlist = []
190            self.error_report_file_name = os.path.realpath(options.xunit_file)
191
192    def report(self, stream):
193        """Writes an Xunit-formatted XML file
194
195        The file includes a report of test errors and failures.
196
197        """
198        self.error_report_file = codecs.open(self.error_report_file_name, 'w',
199                                             self.encoding, 'replace')
200        self.stats['encoding'] = self.encoding
201        self.stats['total'] = (self.stats['errors'] + self.stats['failures']
202                               + self.stats['passes'] + self.stats['skipped'])
203        self.error_report_file.write(
204            '<?xml version="1.0" encoding="%(encoding)s"?>'
205            '<testsuite name="nosetests" tests="%(total)d" '
206            'errors="%(errors)d" failures="%(failures)d" '
207            'skip="%(skipped)d">' % self.stats)
208        self.error_report_file.write(''.join([force_unicode(e, self.encoding)
209                                               for e in self.errorlist]))
210        self.error_report_file.write('</testsuite>')
211        self.error_report_file.close()
212        if self.config.verbosity > 1:
213            stream.writeln("-" * 70)
214            stream.writeln("XML: %s" % self.error_report_file.name)
215
216    def _startCapture(self):
217        self._capture_stack.append((sys.stdout, sys.stderr))
218        self._currentStdout = StringIO()
219        self._currentStderr = StringIO()
220        sys.stdout = Tee(self.encoding, self._currentStdout, sys.stdout)
221        sys.stderr = Tee(self.encoding, self._currentStderr, sys.stderr)
222
223    def startContext(self, context):
224        self._startCapture()
225
226    def stopContext(self, context):
227        self._endCapture()
228
229    def beforeTest(self, test):
230        """Initializes a timer before starting a test."""
231        self._timer = time()
232        self._startCapture()
233
234    def _endCapture(self):
235        if self._capture_stack:
236            sys.stdout, sys.stderr = self._capture_stack.pop()
237
238    def afterTest(self, test):
239        self._endCapture()
240        self._currentStdout = None
241        self._currentStderr = None
242
243    def finalize(self, test):
244        while self._capture_stack:
245            self._endCapture()
246
247    def _getCapturedStdout(self):
248        if self._currentStdout:
249            value = self._currentStdout.getvalue()
250            if value:
251                return '<system-out><![CDATA[%s]]></system-out>' % escape_cdata(
252                        value)
253        return ''
254
255    def _getCapturedStderr(self):
256        if self._currentStderr:
257            value = self._currentStderr.getvalue()
258            if value:
259                return '<system-err><![CDATA[%s]]></system-err>' % escape_cdata(
260                        value)
261        return ''
262
263    def addError(self, test, err, capt=None):
264        """Add error output to Xunit report.
265        """
266        taken = self._timeTaken()
267
268        if issubclass(err[0], SkipTest):
269            type = 'skipped'
270            self.stats['skipped'] += 1
271        else:
272            type = 'error'
273            self.stats['errors'] += 1
274
275        tb = format_exception(err, self.encoding)
276        id = test.id()
277
278        self.errorlist.append(
279            '<testcase classname=%(cls)s name=%(name)s time="%(taken).3f">'
280            '<%(type)s type=%(errtype)s message=%(message)s><![CDATA[%(tb)s]]>'
281            '</%(type)s>%(systemout)s%(systemerr)s</testcase>' %
282            {'cls': self._quoteattr(id_split(id)[0]),
283             'name': self._quoteattr(id_split(id)[-1]),
284             'taken': taken,
285             'type': type,
286             'errtype': self._quoteattr(nice_classname(err[0])),
287             'message': self._quoteattr(exc_message(err)),
288             'tb': escape_cdata(tb),
289             'systemout': self._getCapturedStdout(),
290             'systemerr': self._getCapturedStderr(),
291             })
292
293    def addFailure(self, test, err, capt=None, tb_info=None):
294        """Add failure output to Xunit report.
295        """
296        taken = self._timeTaken()
297        tb = format_exception(err, self.encoding)
298        self.stats['failures'] += 1
299        id = test.id()
300
301        self.errorlist.append(
302            '<testcase classname=%(cls)s name=%(name)s time="%(taken).3f">'
303            '<failure type=%(errtype)s message=%(message)s><![CDATA[%(tb)s]]>'
304            '</failure>%(systemout)s%(systemerr)s</testcase>' %
305            {'cls': self._quoteattr(id_split(id)[0]),
306             'name': self._quoteattr(id_split(id)[-1]),
307             'taken': taken,
308             'errtype': self._quoteattr(nice_classname(err[0])),
309             'message': self._quoteattr(exc_message(err)),
310             'tb': escape_cdata(tb),
311             'systemout': self._getCapturedStdout(),
312             'systemerr': self._getCapturedStderr(),
313             })
314
315    def addSuccess(self, test, capt=None):
316        """Add success output to Xunit report.
317        """
318        taken = self._timeTaken()
319        self.stats['passes'] += 1
320        id = test.id()
321        self.errorlist.append(
322            '<testcase classname=%(cls)s name=%(name)s '
323            'time="%(taken).3f">%(systemout)s%(systemerr)s</testcase>' %
324            {'cls': self._quoteattr(id_split(id)[0]),
325             'name': self._quoteattr(id_split(id)[-1]),
326             'taken': taken,
327             'systemout': self._getCapturedStdout(),
328             'systemerr': self._getCapturedStderr(),
329             })
330