5a380bf10SimaromThe multiprocess plugin enables you to distribute your test run among a set of
6a380bf10Simaromworker processes that run tests in parallel. This can speed up CPU-bound test
7a380bf10Simaromruns (as long as the number of work processeses is around the number of
8a380bf10Simaromprocessors or cores available), but is mainly useful for IO-bound tests that
9a380bf10Simaromspend most of their time waiting for data to arrive from someplace else.
11a380bf10Simarom.. note ::
13a380bf10Simarom   See :doc:`../doc_tests/test_multiprocess/multiprocess` for
14a380bf10Simarom   additional documentation and examples. Use of this plugin on python
15a380bf10Simarom   2.5 or earlier requires the multiprocessing_ module, also available
16a380bf10Simarom   from PyPI.
18a380bf10Simarom.. _multiprocessing : http://code.google.com/p/python-multiprocessing/
20a380bf10SimaromHow tests are distributed
23a380bf10SimaromThe ideal case would be to dispatch each test to a worker process
24a380bf10Simaromseparately. This ideal is not attainable in all cases, however, because many
25a380bf10Simaromtest suites depend on context (class, module or package) fixtures.
27a380bf10SimaromThe plugin can't know (unless you tell it -- see below!) if a context fixture
28a380bf10Simaromcan be called many times concurrently (is re-entrant), or if it can be shared
29a380bf10Simaromamong tests running in different processes. Therefore, if a context has
30a380bf10Simaromfixtures, the default behavior is to dispatch the entire suite to a worker as
31a380bf10Simaroma unit.
33a380bf10SimaromControlling distribution
36a380bf10SimaromThere are two context-level variables that you can use to control this default
39a380bf10SimaromIf a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
40a380bf10Simaromin the context, and the plugin will dispatch tests in suites bound to that
41a380bf10Simaromcontext as if the context had no fixtures. This means that the fixtures will
42a380bf10Simaromexecute concurrently and multiple times, typically once per test.
44a380bf10SimaromIf a context's fixtures can be shared by tests running in different processes
45a380bf10Simarom-- such as a package-level fixture that starts an external http server or
46a380bf10Simarominitializes a shared database -- then set ``_multiprocess_shared_ = True`` in
47a380bf10Simaromthe context. These fixtures will then execute in the primary nose process, and
48a380bf10Simaromtests in those contexts will be individually dispatched to run in parallel.
50a380bf10SimaromHow results are collected and reported
53a380bf10SimaromAs each test or suite executes in a worker process, results (failures, errors,
54a380bf10Simaromand specially handled exceptions like SkipTest) are collected in that
55a380bf10Simaromprocess. When the worker process finishes, it returns results to the main
56a380bf10Simaromnose process. There, any progress output is printed (dots!), and the
57a380bf10Simaromresults from the test run are combined into a consolidated result
58a380bf10Simaromset. When results have been received for all dispatched tests, or all
59a380bf10Simaromworkers have died, the result summary is output as normal.
64a380bf10SimaromNot all test suites will benefit from, or even operate correctly using, this
65a380bf10Simaromplugin. For example, CPU-bound tests will run more slowly if you don't have
66a380bf10Simarommultiple processors. There are also some differences in plugin
67a380bf10Simarominteractions and behaviors due to the way in which tests are dispatched and
68a380bf10Simaromloaded. In general, test loading under this plugin operates as if it were
69a380bf10Simaromalways in directed mode instead of discovered mode. For instance, doctests
70a380bf10Simaromin test modules will always be found when using this plugin with the doctest
73a380bf10SimaromBut the biggest issue you will face is probably concurrency. Unless you
74a380bf10Simaromhave kept your tests as religiously pure unit tests, with no side-effects, no
75a380bf10Simaromordering issues, and no external dependencies, chances are you will experience
76a380bf10Simaromodd, intermittent and unexplainable failures and errors when using this
77a380bf10Simaromplugin. This doesn't necessarily mean the plugin is broken; it may mean that
78a380bf10Simaromyour test suite is not safe for concurrency.
80a380bf10SimaromNew Features in 1.1.0
83a380bf10Simarom* functions generated by test generators are now added to the worker queue
84a380bf10Simarom  making them multi-threaded.
85a380bf10Simarom* fixed timeout functionality, now functions will be terminated with a
86a380bf10Simarom  TimedOutException exception when they exceed their execution time. The
87a380bf10Simarom  worker processes are not terminated.
88a380bf10Simarom* added ``--process-restartworker`` option to restart workers once they are
89a380bf10Simarom  done, this helps control memory usage. Sometimes memory leaks can accumulate
90a380bf10Simarom  making long runs very difficult.
91a380bf10Simarom* added global _instantiate_plugins to configure which plugins are started
92a380bf10Simarom  on the worker processes.
96a380bf10Simaromimport logging
97a380bf10Simaromimport os
98a380bf10Simaromimport sys
99a380bf10Simaromimport time
100a380bf10Simaromimport traceback
101a380bf10Simaromimport unittest
102a380bf10Simaromimport pickle
103a380bf10Simaromimport signal
104a380bf10Simaromimport nose.case
105a380bf10Simaromfrom nose.core import TextTestRunner
106a380bf10Simaromfrom nose import failure
107a380bf10Simaromfrom nose import loader
108a380bf10Simaromfrom nose.plugins.base import Plugin
109a380bf10Simaromfrom nose.pyversion import bytes_
110a380bf10Simaromfrom nose.result import TextTestResult
111a380bf10Simaromfrom nose.suite import ContextSuite
112a380bf10Simaromfrom nose.util import test_address
114a380bf10Simarom    # 2.7+
115a380bf10Simarom    from unittest.runner import _WritelnDecorator
116a380bf10Simaromexcept ImportError:
117a380bf10Simarom    from unittest import _WritelnDecorator
118a380bf10Simaromfrom queue import Empty
119a380bf10Simaromfrom warnings import warn
121a380bf10Simarom    from io import StringIO
122a380bf10Simaromexcept ImportError:
123a380bf10Simarom    import io
125a380bf10Simarom# this is a list of plugin classes that will be checked for and created inside
126a380bf10Simarom# each worker process
127a380bf10Simarom_instantiate_plugins = None
129a380bf10Simaromlog = logging.getLogger(__name__)
131a380bf10SimaromProcess = Queue = Pool = Event = Value = Array = None
133a380bf10Simarom# have to inherit KeyboardInterrupt to it will interrupt process properly
134a380bf10Simaromclass TimedOutException(KeyboardInterrupt):
135a380bf10Simarom    def __init__(self, value = "Timed Out"):
136a380bf10Simarom        self.value = value
137a380bf10Simarom    def __str__(self):
138a380bf10Simarom        return repr(self.value)
140a380bf10Simaromdef _import_mp():
141a380bf10Simarom    global Process, Queue, Pool, Event, Value, Array
142a380bf10Simarom    try:
143a380bf10Simarom        from multiprocessing import Manager, Process
144a380bf10Simarom        #prevent the server process created in the manager which holds Python
145a380bf10Simarom        #objects and allows other processes to manipulate them using proxies
146a380bf10Simarom        #to interrupt on SIGINT (keyboardinterrupt) so that the communication
147a380bf10Simarom        #channel between subprocesses and main process is still usable after
148a380bf10Simarom        #ctrl+C is received in the main process.
149a380bf10Simarom        old=signal.signal(signal.SIGINT, signal.SIG_IGN)
150a380bf10Simarom        m = Manager()
151a380bf10Simarom        #reset it back so main process will receive a KeyboardInterrupt
152a380bf10Simarom        #exception on ctrl+c
153a380bf10Simarom        signal.signal(signal.SIGINT, old)
154a380bf10Simarom        Queue, Pool, Event, Value, Array = (
155a380bf10Simarom                m.Queue, m.Pool, m.Event, m.Value, m.Array
156a380bf10Simarom        )
157a380bf10Simarom    except ImportError:
158a380bf10Simarom        warn("multiprocessing module is not available, multiprocess plugin "
159a380bf10Simarom             "cannot be used", RuntimeWarning)
162a380bf10Simaromclass TestLet:
163a380bf10Simarom    def __init__(self, case):
164a380bf10Simarom        try:
165a380bf10Simarom            self._id = case.id()
166a380bf10Simarom        except AttributeError:
167a380bf10Simarom            pass
168a380bf10Simarom        self._short_description = case.shortDescription()
169a380bf10Simarom        self._str = str(case)
171a380bf10Simarom    def id(self):
172a380bf10Simarom        return self._id
174a380bf10Simarom    def shortDescription(self):
175a380bf10Simarom        return self._short_description
177a380bf10Simarom    def __str__(self):
178a380bf10Simarom        return self._str
180a380bf10Simaromclass MultiProcess(Plugin):
181a380bf10Simarom    """
182a380bf10Simarom    Run tests in multiple processes. Requires processing module.
183a380bf10Simarom    """
184a380bf10Simarom    score = 1000
185a380bf10Simarom    status = {}
187a380bf10Simarom    def options(self, parser, env):
188a380bf10Simarom        """
189a380bf10Simarom        Register command-line options.
190a380bf10Simarom        """
191a380bf10Simarom        parser.add_option("--processes", action="store",
192a380bf10Simarom                          default=env.get('NOSE_PROCESSES', 0),
193a380bf10Simarom                          dest="multiprocess_workers",
194a380bf10Simarom                          metavar="NUM",
195a380bf10Simarom                          help="Spread test run among this many processes. "
196a380bf10Simarom                          "Set a number equal to the number of processors "
197a380bf10Simarom                          "or cores in your machine for best results. "
198a380bf10Simarom                          "Pass a negative number to have the number of "
199a380bf10Simarom                          "processes automatically set to the number of "
200a380bf10Simarom                          "cores. Passing 0 means to disable parallel "
201a380bf10Simarom                          "testing. Default is 0 unless NOSE_PROCESSES is "
202a380bf10Simarom                          "set. "
203a380bf10Simarom                          "[NOSE_PROCESSES]")
204a380bf10Simarom        parser.add_option("--process-timeout", action="store",
205a380bf10Simarom                          default=env.get('NOSE_PROCESS_TIMEOUT', 10),
206a380bf10Simarom                          dest="multiprocess_timeout",
207a380bf10Simarom                          metavar="SECONDS",
208a380bf10Simarom                          help="Set timeout for return of results from each "
209a380bf10Simarom                          "test runner process. Default is 10. "
210a380bf10Simarom                          "[NOSE_PROCESS_TIMEOUT]")
211a380bf10Simarom        parser.add_option("--process-restartworker", action="store_true",
212a380bf10Simarom                          default=env.get('NOSE_PROCESS_RESTARTWORKER', False),
213a380bf10Simarom                          dest="multiprocess_restartworker",
214a380bf10Simarom                          help="If set, will restart each worker process once"
215a380bf10Simarom                          " their tests are done, this helps control memory "
216a380bf10Simarom                          "leaks from killing the system. "
217a380bf10Simarom                          "[NOSE_PROCESS_RESTARTWORKER]")
219a380bf10Simarom    def configure(self, options, config):
220a380bf10Simarom        """
221a380bf10Simarom        Configure plugin.
222a380bf10Simarom        """
223a380bf10Simarom        try:
224a380bf10Simarom            self.status.pop('active')
225a380bf10Simarom        except KeyError:
226a380bf10Simarom            pass
227a380bf10Simarom        if not hasattr(options, 'multiprocess_workers'):
228a380bf10Simarom            self.enabled = False
229a380bf10Simarom            return
230a380bf10Simarom        # don't start inside of a worker process
231a380bf10Simarom        if config.worker:
232a380bf10Simarom            return
233a380bf10Simarom        self.config = config
234a380bf10Simarom        try:
235a380bf10Simarom            workers = int(options.multiprocess_workers)
236a380bf10Simarom        except (TypeError, ValueError):
237a380bf10Simarom            workers = 0
238a380bf10Simarom        if workers:
239a380bf10Simarom            _import_mp()
240a380bf10Simarom            if Process is None:
241a380bf10Simarom                self.enabled = False
242a380bf10Simarom                return
243a380bf10Simarom            # Negative number of workers will cause multiprocessing to hang.
244a380bf10Simarom            # Set the number of workers to the CPU count to avoid this.
245a380bf10Simarom            if workers < 0:
246a380bf10Simarom                try:
247a380bf10Simarom                    import multiprocessing
248a380bf10Simarom                    workers = multiprocessing.cpu_count()
249a380bf10Simarom                except NotImplementedError:
250a380bf10Simarom                    self.enabled = False
251a380bf10Simarom                    return
252a380bf10Simarom            self.enabled = True
253a380bf10Simarom            self.config.multiprocess_workers = workers
254a380bf10Simarom            t = float(options.multiprocess_timeout)
255a380bf10Simarom            self.config.multiprocess_timeout = t
256a380bf10Simarom            r = int(options.multiprocess_restartworker)
257a380bf10Simarom            self.config.multiprocess_restartworker = r
258a380bf10Simarom            self.status['active'] = True
260a380bf10Simarom    def prepareTestLoader(self, loader):
261a380bf10Simarom        """Remember loader class so MultiProcessTestRunner can instantiate
262a380bf10Simarom        the right loader.
263a380bf10Simarom        """
264a380bf10Simarom        self.loaderClass = loader.__class__
266a380bf10Simarom    def prepareTestRunner(self, runner):
267a380bf10Simarom        """Replace test runner with MultiProcessTestRunner.
268a380bf10Simarom        """
269a380bf10Simarom        # replace with our runner class
270a380bf10Simarom        return MultiProcessTestRunner(stream=runner.stream,
271a380bf10Simarom                                      verbosity=self.config.verbosity,
272a380bf10Simarom                                      config=self.config,
273a380bf10Simarom                                      loaderClass=self.loaderClass)
275a380bf10Simaromdef signalhandler(sig, frame):
276a380bf10Simarom    raise TimedOutException()
278a380bf10Simaromclass MultiProcessTestRunner(TextTestRunner):
279a380bf10Simarom    waitkilltime = 5.0 # max time to wait to terminate a process that does not
280a380bf10Simarom                       # respond to SIGILL
281a380bf10Simarom    def __init__(self, **kw):
282a380bf10Simarom        self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
283a380bf10Simarom        super(MultiProcessTestRunner, self).__init__(**kw)
285a380bf10Simarom    def collect(self, test, testQueue, tasks, to_teardown, result):
286a380bf10Simarom        # dispatch and collect results
287a380bf10Simarom        # put indexes only on queue because tests aren't picklable
288a380bf10Simarom        for case in self.nextBatch(test):
289a380bf10Simarom            log.debug("Next batch %s (%s)", case, type(case))
290a380bf10Simarom            if (isinstance(case, nose.case.Test) and
291a380bf10Simarom                isinstance(case.test, failure.Failure)):
292a380bf10Simarom                log.debug("Case is a Failure")
293a380bf10Simarom                case(result) # run here to capture the failure
294a380bf10Simarom                continue
295a380bf10Simarom            # handle shared fixtures
296a380bf10Simarom            if isinstance(case, ContextSuite) and case.context is failure.Failure:
297a380bf10Simarom                log.debug("Case is a Failure")
298a380bf10Simarom                case(result) # run here to capture the failure
299a380bf10Simarom                continue
300a380bf10Simarom            elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
301a380bf10Simarom                log.debug("%s has shared fixtures", case)
302a380bf10Simarom                try:
303a380bf10Simarom                    case.setUp()
304a380bf10Simarom                except (KeyboardInterrupt, SystemExit):
305a380bf10Simarom                    raise
306a380bf10Simarom                except:
307a380bf10Simarom                    log.debug("%s setup failed", sys.exc_info())
308a380bf10Simarom                    result.addError(case, sys.exc_info())
309a380bf10Simarom                else:
310a380bf10Simarom                    to_teardown.append(case)
311a380bf10Simarom                    if case.factory:
312a380bf10Simarom                        ancestors=case.factory.context.get(case, [])
313a380bf10Simarom                        for an in ancestors[:2]:
314a380bf10Simarom                            #log.debug('reset ancestor %s', an)
315a380bf10Simarom                            if getattr(an, '_multiprocess_shared_', False):
316a380bf10Simarom                                an._multiprocess_can_split_=True
317a380bf10Simarom                            #an._multiprocess_shared_=False
318a380bf10Simarom                    self.collect(case, testQueue, tasks, to_teardown, result)
320a380bf10Simarom            else:
321a380bf10Simarom                test_addr = self.addtask(testQueue,tasks,case)
322a380bf10Simarom                log.debug("Queued test %s (%s) to %s",
323a380bf10Simarom                          len(tasks), test_addr, testQueue)
325a380bf10Simarom    def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
326a380bf10Simarom        currentaddr = Value('c',bytes_(''))
327a380bf10Simarom        currentstart = Value('d',time.time())
328a380bf10Simarom        keyboardCaught = Event()
329a380bf10Simarom        p = Process(target=runner,
330a380bf10Simarom                   args=(iworker, testQueue,
331a380bf10Simarom                         resultQueue,
332a380bf10Simarom                         currentaddr,
333a380bf10Simarom                         currentstart,
334a380bf10Simarom                         keyboardCaught,
335a380bf10Simarom                         shouldStop,
336a380bf10Simarom                         self.loaderClass,
337a380bf10Simarom                         result.__class__,
338a380bf10Simarom                         pickle.dumps(self.config)))
339a380bf10Simarom        p.currentaddr = currentaddr
340a380bf10Simarom        p.currentstart = currentstart
341a380bf10Simarom        p.keyboardCaught = keyboardCaught
342a380bf10Simarom        old = signal.signal(signal.SIGILL, signalhandler)
343a380bf10Simarom        p.start()
344a380bf10Simarom        signal.signal(signal.SIGILL, old)
345a380bf10Simarom        return p
347a380bf10Simarom    def run(self, test):
348a380bf10Simarom        """
349a380bf10Simarom        Execute the test (which may be a test suite). If the test is a suite,
350a380bf10Simarom        distribute it out among as many processes as have been configured, at
351a380bf10Simarom        as fine a level as is possible given the context fixtures defined in
352a380bf10Simarom        the suite or any sub-suites.
354a380bf10Simarom        """
355a380bf10Simarom        log.debug("%s.run(%s) (%s)", self, test, os.getpid())
356a380bf10Simarom        wrapper = self.config.plugins.prepareTest(test)
357a380bf10Simarom        if wrapper is not None:
358a380bf10Simarom            test = wrapper
360a380bf10Simarom        # plugins can decorate or capture the output stream
361a380bf10Simarom        wrapped = self.config.plugins.setOutputStream(self.stream)
362a380bf10Simarom        if wrapped is not None:
363a380bf10Simarom            self.stream = wrapped
365a380bf10Simarom        testQueue = Queue()
366a380bf10Simarom        resultQueue = Queue()
367a380bf10Simarom        tasks = []
368a380bf10Simarom        completed = []
369a380bf10Simarom        workers = []
370a380bf10Simarom        to_teardown = []
371a380bf10Simarom        shouldStop = Event()
373a380bf10Simarom        result = self._makeResult()
374a380bf10Simarom        start = time.time()
376a380bf10Simarom        self.collect(test, testQueue, tasks, to_teardown, result)
378a380bf10Simarom        log.debug("Starting %s workers", self.config.multiprocess_workers)
379a380bf10Simarom        for i in range(self.config.multiprocess_workers):
380a380bf10Simarom            p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
381a380bf10Simarom            workers.append(p)
382a380bf10Simarom            log.debug("Started worker process %s", i+1)
384a380bf10Simarom        total_tasks = len(tasks)
385a380bf10Simarom        # need to keep track of the next time to check for timeouts in case
386a380bf10Simarom        # more than one process times out at the same time.
387a380bf10Simarom        nexttimeout=self.config.multiprocess_timeout
388a380bf10Simarom        thrownError = None
390a380bf10Simarom        try:
391a380bf10Simarom            while tasks:
392a380bf10Simarom                log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
393a380bf10Simarom                          len(completed), total_tasks,nexttimeout)
394a380bf10Simarom                try:
395a380bf10Simarom                    iworker, addr, newtask_addrs, batch_result = resultQueue.get(
396a380bf10Simarom                                                            timeout=nexttimeout)
397a380bf10Simarom                    log.debug('Results received for worker %d, %s, new tasks: %d',
398a380bf10Simarom                              iworker,addr,len(newtask_addrs))
399a380bf10Simarom                    try:
400a380bf10Simarom                        try:
401a380bf10Simarom                            tasks.remove(addr)
402a380bf10Simarom                        except ValueError:
403a380bf10Simarom                            log.warn('worker %s failed to remove from tasks: %s',
404a380bf10Simarom                                     iworker,addr)
405a380bf10Simarom                        total_tasks += len(newtask_addrs)
406a380bf10Simarom                        tasks.extend(newtask_addrs)
407a380bf10Simarom                    except KeyError:
408a380bf10Simarom                        log.debug("Got result for unknown task? %s", addr)
409a380bf10Simarom                        log.debug("current: %s",str(list(tasks)[0]))
410a380bf10Simarom                    else:
411a380bf10Simarom                        completed.append([addr,batch_result])
412a380bf10Simarom                    self.consolidate(result, batch_result)
413a380bf10Simarom                    if (self.config.stopOnError
414a380bf10Simarom                        and not result.wasSuccessful()):
415a380bf10Simarom                        # set the stop condition
416a380bf10Simarom                        shouldStop.set()
417a380bf10Simarom                        break
418a380bf10Simarom                    if self.config.multiprocess_restartworker:
419a380bf10Simarom                        log.debug('joining worker %s',iworker)
420a380bf10Simarom                        # wait for working, but not that important if worker
421a380bf10Simarom                        # cannot be joined in fact, for workers that add to
422a380bf10Simarom                        # testQueue, they will not terminate until all their
423a380bf10Simarom                        # items are read
424a380bf10Simarom                        workers[iworker].join(timeout=1)
425a380bf10Simarom                        if not shouldStop.is_set() and not testQueue.empty():
426a380bf10Simarom                            log.debug('starting new process on worker %s',iworker)
427a380bf10Simarom                            workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
428a380bf10Simarom                except Empty:
429a380bf10Simarom                    log.debug("Timed out with %s tasks pending "
430a380bf10Simarom                              "(empty testQueue=%r): %s",
431a380bf10Simarom                              len(tasks),testQueue.empty(),str(tasks))
432a380bf10Simarom                    any_alive = False
433a380bf10Simarom                    for iworker, w in enumerate(workers):
434a380bf10Simarom                        if w.is_alive():
435a380bf10Simarom                            worker_addr = bytes_(w.currentaddr.value,'ascii')
436a380bf10Simarom                            timeprocessing = time.time() - w.currentstart.value
437a380bf10Simarom                            if ( len(worker_addr) == 0
438a380bf10Simarom                                    and timeprocessing > self.config.multiprocess_timeout-0.1):
439a380bf10Simarom                                log.debug('worker %d has finished its work item, '
440a380bf10Simarom                                          'but is not exiting? do we wait for it?',
441a380bf10Simarom                                          iworker)
442a380bf10Simarom                            else:
443a380bf10Simarom                                any_alive = True
444a380bf10Simarom                            if (len(worker_addr) > 0
445a380bf10Simarom                                and timeprocessing > self.config.multiprocess_timeout-0.1):
446a380bf10Simarom                                log.debug('timed out worker %s: %s',
447a380bf10Simarom                                          iworker,worker_addr)
448a380bf10Simarom                                w.currentaddr.value = bytes_('')
449a380bf10Simarom                                # If the process is in C++ code, sending a SIGILL
450a380bf10Simarom                                # might not send a python KeybordInterrupt exception
451a380bf10Simarom                                # therefore, send multiple signals until an
452a380bf10Simarom                                # exception is caught. If this takes too long, then
453a380bf10Simarom                                # terminate the process
454a380bf10Simarom                                w.keyboardCaught.clear()
455a380bf10Simarom                                startkilltime = time.time()
456a380bf10Simarom                                while not w.keyboardCaught.is_set() and w.is_alive():
457a380bf10Simarom                                    if time.time()-startkilltime > self.waitkilltime:
458a380bf10Simarom                                        # have to terminate...
459a380bf10Simarom                                        log.error("terminating worker %s",iworker)
460a380bf10Simarom                                        w.terminate()
461a380bf10Simarom                                        # there is a small probability that the
462a380bf10Simarom                                        # terminated process might send a result,
463a380bf10Simarom                                        # which has to be specially handled or
464a380bf10Simarom                                        # else processes might get orphaned.
465a380bf10Simarom                                        workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
466a380bf10Simarom                                        break
467a380bf10Simarom                                    os.kill(w.pid, signal.SIGILL)
468a380bf10Simarom                                    time.sleep(0.1)
469a380bf10Simarom                    if not any_alive and testQueue.empty():
470a380bf10Simarom                        log.debug("All workers dead")
471a380bf10Simarom                        break
472a380bf10Simarom                nexttimeout=self.config.multiprocess_timeout
473a380bf10Simarom                for w in workers:
474a380bf10Simarom                    if w.is_alive() and len(w.currentaddr.value) > 0:
475a380bf10Simarom                        timeprocessing = time.time()-w.currentstart.value
476a380bf10Simarom                        if timeprocessing <= self.config.multiprocess_timeout:
477a380bf10Simarom                            nexttimeout = min(nexttimeout,
478a380bf10Simarom                                self.config.multiprocess_timeout-timeprocessing)
479a380bf10Simarom            log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
481a380bf10Simarom        except (KeyboardInterrupt, SystemExit) as e:
482a380bf10Simarom            log.info('parent received ctrl-c when waiting for test results')
483a380bf10Simarom            thrownError = e
484a380bf10Simarom            #resultQueue.get(False)
486a380bf10Simarom            result.addError(test, sys.exc_info())
488a380bf10Simarom        try:
489a380bf10Simarom            for case in to_teardown:
490a380bf10Simarom                log.debug("Tearing down shared fixtures for %s", case)
491a380bf10Simarom                try:
492a380bf10Simarom                    case.tearDown()
493a380bf10Simarom                except (KeyboardInterrupt, SystemExit):
494a380bf10Simarom                    raise
495a380bf10Simarom                except:
496a380bf10Simarom                    result.addError(case, sys.exc_info())
498a380bf10Simarom            stop = time.time()
500a380bf10Simarom            # first write since can freeze on shutting down processes
501a380bf10Simarom            result.printErrors()
502a380bf10Simarom            result.printSummary(start, stop)
503a380bf10Simarom            self.config.plugins.finalize(result)
505a380bf10Simarom            if thrownError is None:
506a380bf10Simarom                log.debug("Tell all workers to stop")
507a380bf10Simarom                for w in workers:
508a380bf10Simarom                    if w.is_alive():
509a380bf10Simarom                        testQueue.put('STOP', block=False)
511a380bf10Simarom            # wait for the workers to end
512a380bf10Simarom            for iworker,worker in enumerate(workers):
513a380bf10Simarom                if worker.is_alive():
514a380bf10Simarom                    log.debug('joining worker %s',iworker)
515a380bf10Simarom                    worker.join()
516a380bf10Simarom                    if worker.is_alive():
517a380bf10Simarom                        log.debug('failed to join worker %s',iworker)
518a380bf10Simarom        except (KeyboardInterrupt, SystemExit):
519a380bf10Simarom            log.info('parent received ctrl-c when shutting down: stop all processes')
520a380bf10Simarom            for worker in workers:
521a380bf10Simarom                if worker.is_alive():
522a380bf10Simarom                    worker.terminate()
524a380bf10Simarom            if thrownError: raise thrownError
525a380bf10Simarom            else: raise
527a380bf10Simarom        return result
529a380bf10Simarom    def addtask(testQueue,tasks,case):
530a380bf10Simarom        arg = None
531a380bf10Simarom        if isinstance(case,nose.case.Test) and hasattr(case.test,'arg'):
532a380bf10Simarom            # this removes the top level descriptor and allows real function
533a380bf10Simarom            # name to be returned
534a380bf10Simarom            case.test.descriptor = None
535a380bf10Simarom            arg = case.test.arg
536a380bf10Simarom        test_addr = MultiProcessTestRunner.address(case)
537a380bf10Simarom        testQueue.put((test_addr,arg), block=False)
538a380bf10Simarom        if arg is not None:
539a380bf10Simarom            test_addr += str(arg)
540a380bf10Simarom        if tasks is not None:
541a380bf10Simarom            tasks.append(test_addr)
542a380bf10Simarom        return test_addr
543a380bf10Simarom    addtask = staticmethod(addtask)
545a380bf10Simarom    def address(case):
546a380bf10Simarom        if hasattr(case, 'address'):
547a380bf10Simarom            file, mod, call = case.address()
548a380bf10Simarom        elif hasattr(case, 'context'):
549a380bf10Simarom            file, mod, call = test_address(case.context)
550a380bf10Simarom        else:
551a380bf10Simarom            raise Exception("Unable to convert %s to address" % case)
552a380bf10Simarom        parts = []
553a380bf10Simarom        if file is None:
554a380bf10Simarom            if mod is None:
555a380bf10Simarom                raise Exception("Unaddressable case %s" % case)
556a380bf10Simarom            else:
557a380bf10Simarom                parts.append(mod)
558a380bf10Simarom        else:
559a380bf10Simarom            # strip __init__.py(c) from end of file part
560a380bf10Simarom            # if present, having it there confuses loader
561a380bf10Simarom            dirname, basename = os.path.split(file)
562a380bf10Simarom            if basename.startswith('__init__'):
563a380bf10Simarom                file = dirname
564a380bf10Simarom            parts.append(file)
565a380bf10Simarom        if call is not None:
566a380bf10Simarom            parts.append(call)
567a380bf10Simarom        return ':'.join(map(str, parts))
568a380bf10Simarom    address = staticmethod(address)
570a380bf10Simarom    def nextBatch(self, test):
571a380bf10Simarom        # allows tests or suites to mark themselves as not safe
572a380bf10Simarom        # for multiprocess execution
573a380bf10Simarom        if hasattr(test, 'context'):
574a380bf10Simarom            if not getattr(test.context, '_multiprocess_', True):
575a380bf10Simarom                return
577a380bf10Simarom        if ((isinstance(test, ContextSuite)
578a380bf10Simarom             and test.hasFixtures(self.checkCanSplit))
579a380bf10Simarom            or not getattr(test, 'can_split', True)
580a380bf10Simarom            or not isinstance(test, unittest.TestSuite)):
581a380bf10Simarom            # regular test case, or a suite with context fixtures
583a380bf10Simarom            # special case: when run like nosetests path/to/module.py
584a380bf10Simarom            # the top-level suite has only one item, and it shares
585a380bf10Simarom            # the same context as that item. In that case, we want the
586a380bf10Simarom            # item, not the top-level suite
587a380bf10Simarom            if isinstance(test, ContextSuite):
588a380bf10Simarom                contained = list(test)
589a380bf10Simarom                if (len(contained) == 1
590a380bf10Simarom                    and getattr(contained[0],
591a380bf10Simarom                                'context', None) == test.context):
592a380bf10Simarom                    test = contained[0]
593a380bf10Simarom            yield test
594a380bf10Simarom        else:
595a380bf10Simarom            # Suite is without fixtures at this level; but it may have
596a380bf10Simarom            # fixtures at any deeper level, so we need to examine it all
597a380bf10Simarom            # the way down to the case level
598a380bf10Simarom            for case in test:
599a380bf10Simarom                for batch in self.nextBatch(case):
600a380bf10Simarom                    yield batch
602a380bf10Simarom    def checkCanSplit(context, fixt):
603a380bf10Simarom        """
604a380bf10Simarom        Callback that we use to check whether the fixtures found in a
605a380bf10Simarom        context or ancestor are ones we care about.
607a380bf10Simarom        Contexts can tell us that their fixtures are reentrant by setting
608a380bf10Simarom        _multiprocess_can_split_. So if we see that, we return False to
609a380bf10Simarom        disregard those fixtures.
610a380bf10Simarom        """
611a380bf10Simarom        if not fixt:
612a380bf10Simarom            return False
613a380bf10Simarom        if getattr(context, '_multiprocess_can_split_', False):
614a380bf10Simarom            return False
615a380bf10Simarom        return True
616a380bf10Simarom    checkCanSplit = staticmethod(checkCanSplit)
618a380bf10Simarom    def sharedFixtures(self, case):
619a380bf10Simarom        context = getattr(case, 'context', None)
620a380bf10Simarom        if not context:
621a380bf10Simarom            return False
622a380bf10Simarom        return getattr(context, '_multiprocess_shared_', False)
624a380bf10Simarom    def consolidate(self, result, batch_result):
625a380bf10Simarom        log.debug("batch result is %s" , batch_result)
626a380bf10Simarom        try:
627a380bf10Simarom            output, testsRun, failures, errors, errorClasses = batch_result
628a380bf10Simarom        except ValueError:
629a380bf10Simarom            log.debug("result in unexpected format %s", batch_result)
630a380bf10Simarom            failure.Failure(*sys.exc_info())(result)
631a380bf10Simarom            return
632a380bf10Simarom        self.stream.write(output)
633a380bf10Simarom        result.testsRun += testsRun
634a380bf10Simarom        result.failures.extend(failures)
635a380bf10Simarom        result.errors.extend(errors)
636a380bf10Simarom        for key, (storage, label, isfail) in list(errorClasses.items()):
637a380bf10Simarom            if key not in result.errorClasses:
638a380bf10Simarom                # Ordinarily storage is result attribute
639a380bf10Simarom                # but it's only processed through the errorClasses
640a380bf10Simarom                # dict, so it's ok to fake it here
641a380bf10Simarom                result.errorClasses[key] = ([], label, isfail)
642a380bf10Simarom            mystorage, _junk, _junk = result.errorClasses[key]
643a380bf10Simarom            mystorage.extend(storage)
644a380bf10Simarom        log.debug("Ran %s tests (total: %s)", testsRun, result.testsRun)
647a380bf10Simaromdef runner(ix, testQueue, resultQueue, currentaddr, currentstart,
648a380bf10Simarom           keyboardCaught, shouldStop, loaderClass, resultClass, config):
649a380bf10Simarom    try:
650a380bf10Simarom        try:
651a380bf10Simarom            return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
652a380bf10Simarom                    keyboardCaught, shouldStop, loaderClass, resultClass, config)
653a380bf10Simarom        except KeyboardInterrupt:
654a380bf10Simarom            log.debug('Worker %s keyboard interrupt, stopping',ix)
655a380bf10Simarom    except Empty:
656a380bf10Simarom        log.debug("Worker %s timed out waiting for tasks", ix)
658a380bf10Simaromdef __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
659a380bf10Simarom           keyboardCaught, shouldStop, loaderClass, resultClass, config):
661a380bf10Simarom    config = pickle.loads(config)
662a380bf10Simarom    dummy_parser = config.parserClass()
663a380bf10Simarom    if _instantiate_plugins is not None:
664a380bf10Simarom        for pluginclass in _instantiate_plugins:
665a380bf10Simarom            plugin = pluginclass()
666a380bf10Simarom            plugin.addOptions(dummy_parser,{})
667a380bf10Simarom            config.plugins.addPlugin(plugin)
668a380bf10Simarom    config.plugins.configure(config.options,config)
669a380bf10Simarom    config.plugins.begin()
670a380bf10Simarom    log.debug("Worker %s executing, pid=%d", ix,os.getpid())
671a380bf10Simarom    loader = loaderClass(config=config)
672a380bf10Simarom    loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
674a380bf10Simarom    def get():
675a380bf10Simarom        return testQueue.get(timeout=config.multiprocess_timeout)
677a380bf10Simarom    def makeResult():
678a380bf10Simarom        stream = _WritelnDecorator(StringIO())
679a380bf10Simarom        result = resultClass(stream, descriptions=1,
680a380bf10Simarom                             verbosity=config.verbosity,
681a380bf10Simarom                             config=config)
682a380bf10Simarom        plug_result = config.plugins.prepareTestResult(result)
683a380bf10Simarom        if plug_result:
684a380bf10Simarom            return plug_result
685a380bf10Simarom        return result
687a380bf10Simarom    def batch(result):
688a380bf10Simarom        failures = [(TestLet(c), err) for c, err in result.failures]
689a380bf10Simarom        errors = [(TestLet(c), err) for c, err in result.errors]
690a380bf10Simarom        errorClasses = {}
691a380bf10Simarom        for key, (storage, label, isfail) in list(result.errorClasses.items()):
692a380bf10Simarom            errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
693a380bf10Simarom                                 label, isfail)
694a380bf10Simarom        return (
695a380bf10Simarom            result.stream.getvalue(),
696a380bf10Simarom            result.testsRun,
697a380bf10Simarom            failures,
698a380bf10Simarom            errors,
699a380bf10Simarom            errorClasses)
700a380bf10Simarom    for test_addr, arg in iter(get, 'STOP'):
701a380bf10Simarom        if shouldStop.is_set():
702a380bf10Simarom            log.exception('Worker %d STOPPED',ix)
703a380bf10Simarom            break
704a380bf10Simarom        result = makeResult()
705a380bf10Simarom        test = loader.loadTestsFromNames([test_addr])
706a380bf10Simarom        test.testQueue = testQueue
707a380bf10Simarom        test.tasks = []
708a380bf10Simarom        test.arg = arg
709a380bf10Simarom        log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
710a380bf10Simarom        try:
711a380bf10Simarom            if arg is not None:
712a380bf10Simarom                test_addr = test_addr + str(arg)
713a380bf10Simarom            currentaddr.value = bytes_(test_addr)
714a380bf10Simarom            currentstart.value = time.time()
715a380bf10Simarom            test(result)
716a380bf10Simarom            currentaddr.value = bytes_('')
717a380bf10Simarom            resultQueue.put((ix, test_addr, test.tasks, batch(result)))
718a380bf10Simarom        except KeyboardInterrupt as e: #TimedOutException:
719a380bf10Simarom            timeout = isinstance(e, TimedOutException)
720a380bf10Simarom            if timeout:
721a380bf10Simarom                keyboardCaught.set()
722a380bf10Simarom            if len(currentaddr.value):
723a380bf10Simarom                if timeout:
724a380bf10Simarom                    msg = 'Worker %s timed out, failing current test %s'
725a380bf10Simarom                else:
726a380bf10Simarom                    msg = 'Worker %s keyboard interrupt, failing current test %s'
727a380bf10Simarom                log.exception(msg,ix,test_addr)
728a380bf10Simarom                currentaddr.value = bytes_('')
729a380bf10Simarom                failure.Failure(*sys.exc_info())(result)
730a380bf10Simarom                resultQueue.put((ix, test_addr, test.tasks, batch(result)))
731a380bf10Simarom            else:
732a380bf10Simarom                if timeout:
733a380bf10Simarom                    msg = 'Worker %s test %s timed out'
734a380bf10Simarom                else:
735a380bf10Simarom                    msg = 'Worker %s test %s keyboard interrupt'
736a380bf10Simarom                log.debug(msg,ix,test_addr)
737a380bf10Simarom                resultQueue.put((ix, test_addr, test.tasks, batch(result)))
738a380bf10Simarom            if not timeout:
739a380bf10Simarom                raise
740a380bf10Simarom        except SystemExit:
741a380bf10Simarom            currentaddr.value = bytes_('')
742a380bf10Simarom            log.exception('Worker %s system exit',ix)
743a380bf10Simarom            raise
744a380bf10Simarom        except:
745a380bf10Simarom            currentaddr.value = bytes_('')
746a380bf10Simarom            log.exception("Worker %s error running test or returning "
747a380bf10Simarom                            "results",ix)
748a380bf10Simarom            failure.Failure(*sys.exc_info())(result)
749a380bf10Simarom            resultQueue.put((ix, test_addr, test.tasks, batch(result)))
750a380bf10Simarom        if config.multiprocess_restartworker:
751a380bf10Simarom            break
752a380bf10Simarom    log.debug("Worker %s ending", ix)
755a380bf10Simaromclass NoSharedFixtureContextSuite(ContextSuite):
756a380bf10Simarom    """
757a380bf10Simarom    Context suite that never fires shared fixtures.
759a380bf10Simarom    When a context sets _multiprocess_shared_, fixtures in that context
760a380bf10Simarom    are executed by the main process. Using this suite class prevents them
761a380bf10Simarom    from executing in the runner process as well.
763a380bf10Simarom    """
764a380bf10Simarom    testQueue = None
765a380bf10Simarom    tasks = None
766a380bf10Simarom    arg = None
767a380bf10Simarom    def setupContext(self, context):
768a380bf10Simarom        if getattr(context, '_multiprocess_shared_', False):
769a380bf10Simarom            return
770a380bf10Simarom        super(NoSharedFixtureContextSuite, self).setupContext(context)
772a380bf10Simarom    def teardownContext(self, context):
773a380bf10Simarom        if getattr(context, '_multiprocess_shared_', False):
774a380bf10Simarom            return
775a380bf10Simarom        super(NoSharedFixtureContextSuite, self).teardownContext(context)
776a380bf10Simarom    def run(self, result):
777a380bf10Simarom        """Run tests in suite inside of suite fixtures.
778a380bf10Simarom        """
779a380bf10Simarom        # proxy the result for myself
780a380bf10Simarom        log.debug("suite %s (%s) run called, tests: %s",
781a380bf10Simarom                  id(self), self, self._tests)
782a380bf10Simarom        if self.resultProxy:
783a380bf10Simarom            result, orig = self.resultProxy(result, self), result
784a380bf10Simarom        else:
785a380bf10Simarom            result, orig = result, result
786a380bf10Simarom        try:
787a380bf10Simarom            #log.debug('setUp for %s', id(self));
788a380bf10Simarom            self.setUp()
789a380bf10Simarom        except KeyboardInterrupt:
790a380bf10Simarom            raise
791a380bf10Simarom        except:
792a380bf10Simarom            self.error_context = 'setup'
793a380bf10Simarom            result.addError(self, self._exc_info())
794a380bf10Simarom            return
795a380bf10Simarom        try:
796a380bf10Simarom            for test in self._tests:
797a380bf10Simarom                if (isinstance(test,nose.case.Test)
798a380bf10Simarom                    and self.arg is not None):
799a380bf10Simarom                    test.test.arg = self.arg
800a380bf10Simarom                else:
801a380bf10Simarom                    test.arg = self.arg
802a380bf10Simarom                test.testQueue = self.testQueue
803a380bf10Simarom                test.tasks = self.tasks
804a380bf10Simarom                if result.shouldStop:
805a380bf10Simarom                    log.debug("stopping")
806a380bf10Simarom                    break
807a380bf10Simarom                # each nose.case.Test will create its own result proxy
808a380bf10Simarom                # so the cases need the original result, to avoid proxy
809a380bf10Simarom                # chains
810a380bf10Simarom                #log.debug('running test %s in suite %s', test, self);
811a380bf10Simarom                try:
812a380bf10Simarom                    test(orig)
813a380bf10Simarom                except KeyboardInterrupt as e:
814a380bf10Simarom                    timeout = isinstance(e, TimedOutException)
815a380bf10Simarom                    if timeout:
816a380bf10Simarom                        msg = 'Timeout when running test %s in suite %s'
817a380bf10Simarom                    else:
818a380bf10Simarom                        msg = 'KeyboardInterrupt when running test %s in suite %s'
819a380bf10Simarom                    log.debug(msg, test, self)
820a380bf10Simarom                    err = (TimedOutException,TimedOutException(str(test)),
821a380bf10Simarom                           sys.exc_info()[2])
822a380bf10Simarom                    test.config.plugins.addError(test,err)
823a380bf10Simarom                    orig.addError(test,err)
824a380bf10Simarom                    if not timeout:
825a380bf10Simarom                        raise
826a380bf10Simarom        finally:
827a380bf10Simarom            self.has_run = True
828a380bf10Simarom            try:
829a380bf10Simarom                #log.debug('tearDown for %s', id(self));
830a380bf10Simarom                self.tearDown()
831a380bf10Simarom            except KeyboardInterrupt:
832a380bf10Simarom                raise
833a380bf10Simarom            except:
834a380bf10Simarom                self.error_context = 'teardown'
835a380bf10Simarom                result.addError(self, self._exc_info())