1"""
2Testing Plugins
3===============
4
5The plugin interface is well-tested enough to safely unit test your
6use of its hooks with some level of confidence. However, there is also
7a mixin for unittest.TestCase called PluginTester that's designed to
8test plugins in their native runtime environment.
9
10Here's a simple example with a do-nothing plugin and a composed suite.
11
12    >>> import unittest
13    >>> from nose.plugins import Plugin, PluginTester
14    >>> class FooPlugin(Plugin):
15    ...     pass
16    >>> class TestPluginFoo(PluginTester, unittest.TestCase):
17    ...     activate = '--with-foo'
18    ...     plugins = [FooPlugin()]
19    ...     def test_foo(self):
20    ...         for line in self.output:
21    ...             # i.e. check for patterns
22    ...             pass
23    ...
24    ...         # or check for a line containing ...
25    ...         assert "ValueError" in self.output
26    ...     def makeSuite(self):
27    ...         class TC(unittest.TestCase):
28    ...             def runTest(self):
29    ...                 raise ValueError("I hate foo")
30    ...         return [TC('runTest')]
31    ...
32    >>> res = unittest.TestResult()
33    >>> case = TestPluginFoo('test_foo')
34    >>> _ = case(res)
35    >>> res.errors
36    []
37    >>> res.failures
38    []
39    >>> res.wasSuccessful()
40    True
41    >>> res.testsRun
42    1
43
44And here is a more complex example of testing a plugin that has extra
45arguments and reads environment variables.
46
47    >>> import unittest, os
48    >>> from nose.plugins import Plugin, PluginTester
49    >>> class FancyOutputter(Plugin):
50    ...     name = "fancy"
51    ...     def configure(self, options, conf):
52    ...         Plugin.configure(self, options, conf)
53    ...         if not self.enabled:
54    ...             return
55    ...         self.fanciness = 1
56    ...         if options.more_fancy:
57    ...             self.fanciness = 2
58    ...         if 'EVEN_FANCIER' in self.env:
59    ...             self.fanciness = 3
60    ...
61    ...     def options(self, parser, env=os.environ):
62    ...         self.env = env
63    ...         parser.add_option('--more-fancy', action='store_true')
64    ...         Plugin.options(self, parser, env=env)
65    ...
66    ...     def report(self, stream):
67    ...         stream.write("FANCY " * self.fanciness)
68    ...
69    >>> class TestFancyOutputter(PluginTester, unittest.TestCase):
70    ...     activate = '--with-fancy' # enables the plugin
71    ...     plugins = [FancyOutputter()]
72    ...     args = ['--more-fancy']
73    ...     env = {'EVEN_FANCIER': '1'}
74    ...
75    ...     def test_fancy_output(self):
76    ...         assert "FANCY FANCY FANCY" in self.output, (
77    ...                                         "got: %s" % self.output)
78    ...     def makeSuite(self):
79    ...         class TC(unittest.TestCase):
80    ...             def runTest(self):
81    ...                 raise ValueError("I hate fancy stuff")
82    ...         return [TC('runTest')]
83    ...
84    >>> res = unittest.TestResult()
85    >>> case = TestFancyOutputter('test_fancy_output')
86    >>> _ = case(res)
87    >>> res.errors
88    []
89    >>> res.failures
90    []
91    >>> res.wasSuccessful()
92    True
93    >>> res.testsRun
94    1
95
96"""
97
98import re
99import sys
100from warnings import warn
101
102try:
103    from io import StringIO
104except ImportError:
105    from io import StringIO
106
107__all__ = ['PluginTester', 'run']
108
109from os import getpid
110class MultiProcessFile(object):
111    """
112    helper for testing multiprocessing
113
114    multiprocessing poses a problem for doctests, since the strategy
115    of replacing sys.stdout/stderr with file-like objects then
116    inspecting the results won't work: the child processes will
117    write to the objects, but the data will not be reflected
118    in the parent doctest-ing process.
119
120    The solution is to create file-like objects which will interact with
121    multiprocessing in a more desirable way.
122
123    All processes can write to this object, but only the creator can read.
124    This allows the testing system to see a unified picture of I/O.
125    """
126    def __init__(self):
127        # per advice at:
128        #    http://docs.python.org/library/multiprocessing.html#all-platforms
129        self.__master = getpid()
130        self.__queue = Manager().Queue()
131        self.__buffer = StringIO()
132        self.softspace = 0
133
134    def buffer(self):
135        if getpid() != self.__master:
136            return
137
138        from queue import Empty
139        from collections import defaultdict
140        cache = defaultdict(str)
141        while True:
142            try:
143                pid, data = self.__queue.get_nowait()
144            except Empty:
145                break
146            if pid == ():
147                #show parent output after children
148                #this is what users see, usually
149                pid = ( 1e100, ) # googol!
150            cache[pid] += data
151        for pid in sorted(cache):
152            #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG
153            self.__buffer.write( cache[pid] )
154    def write(self, data):
155        # note that these pids are in the form of current_process()._identity
156        # rather than OS pids
157        from multiprocessing import current_process
158        pid = current_process()._identity
159        self.__queue.put((pid, data))
160    def __iter__(self):
161        "getattr doesn't work for iter()"
162        self.buffer()
163        return self.__buffer
164    def seek(self, offset, whence=0):
165        self.buffer()
166        return self.__buffer.seek(offset, whence)
167    def getvalue(self):
168        self.buffer()
169        return self.__buffer.getvalue()
170    def __getattr__(self, attr):
171        return getattr(self.__buffer, attr)
172
173try:
174    from multiprocessing import Manager
175    Buffer = MultiProcessFile
176except ImportError:
177    Buffer = StringIO
178
179class PluginTester(object):
180    """A mixin for testing nose plugins in their runtime environment.
181
182    Subclass this and mix in unittest.TestCase to run integration/functional
183    tests on your plugin.  When setUp() is called, the stub test suite is
184    executed with your plugin so that during an actual test you can inspect the
185    artifacts of how your plugin interacted with the stub test suite.
186
187    - activate
188
189      - the argument to send nosetests to activate the plugin
190
191    - suitepath
192
193      - if set, this is the path of the suite to test. Otherwise, you
194        will need to use the hook, makeSuite()
195
196    - plugins
197
198      - the list of plugins to make available during the run. Note
199        that this does not mean these plugins will be *enabled* during
200        the run -- only the plugins enabled by the activate argument
201        or other settings in argv or env will be enabled.
202
203    - args
204
205      - a list of arguments to add to the nosetests command, in addition to
206        the activate argument
207
208    - env
209
210      - optional dict of environment variables to send nosetests
211
212    """
213    activate = None
214    suitepath = None
215    args = None
216    env = {}
217    argv = None
218    plugins = []
219    ignoreFiles = None
220
221    def makeSuite(self):
222        """returns a suite object of tests to run (unittest.TestSuite())
223
224        If self.suitepath is None, this must be implemented. The returned suite
225        object will be executed with all plugins activated.  It may return
226        None.
227
228        Here is an example of a basic suite object you can return ::
229
230            >>> import unittest
231            >>> class SomeTest(unittest.TestCase):
232            ...     def runTest(self):
233            ...         raise ValueError("Now do something, plugin!")
234            ...
235            >>> unittest.TestSuite([SomeTest()]) # doctest: +ELLIPSIS
236            <unittest...TestSuite tests=[<...SomeTest testMethod=runTest>]>
237
238        """
239        raise NotImplementedError
240
241    def _execPlugin(self):
242        """execute the plugin on the internal test suite.
243        """
244        from nose.config import Config
245        from nose.core import TestProgram
246        from nose.plugins.manager import PluginManager
247
248        suite = None
249        stream = Buffer()
250        conf = Config(env=self.env,
251                      stream=stream,
252                      plugins=PluginManager(plugins=self.plugins))
253        if self.ignoreFiles is not None:
254            conf.ignoreFiles = self.ignoreFiles
255        if not self.suitepath:
256            suite = self.makeSuite()
257
258        self.nose = TestProgram(argv=self.argv, config=conf, suite=suite,
259                                exit=False)
260        self.output = AccessDecorator(stream)
261
262    def setUp(self):
263        """runs nosetests with the specified test suite, all plugins
264        activated.
265        """
266        self.argv = ['nosetests', self.activate]
267        if self.args:
268            self.argv.extend(self.args)
269        if self.suitepath:
270            self.argv.append(self.suitepath)
271
272        self._execPlugin()
273
274
275class AccessDecorator(object):
276    stream = None
277    _buf = None
278    def __init__(self, stream):
279        self.stream = stream
280        stream.seek(0)
281        self._buf = stream.read()
282        stream.seek(0)
283    def __contains__(self, val):
284        return val in self._buf
285    def __iter__(self):
286        return iter(self.stream)
287    def __str__(self):
288        return self._buf
289
290
291def blankline_separated_blocks(text):
292    "a bunch of === characters is also considered a blank line"
293    block = []
294    for line in text.splitlines(True):
295        block.append(line)
296        line = line.strip()
297        if not line or line.startswith('===') and not line.strip('='):
298            yield "".join(block)
299            block = []
300    if block:
301        yield "".join(block)
302
303
304def remove_stack_traces(out):
305    # this regexp taken from Python 2.5's doctest
306    traceback_re = re.compile(r"""
307        # Grab the traceback header.  Different versions of Python have
308        # said different things on the first traceback line.
309        ^(?P<hdr> Traceback\ \(
310            (?: most\ recent\ call\ last
311            |   innermost\ last
312            ) \) :
313        )
314        \s* $                   # toss trailing whitespace on the header.
315        (?P<stack> .*?)         # don't blink: absorb stuff until...
316        ^(?=\w)                 #     a line *starts* with alphanum.
317        .*?(?P<exception> \w+ ) # exception name
318        (?P<msg> [:\n] .*)      # the rest
319        """, re.VERBOSE | re.MULTILINE | re.DOTALL)
320    blocks = []
321    for block in blankline_separated_blocks(out):
322        blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
323    return "".join(blocks)
324
325
326def simplify_warnings(out):
327    warn_re = re.compile(r"""
328        # Cut the file and line no, up to the warning name
329        ^.*:\d+:\s
330        (?P<category>\w+): \s+        # warning category
331        (?P<detail>.+) $ \n?          # warning message
332        ^ .* $                        # stack frame
333        """, re.VERBOSE | re.MULTILINE)
334    return warn_re.sub(r"\g<category>: \g<detail>", out)
335
336
337def remove_timings(out):
338    return re.sub(
339        r"Ran (\d+ tests?) in [0-9.]+s", r"Ran \1 in ...s", out)
340
341
342def munge_nose_output_for_doctest(out):
343    """Modify nose output to make it easy to use in doctests."""
344    out = remove_stack_traces(out)
345    out = simplify_warnings(out)
346    out = remove_timings(out)
347    return out.strip()
348
349
350def run(*arg, **kw):
351    """
352    Specialized version of nose.run for use inside of doctests that
353    test test runs.
354
355    This version of run() prints the result output to stdout.  Before
356    printing, the output is processed by replacing the timing
357    information with an ellipsis (...), removing traceback stacks, and
358    removing trailing whitespace.
359
360    Use this version of run wherever you are writing a doctest that
361    tests nose (or unittest) test result output.
362
363    Note: do not use doctest: +ELLIPSIS when testing nose output,
364    since ellipses ("test_foo ... ok") in your expected test runner
365    output may match multiple lines of output, causing spurious test
366    passes!
367    """
368    from nose import run
369    from nose.config import Config
370    from nose.plugins.manager import PluginManager
371
372    buffer = Buffer()
373    if 'config' not in kw:
374        plugins = kw.pop('plugins', [])
375        if isinstance(plugins, list):
376            plugins = PluginManager(plugins=plugins)
377        env = kw.pop('env', {})
378        kw['config'] = Config(env=env, plugins=plugins)
379    if 'argv' not in kw:
380        kw['argv'] = ['nosetests', '-v']
381    kw['config'].stream = buffer
382
383    # Set up buffering so that all output goes to our buffer,
384    # or warn user if deprecated behavior is active. If this is not
385    # done, prints and warnings will either be out of place or
386    # disappear.
387    stderr = sys.stderr
388    stdout = sys.stdout
389    if kw.pop('buffer_all', False):
390        sys.stdout = sys.stderr = buffer
391        restore = True
392    else:
393        restore = False
394        warn("The behavior of nose.plugins.plugintest.run() will change in "
395             "the next release of nose. The current behavior does not "
396             "correctly account for output to stdout and stderr. To enable "
397             "correct behavior, use run_buffered() instead, or pass "
398             "the keyword argument buffer_all=True to run().",
399             DeprecationWarning, stacklevel=2)
400    try:
401        run(*arg, **kw)
402    finally:
403        if restore:
404            sys.stderr = stderr
405            sys.stdout = stdout
406    out = buffer.getvalue()
407    print(munge_nose_output_for_doctest(out))
408
409
410def run_buffered(*arg, **kw):
411    kw['buffer_all'] = True
412    run(*arg, **kw)
413
414if __name__ == '__main__':
415    import doctest
416    doctest.testmod()
417
418