1823b8294SYaroslav Brustinov"""
2823b8294SYaroslav BrustinovOverview
3823b8294SYaroslav Brustinov========
4823b8294SYaroslav Brustinov
5823b8294SYaroslav BrustinovThe multiprocess plugin enables you to distribute your test run among a set of
6823b8294SYaroslav Brustinovworker processes that run tests in parallel. This can speed up CPU-bound test
7823b8294SYaroslav Brustinovruns (as long as the number of work processeses is around the number of
8823b8294SYaroslav Brustinovprocessors or cores available), but is mainly useful for IO-bound tests that
9823b8294SYaroslav Brustinovspend most of their time waiting for data to arrive from someplace else.
10823b8294SYaroslav Brustinov
11823b8294SYaroslav Brustinov.. note ::
12823b8294SYaroslav Brustinov
13823b8294SYaroslav Brustinov   See :doc:`../doc_tests/test_multiprocess/multiprocess` for
14823b8294SYaroslav Brustinov   additional documentation and examples. Use of this plugin on python
15823b8294SYaroslav Brustinov   2.5 or earlier requires the multiprocessing_ module, also available
16823b8294SYaroslav Brustinov   from PyPI.
17823b8294SYaroslav Brustinov
18823b8294SYaroslav Brustinov.. _multiprocessing : http://code.google.com/p/python-multiprocessing/
19823b8294SYaroslav Brustinov
20823b8294SYaroslav BrustinovHow tests are distributed
21823b8294SYaroslav Brustinov=========================
22823b8294SYaroslav Brustinov
23823b8294SYaroslav BrustinovThe ideal case would be to dispatch each test to a worker process
24823b8294SYaroslav Brustinovseparately. This ideal is not attainable in all cases, however, because many
25823b8294SYaroslav Brustinovtest suites depend on context (class, module or package) fixtures.
26823b8294SYaroslav Brustinov
27823b8294SYaroslav BrustinovThe plugin can't know (unless you tell it -- see below!) if a context fixture
28823b8294SYaroslav Brustinovcan be called many times concurrently (is re-entrant), or if it can be shared
29823b8294SYaroslav Brustinovamong tests running in different processes. Therefore, if a context has
30823b8294SYaroslav Brustinovfixtures, the default behavior is to dispatch the entire suite to a worker as
31823b8294SYaroslav Brustinova unit.
32823b8294SYaroslav Brustinov
33823b8294SYaroslav BrustinovControlling distribution
34823b8294SYaroslav Brustinov^^^^^^^^^^^^^^^^^^^^^^^^
35823b8294SYaroslav Brustinov
36823b8294SYaroslav BrustinovThere are two context-level variables that you can use to control this default
37823b8294SYaroslav Brustinovbehavior.
38823b8294SYaroslav Brustinov
39823b8294SYaroslav BrustinovIf a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
40823b8294SYaroslav Brustinovin the context, and the plugin will dispatch tests in suites bound to that
41823b8294SYaroslav Brustinovcontext as if the context had no fixtures. This means that the fixtures will
42823b8294SYaroslav Brustinovexecute concurrently and multiple times, typically once per test.
43823b8294SYaroslav Brustinov
44823b8294SYaroslav BrustinovIf a context's fixtures can be shared by tests running in different processes
45823b8294SYaroslav Brustinov-- such as a package-level fixture that starts an external http server or
46823b8294SYaroslav Brustinovinitializes a shared database -- then set ``_multiprocess_shared_ = True`` in
47823b8294SYaroslav Brustinovthe context. These fixtures will then execute in the primary nose process, and
48823b8294SYaroslav Brustinovtests in those contexts will be individually dispatched to run in parallel.
49823b8294SYaroslav Brustinov
50823b8294SYaroslav BrustinovHow results are collected and reported
51823b8294SYaroslav Brustinov======================================
52823b8294SYaroslav Brustinov
53823b8294SYaroslav BrustinovAs each test or suite executes in a worker process, results (failures, errors,
54823b8294SYaroslav Brustinovand specially handled exceptions like SkipTest) are collected in that
55823b8294SYaroslav Brustinovprocess. When the worker process finishes, it returns results to the main
56823b8294SYaroslav Brustinovnose process. There, any progress output is printed (dots!), and the
57823b8294SYaroslav Brustinovresults from the test run are combined into a consolidated result
58823b8294SYaroslav Brustinovset. When results have been received for all dispatched tests, or all
59823b8294SYaroslav Brustinovworkers have died, the result summary is output as normal.
60823b8294SYaroslav Brustinov
61823b8294SYaroslav BrustinovBeware!
62823b8294SYaroslav Brustinov=======
63823b8294SYaroslav Brustinov
64823b8294SYaroslav BrustinovNot all test suites will benefit from, or even operate correctly using, this
65823b8294SYaroslav Brustinovplugin. For example, CPU-bound tests will run more slowly if you don't have
66823b8294SYaroslav Brustinovmultiple processors. There are also some differences in plugin
67823b8294SYaroslav Brustinovinteractions and behaviors due to the way in which tests are dispatched and
68823b8294SYaroslav Brustinovloaded. In general, test loading under this plugin operates as if it were
69823b8294SYaroslav Brustinovalways in directed mode instead of discovered mode. For instance, doctests
70823b8294SYaroslav Brustinovin test modules will always be found when using this plugin with the doctest
71823b8294SYaroslav Brustinovplugin.
72823b8294SYaroslav Brustinov
73823b8294SYaroslav BrustinovBut the biggest issue you will face is probably concurrency. Unless you
74823b8294SYaroslav Brustinovhave kept your tests as religiously pure unit tests, with no side-effects, no
75823b8294SYaroslav Brustinovordering issues, and no external dependencies, chances are you will experience
76823b8294SYaroslav Brustinovodd, intermittent and unexplainable failures and errors when using this
77823b8294SYaroslav Brustinovplugin. This doesn't necessarily mean the plugin is broken; it may mean that
78823b8294SYaroslav Brustinovyour test suite is not safe for concurrency.
79823b8294SYaroslav Brustinov
80823b8294SYaroslav BrustinovNew Features in 1.1.0
81823b8294SYaroslav Brustinov=====================
82823b8294SYaroslav Brustinov
83823b8294SYaroslav Brustinov* functions generated by test generators are now added to the worker queue
84823b8294SYaroslav Brustinov  making them multi-threaded.
85823b8294SYaroslav Brustinov* fixed timeout functionality, now functions will be terminated with a
86823b8294SYaroslav Brustinov  TimedOutException exception when they exceed their execution time. The
87823b8294SYaroslav Brustinov  worker processes are not terminated.
88823b8294SYaroslav Brustinov* added ``--process-restartworker`` option to restart workers once they are
89823b8294SYaroslav Brustinov  done, this helps control memory usage. Sometimes memory leaks can accumulate
90823b8294SYaroslav Brustinov  making long runs very difficult.
91823b8294SYaroslav Brustinov* added global _instantiate_plugins to configure which plugins are started
92823b8294SYaroslav Brustinov  on the worker processes.
93823b8294SYaroslav Brustinov
94823b8294SYaroslav Brustinov"""
95823b8294SYaroslav Brustinov
96823b8294SYaroslav Brustinovimport logging
97823b8294SYaroslav Brustinovimport os
98823b8294SYaroslav Brustinovimport sys
99823b8294SYaroslav Brustinovimport time
100823b8294SYaroslav Brustinovimport traceback
101823b8294SYaroslav Brustinovimport unittest
102823b8294SYaroslav Brustinovimport pickle
103823b8294SYaroslav Brustinovimport signal
104823b8294SYaroslav Brustinovimport nose.case
105823b8294SYaroslav Brustinovfrom nose.core import TextTestRunner
106823b8294SYaroslav Brustinovfrom nose import failure
107823b8294SYaroslav Brustinovfrom nose import loader
108823b8294SYaroslav Brustinovfrom nose.plugins.base import Plugin
109823b8294SYaroslav Brustinovfrom nose.pyversion import bytes_
110823b8294SYaroslav Brustinovfrom nose.result import TextTestResult
111823b8294SYaroslav Brustinovfrom nose.suite import ContextSuite
112823b8294SYaroslav Brustinovfrom nose.util import test_address
113823b8294SYaroslav Brustinovtry:
114823b8294SYaroslav Brustinov    # 2.7+
115823b8294SYaroslav Brustinov    from unittest.runner import _WritelnDecorator
116823b8294SYaroslav Brustinovexcept ImportError:
117823b8294SYaroslav Brustinov    from unittest import _WritelnDecorator
118823b8294SYaroslav Brustinovfrom Queue import Empty
119823b8294SYaroslav Brustinovfrom warnings import warn
120823b8294SYaroslav Brustinovtry:
121823b8294SYaroslav Brustinov    from cStringIO import StringIO
122823b8294SYaroslav Brustinovexcept ImportError:
123823b8294SYaroslav Brustinov    import StringIO
124823b8294SYaroslav Brustinov
125823b8294SYaroslav Brustinov# this is a list of plugin classes that will be checked for and created inside
126823b8294SYaroslav Brustinov# each worker process
127823b8294SYaroslav Brustinov_instantiate_plugins = None
128823b8294SYaroslav Brustinov
129823b8294SYaroslav Brustinovlog = logging.getLogger(__name__)
130823b8294SYaroslav Brustinov
131823b8294SYaroslav BrustinovProcess = Queue = Pool = Event = Value = Array = None
132823b8294SYaroslav Brustinov
133823b8294SYaroslav Brustinov# have to inherit KeyboardInterrupt to it will interrupt process properly
134823b8294SYaroslav Brustinovclass TimedOutException(KeyboardInterrupt):
135823b8294SYaroslav Brustinov    def __init__(self, value = "Timed Out"):
136823b8294SYaroslav Brustinov        self.value = value
137823b8294SYaroslav Brustinov    def __str__(self):
138823b8294SYaroslav Brustinov        return repr(self.value)
139823b8294SYaroslav Brustinov
140823b8294SYaroslav Brustinovdef _import_mp():
141823b8294SYaroslav Brustinov    global Process, Queue, Pool, Event, Value, Array
142823b8294SYaroslav Brustinov    try:
143823b8294SYaroslav Brustinov        from multiprocessing import Manager, Process
144823b8294SYaroslav Brustinov        #prevent the server process created in the manager which holds Python
145823b8294SYaroslav Brustinov        #objects and allows other processes to manipulate them using proxies
146823b8294SYaroslav Brustinov        #to interrupt on SIGINT (keyboardinterrupt) so that the communication
147823b8294SYaroslav Brustinov        #channel between subprocesses and main process is still usable after
148823b8294SYaroslav Brustinov        #ctrl+C is received in the main process.
149823b8294SYaroslav Brustinov        old=signal.signal(signal.SIGINT, signal.SIG_IGN)
150823b8294SYaroslav Brustinov        m = Manager()
151823b8294SYaroslav Brustinov        #reset it back so main process will receive a KeyboardInterrupt
152823b8294SYaroslav Brustinov        #exception on ctrl+c
153823b8294SYaroslav Brustinov        signal.signal(signal.SIGINT, old)
154823b8294SYaroslav Brustinov        Queue, Pool, Event, Value, Array = (
155823b8294SYaroslav Brustinov                m.Queue, m.Pool, m.Event, m.Value, m.Array
156823b8294SYaroslav Brustinov        )
157823b8294SYaroslav Brustinov    except ImportError:
158823b8294SYaroslav Brustinov        warn("multiprocessing module is not available, multiprocess plugin "
159823b8294SYaroslav Brustinov             "cannot be used", RuntimeWarning)
160823b8294SYaroslav Brustinov
161823b8294SYaroslav Brustinov
162823b8294SYaroslav Brustinovclass TestLet:
163823b8294SYaroslav Brustinov    def __init__(self, case):
164823b8294SYaroslav Brustinov        try:
165823b8294SYaroslav Brustinov            self._id = case.id()
166823b8294SYaroslav Brustinov        except AttributeError:
167823b8294SYaroslav Brustinov            pass
168823b8294SYaroslav Brustinov        self._short_description = case.shortDescription()
169823b8294SYaroslav Brustinov        self._str = str(case)
170823b8294SYaroslav Brustinov
171823b8294SYaroslav Brustinov    def id(self):
172823b8294SYaroslav Brustinov        return self._id
173823b8294SYaroslav Brustinov
174823b8294SYaroslav Brustinov    def shortDescription(self):
175823b8294SYaroslav Brustinov        return self._short_description
176823b8294SYaroslav Brustinov
177823b8294SYaroslav Brustinov    def __str__(self):
178823b8294SYaroslav Brustinov        return self._str
179823b8294SYaroslav Brustinov
180823b8294SYaroslav Brustinovclass MultiProcess(Plugin):
181823b8294SYaroslav Brustinov    """
182823b8294SYaroslav Brustinov    Run tests in multiple processes. Requires processing module.
183823b8294SYaroslav Brustinov    """
184823b8294SYaroslav Brustinov    score = 1000
185823b8294SYaroslav Brustinov    status = {}
186823b8294SYaroslav Brustinov
187823b8294SYaroslav Brustinov    def options(self, parser, env):
188823b8294SYaroslav Brustinov        """
189823b8294SYaroslav Brustinov        Register command-line options.
190823b8294SYaroslav Brustinov        """
191823b8294SYaroslav Brustinov        parser.add_option("--processes", action="store",
192823b8294SYaroslav Brustinov                          default=env.get('NOSE_PROCESSES', 0),
193823b8294SYaroslav Brustinov                          dest="multiprocess_workers",
194823b8294SYaroslav Brustinov                          metavar="NUM",
195823b8294SYaroslav Brustinov                          help="Spread test run among this many processes. "
196823b8294SYaroslav Brustinov                          "Set a number equal to the number of processors "
197823b8294SYaroslav Brustinov                          "or cores in your machine for best results. "
198823b8294SYaroslav Brustinov                          "Pass a negative number to have the number of "
199823b8294SYaroslav Brustinov                          "processes automatically set to the number of "
200823b8294SYaroslav Brustinov                          "cores. Passing 0 means to disable parallel "
201823b8294SYaroslav Brustinov                          "testing. Default is 0 unless NOSE_PROCESSES is "
202823b8294SYaroslav Brustinov                          "set. "
203823b8294SYaroslav Brustinov                          "[NOSE_PROCESSES]")
204823b8294SYaroslav Brustinov        parser.add_option("--process-timeout", action="store",
205823b8294SYaroslav Brustinov                          default=env.get('NOSE_PROCESS_TIMEOUT', 10),
206823b8294SYaroslav Brustinov                          dest="multiprocess_timeout",
207823b8294SYaroslav Brustinov                          metavar="SECONDS",
208823b8294SYaroslav Brustinov                          help="Set timeout for return of results from each "
209823b8294SYaroslav Brustinov                          "test runner process. Default is 10. "
210823b8294SYaroslav Brustinov                          "[NOSE_PROCESS_TIMEOUT]")
211823b8294SYaroslav Brustinov        parser.add_option("--process-restartworker", action="store_true",
212823b8294SYaroslav Brustinov                          default=env.get('NOSE_PROCESS_RESTARTWORKER', False),
213823b8294SYaroslav Brustinov                          dest="multiprocess_restartworker",
214823b8294SYaroslav Brustinov                          help="If set, will restart each worker process once"
215823b8294SYaroslav Brustinov                          " their tests are done, this helps control memory "
216823b8294SYaroslav Brustinov                          "leaks from killing the system. "
217823b8294SYaroslav Brustinov                          "[NOSE_PROCESS_RESTARTWORKER]")
218823b8294SYaroslav Brustinov
219823b8294SYaroslav Brustinov    def configure(self, options, config):
220823b8294SYaroslav Brustinov        """
221823b8294SYaroslav Brustinov        Configure plugin.
222823b8294SYaroslav Brustinov        """
223823b8294SYaroslav Brustinov        try:
224823b8294SYaroslav Brustinov            self.status.pop('active')
225823b8294SYaroslav Brustinov        except KeyError:
226823b8294SYaroslav Brustinov            pass
227823b8294SYaroslav Brustinov        if not hasattr(options, 'multiprocess_workers'):
228823b8294SYaroslav Brustinov            self.enabled = False
229823b8294SYaroslav Brustinov            return
230823b8294SYaroslav Brustinov        # don't start inside of a worker process
231823b8294SYaroslav Brustinov        if config.worker:
232823b8294SYaroslav Brustinov            return
233823b8294SYaroslav Brustinov        self.config = config
234823b8294SYaroslav Brustinov        try:
235823b8294SYaroslav Brustinov            workers = int(options.multiprocess_workers)
236823b8294SYaroslav Brustinov        except (TypeError, ValueError):
237823b8294SYaroslav Brustinov            workers = 0
238823b8294SYaroslav Brustinov        if workers:
239823b8294SYaroslav Brustinov            _import_mp()
240823b8294SYaroslav Brustinov            if Process is None:
241823b8294SYaroslav Brustinov                self.enabled = False
242823b8294SYaroslav Brustinov                return
243823b8294SYaroslav Brustinov            # Negative number of workers will cause multiprocessing to hang.
244823b8294SYaroslav Brustinov            # Set the number of workers to the CPU count to avoid this.
245823b8294SYaroslav Brustinov            if workers < 0:
246823b8294SYaroslav Brustinov                try:
247823b8294SYaroslav Brustinov                    import multiprocessing
248823b8294SYaroslav Brustinov                    workers = multiprocessing.cpu_count()
249823b8294SYaroslav Brustinov                except NotImplementedError:
250823b8294SYaroslav Brustinov                    self.enabled = False
251823b8294SYaroslav Brustinov                    return
252823b8294SYaroslav Brustinov            self.enabled = True
253823b8294SYaroslav Brustinov            self.config.multiprocess_workers = workers
254823b8294SYaroslav Brustinov            t = float(options.multiprocess_timeout)
255823b8294SYaroslav Brustinov            self.config.multiprocess_timeout = t
256823b8294SYaroslav Brustinov            r = int(options.multiprocess_restartworker)
257823b8294SYaroslav Brustinov            self.config.multiprocess_restartworker = r
258823b8294SYaroslav Brustinov            self.status['active'] = True
259823b8294SYaroslav Brustinov
260823b8294SYaroslav Brustinov    def prepareTestLoader(self, loader):
261823b8294SYaroslav Brustinov        """Remember loader class so MultiProcessTestRunner can instantiate
262823b8294SYaroslav Brustinov        the right loader.
263823b8294SYaroslav Brustinov        """
264823b8294SYaroslav Brustinov        self.loaderClass = loader.__class__
265823b8294SYaroslav Brustinov
266823b8294SYaroslav Brustinov    def prepareTestRunner(self, runner):
267823b8294SYaroslav Brustinov        """Replace test runner with MultiProcessTestRunner.
268823b8294SYaroslav Brustinov        """
269823b8294SYaroslav Brustinov        # replace with our runner class
270823b8294SYaroslav Brustinov        return MultiProcessTestRunner(stream=runner.stream,
271823b8294SYaroslav Brustinov                                      verbosity=self.config.verbosity,
272823b8294SYaroslav Brustinov                                      config=self.config,
273823b8294SYaroslav Brustinov                                      loaderClass=self.loaderClass)
274823b8294SYaroslav Brustinov
275823b8294SYaroslav Brustinovdef signalhandler(sig, frame):
276823b8294SYaroslav Brustinov    raise TimedOutException()
277823b8294SYaroslav Brustinov
278823b8294SYaroslav Brustinovclass MultiProcessTestRunner(TextTestRunner):
279823b8294SYaroslav Brustinov    waitkilltime = 5.0 # max time to wait to terminate a process that does not
280823b8294SYaroslav Brustinov                       # respond to SIGILL
281823b8294SYaroslav Brustinov    def __init__(self, **kw):
282823b8294SYaroslav Brustinov        self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
283823b8294SYaroslav Brustinov        super(MultiProcessTestRunner, self).__init__(**kw)
284823b8294SYaroslav Brustinov
285823b8294SYaroslav Brustinov    def collect(self, test, testQueue, tasks, to_teardown, result):
286823b8294SYaroslav Brustinov        # dispatch and collect results
287823b8294SYaroslav Brustinov        # put indexes only on queue because tests aren't picklable
288823b8294SYaroslav Brustinov        for case in self.nextBatch(test):
289823b8294SYaroslav Brustinov            log.debug("Next batch %s (%s)", case, type(case))
290823b8294SYaroslav Brustinov            if (isinstance(case, nose.case.Test) and
291823b8294SYaroslav Brustinov                isinstance(case.test, failure.Failure)):
292823b8294SYaroslav Brustinov                log.debug("Case is a Failure")
293823b8294SYaroslav Brustinov                case(result) # run here to capture the failure
294823b8294SYaroslav Brustinov                continue
295823b8294SYaroslav Brustinov            # handle shared fixtures
296823b8294SYaroslav Brustinov            if isinstance(case, ContextSuite) and case.context is failure.Failure:
297823b8294SYaroslav Brustinov                log.debug("Case is a Failure")
298823b8294SYaroslav Brustinov                case(result) # run here to capture the failure
299823b8294SYaroslav Brustinov                continue
300823b8294SYaroslav Brustinov            elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
301823b8294SYaroslav Brustinov                log.debug("%s has shared fixtures", case)
302823b8294SYaroslav Brustinov                try:
303823b8294SYaroslav Brustinov                    case.setUp()
304823b8294SYaroslav Brustinov                except (KeyboardInterrupt, SystemExit):
305823b8294SYaroslav Brustinov                    raise
306823b8294SYaroslav Brustinov                except:
307823b8294SYaroslav Brustinov                    log.debug("%s setup failed", sys.exc_info())
308823b8294SYaroslav Brustinov                    result.addError(case, sys.exc_info())
309823b8294SYaroslav Brustinov                else:
310823b8294SYaroslav Brustinov                    to_teardown.append(case)
311823b8294SYaroslav Brustinov                    if case.factory:
312823b8294SYaroslav Brustinov                        ancestors=case.factory.context.get(case, [])
313823b8294SYaroslav Brustinov                        for an in ancestors[:2]:
314823b8294SYaroslav Brustinov                            #log.debug('reset ancestor %s', an)
315823b8294SYaroslav Brustinov                            if getattr(an, '_multiprocess_shared_', False):
316823b8294SYaroslav Brustinov                                an._multiprocess_can_split_=True
317823b8294SYaroslav Brustinov                            #an._multiprocess_shared_=False
318823b8294SYaroslav Brustinov                    self.collect(case, testQueue, tasks, to_teardown, result)
319823b8294SYaroslav Brustinov
320823b8294SYaroslav Brustinov            else:
321823b8294SYaroslav Brustinov                test_addr = self.addtask(testQueue,tasks,case)
322823b8294SYaroslav Brustinov                log.debug("Queued test %s (%s) to %s",
323823b8294SYaroslav Brustinov                          len(tasks), test_addr, testQueue)
324823b8294SYaroslav Brustinov
325823b8294SYaroslav Brustinov    def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
326823b8294SYaroslav Brustinov        currentaddr = Value('c',bytes_(''))
327823b8294SYaroslav Brustinov        currentstart = Value('d',time.time())
328823b8294SYaroslav Brustinov        keyboardCaught = Event()
329823b8294SYaroslav Brustinov        p = Process(target=runner,
330823b8294SYaroslav Brustinov                   args=(iworker, testQueue,
331823b8294SYaroslav Brustinov                         resultQueue,
332823b8294SYaroslav Brustinov                         currentaddr,
333823b8294SYaroslav Brustinov                         currentstart,
334823b8294SYaroslav Brustinov                         keyboardCaught,
335823b8294SYaroslav Brustinov                         shouldStop,
336823b8294SYaroslav Brustinov                         self.loaderClass,
337823b8294SYaroslav Brustinov                         result.__class__,
338823b8294SYaroslav Brustinov                         pickle.dumps(self.config)))
339823b8294SYaroslav Brustinov        p.currentaddr = currentaddr
340823b8294SYaroslav Brustinov        p.currentstart = currentstart
341823b8294SYaroslav Brustinov        p.keyboardCaught = keyboardCaught
342823b8294SYaroslav Brustinov        old = signal.signal(signal.SIGILL, signalhandler)
343823b8294SYaroslav Brustinov        p.start()
344823b8294SYaroslav Brustinov        signal.signal(signal.SIGILL, old)
345823b8294SYaroslav Brustinov        return p
346823b8294SYaroslav Brustinov
347823b8294SYaroslav Brustinov    def run(self, test):
348823b8294SYaroslav Brustinov        """
349823b8294SYaroslav Brustinov        Execute the test (which may be a test suite). If the test is a suite,
350823b8294SYaroslav Brustinov        distribute it out among as many processes as have been configured, at
351823b8294SYaroslav Brustinov        as fine a level as is possible given the context fixtures defined in
352823b8294SYaroslav Brustinov        the suite or any sub-suites.
353823b8294SYaroslav Brustinov
354823b8294SYaroslav Brustinov        """
355823b8294SYaroslav Brustinov        log.debug("%s.run(%s) (%s)", self, test, os.getpid())
356823b8294SYaroslav Brustinov        wrapper = self.config.plugins.prepareTest(test)
357823b8294SYaroslav Brustinov        if wrapper is not None:
358823b8294SYaroslav Brustinov            test = wrapper
359823b8294SYaroslav Brustinov
360823b8294SYaroslav Brustinov        # plugins can decorate or capture the output stream
361823b8294SYaroslav Brustinov        wrapped = self.config.plugins.setOutputStream(self.stream)
362823b8294SYaroslav Brustinov        if wrapped is not None:
363823b8294SYaroslav Brustinov            self.stream = wrapped
364823b8294SYaroslav Brustinov
365823b8294SYaroslav Brustinov        testQueue = Queue()
366823b8294SYaroslav Brustinov        resultQueue = Queue()
367823b8294SYaroslav Brustinov        tasks = []
368823b8294SYaroslav Brustinov        completed = []
369823b8294SYaroslav Brustinov        workers = []
370823b8294SYaroslav Brustinov        to_teardown = []
371823b8294SYaroslav Brustinov        shouldStop = Event()
372823b8294SYaroslav Brustinov
373823b8294SYaroslav Brustinov        result = self._makeResult()
374823b8294SYaroslav Brustinov        start = time.time()
375823b8294SYaroslav Brustinov
376823b8294SYaroslav Brustinov        self.collect(test, testQueue, tasks, to_teardown, result)
377823b8294SYaroslav Brustinov
378823b8294SYaroslav Brustinov        log.debug("Starting %s workers", self.config.multiprocess_workers)
379823b8294SYaroslav Brustinov        for i in range(self.config.multiprocess_workers):
380823b8294SYaroslav Brustinov            p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
381823b8294SYaroslav Brustinov            workers.append(p)
382823b8294SYaroslav Brustinov            log.debug("Started worker process %s", i+1)
383823b8294SYaroslav Brustinov
384823b8294SYaroslav Brustinov        total_tasks = len(tasks)
385823b8294SYaroslav Brustinov        # need to keep track of the next time to check for timeouts in case
386823b8294SYaroslav Brustinov        # more than one process times out at the same time.
387823b8294SYaroslav Brustinov        nexttimeout=self.config.multiprocess_timeout
388823b8294SYaroslav Brustinov        thrownError = None
389823b8294SYaroslav Brustinov
390823b8294SYaroslav Brustinov        try:
391823b8294SYaroslav Brustinov            while tasks:
392823b8294SYaroslav Brustinov                log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
393823b8294SYaroslav Brustinov                          len(completed), total_tasks,nexttimeout)
394823b8294SYaroslav Brustinov                try:
395823b8294SYaroslav Brustinov                    iworker, addr, newtask_addrs, batch_result = resultQueue.get(
396823b8294SYaroslav Brustinov                                                            timeout=nexttimeout)
397823b8294SYaroslav Brustinov                    log.debug('Results received for worker %d, %s, new tasks: %d',
398823b8294SYaroslav Brustinov                              iworker,addr,len(newtask_addrs))
399823b8294SYaroslav Brustinov                    try:
400823b8294SYaroslav Brustinov                        try:
401823b8294SYaroslav Brustinov                            tasks.remove(addr)
402823b8294SYaroslav Brustinov                        except ValueError:
403823b8294SYaroslav Brustinov                            log.warn('worker %s failed to remove from tasks: %s',
404823b8294SYaroslav Brustinov                                     iworker,addr)
405823b8294SYaroslav Brustinov                        total_tasks += len(newtask_addrs)
406823b8294SYaroslav Brustinov                        tasks.extend(newtask_addrs)
407823b8294SYaroslav Brustinov                    except KeyError:
408823b8294SYaroslav Brustinov                        log.debug("Got result for unknown task? %s", addr)
409823b8294SYaroslav Brustinov                        log.debug("current: %s",str(list(tasks)[0]))
410823b8294SYaroslav Brustinov                    else:
411823b8294SYaroslav Brustinov                        completed.append([addr,batch_result])
412823b8294SYaroslav Brustinov                    self.consolidate(result, batch_result)
413823b8294SYaroslav Brustinov                    if (self.config.stopOnError
414823b8294SYaroslav Brustinov                        and not result.wasSuccessful()):
415823b8294SYaroslav Brustinov                        # set the stop condition
416823b8294SYaroslav Brustinov                        shouldStop.set()
417823b8294SYaroslav Brustinov                        break
418823b8294SYaroslav Brustinov                    if self.config.multiprocess_restartworker:
419823b8294SYaroslav Brustinov                        log.debug('joining worker %s',iworker)
420823b8294SYaroslav Brustinov                        # wait for working, but not that important if worker
421823b8294SYaroslav Brustinov                        # cannot be joined in fact, for workers that add to
422823b8294SYaroslav Brustinov                        # testQueue, they will not terminate until all their
423823b8294SYaroslav Brustinov                        # items are read
424823b8294SYaroslav Brustinov                        workers[iworker].join(timeout=1)
425823b8294SYaroslav Brustinov                        if not shouldStop.is_set() and not testQueue.empty():
426823b8294SYaroslav Brustinov                            log.debug('starting new process on worker %s',iworker)
427823b8294SYaroslav Brustinov                            workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
428823b8294SYaroslav Brustinov                except Empty:
429823b8294SYaroslav Brustinov                    log.debug("Timed out with %s tasks pending "
430823b8294SYaroslav Brustinov                              "(empty testQueue=%r): %s",
431823b8294SYaroslav Brustinov                              len(tasks),testQueue.empty(),str(tasks))
432823b8294SYaroslav Brustinov                    any_alive = False
433823b8294SYaroslav Brustinov                    for iworker, w in enumerate(workers):
434823b8294SYaroslav Brustinov                        if w.is_alive():
435823b8294SYaroslav Brustinov                            worker_addr = bytes_(w.currentaddr.value,'ascii')
436823b8294SYaroslav Brustinov                            timeprocessing = time.time() - w.currentstart.value
437823b8294SYaroslav Brustinov                            if ( len(worker_addr) == 0
438823b8294SYaroslav Brustinov                                    and timeprocessing > self.config.multiprocess_timeout-0.1):
439823b8294SYaroslav Brustinov                                log.debug('worker %d has finished its work item, '
440823b8294SYaroslav Brustinov                                          'but is not exiting? do we wait for it?',
441823b8294SYaroslav Brustinov                                          iworker)
442823b8294SYaroslav Brustinov                            else:
443823b8294SYaroslav Brustinov                                any_alive = True
444823b8294SYaroslav Brustinov                            if (len(worker_addr) > 0
445823b8294SYaroslav Brustinov                                and timeprocessing > self.config.multiprocess_timeout-0.1):
446823b8294SYaroslav Brustinov                                log.debug('timed out worker %s: %s',
447823b8294SYaroslav Brustinov                                          iworker,worker_addr)
448823b8294SYaroslav Brustinov                                w.currentaddr.value = bytes_('')
449823b8294SYaroslav Brustinov                                # If the process is in C++ code, sending a SIGILL
450823b8294SYaroslav Brustinov                                # might not send a python KeybordInterrupt exception
451823b8294SYaroslav Brustinov                                # therefore, send multiple signals until an
452823b8294SYaroslav Brustinov                                # exception is caught. If this takes too long, then
453823b8294SYaroslav Brustinov                                # terminate the process
454823b8294SYaroslav Brustinov                                w.keyboardCaught.clear()
455823b8294SYaroslav Brustinov                                startkilltime = time.time()
456823b8294SYaroslav Brustinov                                while not w.keyboardCaught.is_set() and w.is_alive():
457823b8294SYaroslav Brustinov                                    if time.time()-startkilltime > self.waitkilltime:
458823b8294SYaroslav Brustinov                                        # have to terminate...
459823b8294SYaroslav Brustinov                                        log.error("terminating worker %s",iworker)
460823b8294SYaroslav Brustinov                                        w.terminate()
461823b8294SYaroslav Brustinov                                        # there is a small probability that the
462823b8294SYaroslav Brustinov                                        # terminated process might send a result,
463823b8294SYaroslav Brustinov                                        # which has to be specially handled or
464823b8294SYaroslav Brustinov                                        # else processes might get orphaned.
465823b8294SYaroslav Brustinov                                        workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
466823b8294SYaroslav Brustinov                                        break
467823b8294SYaroslav Brustinov                                    os.kill(w.pid, signal.SIGILL)
468823b8294SYaroslav Brustinov                                    time.sleep(0.1)
469823b8294SYaroslav Brustinov                    if not any_alive and testQueue.empty():
470823b8294SYaroslav Brustinov                        log.debug("All workers dead")
471823b8294SYaroslav Brustinov                        break
472823b8294SYaroslav Brustinov                nexttimeout=self.config.multiprocess_timeout
473823b8294SYaroslav Brustinov                for w in workers:
474823b8294SYaroslav Brustinov                    if w.is_alive() and len(w.currentaddr.value) > 0:
475823b8294SYaroslav Brustinov                        timeprocessing = time.time()-w.currentstart.value
476823b8294SYaroslav Brustinov                        if timeprocessing <= self.config.multiprocess_timeout:
477823b8294SYaroslav Brustinov                            nexttimeout = min(nexttimeout,
478823b8294SYaroslav Brustinov                                self.config.multiprocess_timeout-timeprocessing)
479823b8294SYaroslav Brustinov            log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
480823b8294SYaroslav Brustinov
481823b8294SYaroslav Brustinov        except (KeyboardInterrupt, SystemExit), e:
482823b8294SYaroslav Brustinov            log.info('parent received ctrl-c when waiting for test results')
483823b8294SYaroslav Brustinov            thrownError = e
484823b8294SYaroslav Brustinov            #resultQueue.get(False)
485823b8294SYaroslav Brustinov
486823b8294SYaroslav Brustinov            result.addError(test, sys.exc_info())
487823b8294SYaroslav Brustinov
488823b8294SYaroslav Brustinov        try:
489823b8294SYaroslav Brustinov            for case in to_teardown:
490823b8294SYaroslav Brustinov                log.debug("Tearing down shared fixtures for %s", case)
491823b8294SYaroslav Brustinov                try:
492823b8294SYaroslav Brustinov                    case.tearDown()
493823b8294SYaroslav Brustinov                except (KeyboardInterrupt, SystemExit):
494823b8294SYaroslav Brustinov                    raise
495823b8294SYaroslav Brustinov                except:
496823b8294SYaroslav Brustinov                    result.addError(case, sys.exc_info())
497823b8294SYaroslav Brustinov
498823b8294SYaroslav Brustinov            stop = time.time()
499823b8294SYaroslav Brustinov
500823b8294SYaroslav Brustinov            # first write since can freeze on shutting down processes
501823b8294SYaroslav Brustinov            result.printErrors()
502823b8294SYaroslav Brustinov            result.printSummary(start, stop)
503823b8294SYaroslav Brustinov            self.config.plugins.finalize(result)
504823b8294SYaroslav Brustinov
505823b8294SYaroslav Brustinov            if thrownError is None:
506823b8294SYaroslav Brustinov                log.debug("Tell all workers to stop")
507823b8294SYaroslav Brustinov                for w in workers:
508823b8294SYaroslav Brustinov                    if w.is_alive():
509823b8294SYaroslav Brustinov                        testQueue.put('STOP', block=False)
510823b8294SYaroslav Brustinov
511823b8294SYaroslav Brustinov            # wait for the workers to end
512823b8294SYaroslav Brustinov            for iworker,worker in enumerate(workers):
513823b8294SYaroslav Brustinov                if worker.is_alive():
514823b8294SYaroslav Brustinov                    log.debug('joining worker %s',iworker)
515823b8294SYaroslav Brustinov                    worker.join()
516823b8294SYaroslav Brustinov                    if worker.is_alive():
517823b8294SYaroslav Brustinov                        log.debug('failed to join worker %s',iworker)
518823b8294SYaroslav Brustinov        except (KeyboardInterrupt, SystemExit):
519823b8294SYaroslav Brustinov            log.info('parent received ctrl-c when shutting down: stop all processes')
520823b8294SYaroslav Brustinov            for worker in workers:
521823b8294SYaroslav Brustinov                if worker.is_alive():
522823b8294SYaroslav Brustinov                    worker.terminate()
523823b8294SYaroslav Brustinov
524823b8294SYaroslav Brustinov            if thrownError: raise thrownError
525823b8294SYaroslav Brustinov            else: raise
526823b8294SYaroslav Brustinov
527823b8294SYaroslav Brustinov        return result
528823b8294SYaroslav Brustinov
529823b8294SYaroslav Brustinov    def addtask(testQueue,tasks,case):
530823b8294SYaroslav Brustinov        arg = None
531823b8294SYaroslav Brustinov        if isinstance(case,nose.case.Test) and hasattr(case.test,'arg'):
532823b8294SYaroslav Brustinov            # this removes the top level descriptor and allows real function
533823b8294SYaroslav Brustinov            # name to be returned
534823b8294SYaroslav Brustinov            case.test.descriptor = None
535823b8294SYaroslav Brustinov            arg = case.test.arg
536823b8294SYaroslav Brustinov        test_addr = MultiProcessTestRunner.address(case)
537823b8294SYaroslav Brustinov        testQueue.put((test_addr,arg), block=False)
538823b8294SYaroslav Brustinov        if arg is not None:
539823b8294SYaroslav Brustinov            test_addr += str(arg)
540823b8294SYaroslav Brustinov        if tasks is not None:
541823b8294SYaroslav Brustinov            tasks.append(test_addr)
542823b8294SYaroslav Brustinov        return test_addr
543823b8294SYaroslav Brustinov    addtask = staticmethod(addtask)
544823b8294SYaroslav Brustinov
545823b8294SYaroslav Brustinov    def address(case):
546823b8294SYaroslav Brustinov        if hasattr(case, 'address'):
547823b8294SYaroslav Brustinov            file, mod, call = case.address()
548823b8294SYaroslav Brustinov        elif hasattr(case, 'context'):
549823b8294SYaroslav Brustinov            file, mod, call = test_address(case.context)
550823b8294SYaroslav Brustinov        else:
551823b8294SYaroslav Brustinov            raise Exception("Unable to convert %s to address" % case)
552823b8294SYaroslav Brustinov        parts = []
553823b8294SYaroslav Brustinov        if file is None:
554823b8294SYaroslav Brustinov            if mod is None:
555823b8294SYaroslav Brustinov                raise Exception("Unaddressable case %s" % case)
556823b8294SYaroslav Brustinov            else:
557823b8294SYaroslav Brustinov                parts.append(mod)
558823b8294SYaroslav Brustinov        else:
559823b8294SYaroslav Brustinov            # strip __init__.py(c) from end of file part
560823b8294SYaroslav Brustinov            # if present, having it there confuses loader
561823b8294SYaroslav Brustinov            dirname, basename = os.path.split(file)
562823b8294SYaroslav Brustinov            if basename.startswith('__init__'):
563823b8294SYaroslav Brustinov                file = dirname
564823b8294SYaroslav Brustinov            parts.append(file)
565823b8294SYaroslav Brustinov        if call is not None:
566823b8294SYaroslav Brustinov            parts.append(call)
567823b8294SYaroslav Brustinov        return ':'.join(map(str, parts))
568823b8294SYaroslav Brustinov    address = staticmethod(address)
569823b8294SYaroslav Brustinov
570823b8294SYaroslav Brustinov    def nextBatch(self, test):
571823b8294SYaroslav Brustinov        # allows tests or suites to mark themselves as not safe
572823b8294SYaroslav Brustinov        # for multiprocess execution
573823b8294SYaroslav Brustinov        if hasattr(test, 'context'):
574823b8294SYaroslav Brustinov            if not getattr(test.context, '_multiprocess_', True):
575823b8294SYaroslav Brustinov                return
576823b8294SYaroslav Brustinov
577823b8294SYaroslav Brustinov        if ((isinstance(test, ContextSuite)
578823b8294SYaroslav Brustinov             and test.hasFixtures(self.checkCanSplit))
579823b8294SYaroslav Brustinov            or not getattr(test, 'can_split', True)
580823b8294SYaroslav Brustinov            or not isinstance(test, unittest.TestSuite)):
581823b8294SYaroslav Brustinov            # regular test case, or a suite with context fixtures
582823b8294SYaroslav Brustinov
583823b8294SYaroslav Brustinov            # special case: when run like nosetests path/to/module.py
584823b8294SYaroslav Brustinov            # the top-level suite has only one item, and it shares
585823b8294SYaroslav Brustinov            # the same context as that item. In that case, we want the
586823b8294SYaroslav Brustinov            # item, not the top-level suite
587823b8294SYaroslav Brustinov            if isinstance(test, ContextSuite):
588823b8294SYaroslav Brustinov                contained = list(test)
589823b8294SYaroslav Brustinov                if (len(contained) == 1
590823b8294SYaroslav Brustinov                    and getattr(contained[0],
591823b8294SYaroslav Brustinov                                'context', None) == test.context):
592823b8294SYaroslav Brustinov                    test = contained[0]
593823b8294SYaroslav Brustinov            yield test
594823b8294SYaroslav Brustinov        else:
595823b8294SYaroslav Brustinov            # Suite is without fixtures at this level; but it may have
596823b8294SYaroslav Brustinov            # fixtures at any deeper level, so we need to examine it all
597823b8294SYaroslav Brustinov            # the way down to the case level
598823b8294SYaroslav Brustinov            for case in test:
599823b8294SYaroslav Brustinov                for batch in self.nextBatch(case):
600823b8294SYaroslav Brustinov                    yield batch
601823b8294SYaroslav Brustinov
602823b8294SYaroslav Brustinov    def checkCanSplit(context, fixt):
603823b8294SYaroslav Brustinov        """
604823b8294SYaroslav Brustinov        Callback that we use to check whether the fixtures found in a
605823b8294SYaroslav Brustinov        context or ancestor are ones we care about.
606823b8294SYaroslav Brustinov
607823b8294SYaroslav Brustinov        Contexts can tell us that their fixtures are reentrant by setting
608823b8294SYaroslav Brustinov        _multiprocess_can_split_. So if we see that, we return False to
609823b8294SYaroslav Brustinov        disregard those fixtures.
610823b8294SYaroslav Brustinov        """
611823b8294SYaroslav Brustinov        if not fixt:
612823b8294SYaroslav Brustinov            return False
613823b8294SYaroslav Brustinov        if getattr(context, '_multiprocess_can_split_', False):
614823b8294SYaroslav Brustinov            return False
615823b8294SYaroslav Brustinov        return True
616823b8294SYaroslav Brustinov    checkCanSplit = staticmethod(checkCanSplit)
617823b8294SYaroslav Brustinov
618823b8294SYaroslav Brustinov    def sharedFixtures(self, case):
619823b8294SYaroslav Brustinov        context = getattr(case, 'context', None)
620823b8294SYaroslav Brustinov        if not context:
621823b8294SYaroslav Brustinov            return False
622823b8294SYaroslav Brustinov        return getattr(context, '_multiprocess_shared_', False)
623823b8294SYaroslav Brustinov
624823b8294SYaroslav Brustinov    def consolidate(self, result, batch_result):
625823b8294SYaroslav Brustinov        log.debug("batch result is %s" , batch_result)
626823b8294SYaroslav Brustinov        try:
627823b8294SYaroslav Brustinov            output, testsRun, failures, errors, errorClasses = batch_result
628823b8294SYaroslav Brustinov        except ValueError:
629823b8294SYaroslav Brustinov            log.debug("result in unexpected format %s", batch_result)
630823b8294SYaroslav Brustinov            failure.Failure(*sys.exc_info())(result)
631823b8294SYaroslav Brustinov            return
632823b8294SYaroslav Brustinov        self.stream.write(output)
633823b8294SYaroslav Brustinov        result.testsRun += testsRun
634823b8294SYaroslav Brustinov        result.failures.extend(failures)
635823b8294SYaroslav Brustinov        result.errors.extend(errors)
636823b8294SYaroslav Brustinov        for key, (storage, label, isfail) in errorClasses.items():
637823b8294SYaroslav Brustinov            if key not in result.errorClasses:
638823b8294SYaroslav Brustinov                # Ordinarily storage is result attribute
639823b8294SYaroslav Brustinov                # but it's only processed through the errorClasses
640823b8294SYaroslav Brustinov                # dict, so it's ok to fake it here
641823b8294SYaroslav Brustinov                result.errorClasses[key] = ([], label, isfail)
642823b8294SYaroslav Brustinov            mystorage, _junk, _junk = result.errorClasses[key]
643823b8294SYaroslav Brustinov            mystorage.extend(storage)
644823b8294SYaroslav Brustinov        log.debug("Ran %s tests (total: %s)", testsRun, result.testsRun)
645823b8294SYaroslav Brustinov
646823b8294SYaroslav Brustinov
647823b8294SYaroslav Brustinovdef runner(ix, testQueue, resultQueue, currentaddr, currentstart,
648823b8294SYaroslav Brustinov           keyboardCaught, shouldStop, loaderClass, resultClass, config):
649823b8294SYaroslav Brustinov    try:
650823b8294SYaroslav Brustinov        try:
651823b8294SYaroslav Brustinov            return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
652823b8294SYaroslav Brustinov                    keyboardCaught, shouldStop, loaderClass, resultClass, config)
653823b8294SYaroslav Brustinov        except KeyboardInterrupt:
654823b8294SYaroslav Brustinov            log.debug('Worker %s keyboard interrupt, stopping',ix)
655823b8294SYaroslav Brustinov    except Empty:
656823b8294SYaroslav Brustinov        log.debug("Worker %s timed out waiting for tasks", ix)
657823b8294SYaroslav Brustinov
658823b8294SYaroslav Brustinovdef __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
659823b8294SYaroslav Brustinov           keyboardCaught, shouldStop, loaderClass, resultClass, config):
660823b8294SYaroslav Brustinov
661823b8294SYaroslav Brustinov    config = pickle.loads(config)
662823b8294SYaroslav Brustinov    dummy_parser = config.parserClass()
663823b8294SYaroslav Brustinov    if _instantiate_plugins is not None:
664823b8294SYaroslav Brustinov        for pluginclass in _instantiate_plugins:
665823b8294SYaroslav Brustinov            plugin = pluginclass()
666823b8294SYaroslav Brustinov            plugin.addOptions(dummy_parser,{})
667823b8294SYaroslav Brustinov            config.plugins.addPlugin(plugin)
668823b8294SYaroslav Brustinov    config.plugins.configure(config.options,config)
669823b8294SYaroslav Brustinov    config.plugins.begin()
670823b8294SYaroslav Brustinov    log.debug("Worker %s executing, pid=%d", ix,os.getpid())
671823b8294SYaroslav Brustinov    loader = loaderClass(config=config)
672823b8294SYaroslav Brustinov    loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
673823b8294SYaroslav Brustinov
674823b8294SYaroslav Brustinov    def get():
675823b8294SYaroslav Brustinov        return testQueue.get(timeout=config.multiprocess_timeout)
676823b8294SYaroslav Brustinov
677823b8294SYaroslav Brustinov    def makeResult():
678823b8294SYaroslav Brustinov        stream = _WritelnDecorator(StringIO())
679823b8294SYaroslav Brustinov        result = resultClass(stream, descriptions=1,
680823b8294SYaroslav Brustinov                             verbosity=config.verbosity,
681823b8294SYaroslav Brustinov                             config=config)
682823b8294SYaroslav Brustinov        plug_result = config.plugins.prepareTestResult(result)
683823b8294SYaroslav Brustinov        if plug_result:
684823b8294SYaroslav Brustinov            return plug_result
685823b8294SYaroslav Brustinov        return result
686823b8294SYaroslav Brustinov
687823b8294SYaroslav Brustinov    def batch(result):
688823b8294SYaroslav Brustinov        failures = [(TestLet(c), err) for c, err in result.failures]
689823b8294SYaroslav Brustinov        errors = [(TestLet(c), err) for c, err in result.errors]
690823b8294SYaroslav Brustinov        errorClasses = {}
691823b8294SYaroslav Brustinov        for key, (storage, label, isfail) in result.errorClasses.items():
692823b8294SYaroslav Brustinov            errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
693823b8294SYaroslav Brustinov                                 label, isfail)
694823b8294SYaroslav Brustinov        return (
695823b8294SYaroslav Brustinov            result.stream.getvalue(),
696823b8294SYaroslav Brustinov            result.testsRun,
697823b8294SYaroslav Brustinov            failures,
698823b8294SYaroslav Brustinov            errors,
699823b8294SYaroslav Brustinov            errorClasses)
700823b8294SYaroslav Brustinov    for test_addr, arg in iter(get, 'STOP'):
701823b8294SYaroslav Brustinov        if shouldStop.is_set():
702823b8294SYaroslav Brustinov            log.exception('Worker %d STOPPED',ix)
703823b8294SYaroslav Brustinov            break
704823b8294SYaroslav Brustinov        result = makeResult()
705823b8294SYaroslav Brustinov        test = loader.loadTestsFromNames([test_addr])
706823b8294SYaroslav Brustinov        test.testQueue = testQueue
707823b8294SYaroslav Brustinov        test.tasks = []
708823b8294SYaroslav Brustinov        test.arg = arg
709823b8294SYaroslav Brustinov        log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
710823b8294SYaroslav Brustinov        try:
711823b8294SYaroslav Brustinov            if arg is not None:
712823b8294SYaroslav Brustinov                test_addr = test_addr + str(arg)
713823b8294SYaroslav Brustinov            currentaddr.value = bytes_(test_addr)
714823b8294SYaroslav Brustinov            currentstart.value = time.time()
715823b8294SYaroslav Brustinov            test(result)
716823b8294SYaroslav Brustinov            currentaddr.value = bytes_('')
717823b8294SYaroslav Brustinov            resultQueue.put((ix, test_addr, test.tasks, batch(result)))
718823b8294SYaroslav Brustinov        except KeyboardInterrupt, e: #TimedOutException:
719823b8294SYaroslav Brustinov            timeout = isinstance(e, TimedOutException)
720823b8294SYaroslav Brustinov            if timeout:
721823b8294SYaroslav Brustinov                keyboardCaught.set()
722823b8294SYaroslav Brustinov            if len(currentaddr.value):
723823b8294SYaroslav Brustinov                if timeout:
724823b8294SYaroslav Brustinov                    msg = 'Worker %s timed out, failing current test %s'
725823b8294SYaroslav Brustinov                else:
726823b8294SYaroslav Brustinov                    msg = 'Worker %s keyboard interrupt, failing current test %s'
727823b8294SYaroslav Brustinov                log.exception(msg,ix,test_addr)
728823b8294SYaroslav Brustinov                currentaddr.value = bytes_('')
729823b8294SYaroslav Brustinov                failure.Failure(*sys.exc_info())(result)
730823b8294SYaroslav Brustinov                resultQueue.put((ix, test_addr, test.tasks, batch(result)))
731823b8294SYaroslav Brustinov            else:
732823b8294SYaroslav Brustinov                if timeout:
733823b8294SYaroslav Brustinov                    msg = 'Worker %s test %s timed out'
734823b8294SYaroslav Brustinov                else:
735823b8294SYaroslav Brustinov                    msg = 'Worker %s test %s keyboard interrupt'
736823b8294SYaroslav Brustinov                log.debug(msg,ix,test_addr)
737823b8294SYaroslav Brustinov                resultQueue.put((ix, test_addr, test.tasks, batch(result)))
738823b8294SYaroslav Brustinov            if not timeout:
739823b8294SYaroslav Brustinov                raise
740823b8294SYaroslav Brustinov        except SystemExit:
741823b8294SYaroslav Brustinov            currentaddr.value = bytes_('')
742823b8294SYaroslav Brustinov            log.exception('Worker %s system exit',ix)
743823b8294SYaroslav Brustinov            raise
744823b8294SYaroslav Brustinov        except:
745823b8294SYaroslav Brustinov            currentaddr.value = bytes_('')
746823b8294SYaroslav Brustinov            log.exception("Worker %s error running test or returning "
747823b8294SYaroslav Brustinov                            "results",ix)
748823b8294SYaroslav Brustinov            failure.Failure(*sys.exc_info())(result)
749823b8294SYaroslav Brustinov            resultQueue.put((ix, test_addr, test.tasks, batch(result)))
750823b8294SYaroslav Brustinov        if config.multiprocess_restartworker:
751823b8294SYaroslav Brustinov            break
752823b8294SYaroslav Brustinov    log.debug("Worker %s ending", ix)
753823b8294SYaroslav Brustinov
754823b8294SYaroslav Brustinov
755823b8294SYaroslav Brustinovclass NoSharedFixtureContextSuite(ContextSuite):
756823b8294SYaroslav Brustinov    """
757823b8294SYaroslav Brustinov    Context suite that never fires shared fixtures.
758823b8294SYaroslav Brustinov
759823b8294SYaroslav Brustinov    When a context sets _multiprocess_shared_, fixtures in that context
760823b8294SYaroslav Brustinov    are executed by the main process. Using this suite class prevents them
761823b8294SYaroslav Brustinov    from executing in the runner process as well.
762823b8294SYaroslav Brustinov
763823b8294SYaroslav Brustinov    """
764823b8294SYaroslav Brustinov    testQueue = None
765823b8294SYaroslav Brustinov    tasks = None
766823b8294SYaroslav Brustinov    arg = None
767823b8294SYaroslav Brustinov    def setupContext(self, context):
768823b8294SYaroslav Brustinov        if getattr(context, '_multiprocess_shared_', False):
769823b8294SYaroslav Brustinov            return
770823b8294SYaroslav Brustinov        super(NoSharedFixtureContextSuite, self).setupContext(context)
771823b8294SYaroslav Brustinov
772823b8294SYaroslav Brustinov    def teardownContext(self, context):
773823b8294SYaroslav Brustinov        if getattr(context, '_multiprocess_shared_', False):
774823b8294SYaroslav Brustinov            return
775823b8294SYaroslav Brustinov        super(NoSharedFixtureContextSuite, self).teardownContext(context)
776823b8294SYaroslav Brustinov    def run(self, result):
777823b8294SYaroslav Brustinov        """Run tests in suite inside of suite fixtures.
778823b8294SYaroslav Brustinov        """
779823b8294SYaroslav Brustinov        # proxy the result for myself
780823b8294SYaroslav Brustinov        log.debug("suite %s (%s) run called, tests: %s",
781823b8294SYaroslav Brustinov                  id(self), self, self._tests)
782823b8294SYaroslav Brustinov        if self.resultProxy:
783823b8294SYaroslav Brustinov            result, orig = self.resultProxy(result, self), result
784823b8294SYaroslav Brustinov        else:
785823b8294SYaroslav Brustinov            result, orig = result, result
786823b8294SYaroslav Brustinov        try:
787823b8294SYaroslav Brustinov            #log.debug('setUp for %s', id(self));
788823b8294SYaroslav Brustinov            self.setUp()
789823b8294SYaroslav Brustinov        except KeyboardInterrupt:
790823b8294SYaroslav Brustinov            raise
791823b8294SYaroslav Brustinov        except:
792823b8294SYaroslav Brustinov            self.error_context = 'setup'
793823b8294SYaroslav Brustinov            result.addError(self, self._exc_info())
794823b8294SYaroslav Brustinov            return
795823b8294SYaroslav Brustinov        try:
796823b8294SYaroslav Brustinov            for test in self._tests:
797823b8294SYaroslav Brustinov                if (isinstance(test,nose.case.Test)
798823b8294SYaroslav Brustinov                    and self.arg is not None):
799823b8294SYaroslav Brustinov                    test.test.arg = self.arg
800823b8294SYaroslav Brustinov                else:
801823b8294SYaroslav Brustinov                    test.arg = self.arg
802823b8294SYaroslav Brustinov                test.testQueue = self.testQueue
803823b8294SYaroslav Brustinov                test.tasks = self.tasks
804823b8294SYaroslav Brustinov                if result.shouldStop:
805823b8294SYaroslav Brustinov                    log.debug("stopping")
806823b8294SYaroslav Brustinov                    break
807823b8294SYaroslav Brustinov                # each nose.case.Test will create its own result proxy
808823b8294SYaroslav Brustinov                # so the cases need the original result, to avoid proxy
809823b8294SYaroslav Brustinov                # chains
810823b8294SYaroslav Brustinov                #log.debug('running test %s in suite %s', test, self);
811823b8294SYaroslav Brustinov                try:
812823b8294SYaroslav Brustinov                    test(orig)
813823b8294SYaroslav Brustinov                except KeyboardInterrupt, e:
814823b8294SYaroslav Brustinov                    timeout = isinstance(e, TimedOutException)
815823b8294SYaroslav Brustinov                    if timeout:
816823b8294SYaroslav Brustinov                        msg = 'Timeout when running test %s in suite %s'
817823b8294SYaroslav Brustinov                    else:
818823b8294SYaroslav Brustinov                        msg = 'KeyboardInterrupt when running test %s in suite %s'
819823b8294SYaroslav Brustinov                    log.debug(msg, test, self)
820823b8294SYaroslav Brustinov                    err = (TimedOutException,TimedOutException(str(test)),
821823b8294SYaroslav Brustinov                           sys.exc_info()[2])
822823b8294SYaroslav Brustinov                    test.config.plugins.addError(test,err)
823823b8294SYaroslav Brustinov                    orig.addError(test,err)
824823b8294SYaroslav Brustinov                    if not timeout:
825823b8294SYaroslav Brustinov                        raise
826823b8294SYaroslav Brustinov        finally:
827823b8294SYaroslav Brustinov            self.has_run = True
828823b8294SYaroslav Brustinov            try:
829823b8294SYaroslav Brustinov                #log.debug('tearDown for %s', id(self));
830823b8294SYaroslav Brustinov                self.tearDown()
831823b8294SYaroslav Brustinov            except KeyboardInterrupt:
832823b8294SYaroslav Brustinov                raise
833823b8294SYaroslav Brustinov            except:
834823b8294SYaroslav Brustinov                self.error_context = 'teardown'
835823b8294SYaroslav Brustinov                result.addError(self, self._exc_info())