1# Copyright (c) 2009, Tim Cuthbertson # All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions
5# are met:
6#
7#     * Redistributions of source code must retain the above copyright
8#       notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10#       copyright notice, this list of conditions and the following
11#       disclaimer in the documentation and/or other materials provided
12#       with the distribution.
13#     * Neither the name of the organisation nor the names of its
14#       contributors may be used to endorse or promote products derived
15#       from this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
20# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
21# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
23# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
24# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
27# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
29
30from __future__ import print_function
31import os
32import sys
33import linecache
34import re
35import time
36
37import nose
38
39import termstyle
40
41failure = 'FAILED'
42error = 'ERROR'
43success = 'passed'
44skip = 'skipped'
45line_length = 77
46
47PY3 = sys.version_info[0] >= 3
48if PY3:
49	to_unicode = str
50else:
51	def to_unicode(s):
52		try:
53			return unicode(s)
54		except UnicodeDecodeError:
55			return unicode(repr(str(s)))
56
57BLACKLISTED_WRITERS = [
58	'nose[\\/]result\\.pyc?$',
59	'unittest[\\/]runner\\.pyc?$'
60]
61REDNOSE_DEBUG = False
62
63
64class RedNose(nose.plugins.Plugin):
65	env_opt = 'NOSE_REDNOSE'
66	env_opt_color = 'NOSE_REDNOSE_COLOR'
67	score = 199  # just under the `coverage` module
68
69	def __init__(self, *args):
70		super(RedNose, self).__init__(*args)
71		self.reports = []
72		self.error = self.success = self.failure = self.skip = 0
73		self.total = 0
74		self.stream = None
75		self.verbose = False
76		self.enabled = False
77		self.tree = False
78
79	def options(self, parser, env=os.environ):
80		global REDNOSE_DEBUG
81		rednose_on = bool(env.get(self.env_opt, False))
82		rednose_color = env.get(self.env_opt_color, 'auto')
83		REDNOSE_DEBUG = bool(env.get('REDNOSE_DEBUG', False))
84
85		parser.add_option(
86			"--rednose",
87			action="store_true",
88			default=rednose_on,
89			dest="rednose",
90			help="enable colour output (alternatively, set $%s=1)" % (self.env_opt,)
91		)
92		parser.add_option(
93			"--no-color",
94			action="store_false",
95			dest="rednose",
96			help="disable colour output"
97		)
98		parser.add_option(
99			"--force-color",
100			action="store_const",
101			dest='rednose_color',
102			default=rednose_color,
103			const='force',
104			help="force colour output when not using a TTY (alternatively, set $%s=force)" % (self.env_opt_color,)
105		)
106		parser.add_option(
107			"--immediate",
108			action="store_true",
109			default=False,
110			help="print errors and failures as they happen, as well as at the end"
111		)
112
113	def configure(self, options, conf):
114		if options.rednose:
115			self.enabled = True
116			termstyle_init = {
117				'force': termstyle.enable,
118				'off': termstyle.disable
119			}.get(options.rednose_color, termstyle.auto)
120			termstyle_init()
121
122			self.immediate = options.immediate
123			self.verbose = options.verbosity >= 2
124
125	def begin(self):
126		self.start_time = time.time()
127		self._in_test = False
128
129	def _format_test_name(self, test):
130		return test.shortDescription() or to_unicode(test)
131
132	def prepareTestResult(self, result):
133		result.stream = FilteringStream(self.stream, BLACKLISTED_WRITERS)
134
135	def beforeTest(self, test):
136		self._in_test = True
137		if self.verbose:
138			self._out(self._format_test_name(test) + ' ... ')
139
140	def afterTest(self, test):
141		if self._in_test:
142			self.addSkip()
143
144	def _print_test(self, type_, color):
145		self.total += 1
146		if self.verbose:
147			self._outln(color(type_))
148		else:
149			if type_ == failure:
150				short_ = 'F'
151			elif type_ == error:
152				short_ = 'X'
153			elif type_ == skip:
154				short_ = '-'
155			else:
156				short_ = '.'
157			self._out(color(short_))
158			if self.total % line_length == 0:
159				self._outln()
160		self._in_test = False
161
162	def _add_report(self, report):
163		failure_type, test, err = report
164		self.reports.append(report)
165		if self.immediate:
166			self._outln()
167			self._report_test(len(self.reports), *report)
168
169	def addFailure(self, test, err):
170		self.failure += 1
171		self._add_report((failure, test, err))
172		self._print_test(failure, termstyle.red)
173
174	def addError(self, test, err):
175		if err[0].__name__ == 'SkipTest':
176			self.addSkip(test, err)
177			return
178		self.error += 1
179		self._add_report((error, test, err))
180		self._print_test(error, termstyle.yellow)
181
182	def addSuccess(self, test):
183		self.success += 1
184		self._print_test(success, termstyle.green)
185
186	def addSkip(self, test=None, err=None):
187		self.skip += 1
188		self._print_test(skip, termstyle.blue)
189
190	def setOutputStream(self, stream):
191		self.stream = stream
192
193	def report(self, stream):
194		"""report on all registered failures and errors"""
195		self._outln()
196		if self.immediate:
197			for x in range(0, 5):
198				self._outln()
199		report_num = 0
200		if len(self.reports) > 0:
201			for report_num, report in enumerate(self.reports):
202				self._report_test(report_num + 1, *report)
203			self._outln()
204
205		self._summarize()
206
207	def _summarize(self):
208		"""summarize all tests - the number of failures, errors and successes"""
209		self._line(termstyle.black)
210		self._out("%s test%s run in %0.1f seconds" % (
211			self.total,
212			self._plural(self.total),
213			time.time() - self.start_time))
214		if self.total > self.success:
215			self._outln(". ")
216			additionals = []
217			if self.failure > 0:
218				additionals.append(termstyle.red("%s FAILED" % (
219					self.failure,)))
220			if self.error > 0:
221				additionals.append(termstyle.yellow("%s error%s" % (
222					self.error,
223					self._plural(self.error) )))
224			if self.skip > 0:
225				additionals.append(termstyle.blue("%s skipped" % (
226					self.skip)))
227			self._out(', '.join(additionals))
228
229		self._out(termstyle.green(" (%s test%s passed)" % (
230			self.success,
231			self._plural(self.success) )))
232		self._outln()
233
234	def _report_test(self, report_num, type_, test, err):
235		"""report the results of a single (failing or errored) test"""
236		self._line(termstyle.black)
237		self._out("%s) " % (report_num))
238		if type_ == failure:
239			color = termstyle.red
240			self._outln(color('FAIL: %s' % (self._format_test_name(test),)))
241		else:
242			color = termstyle.yellow
243			self._outln(color('ERROR: %s' % (self._format_test_name(test),)))
244
245		exc_type, exc_instance, exc_trace = err
246
247		self._outln()
248		self._outln(self._fmt_traceback(exc_trace))
249		self._out(color('   ', termstyle.bold(color(exc_type.__name__)), ": "))
250		self._outln(self._fmt_message(exc_instance, color))
251		self._outln()
252
253	def _relative_path(self, path):
254		"""
255		If path is a child of the current working directory, the relative
256		path is returned surrounded by bold xterm escape sequences.
257		If path is not a child of the working directory, path is returned
258		"""
259		try:
260			here = os.path.abspath(os.path.realpath(os.getcwd()))
261			fullpath = os.path.abspath(os.path.realpath(path))
262		except OSError:
263			return path
264		if fullpath.startswith(here):
265			return termstyle.bold(fullpath[len(here)+1:])
266		return path
267
268	def _file_line(self, tb):
269		"""formats the file / lineno / function line of a traceback element"""
270		prefix = "file://"
271		prefix = ""
272
273		f = tb.tb_frame
274		if '__unittest' in f.f_globals:
275			# this is the magical flag that prevents unittest internal
276			# code from junking up the stacktrace
277			return None
278
279		filename = f.f_code.co_filename
280		lineno = tb.tb_lineno
281		linecache.checkcache(filename)
282		function_name = f.f_code.co_name
283
284		line_contents = linecache.getline(filename, lineno, f.f_globals).strip()
285
286		return "    %s line %s in %s\n      %s" % (
287			termstyle.blue(prefix, self._relative_path(filename)),
288			lineno,
289			termstyle.cyan(function_name),
290			line_contents)
291
292	def _fmt_traceback(self, trace):
293		"""format a traceback"""
294		ret = []
295		ret.append(termstyle.default("   Traceback (most recent call last):"))
296		current_trace = trace
297		while current_trace is not None:
298			line = self._file_line(current_trace)
299			if line is not None:
300				ret.append(line)
301			current_trace = current_trace.tb_next
302		return '\n'.join(ret)
303
304	def _fmt_message(self, exception, color):
305		orig_message_lines = to_unicode(exception).splitlines()
306
307		if len(orig_message_lines) == 0:
308			return ''
309		message_lines = [color(orig_message_lines[0])]
310		for line in orig_message_lines[1:]:
311			match = re.match('^---.* begin captured stdout.*----$', line)
312			if match:
313				color = None
314				message_lines.append('')
315			line = '   ' + line
316			message_lines.append(color(line) if color is not None else line)
317		return '\n'.join(message_lines)
318
319	def _out(self, msg='', newline=False):
320		self.stream.write(msg)
321		if newline:
322			self.stream.write('\n')
323
324	def _outln(self, msg=''):
325		self._out(msg, True)
326
327	def _plural(self, num):
328		return '' if num == 1 else 's'
329
330	def _line(self, color=termstyle.reset, char='-'):
331		"""
332		print a line of separator characters (default '-')
333		in the given colour (default black)
334		"""
335		self._outln(color(char * line_length))
336
337
338import traceback
339import sys
340
341
342class FilteringStream(object):
343	"""
344	A wrapper for a stream that will filter
345	calls to `write` and `writeln` to ignore calls
346	from blacklisted callers
347	(implemented as a regex on their filename, according
348	to traceback.extract_stack())
349
350	It's super hacky, but there seems to be no other way
351	to suppress nose's default output
352	"""
353	def __init__(self, stream, excludes):
354		self.__stream = stream
355		self.__excludes = list(map(re.compile, excludes))
356
357	def __should_filter(self):
358		try:
359			stack = traceback.extract_stack(limit=3)[0]
360			filename = stack[0]
361			pattern_matches_filename = lambda pattern: pattern.search(filename)
362			should_filter = any(map(pattern_matches_filename, self.__excludes))
363			if REDNOSE_DEBUG:
364				print >> sys.stderr, "REDNOSE_DEBUG: got write call from %s, should_filter = %s" % (
365						filename, should_filter)
366			return should_filter
367		except StandardError as e:
368			if REDNOSE_DEBUG:
369				print("\nError in rednose filtering: %s" % (e,), file=sys.stderr)
370				traceback.print_exc(sys.stderr)
371			return False
372
373	def write(self, *a):
374		if self.__should_filter():
375			return
376		return self.__stream.write(*a)
377
378	def writeln(self, *a):
379		if self.__should_filter():
380			return
381		return self.__stream.writeln(*a)
382
383	# pass non-known methods through to self.__stream
384	def __getattr__(self, name):
385		if REDNOSE_DEBUG:
386			print("REDNOSE_DEBUG: getting attr %s" % (name,), file=sys.stderr)
387		return getattr(self.__stream, name)
388