1a380bf10Simarom"""
2a380bf10SimaromTesting Plugins
3a380bf10Simarom===============
4a380bf10Simarom
5a380bf10SimaromThe plugin interface is well-tested enough to safely unit test your
6a380bf10Simaromuse of its hooks with some level of confidence. However, there is also
7a380bf10Simaroma mixin for unittest.TestCase called PluginTester that's designed to
8a380bf10Simaromtest plugins in their native runtime environment.
9a380bf10Simarom
10a380bf10SimaromHere's a simple example with a do-nothing plugin and a composed suite.
11a380bf10Simarom
12a380bf10Simarom    >>> import unittest
13a380bf10Simarom    >>> from nose.plugins import Plugin, PluginTester
14a380bf10Simarom    >>> class FooPlugin(Plugin):
15a380bf10Simarom    ...     pass
16a380bf10Simarom    >>> class TestPluginFoo(PluginTester, unittest.TestCase):
17a380bf10Simarom    ...     activate = '--with-foo'
18a380bf10Simarom    ...     plugins = [FooPlugin()]
19a380bf10Simarom    ...     def test_foo(self):
20a380bf10Simarom    ...         for line in self.output:
21a380bf10Simarom    ...             # i.e. check for patterns
22a380bf10Simarom    ...             pass
23a380bf10Simarom    ...
24a380bf10Simarom    ...         # or check for a line containing ...
25a380bf10Simarom    ...         assert "ValueError" in self.output
26a380bf10Simarom    ...     def makeSuite(self):
27a380bf10Simarom    ...         class TC(unittest.TestCase):
28a380bf10Simarom    ...             def runTest(self):
29a380bf10Simarom    ...                 raise ValueError("I hate foo")
30a380bf10Simarom    ...         return [TC('runTest')]
31a380bf10Simarom    ...
32a380bf10Simarom    >>> res = unittest.TestResult()
33a380bf10Simarom    >>> case = TestPluginFoo('test_foo')
34a380bf10Simarom    >>> _ = case(res)
35a380bf10Simarom    >>> res.errors
36a380bf10Simarom    []
37a380bf10Simarom    >>> res.failures
38a380bf10Simarom    []
39a380bf10Simarom    >>> res.wasSuccessful()
40a380bf10Simarom    True
41a380bf10Simarom    >>> res.testsRun
42a380bf10Simarom    1
43a380bf10Simarom
44a380bf10SimaromAnd here is a more complex example of testing a plugin that has extra
45a380bf10Simaromarguments and reads environment variables.
46a380bf10Simarom
47a380bf10Simarom    >>> import unittest, os
48a380bf10Simarom    >>> from nose.plugins import Plugin, PluginTester
49a380bf10Simarom    >>> class FancyOutputter(Plugin):
50a380bf10Simarom    ...     name = "fancy"
51a380bf10Simarom    ...     def configure(self, options, conf):
52a380bf10Simarom    ...         Plugin.configure(self, options, conf)
53a380bf10Simarom    ...         if not self.enabled:
54a380bf10Simarom    ...             return
55a380bf10Simarom    ...         self.fanciness = 1
56a380bf10Simarom    ...         if options.more_fancy:
57a380bf10Simarom    ...             self.fanciness = 2
58a380bf10Simarom    ...         if 'EVEN_FANCIER' in self.env:
59a380bf10Simarom    ...             self.fanciness = 3
60a380bf10Simarom    ...
61a380bf10Simarom    ...     def options(self, parser, env=os.environ):
62a380bf10Simarom    ...         self.env = env
63a380bf10Simarom    ...         parser.add_option('--more-fancy', action='store_true')
64a380bf10Simarom    ...         Plugin.options(self, parser, env=env)
65a380bf10Simarom    ...
66a380bf10Simarom    ...     def report(self, stream):
67a380bf10Simarom    ...         stream.write("FANCY " * self.fanciness)
68a380bf10Simarom    ...
69a380bf10Simarom    >>> class TestFancyOutputter(PluginTester, unittest.TestCase):
70a380bf10Simarom    ...     activate = '--with-fancy' # enables the plugin
71a380bf10Simarom    ...     plugins = [FancyOutputter()]
72a380bf10Simarom    ...     args = ['--more-fancy']
73a380bf10Simarom    ...     env = {'EVEN_FANCIER': '1'}
74a380bf10Simarom    ...
75a380bf10Simarom    ...     def test_fancy_output(self):
76a380bf10Simarom    ...         assert "FANCY FANCY FANCY" in self.output, (
77a380bf10Simarom    ...                                         "got: %s" % self.output)
78a380bf10Simarom    ...     def makeSuite(self):
79a380bf10Simarom    ...         class TC(unittest.TestCase):
80a380bf10Simarom    ...             def runTest(self):
81a380bf10Simarom    ...                 raise ValueError("I hate fancy stuff")
82a380bf10Simarom    ...         return [TC('runTest')]
83a380bf10Simarom    ...
84a380bf10Simarom    >>> res = unittest.TestResult()
85a380bf10Simarom    >>> case = TestFancyOutputter('test_fancy_output')
86a380bf10Simarom    >>> _ = case(res)
87a380bf10Simarom    >>> res.errors
88a380bf10Simarom    []
89a380bf10Simarom    >>> res.failures
90a380bf10Simarom    []
91a380bf10Simarom    >>> res.wasSuccessful()
92a380bf10Simarom    True
93a380bf10Simarom    >>> res.testsRun
94a380bf10Simarom    1
95a380bf10Simarom
96a380bf10Simarom"""
97a380bf10Simarom
98a380bf10Simaromimport re
99a380bf10Simaromimport sys
100a380bf10Simaromfrom warnings import warn
101a380bf10Simarom
102a380bf10Simaromtry:
103a380bf10Simarom    from io import StringIO
104a380bf10Simaromexcept ImportError:
105a380bf10Simarom    from io import StringIO
106a380bf10Simarom
107a380bf10Simarom__all__ = ['PluginTester', 'run']
108a380bf10Simarom
109a380bf10Simaromfrom os import getpid
110a380bf10Simaromclass MultiProcessFile(object):
111a380bf10Simarom    """
112a380bf10Simarom    helper for testing multiprocessing
113a380bf10Simarom
114a380bf10Simarom    multiprocessing poses a problem for doctests, since the strategy
115a380bf10Simarom    of replacing sys.stdout/stderr with file-like objects then
116a380bf10Simarom    inspecting the results won't work: the child processes will
117a380bf10Simarom    write to the objects, but the data will not be reflected
118a380bf10Simarom    in the parent doctest-ing process.
119a380bf10Simarom
120a380bf10Simarom    The solution is to create file-like objects which will interact with
121a380bf10Simarom    multiprocessing in a more desirable way.
122a380bf10Simarom
123a380bf10Simarom    All processes can write to this object, but only the creator can read.
124a380bf10Simarom    This allows the testing system to see a unified picture of I/O.
125a380bf10Simarom    """
126a380bf10Simarom    def __init__(self):
127a380bf10Simarom        # per advice at:
128a380bf10Simarom        #    http://docs.python.org/library/multiprocessing.html#all-platforms
129a380bf10Simarom        self.__master = getpid()
130a380bf10Simarom        self.__queue = Manager().Queue()
131a380bf10Simarom        self.__buffer = StringIO()
132a380bf10Simarom        self.softspace = 0
133a380bf10Simarom
134a380bf10Simarom    def buffer(self):
135a380bf10Simarom        if getpid() != self.__master:
136a380bf10Simarom            return
137a380bf10Simarom
138a380bf10Simarom        from queue import Empty
139a380bf10Simarom        from collections import defaultdict
140a380bf10Simarom        cache = defaultdict(str)
141a380bf10Simarom        while True:
142a380bf10Simarom            try:
143a380bf10Simarom                pid, data = self.__queue.get_nowait()
144a380bf10Simarom            except Empty:
145a380bf10Simarom                break
146a380bf10Simarom            if pid == ():
147a380bf10Simarom                #show parent output after children
148a380bf10Simarom                #this is what users see, usually
149a380bf10Simarom                pid = ( 1e100, ) # googol!
150a380bf10Simarom            cache[pid] += data
151a380bf10Simarom        for pid in sorted(cache):
152a380bf10Simarom            #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG
153a380bf10Simarom            self.__buffer.write( cache[pid] )
154a380bf10Simarom    def write(self, data):
155a380bf10Simarom        # note that these pids are in the form of current_process()._identity
156a380bf10Simarom        # rather than OS pids
157a380bf10Simarom        from multiprocessing import current_process
158a380bf10Simarom        pid = current_process()._identity
159a380bf10Simarom        self.__queue.put((pid, data))
160a380bf10Simarom    def __iter__(self):
161a380bf10Simarom        "getattr doesn't work for iter()"
162a380bf10Simarom        self.buffer()
163a380bf10Simarom        return self.__buffer
164a380bf10Simarom    def seek(self, offset, whence=0):
165a380bf10Simarom        self.buffer()
166a380bf10Simarom        return self.__buffer.seek(offset, whence)
167a380bf10Simarom    def getvalue(self):
168a380bf10Simarom        self.buffer()
169a380bf10Simarom        return self.__buffer.getvalue()
170a380bf10Simarom    def __getattr__(self, attr):
171a380bf10Simarom        return getattr(self.__buffer, attr)
172a380bf10Simarom
173a380bf10Simaromtry:
174a380bf10Simarom    from multiprocessing import Manager
175a380bf10Simarom    Buffer = MultiProcessFile
176a380bf10Simaromexcept ImportError:
177a380bf10Simarom    Buffer = StringIO
178a380bf10Simarom
179a380bf10Simaromclass PluginTester(object):
180a380bf10Simarom    """A mixin for testing nose plugins in their runtime environment.
181a380bf10Simarom
182a380bf10Simarom    Subclass this and mix in unittest.TestCase to run integration/functional
183a380bf10Simarom    tests on your plugin.  When setUp() is called, the stub test suite is
184a380bf10Simarom    executed with your plugin so that during an actual test you can inspect the
185a380bf10Simarom    artifacts of how your plugin interacted with the stub test suite.
186a380bf10Simarom
187a380bf10Simarom    - activate
188a380bf10Simarom
189a380bf10Simarom      - the argument to send nosetests to activate the plugin
190a380bf10Simarom
191a380bf10Simarom    - suitepath
192a380bf10Simarom
193a380bf10Simarom      - if set, this is the path of the suite to test. Otherwise, you
194a380bf10Simarom        will need to use the hook, makeSuite()
195a380bf10Simarom
196a380bf10Simarom    - plugins
197a380bf10Simarom
198a380bf10Simarom      - the list of plugins to make available during the run. Note
199a380bf10Simarom        that this does not mean these plugins will be *enabled* during
200a380bf10Simarom        the run -- only the plugins enabled by the activate argument
201a380bf10Simarom        or other settings in argv or env will be enabled.
202a380bf10Simarom
203a380bf10Simarom    - args
204a380bf10Simarom
205a380bf10Simarom      - a list of arguments to add to the nosetests command, in addition to
206a380bf10Simarom        the activate argument
207a380bf10Simarom
208a380bf10Simarom    - env
209a380bf10Simarom
210a380bf10Simarom      - optional dict of environment variables to send nosetests
211a380bf10Simarom
212a380bf10Simarom    """
213a380bf10Simarom    activate = None
214a380bf10Simarom    suitepath = None
215a380bf10Simarom    args = None
216a380bf10Simarom    env = {}
217a380bf10Simarom    argv = None
218a380bf10Simarom    plugins = []
219a380bf10Simarom    ignoreFiles = None
220a380bf10Simarom
221a380bf10Simarom    def makeSuite(self):
222a380bf10Simarom        """returns a suite object of tests to run (unittest.TestSuite())
223a380bf10Simarom
224a380bf10Simarom        If self.suitepath is None, this must be implemented. The returned suite
225a380bf10Simarom        object will be executed with all plugins activated.  It may return
226a380bf10Simarom        None.
227a380bf10Simarom
228a380bf10Simarom        Here is an example of a basic suite object you can return ::
229a380bf10Simarom
230a380bf10Simarom            >>> import unittest
231a380bf10Simarom            >>> class SomeTest(unittest.TestCase):
232a380bf10Simarom            ...     def runTest(self):
233a380bf10Simarom            ...         raise ValueError("Now do something, plugin!")
234a380bf10Simarom            ...
235a380bf10Simarom            >>> unittest.TestSuite([SomeTest()]) # doctest: +ELLIPSIS
236a380bf10Simarom            <unittest...TestSuite tests=[<...SomeTest testMethod=runTest>]>
237a380bf10Simarom
238a380bf10Simarom        """
239a380bf10Simarom        raise NotImplementedError
240a380bf10Simarom
241a380bf10Simarom    def _execPlugin(self):
242a380bf10Simarom        """execute the plugin on the internal test suite.
243a380bf10Simarom        """
244a380bf10Simarom        from nose.config import Config
245a380bf10Simarom        from nose.core import TestProgram
246a380bf10Simarom        from nose.plugins.manager import PluginManager
247a380bf10Simarom
248a380bf10Simarom        suite = None
249a380bf10Simarom        stream = Buffer()
250a380bf10Simarom        conf = Config(env=self.env,
251a380bf10Simarom                      stream=stream,
252a380bf10Simarom                      plugins=PluginManager(plugins=self.plugins))
253a380bf10Simarom        if self.ignoreFiles is not None:
254a380bf10Simarom            conf.ignoreFiles = self.ignoreFiles
255a380bf10Simarom        if not self.suitepath:
256a380bf10Simarom            suite = self.makeSuite()
257a380bf10Simarom
258a380bf10Simarom        self.nose = TestProgram(argv=self.argv, config=conf, suite=suite,
259a380bf10Simarom                                exit=False)
260a380bf10Simarom        self.output = AccessDecorator(stream)
261a380bf10Simarom
262a380bf10Simarom    def setUp(self):
263a380bf10Simarom        """runs nosetests with the specified test suite, all plugins
264a380bf10Simarom        activated.
265a380bf10Simarom        """
266a380bf10Simarom        self.argv = ['nosetests', self.activate]
267a380bf10Simarom        if self.args:
268a380bf10Simarom            self.argv.extend(self.args)
269a380bf10Simarom        if self.suitepath:
270a380bf10Simarom            self.argv.append(self.suitepath)
271a380bf10Simarom
272a380bf10Simarom        self._execPlugin()
273a380bf10Simarom
274a380bf10Simarom
275a380bf10Simaromclass AccessDecorator(object):
276a380bf10Simarom    stream = None
277a380bf10Simarom    _buf = None
278a380bf10Simarom    def __init__(self, stream):
279a380bf10Simarom        self.stream = stream
280a380bf10Simarom        stream.seek(0)
281a380bf10Simarom        self._buf = stream.read()
282a380bf10Simarom        stream.seek(0)
283a380bf10Simarom    def __contains__(self, val):
284a380bf10Simarom        return val in self._buf
285a380bf10Simarom    def __iter__(self):
286a380bf10Simarom        return iter(self.stream)
287a380bf10Simarom    def __str__(self):
288a380bf10Simarom        return self._buf
289a380bf10Simarom
290a380bf10Simarom
291a380bf10Simaromdef blankline_separated_blocks(text):
292a380bf10Simarom    "a bunch of === characters is also considered a blank line"
293a380bf10Simarom    block = []
294a380bf10Simarom    for line in text.splitlines(True):
295a380bf10Simarom        block.append(line)
296a380bf10Simarom        line = line.strip()
297a380bf10Simarom        if not line or line.startswith('===') and not line.strip('='):
298a380bf10Simarom            yield "".join(block)
299a380bf10Simarom            block = []
300a380bf10Simarom    if block:
301a380bf10Simarom        yield "".join(block)
302a380bf10Simarom
303a380bf10Simarom
304a380bf10Simaromdef remove_stack_traces(out):
305a380bf10Simarom    # this regexp taken from Python 2.5's doctest
306a380bf10Simarom    traceback_re = re.compile(r"""
307a380bf10Simarom        # Grab the traceback header.  Different versions of Python have
308a380bf10Simarom        # said different things on the first traceback line.
309a380bf10Simarom        ^(?P<hdr> Traceback\ \(
310a380bf10Simarom            (?: most\ recent\ call\ last
311a380bf10Simarom            |   innermost\ last
312a380bf10Simarom            ) \) :
313a380bf10Simarom        )
314a380bf10Simarom        \s* $                   # toss trailing whitespace on the header.
315a380bf10Simarom        (?P<stack> .*?)         # don't blink: absorb stuff until...
316a380bf10Simarom        ^(?=\w)                 #     a line *starts* with alphanum.
317a380bf10Simarom        .*?(?P<exception> \w+ ) # exception name
318a380bf10Simarom        (?P<msg> [:\n] .*)      # the rest
319a380bf10Simarom        """, re.VERBOSE | re.MULTILINE | re.DOTALL)
320a380bf10Simarom    blocks = []
321a380bf10Simarom    for block in blankline_separated_blocks(out):
322a380bf10Simarom        blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
323a380bf10Simarom    return "".join(blocks)
324a380bf10Simarom
325a380bf10Simarom
326a380bf10Simaromdef simplify_warnings(out):
327a380bf10Simarom    warn_re = re.compile(r"""
328a380bf10Simarom        # Cut the file and line no, up to the warning name
329a380bf10Simarom        ^.*:\d+:\s
330a380bf10Simarom        (?P<category>\w+): \s+        # warning category
331a380bf10Simarom        (?P<detail>.+) $ \n?          # warning message
332a380bf10Simarom        ^ .* $                        # stack frame
333a380bf10Simarom        """, re.VERBOSE | re.MULTILINE)
334a380bf10Simarom    return warn_re.sub(r"\g<category>: \g<detail>", out)
335a380bf10Simarom
336a380bf10Simarom
337a380bf10Simaromdef remove_timings(out):
338a380bf10Simarom    return re.sub(
339a380bf10Simarom        r"Ran (\d+ tests?) in [0-9.]+s", r"Ran \1 in ...s", out)
340a380bf10Simarom
341a380bf10Simarom
342a380bf10Simaromdef munge_nose_output_for_doctest(out):
343a380bf10Simarom    """Modify nose output to make it easy to use in doctests."""
344a380bf10Simarom    out = remove_stack_traces(out)
345a380bf10Simarom    out = simplify_warnings(out)
346a380bf10Simarom    out = remove_timings(out)
347a380bf10Simarom    return out.strip()
348a380bf10Simarom
349a380bf10Simarom
350a380bf10Simaromdef run(*arg, **kw):
351a380bf10Simarom    """
352a380bf10Simarom    Specialized version of nose.run for use inside of doctests that
353a380bf10Simarom    test test runs.
354a380bf10Simarom
355a380bf10Simarom    This version of run() prints the result output to stdout.  Before
356a380bf10Simarom    printing, the output is processed by replacing the timing
357a380bf10Simarom    information with an ellipsis (...), removing traceback stacks, and
358a380bf10Simarom    removing trailing whitespace.
359a380bf10Simarom
360a380bf10Simarom    Use this version of run wherever you are writing a doctest that
361a380bf10Simarom    tests nose (or unittest) test result output.
362a380bf10Simarom
363a380bf10Simarom    Note: do not use doctest: +ELLIPSIS when testing nose output,
364a380bf10Simarom    since ellipses ("test_foo ... ok") in your expected test runner
365a380bf10Simarom    output may match multiple lines of output, causing spurious test
366a380bf10Simarom    passes!
367a380bf10Simarom    """
368a380bf10Simarom    from nose import run
369a380bf10Simarom    from nose.config import Config
370a380bf10Simarom    from nose.plugins.manager import PluginManager
371a380bf10Simarom
372a380bf10Simarom    buffer = Buffer()
373a380bf10Simarom    if 'config' not in kw:
374a380bf10Simarom        plugins = kw.pop('plugins', [])
375a380bf10Simarom        if isinstance(plugins, list):
376a380bf10Simarom            plugins = PluginManager(plugins=plugins)
377a380bf10Simarom        env = kw.pop('env', {})
378a380bf10Simarom        kw['config'] = Config(env=env, plugins=plugins)
379a380bf10Simarom    if 'argv' not in kw:
380a380bf10Simarom        kw['argv'] = ['nosetests', '-v']
381a380bf10Simarom    kw['config'].stream = buffer
382a380bf10Simarom
383a380bf10Simarom    # Set up buffering so that all output goes to our buffer,
384a380bf10Simarom    # or warn user if deprecated behavior is active. If this is not
385a380bf10Simarom    # done, prints and warnings will either be out of place or
386a380bf10Simarom    # disappear.
387a380bf10Simarom    stderr = sys.stderr
388a380bf10Simarom    stdout = sys.stdout
389a380bf10Simarom    if kw.pop('buffer_all', False):
390a380bf10Simarom        sys.stdout = sys.stderr = buffer
391a380bf10Simarom        restore = True
392a380bf10Simarom    else:
393a380bf10Simarom        restore = False
394a380bf10Simarom        warn("The behavior of nose.plugins.plugintest.run() will change in "
395a380bf10Simarom             "the next release of nose. The current behavior does not "
396a380bf10Simarom             "correctly account for output to stdout and stderr. To enable "
397a380bf10Simarom             "correct behavior, use run_buffered() instead, or pass "
398a380bf10Simarom             "the keyword argument buffer_all=True to run().",
399a380bf10Simarom             DeprecationWarning, stacklevel=2)
400a380bf10Simarom    try:
401a380bf10Simarom        run(*arg, **kw)
402a380bf10Simarom    finally:
403a380bf10Simarom        if restore:
404a380bf10Simarom            sys.stderr = stderr
405a380bf10Simarom            sys.stdout = stdout
406a380bf10Simarom    out = buffer.getvalue()
407a380bf10Simarom    print(munge_nose_output_for_doctest(out))
408a380bf10Simarom
409a380bf10Simarom
410a380bf10Simaromdef run_buffered(*arg, **kw):
411a380bf10Simarom    kw['buffer_all'] = True
412a380bf10Simarom    run(*arg, **kw)
413a380bf10Simarom
414a380bf10Simaromif __name__ == '__main__':
415a380bf10Simarom    import doctest
416a380bf10Simarom    doctest.testmod()
417a380bf10Simarom
418