2a380bf10SimaromTesting Plugins
5a380bf10SimaromThe plugin interface is well-tested enough to safely unit test your
6a380bf10Simaromuse of its hooks with some level of confidence. However, there is also
7a380bf10Simaroma mixin for unittest.TestCase called PluginTester that's designed to
8a380bf10Simaromtest plugins in their native runtime environment.
10a380bf10SimaromHere's a simple example with a do-nothing plugin and a composed suite.
12a380bf10Simarom    >>> import unittest
13a380bf10Simarom    >>> from nose.plugins import Plugin, PluginTester
14a380bf10Simarom    >>> class FooPlugin(Plugin):
15a380bf10Simarom    ...     pass
16a380bf10Simarom    >>> class TestPluginFoo(PluginTester, unittest.TestCase):
17a380bf10Simarom    ...     activate = '--with-foo'
18a380bf10Simarom    ...     plugins = [FooPlugin()]
19a380bf10Simarom    ...     def test_foo(self):
20a380bf10Simarom    ...         for line in self.output:
21a380bf10Simarom    ...             # i.e. check for patterns
22a380bf10Simarom    ...             pass
23a380bf10Simarom    ...
24a380bf10Simarom    ...         # or check for a line containing ...
25a380bf10Simarom    ...         assert "ValueError" in self.output
26a380bf10Simarom    ...     def makeSuite(self):
27a380bf10Simarom    ...         class TC(unittest.TestCase):
28a380bf10Simarom    ...             def runTest(self):
29a380bf10Simarom    ...                 raise ValueError("I hate foo")
30a380bf10Simarom    ...         return [TC('runTest')]
31a380bf10Simarom    ...
32a380bf10Simarom    >>> res = unittest.TestResult()
33a380bf10Simarom    >>> case = TestPluginFoo('test_foo')
34a380bf10Simarom    >>> _ = case(res)
35a380bf10Simarom    >>> res.errors
36a380bf10Simarom    []
37a380bf10Simarom    >>> res.failures
38a380bf10Simarom    []
39a380bf10Simarom    >>> res.wasSuccessful()
40a380bf10Simarom    True
41a380bf10Simarom    >>> res.testsRun
42a380bf10Simarom    1
44a380bf10SimaromAnd here is a more complex example of testing a plugin that has extra
45a380bf10Simaromarguments and reads environment variables.
47a380bf10Simarom    >>> import unittest, os
48a380bf10Simarom    >>> from nose.plugins import Plugin, PluginTester
49a380bf10Simarom    >>> class FancyOutputter(Plugin):
50a380bf10Simarom    ...     name = "fancy"
51a380bf10Simarom    ...     def configure(self, options, conf):
52a380bf10Simarom    ...         Plugin.configure(self, options, conf)
53a380bf10Simarom    ...         if not self.enabled:
54a380bf10Simarom    ...             return
55a380bf10Simarom    ...         self.fanciness = 1
56a380bf10Simarom    ...         if options.more_fancy:
57a380bf10Simarom    ...             self.fanciness = 2
58a380bf10Simarom    ...         if 'EVEN_FANCIER' in self.env:
59a380bf10Simarom    ...             self.fanciness = 3
60a380bf10Simarom    ...
61a380bf10Simarom    ...     def options(self, parser, env=os.environ):
62a380bf10Simarom    ...         self.env = env
63a380bf10Simarom    ...         parser.add_option('--more-fancy', action='store_true')
64a380bf10Simarom    ...         Plugin.options(self, parser, env=env)
65a380bf10Simarom    ...
66a380bf10Simarom    ...     def report(self, stream):
67a380bf10Simarom    ...         stream.write("FANCY " * self.fanciness)
68a380bf10Simarom    ...
69a380bf10Simarom    >>> class TestFancyOutputter(PluginTester, unittest.TestCase):
70a380bf10Simarom    ...     activate = '--with-fancy' # enables the plugin
71a380bf10Simarom    ...     plugins = [FancyOutputter()]
72a380bf10Simarom    ...     args = ['--more-fancy']
73a380bf10Simarom    ...     env = {'EVEN_FANCIER': '1'}
74a380bf10Simarom    ...
75a380bf10Simarom    ...     def test_fancy_output(self):
76a380bf10Simarom    ...         assert "FANCY FANCY FANCY" in self.output, (
77a380bf10Simarom    ...                                         "got: %s" % self.output)
78a380bf10Simarom    ...     def makeSuite(self):
79a380bf10Simarom    ...         class TC(unittest.TestCase):
80a380bf10Simarom    ...             def runTest(self):
81a380bf10Simarom    ...                 raise ValueError("I hate fancy stuff")
82a380bf10Simarom    ...         return [TC('runTest')]
83a380bf10Simarom    ...
84a380bf10Simarom    >>> res = unittest.TestResult()
85a380bf10Simarom    >>> case = TestFancyOutputter('test_fancy_output')
86a380bf10Simarom    >>> _ = case(res)
87a380bf10Simarom    >>> res.errors
88a380bf10Simarom    []
89a380bf10Simarom    >>> res.failures
90a380bf10Simarom    []
91a380bf10Simarom    >>> res.wasSuccessful()
92a380bf10Simarom    True
93a380bf10Simarom    >>> res.testsRun
94a380bf10Simarom    1
98a380bf10Simaromimport re
99a380bf10Simaromimport sys
100a380bf10Simaromfrom warnings import warn
103a380bf10Simarom    from io import StringIO
104a380bf10Simaromexcept ImportError:
105a380bf10Simarom    from io import StringIO
107a380bf10Simarom__all__ = ['PluginTester', 'run']
109a380bf10Simaromfrom os import getpid
110a380bf10Simaromclass MultiProcessFile(object):
111a380bf10Simarom    """
112a380bf10Simarom    helper for testing multiprocessing
114a380bf10Simarom    multiprocessing poses a problem for doctests, since the strategy
115a380bf10Simarom    of replacing sys.stdout/stderr with file-like objects then
116a380bf10Simarom    inspecting the results won't work: the child processes will
117a380bf10Simarom    write to the objects, but the data will not be reflected
118a380bf10Simarom    in the parent doctest-ing process.
120a380bf10Simarom    The solution is to create file-like objects which will interact with
121a380bf10Simarom    multiprocessing in a more desirable way.
123a380bf10Simarom    All processes can write to this object, but only the creator can read.
124a380bf10Simarom    This allows the testing system to see a unified picture of I/O.
125a380bf10Simarom    """
126a380bf10Simarom    def __init__(self):
127a380bf10Simarom        # per advice at:
128a380bf10Simarom        #    http://docs.python.org/library/multiprocessing.html#all-platforms
129a380bf10Simarom        self.__master = getpid()
130a380bf10Simarom        self.__queue = Manager().Queue()
131a380bf10Simarom        self.__buffer = StringIO()
132a380bf10Simarom        self.softspace = 0
134a380bf10Simarom    def buffer(self):
135a380bf10Simarom        if getpid() != self.__master:
136a380bf10Simarom            return
138a380bf10Simarom        from queue import Empty
139a380bf10Simarom        from collections import defaultdict
140a380bf10Simarom        cache = defaultdict(str)
141a380bf10Simarom        while True:
142a380bf10Simarom            try:
143a380bf10Simarom                pid, data = self.__queue.get_nowait()
144a380bf10Simarom            except Empty:
145a380bf10Simarom                break
146a380bf10Simarom            if pid == ():
147a380bf10Simarom                #show parent output after children
148a380bf10Simarom                #this is what users see, usually
149a380bf10Simarom                pid = ( 1e100, ) # googol!
150a380bf10Simarom            cache[pid] += data
151a380bf10Simarom        for pid in sorted(cache):
152a380bf10Simarom            #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG
153a380bf10Simarom            self.__buffer.write( cache[pid] )
154a380bf10Simarom    def write(self, data):
155a380bf10Simarom        # note that these pids are in the form of current_process()._identity
156a380bf10Simarom        # rather than OS pids
157a380bf10Simarom        from multiprocessing import current_process
158a380bf10Simarom        pid = current_process()._identity
159a380bf10Simarom        self.__queue.put((pid, data))
160a380bf10Simarom    def __iter__(self):
161a380bf10Simarom        "getattr doesn't work for iter()"
162a380bf10Simarom        self.buffer()
163a380bf10Simarom        return self.__buffer
164a380bf10Simarom    def seek(self, offset, whence=0):
165a380bf10Simarom        self.buffer()
166a380bf10Simarom        return self.__buffer.seek(offset, whence)
167a380bf10Simarom    def getvalue(self):
168a380bf10Simarom        self.buffer()
169a380bf10Simarom        return self.__buffer.getvalue()
170a380bf10Simarom    def __getattr__(self, attr):
171a380bf10Simarom        return getattr(self.__buffer, attr)
174a380bf10Simarom    from multiprocessing import Manager
175a380bf10Simarom    Buffer = MultiProcessFile
176a380bf10Simaromexcept ImportError:
177a380bf10Simarom    Buffer = StringIO
179a380bf10Simaromclass PluginTester(object):
180a380bf10Simarom    """A mixin for testing nose plugins in their runtime environment.
182a380bf10Simarom    Subclass this and mix in unittest.TestCase to run integration/functional
183a380bf10Simarom    tests on your plugin.  When setUp() is called, the stub test suite is
184a380bf10Simarom    executed with your plugin so that during an actual test you can inspect the
185a380bf10Simarom    artifacts of how your plugin interacted with the stub test suite.
187a380bf10Simarom    - activate
189a380bf10Simarom      - the argument to send nosetests to activate the plugin
191a380bf10Simarom    - suitepath
193a380bf10Simarom      - if set, this is the path of the suite to test. Otherwise, you
194a380bf10Simarom        will need to use the hook, makeSuite()
196a380bf10Simarom    - plugins
198a380bf10Simarom      - the list of plugins to make available during the run. Note
199a380bf10Simarom        that this does not mean these plugins will be *enabled* during
200a380bf10Simarom        the run -- only the plugins enabled by the activate argument
201a380bf10Simarom        or other settings in argv or env will be enabled.
203a380bf10Simarom    - args
205a380bf10Simarom      - a list of arguments to add to the nosetests command, in addition to
206a380bf10Simarom        the activate argument
208a380bf10Simarom    - env
210a380bf10Simarom      - optional dict of environment variables to send nosetests
212a380bf10Simarom    """
213a380bf10Simarom    activate = None
214a380bf10Simarom    suitepath = None
215a380bf10Simarom    args = None
216a380bf10Simarom    env = {}
217a380bf10Simarom    argv = None
218a380bf10Simarom    plugins = []
219a380bf10Simarom    ignoreFiles = None
221a380bf10Simarom    def makeSuite(self):
222a380bf10Simarom        """returns a suite object of tests to run (unittest.TestSuite())
224a380bf10Simarom        If self.suitepath is None, this must be implemented. The returned suite
225a380bf10Simarom        object will be executed with all plugins activated.  It may return
226a380bf10Simarom        None.
228a380bf10Simarom        Here is an example of a basic suite object you can return ::
230a380bf10Simarom            >>> import unittest
231a380bf10Simarom            >>> class SomeTest(unittest.TestCase):
232a380bf10Simarom            ...     def runTest(self):
233a380bf10Simarom            ...         raise ValueError("Now do something, plugin!")
234a380bf10Simarom            ...
235a380bf10Simarom            >>> unittest.TestSuite([SomeTest()]) # doctest: +ELLIPSIS
236a380bf10Simarom            <unittest...TestSuite tests=[<...SomeTest testMethod=runTest>]>
238a380bf10Simarom        """
239a380bf10Simarom        raise NotImplementedError
241a380bf10Simarom    def _execPlugin(self):
242a380bf10Simarom        """execute the plugin on the internal test suite.
243a380bf10Simarom        """
244a380bf10Simarom        from nose.config import Config
245a380bf10Simarom        from nose.core import TestProgram
246a380bf10Simarom        from nose.plugins.manager import PluginManager
248a380bf10Simarom        suite = None
249a380bf10Simarom        stream = Buffer()
250a380bf10Simarom        conf = Config(env=self.env,
251a380bf10Simarom                      stream=stream,
252a380bf10Simarom                      plugins=PluginManager(plugins=self.plugins))
253a380bf10Simarom        if self.ignoreFiles is not None:
254a380bf10Simarom            conf.ignoreFiles = self.ignoreFiles
255a380bf10Simarom        if not self.suitepath:
256a380bf10Simarom            suite = self.makeSuite()
258a380bf10Simarom        self.nose = TestProgram(argv=self.argv, config=conf, suite=suite,
259a380bf10Simarom                                exit=False)
260a380bf10Simarom        self.output = AccessDecorator(stream)
262a380bf10Simarom    def setUp(self):
263a380bf10Simarom        """runs nosetests with the specified test suite, all plugins
264a380bf10Simarom        activated.
265a380bf10Simarom        """
266a380bf10Simarom        self.argv = ['nosetests', self.activate]
267a380bf10Simarom        if self.args:
268a380bf10Simarom            self.argv.extend(self.args)
269a380bf10Simarom        if self.suitepath:
270a380bf10Simarom            self.argv.append(self.suitepath)
272a380bf10Simarom        self._execPlugin()
275a380bf10Simaromclass AccessDecorator(object):
276a380bf10Simarom    stream = None
277a380bf10Simarom    _buf = None
278a380bf10Simarom    def __init__(self, stream):
279a380bf10Simarom        self.stream = stream
280a380bf10Simarom        stream.seek(0)
281a380bf10Simarom        self._buf = stream.read()
282a380bf10Simarom        stream.seek(0)
283a380bf10Simarom    def __contains__(self, val):
284a380bf10Simarom        return val in self._buf
285a380bf10Simarom    def __iter__(self):
286a380bf10Simarom        return iter(self.stream)
287a380bf10Simarom    def __str__(self):
288a380bf10Simarom        return self._buf
291a380bf10Simaromdef blankline_separated_blocks(text):
292a380bf10Simarom    "a bunch of === characters is also considered a blank line"
293a380bf10Simarom    block = []
294a380bf10Simarom    for line in text.splitlines(True):
295a380bf10Simarom        block.append(line)
296a380bf10Simarom        line = line.strip()
297a380bf10Simarom        if not line or line.startswith('===') and not line.strip('='):
298a380bf10Simarom            yield "".join(block)
299a380bf10Simarom            block = []
300a380bf10Simarom    if block:
301a380bf10Simarom        yield "".join(block)
304a380bf10Simaromdef remove_stack_traces(out):
305a380bf10Simarom    # this regexp taken from Python 2.5's doctest
306a380bf10Simarom    traceback_re = re.compile(r"""
307a380bf10Simarom        # Grab the traceback header.  Different versions of Python have
308a380bf10Simarom        # said different things on the first traceback line.
309a380bf10Simarom        ^(?P<hdr> Traceback\ \(
310a380bf10Simarom            (?: most\ recent\ call\ last
311a380bf10Simarom            |   innermost\ last
312a380bf10Simarom            ) \) :
313a380bf10Simarom        )
314a380bf10Simarom        \s* $                   # toss trailing whitespace on the header.
315a380bf10Simarom        (?P<stack> .*?)         # don't blink: absorb stuff until...
316a380bf10Simarom        ^(?=\w)                 #     a line *starts* with alphanum.
317a380bf10Simarom        .*?(?P<exception> \w+ ) # exception name
318a380bf10Simarom        (?P<msg> [:\n] .*)      # the rest
319a380bf10Simarom        """, re.VERBOSE | re.MULTILINE | re.DOTALL)
320a380bf10Simarom    blocks = []
321a380bf10Simarom    for block in blankline_separated_blocks(out):
322a380bf10Simarom        blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
323a380bf10Simarom    return "".join(blocks)
326a380bf10Simaromdef simplify_warnings(out):
327a380bf10Simarom    warn_re = re.compile(r"""
328a380bf10Simarom        # Cut the file and line no, up to the warning name
329a380bf10Simarom        ^.*:\d+:\s
330a380bf10Simarom        (?P<category>\w+): \s+        # warning category
331a380bf10Simarom        (?P<detail>.+) $ \n?          # warning message
332a380bf10Simarom        ^ .* $                        # stack frame
333a380bf10Simarom        """, re.VERBOSE | re.MULTILINE)
334a380bf10Simarom    return warn_re.sub(r"\g<category>: \g<detail>", out)
337a380bf10Simaromdef remove_timings(out):
338a380bf10Simarom    return re.sub(
339a380bf10Simarom        r"Ran (\d+ tests?) in [0-9.]+s", r"Ran \1 in ...s", out)
342a380bf10Simaromdef munge_nose_output_for_doctest(out):
343a380bf10Simarom    """Modify nose output to make it easy to use in doctests."""
344a380bf10Simarom    out = remove_stack_traces(out)
345a380bf10Simarom    out = simplify_warnings(out)
346a380bf10Simarom    out = remove_timings(out)
347a380bf10Simarom    return out.strip()
350a380bf10Simaromdef run(*arg, **kw):
351a380bf10Simarom    """
352a380bf10Simarom    Specialized version of nose.run for use inside of doctests that
353a380bf10Simarom    test test runs.
355a380bf10Simarom    This version of run() prints the result output to stdout.  Before
356a380bf10Simarom    printing, the output is processed by replacing the timing
357a380bf10Simarom    information with an ellipsis (...), removing traceback stacks, and
358a380bf10Simarom    removing trailing whitespace.
360a380bf10Simarom    Use this version of run wherever you are writing a doctest that
361a380bf10Simarom    tests nose (or unittest) test result output.
363a380bf10Simarom    Note: do not use doctest: +ELLIPSIS when testing nose output,
364a380bf10Simarom    since ellipses ("test_foo ... ok") in your expected test runner
365a380bf10Simarom    output may match multiple lines of output, causing spurious test
366a380bf10Simarom    passes!
367a380bf10Simarom    """
368a380bf10Simarom    from nose import run
369a380bf10Simarom    from nose.config import Config
370a380bf10Simarom    from nose.plugins.manager import PluginManager
372a380bf10Simarom    buffer = Buffer()
373a380bf10Simarom    if 'config' not in kw:
374a380bf10Simarom        plugins = kw.pop('plugins', [])
375a380bf10Simarom        if isinstance(plugins, list):
376a380bf10Simarom            plugins = PluginManager(plugins=plugins)
377a380bf10Simarom        env = kw.pop('env', {})
378a380bf10Simarom        kw['config'] = Config(env=env, plugins=plugins)
379a380bf10Simarom    if 'argv' not in kw:
380a380bf10Simarom        kw['argv'] = ['nosetests', '-v']
381a380bf10Simarom    kw['config'].stream = buffer
383a380bf10Simarom    # Set up buffering so that all output goes to our buffer,
384a380bf10Simarom    # or warn user if deprecated behavior is active. If this is not
385a380bf10Simarom    # done, prints and warnings will either be out of place or
386a380bf10Simarom    # disappear.
387a380bf10Simarom    stderr = sys.stderr
388a380bf10Simarom    stdout = sys.stdout
389a380bf10Simarom    if kw.pop('buffer_all', False):
390a380bf10Simarom        sys.stdout = sys.stderr = buffer
391a380bf10Simarom        restore = True
392a380bf10Simarom    else:
393a380bf10Simarom        restore = False
394a380bf10Simarom        warn("The behavior of nose.plugins.plugintest.run() will change in "
395a380bf10Simarom             "the next release of nose. The current behavior does not "
396a380bf10Simarom             "correctly account for output to stdout and stderr. To enable "
397a380bf10Simarom             "correct behavior, use run_buffered() instead, or pass "
398a380bf10Simarom             "the keyword argument buffer_all=True to run().",
399a380bf10Simarom             DeprecationWarning, stacklevel=2)
400a380bf10Simarom    try:
401a380bf10Simarom        run(*arg, **kw)
402a380bf10Simarom    finally:
403a380bf10Simarom        if restore:
404a380bf10Simarom            sys.stderr = stderr
405a380bf10Simarom            sys.stdout = stdout
406a380bf10Simarom    out = buffer.getvalue()
407a380bf10Simarom    print(munge_nose_output_for_doctest(out))
410a380bf10Simaromdef run_buffered(*arg, **kw):
411a380bf10Simarom    kw['buffer_all'] = True
412a380bf10Simarom    run(*arg, **kw)
414a380bf10Simaromif __name__ == '__main__':
415a380bf10Simarom    import doctest
416a380bf10Simarom    doctest.testmod()