1# -*- coding: utf-8 -*-
2#
3# test/test_daemon.py
4# Part of ‘python-daemon’, an implementation of PEP 3143.
5#
6# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
7#
8# This is free software: you may copy, modify, and/or distribute this work
9# under the terms of the Apache License, version 2.0 as published by the
10# Apache Software Foundation.
11# No warranty expressed or implied. See the file ‘LICENSE.ASF-2’ for details.
12
13""" Unit test for ‘daemon’ module.
14    """
15
16from __future__ import (absolute_import, unicode_literals)
17
18import os
19import sys
20import tempfile
21import resource
22import errno
23import signal
24import socket
25from types import ModuleType
26import collections
27import functools
28try:
29    # Standard library of Python 2.7 and later.
30    from io import StringIO
31except ImportError:
32    # Standard library of Python 2.6 and earlier.
33    from StringIO import StringIO
34
35import mock
36
37from . import scaffold
38from .scaffold import (basestring, unicode)
39from .test_pidfile import (
40        FakeFileDescriptorStringIO,
41        setup_pidfile_fixtures,
42        )
43
44import daemon
45
46
47class ModuleExceptions_TestCase(scaffold.Exception_TestCase):
48    """ Test cases for module exception classes. """
49
50    scenarios = scaffold.make_exception_scenarios([
51            ('daemon.daemon.DaemonError', dict(
52                exc_type = daemon.daemon.DaemonError,
53                min_args = 1,
54                types = [Exception],
55                )),
56            ('daemon.daemon.DaemonOSEnvironmentError', dict(
57                exc_type = daemon.daemon.DaemonOSEnvironmentError,
58                min_args = 1,
59                types = [daemon.daemon.DaemonError, OSError],
60                )),
61            ('daemon.daemon.DaemonProcessDetachError', dict(
62                exc_type = daemon.daemon.DaemonProcessDetachError,
63                min_args = 1,
64                types = [daemon.daemon.DaemonError, OSError],
65                )),
66            ])
67
68
69def setup_daemon_context_fixtures(testcase):
70    """ Set up common test fixtures for DaemonContext test case.
71
72        :param testcase: A ``TestCase`` instance to decorate.
73        :return: ``None``.
74
75        Decorate the `testcase` with fixtures for tests involving
76        `DaemonContext`.
77
78        """
79    setup_streams_fixtures(testcase)
80
81    setup_pidfile_fixtures(testcase)
82
83    testcase.fake_pidfile_path = tempfile.mktemp()
84    testcase.mock_pidlockfile = mock.MagicMock()
85    testcase.mock_pidlockfile.path = testcase.fake_pidfile_path
86
87    testcase.daemon_context_args = dict(
88            stdin=testcase.stream_files_by_name['stdin'],
89            stdout=testcase.stream_files_by_name['stdout'],
90            stderr=testcase.stream_files_by_name['stderr'],
91            )
92    testcase.test_instance = daemon.DaemonContext(
93            **testcase.daemon_context_args)
94
95fake_default_signal_map = object()
96
97@mock.patch.object(
98        daemon.daemon, "is_detach_process_context_required",
99        new=(lambda: True))
100@mock.patch.object(
101        daemon.daemon, "make_default_signal_map",
102        new=(lambda: fake_default_signal_map))
103@mock.patch.object(os, "setgid", new=(lambda x: object()))
104@mock.patch.object(os, "setuid", new=(lambda x: object()))
105class DaemonContext_BaseTestCase(scaffold.TestCase):
106    """ Base class for DaemonContext test case classes. """
107
108    def setUp(self):
109        """ Set up test fixtures. """
110        super(DaemonContext_BaseTestCase, self).setUp()
111
112        setup_daemon_context_fixtures(self)
113
114
115class DaemonContext_TestCase(DaemonContext_BaseTestCase):
116    """ Test cases for DaemonContext class. """
117
118    def test_instantiate(self):
119        """ New instance of DaemonContext should be created. """
120        self.assertIsInstance(
121                self.test_instance, daemon.daemon.DaemonContext)
122
123    def test_minimum_zero_arguments(self):
124        """ Initialiser should not require any arguments. """
125        instance = daemon.daemon.DaemonContext()
126        self.assertIsNot(instance, None)
127
128    def test_has_specified_chroot_directory(self):
129        """ Should have specified chroot_directory option. """
130        args = dict(
131                chroot_directory=object(),
132                )
133        expected_directory = args['chroot_directory']
134        instance = daemon.daemon.DaemonContext(**args)
135        self.assertEqual(expected_directory, instance.chroot_directory)
136
137    def test_has_specified_working_directory(self):
138        """ Should have specified working_directory option. """
139        args = dict(
140                working_directory=object(),
141                )
142        expected_directory = args['working_directory']
143        instance = daemon.daemon.DaemonContext(**args)
144        self.assertEqual(expected_directory, instance.working_directory)
145
146    def test_has_default_working_directory(self):
147        """ Should have default working_directory option. """
148        args = dict()
149        expected_directory = "/"
150        instance = daemon.daemon.DaemonContext(**args)
151        self.assertEqual(expected_directory, instance.working_directory)
152
153    def test_has_specified_creation_mask(self):
154        """ Should have specified umask option. """
155        args = dict(
156                umask=object(),
157                )
158        expected_mask = args['umask']
159        instance = daemon.daemon.DaemonContext(**args)
160        self.assertEqual(expected_mask, instance.umask)
161
162    def test_has_default_creation_mask(self):
163        """ Should have default umask option. """
164        args = dict()
165        expected_mask = 0
166        instance = daemon.daemon.DaemonContext(**args)
167        self.assertEqual(expected_mask, instance.umask)
168
169    def test_has_specified_uid(self):
170        """ Should have specified uid option. """
171        args = dict(
172                uid=object(),
173                )
174        expected_id = args['uid']
175        instance = daemon.daemon.DaemonContext(**args)
176        self.assertEqual(expected_id, instance.uid)
177
178    def test_has_derived_uid(self):
179        """ Should have uid option derived from process. """
180        args = dict()
181        expected_id = os.getuid()
182        instance = daemon.daemon.DaemonContext(**args)
183        self.assertEqual(expected_id, instance.uid)
184
185    def test_has_specified_gid(self):
186        """ Should have specified gid option. """
187        args = dict(
188                gid=object(),
189                )
190        expected_id = args['gid']
191        instance = daemon.daemon.DaemonContext(**args)
192        self.assertEqual(expected_id, instance.gid)
193
194    def test_has_derived_gid(self):
195        """ Should have gid option derived from process. """
196        args = dict()
197        expected_id = os.getgid()
198        instance = daemon.daemon.DaemonContext(**args)
199        self.assertEqual(expected_id, instance.gid)
200
201    def test_has_specified_detach_process(self):
202        """ Should have specified detach_process option. """
203        args = dict(
204                detach_process=object(),
205                )
206        expected_value = args['detach_process']
207        instance = daemon.daemon.DaemonContext(**args)
208        self.assertEqual(expected_value, instance.detach_process)
209
210    def test_has_derived_detach_process(self):
211        """ Should have detach_process option derived from environment. """
212        args = dict()
213        func = daemon.daemon.is_detach_process_context_required
214        expected_value = func()
215        instance = daemon.daemon.DaemonContext(**args)
216        self.assertEqual(expected_value, instance.detach_process)
217
218    def test_has_specified_files_preserve(self):
219        """ Should have specified files_preserve option. """
220        args = dict(
221                files_preserve=object(),
222                )
223        expected_files_preserve = args['files_preserve']
224        instance = daemon.daemon.DaemonContext(**args)
225        self.assertEqual(expected_files_preserve, instance.files_preserve)
226
227    def test_has_specified_pidfile(self):
228        """ Should have the specified pidfile. """
229        args = dict(
230                pidfile=object(),
231                )
232        expected_pidfile = args['pidfile']
233        instance = daemon.daemon.DaemonContext(**args)
234        self.assertEqual(expected_pidfile, instance.pidfile)
235
236    def test_has_specified_stdin(self):
237        """ Should have specified stdin option. """
238        args = dict(
239                stdin=object(),
240                )
241        expected_file = args['stdin']
242        instance = daemon.daemon.DaemonContext(**args)
243        self.assertEqual(expected_file, instance.stdin)
244
245    def test_has_specified_stdout(self):
246        """ Should have specified stdout option. """
247        args = dict(
248                stdout=object(),
249                )
250        expected_file = args['stdout']
251        instance = daemon.daemon.DaemonContext(**args)
252        self.assertEqual(expected_file, instance.stdout)
253
254    def test_has_specified_stderr(self):
255        """ Should have specified stderr option. """
256        args = dict(
257                stderr=object(),
258                )
259        expected_file = args['stderr']
260        instance = daemon.daemon.DaemonContext(**args)
261        self.assertEqual(expected_file, instance.stderr)
262
263    def test_has_specified_signal_map(self):
264        """ Should have specified signal_map option. """
265        args = dict(
266                signal_map=object(),
267                )
268        expected_signal_map = args['signal_map']
269        instance = daemon.daemon.DaemonContext(**args)
270        self.assertEqual(expected_signal_map, instance.signal_map)
271
272    def test_has_derived_signal_map(self):
273        """ Should have signal_map option derived from system. """
274        args = dict()
275        expected_signal_map = daemon.daemon.make_default_signal_map()
276        instance = daemon.daemon.DaemonContext(**args)
277        self.assertEqual(expected_signal_map, instance.signal_map)
278
279
280class DaemonContext_is_open_TestCase(DaemonContext_BaseTestCase):
281    """ Test cases for DaemonContext.is_open property. """
282
283    def test_begin_false(self):
284        """ Initial value of is_open should be False. """
285        instance = self.test_instance
286        self.assertEqual(False, instance.is_open)
287
288    def test_write_fails(self):
289        """ Writing to is_open should fail. """
290        instance = self.test_instance
291        self.assertRaises(
292                AttributeError,
293                setattr, instance, 'is_open', object())
294
295
296class DaemonContext_open_TestCase(DaemonContext_BaseTestCase):
297    """ Test cases for DaemonContext.open method. """
298
299    def setUp(self):
300        """ Set up test fixtures. """
301        super(DaemonContext_open_TestCase, self).setUp()
302
303        self.test_instance._is_open = False
304
305        self.mock_module_daemon = mock.MagicMock()
306        daemon_func_patchers = dict(
307                (func_name, mock.patch.object(
308                    daemon.daemon, func_name))
309                for func_name in [
310                    "detach_process_context",
311                    "change_working_directory",
312                    "change_root_directory",
313                    "change_file_creation_mask",
314                    "change_process_owner",
315                    "prevent_core_dump",
316                    "close_all_open_files",
317                    "redirect_stream",
318                    "set_signal_handlers",
319                    "register_atexit_function",
320                    ])
321        for (func_name, patcher) in daemon_func_patchers.items():
322            mock_func = patcher.start()
323            self.addCleanup(patcher.stop)
324            self.mock_module_daemon.attach_mock(mock_func, func_name)
325
326        self.mock_module_daemon.attach_mock(mock.Mock(), 'DaemonContext')
327
328        self.test_files_preserve_fds = object()
329        self.test_signal_handler_map = object()
330        daemoncontext_method_return_values = {
331                '_get_exclude_file_descriptors':
332                    self.test_files_preserve_fds,
333                '_make_signal_handler_map':
334                    self.test_signal_handler_map,
335                }
336        daemoncontext_func_patchers = dict(
337                (func_name, mock.patch.object(
338                    daemon.daemon.DaemonContext,
339                    func_name,
340                    return_value=return_value))
341                for (func_name, return_value) in
342                    daemoncontext_method_return_values.items())
343        for (func_name, patcher) in daemoncontext_func_patchers.items():
344            mock_func = patcher.start()
345            self.addCleanup(patcher.stop)
346            self.mock_module_daemon.DaemonContext.attach_mock(
347                    mock_func, func_name)
348
349    def test_performs_steps_in_expected_sequence(self):
350        """ Should perform daemonisation steps in expected sequence. """
351        instance = self.test_instance
352        instance.chroot_directory = object()
353        instance.detach_process = True
354        instance.pidfile = self.mock_pidlockfile
355        self.mock_module_daemon.attach_mock(
356                self.mock_pidlockfile, 'pidlockfile')
357        expected_calls = [
358                mock.call.change_root_directory(mock.ANY),
359                mock.call.prevent_core_dump(),
360                mock.call.change_file_creation_mask(mock.ANY),
361                mock.call.change_working_directory(mock.ANY),
362                mock.call.change_process_owner(mock.ANY, mock.ANY),
363                mock.call.detach_process_context(),
364                mock.call.DaemonContext._make_signal_handler_map(),
365                mock.call.set_signal_handlers(mock.ANY),
366                mock.call.DaemonContext._get_exclude_file_descriptors(),
367                mock.call.close_all_open_files(exclude=mock.ANY),
368                mock.call.redirect_stream(mock.ANY, mock.ANY),
369                mock.call.redirect_stream(mock.ANY, mock.ANY),
370                mock.call.redirect_stream(mock.ANY, mock.ANY),
371                mock.call.pidlockfile.__enter__(),
372                mock.call.register_atexit_function(mock.ANY),
373                ]
374        instance.open()
375        self.mock_module_daemon.assert_has_calls(expected_calls)
376
377    def test_returns_immediately_if_is_open(self):
378        """ Should return immediately if is_open property is true. """
379        instance = self.test_instance
380        instance._is_open = True
381        instance.open()
382        self.assertEqual(0, len(self.mock_module_daemon.mock_calls))
383
384    def test_changes_root_directory_to_chroot_directory(self):
385        """ Should change root directory to `chroot_directory` option. """
386        instance = self.test_instance
387        chroot_directory = object()
388        instance.chroot_directory = chroot_directory
389        instance.open()
390        self.mock_module_daemon.change_root_directory.assert_called_with(
391                chroot_directory)
392
393    def test_omits_chroot_if_no_chroot_directory(self):
394        """ Should omit changing root directory if no `chroot_directory`. """
395        instance = self.test_instance
396        instance.chroot_directory = None
397        instance.open()
398        self.assertFalse(self.mock_module_daemon.change_root_directory.called)
399
400    def test_prevents_core_dump(self):
401        """ Should request prevention of core dumps. """
402        instance = self.test_instance
403        instance.open()
404        self.mock_module_daemon.prevent_core_dump.assert_called_with()
405
406    def test_omits_prevent_core_dump_if_prevent_core_false(self):
407        """ Should omit preventing core dumps if `prevent_core` is false. """
408        instance = self.test_instance
409        instance.prevent_core = False
410        instance.open()
411        self.assertFalse(self.mock_module_daemon.prevent_core_dump.called)
412
413    def test_closes_open_files(self):
414        """ Should close all open files, excluding `files_preserve`. """
415        instance = self.test_instance
416        expected_exclude = self.test_files_preserve_fds
417        instance.open()
418        self.mock_module_daemon.close_all_open_files.assert_called_with(
419                exclude=expected_exclude)
420
421    def test_changes_directory_to_working_directory(self):
422        """ Should change current directory to `working_directory` option. """
423        instance = self.test_instance
424        working_directory = object()
425        instance.working_directory = working_directory
426        instance.open()
427        self.mock_module_daemon.change_working_directory.assert_called_with(
428                working_directory)
429
430    def test_changes_creation_mask_to_umask(self):
431        """ Should change file creation mask to `umask` option. """
432        instance = self.test_instance
433        umask = object()
434        instance.umask = umask
435        instance.open()
436        self.mock_module_daemon.change_file_creation_mask.assert_called_with(
437                umask)
438
439    def test_changes_owner_to_specified_uid_and_gid(self):
440        """ Should change process UID and GID to `uid` and `gid` options. """
441        instance = self.test_instance
442        uid = object()
443        gid = object()
444        instance.uid = uid
445        instance.gid = gid
446        instance.open()
447        self.mock_module_daemon.change_process_owner.assert_called_with(
448                uid, gid)
449
450    def test_detaches_process_context(self):
451        """ Should request detach of process context. """
452        instance = self.test_instance
453        instance.open()
454        self.mock_module_daemon.detach_process_context.assert_called_with()
455
456    def test_omits_process_detach_if_not_required(self):
457        """ Should omit detach of process context if not required. """
458        instance = self.test_instance
459        instance.detach_process = False
460        instance.open()
461        self.assertFalse(self.mock_module_daemon.detach_process_context.called)
462
463    def test_sets_signal_handlers_from_signal_map(self):
464        """ Should set signal handlers according to `signal_map`. """
465        instance = self.test_instance
466        instance.signal_map = object()
467        expected_signal_handler_map = self.test_signal_handler_map
468        instance.open()
469        self.mock_module_daemon.set_signal_handlers.assert_called_with(
470                expected_signal_handler_map)
471
472    def test_redirects_standard_streams(self):
473        """ Should request redirection of standard stream files. """
474        instance = self.test_instance
475        (system_stdin, system_stdout, system_stderr) = (
476                sys.stdin, sys.stdout, sys.stderr)
477        (target_stdin, target_stdout, target_stderr) = (
478                self.stream_files_by_name[name]
479                for name in ['stdin', 'stdout', 'stderr'])
480        expected_calls = [
481                mock.call(system_stdin, target_stdin),
482                mock.call(system_stdout, target_stdout),
483                mock.call(system_stderr, target_stderr),
484                ]
485        instance.open()
486        self.mock_module_daemon.redirect_stream.assert_has_calls(
487                expected_calls, any_order=True)
488
489    def test_enters_pidfile_context(self):
490        """ Should enter the PID file context manager. """
491        instance = self.test_instance
492        instance.pidfile = self.mock_pidlockfile
493        instance.open()
494        self.mock_pidlockfile.__enter__.assert_called_with()
495
496    def test_sets_is_open_true(self):
497        """ Should set the `is_open` property to True. """
498        instance = self.test_instance
499        instance.open()
500        self.assertEqual(True, instance.is_open)
501
502    def test_registers_close_method_for_atexit(self):
503        """ Should register the `close` method for atexit processing. """
504        instance = self.test_instance
505        close_method = instance.close
506        instance.open()
507        self.mock_module_daemon.register_atexit_function.assert_called_with(
508                close_method)
509
510
511class DaemonContext_close_TestCase(DaemonContext_BaseTestCase):
512    """ Test cases for DaemonContext.close method. """
513
514    def setUp(self):
515        """ Set up test fixtures. """
516        super(DaemonContext_close_TestCase, self).setUp()
517
518        self.test_instance._is_open = True
519
520    def test_returns_immediately_if_not_is_open(self):
521        """ Should return immediately if is_open property is false. """
522        instance = self.test_instance
523        instance._is_open = False
524        instance.pidfile = object()
525        instance.close()
526        self.assertFalse(self.mock_pidlockfile.__exit__.called)
527
528    def test_exits_pidfile_context(self):
529        """ Should exit the PID file context manager. """
530        instance = self.test_instance
531        instance.pidfile = self.mock_pidlockfile
532        instance.close()
533        self.mock_pidlockfile.__exit__.assert_called_with(None, None, None)
534
535    def test_returns_none(self):
536        """ Should return None. """
537        instance = self.test_instance
538        expected_result = None
539        result = instance.close()
540        self.assertIs(result, expected_result)
541
542    def test_sets_is_open_false(self):
543        """ Should set the `is_open` property to False. """
544        instance = self.test_instance
545        instance.close()
546        self.assertEqual(False, instance.is_open)
547
548
549@mock.patch.object(daemon.daemon.DaemonContext, "open")
550class DaemonContext_context_manager_enter_TestCase(DaemonContext_BaseTestCase):
551    """ Test cases for DaemonContext.__enter__ method. """
552
553    def test_opens_daemon_context(self, mock_func_daemoncontext_open):
554        """ Should open the DaemonContext. """
555        instance = self.test_instance
556        instance.__enter__()
557        mock_func_daemoncontext_open.assert_called_with()
558
559    def test_returns_self_instance(self, mock_func_daemoncontext_open):
560        """ Should return DaemonContext instance. """
561        instance = self.test_instance
562        expected_result = instance
563        result = instance.__enter__()
564        self.assertIs(result, expected_result)
565
566
567@mock.patch.object(daemon.daemon.DaemonContext, "close")
568class DaemonContext_context_manager_exit_TestCase(DaemonContext_BaseTestCase):
569    """ Test cases for DaemonContext.__exit__ method. """
570
571    def setUp(self):
572        """ Set up test fixtures. """
573        super(DaemonContext_context_manager_exit_TestCase, self).setUp()
574
575        self.test_args = dict(
576                exc_type=object(),
577                exc_value=object(),
578                traceback=object(),
579                )
580
581    def test_closes_daemon_context(self, mock_func_daemoncontext_close):
582        """ Should close the DaemonContext. """
583        instance = self.test_instance
584        args = self.test_args
585        instance.__exit__(**args)
586        mock_func_daemoncontext_close.assert_called_with()
587
588    def test_returns_none(self, mock_func_daemoncontext_close):
589        """ Should return None, indicating exception was not handled. """
590        instance = self.test_instance
591        args = self.test_args
592        expected_result = None
593        result = instance.__exit__(**args)
594        self.assertIs(result, expected_result)
595
596
597class DaemonContext_terminate_TestCase(DaemonContext_BaseTestCase):
598    """ Test cases for DaemonContext.terminate method. """
599
600    def setUp(self):
601        """ Set up test fixtures. """
602        super(DaemonContext_terminate_TestCase, self).setUp()
603
604        self.test_signal = signal.SIGTERM
605        self.test_frame = None
606        self.test_args = (self.test_signal, self.test_frame)
607
608    def test_raises_system_exit(self):
609        """ Should raise SystemExit. """
610        instance = self.test_instance
611        args = self.test_args
612        expected_exception = SystemExit
613        self.assertRaises(
614                expected_exception,
615                instance.terminate, *args)
616
617    def test_exception_message_contains_signal_number(self):
618        """ Should raise exception with a message containing signal number. """
619        instance = self.test_instance
620        args = self.test_args
621        signal_number = self.test_signal
622        expected_exception = SystemExit
623        exc = self.assertRaises(
624                expected_exception,
625                instance.terminate, *args)
626        self.assertIn(unicode(signal_number), unicode(exc))
627
628
629class DaemonContext_get_exclude_file_descriptors_TestCase(
630        DaemonContext_BaseTestCase):
631    """ Test cases for DaemonContext._get_exclude_file_descriptors function. """
632
633    def setUp(self):
634        """ Set up test fixtures. """
635        super(
636                DaemonContext_get_exclude_file_descriptors_TestCase,
637                self).setUp()
638
639        self.test_files = {
640                2: FakeFileDescriptorStringIO(),
641                5: 5,
642                11: FakeFileDescriptorStringIO(),
643                17: None,
644                23: FakeFileDescriptorStringIO(),
645                37: 37,
646                42: FakeFileDescriptorStringIO(),
647                }
648        for (fileno, item) in self.test_files.items():
649            if hasattr(item, '_fileno'):
650                item._fileno = fileno
651        self.test_file_descriptors = set(
652                fd for (fd, item) in self.test_files.items()
653                if item is not None)
654        self.test_file_descriptors.update(
655                self.stream_files_by_name[name].fileno()
656                for name in ['stdin', 'stdout', 'stderr']
657                )
658
659    def test_returns_expected_file_descriptors(self):
660        """ Should return expected set of file descriptors. """
661        instance = self.test_instance
662        instance.files_preserve = list(self.test_files.values())
663        expected_result = self.test_file_descriptors
664        result = instance._get_exclude_file_descriptors()
665        self.assertEqual(expected_result, result)
666
667    def test_returns_stream_redirects_if_no_files_preserve(self):
668        """ Should return only stream redirects if no files_preserve. """
669        instance = self.test_instance
670        instance.files_preserve = None
671        expected_result = set(
672                stream.fileno()
673                for stream in self.stream_files_by_name.values())
674        result = instance._get_exclude_file_descriptors()
675        self.assertEqual(expected_result, result)
676
677    def test_returns_empty_set_if_no_files(self):
678        """ Should return empty set if no file options. """
679        instance = self.test_instance
680        for name in ['files_preserve', 'stdin', 'stdout', 'stderr']:
681            setattr(instance, name, None)
682        expected_result = set()
683        result = instance._get_exclude_file_descriptors()
684        self.assertEqual(expected_result, result)
685
686    def test_omits_non_file_streams(self):
687        """ Should omit non-file stream attributes. """
688        instance = self.test_instance
689        instance.files_preserve = list(self.test_files.values())
690        stream_files = self.stream_files_by_name
691        expected_result = self.test_file_descriptors.copy()
692        for (pseudo_stream_name, pseudo_stream) in stream_files.items():
693            test_non_file_object = object()
694            setattr(instance, pseudo_stream_name, test_non_file_object)
695            stream_fd = pseudo_stream.fileno()
696            expected_result.discard(stream_fd)
697        result = instance._get_exclude_file_descriptors()
698        self.assertEqual(expected_result, result)
699
700    def test_includes_verbatim_streams_without_file_descriptor(self):
701        """ Should include verbatim any stream without a file descriptor. """
702        instance = self.test_instance
703        instance.files_preserve = list(self.test_files.values())
704        stream_files = self.stream_files_by_name
705        mock_fileno_method = mock.MagicMock(
706                spec=sys.__stdin__.fileno,
707                side_effect=ValueError)
708        expected_result = self.test_file_descriptors.copy()
709        for (pseudo_stream_name, pseudo_stream) in stream_files.items():
710            test_non_fd_stream = StringIO()
711            if not hasattr(test_non_fd_stream, 'fileno'):
712                # Python < 3 StringIO doesn't have ‘fileno’ at all.
713                # Add a method which raises an exception.
714                test_non_fd_stream.fileno = mock_fileno_method
715            setattr(instance, pseudo_stream_name, test_non_fd_stream)
716            stream_fd = pseudo_stream.fileno()
717            expected_result.discard(stream_fd)
718            expected_result.add(test_non_fd_stream)
719        result = instance._get_exclude_file_descriptors()
720        self.assertEqual(expected_result, result)
721
722    def test_omits_none_streams(self):
723        """ Should omit any stream attribute which is None. """
724        instance = self.test_instance
725        instance.files_preserve = list(self.test_files.values())
726        stream_files = self.stream_files_by_name
727        expected_result = self.test_file_descriptors.copy()
728        for (pseudo_stream_name, pseudo_stream) in stream_files.items():
729            setattr(instance, pseudo_stream_name, None)
730            stream_fd = pseudo_stream.fileno()
731            expected_result.discard(stream_fd)
732        result = instance._get_exclude_file_descriptors()
733        self.assertEqual(expected_result, result)
734
735
736class DaemonContext_make_signal_handler_TestCase(DaemonContext_BaseTestCase):
737    """ Test cases for DaemonContext._make_signal_handler function. """
738
739    def test_returns_ignore_for_none(self):
740        """ Should return SIG_IGN when None handler specified. """
741        instance = self.test_instance
742        target = None
743        expected_result = signal.SIG_IGN
744        result = instance._make_signal_handler(target)
745        self.assertEqual(expected_result, result)
746
747    def test_returns_method_for_name(self):
748        """ Should return method of DaemonContext when name specified. """
749        instance = self.test_instance
750        target = 'terminate'
751        expected_result = instance.terminate
752        result = instance._make_signal_handler(target)
753        self.assertEqual(expected_result, result)
754
755    def test_raises_error_for_unknown_name(self):
756        """ Should raise AttributeError for unknown method name. """
757        instance = self.test_instance
758        target = 'b0gUs'
759        expected_error = AttributeError
760        self.assertRaises(
761                expected_error,
762                instance._make_signal_handler, target)
763
764    def test_returns_object_for_object(self):
765        """ Should return same object for any other object. """
766        instance = self.test_instance
767        target = object()
768        expected_result = target
769        result = instance._make_signal_handler(target)
770        self.assertEqual(expected_result, result)
771
772
773class DaemonContext_make_signal_handler_map_TestCase(
774        DaemonContext_BaseTestCase):
775    """ Test cases for DaemonContext._make_signal_handler_map function. """
776
777    def setUp(self):
778        """ Set up test fixtures. """
779        super(DaemonContext_make_signal_handler_map_TestCase, self).setUp()
780
781        self.test_instance.signal_map = {
782                object(): object(),
783                object(): object(),
784                object(): object(),
785                }
786
787        self.test_signal_handlers = dict(
788                (key, object())
789                for key in self.test_instance.signal_map.values())
790        self.test_signal_handler_map = dict(
791                (key, self.test_signal_handlers[target])
792                for (key, target) in self.test_instance.signal_map.items())
793
794        def fake_make_signal_handler(target):
795            return self.test_signal_handlers[target]
796
797        func_patcher_make_signal_handler = mock.patch.object(
798                daemon.daemon.DaemonContext, "_make_signal_handler",
799                side_effect=fake_make_signal_handler)
800        self.mock_func_make_signal_handler = (
801                func_patcher_make_signal_handler.start())
802        self.addCleanup(func_patcher_make_signal_handler.stop)
803
804    def test_returns_constructed_signal_handler_items(self):
805        """ Should return items as constructed via make_signal_handler. """
806        instance = self.test_instance
807        expected_result = self.test_signal_handler_map
808        result = instance._make_signal_handler_map()
809        self.assertEqual(expected_result, result)
810
811
812try:
813    FileNotFoundError
814except NameError:
815    # Python 2 uses IOError.
816    FileNotFoundError = functools.partial(IOError, errno.ENOENT)
817
818
819@mock.patch.object(os, "chdir")
820class change_working_directory_TestCase(scaffold.TestCase):
821    """ Test cases for change_working_directory function. """
822
823    def setUp(self):
824        """ Set up test fixtures. """
825        super(change_working_directory_TestCase, self).setUp()
826
827        self.test_directory = object()
828        self.test_args = dict(
829                directory=self.test_directory,
830                )
831
832    def test_changes_working_directory_to_specified_directory(
833            self,
834            mock_func_os_chdir):
835        """ Should change working directory to specified directory. """
836        args = self.test_args
837        directory = self.test_directory
838        daemon.daemon.change_working_directory(**args)
839        mock_func_os_chdir.assert_called_with(directory)
840
841    def test_raises_daemon_error_on_os_error(
842            self,
843            mock_func_os_chdir):
844        """ Should raise a DaemonError on receiving an IOError. """
845        args = self.test_args
846        test_error = FileNotFoundError("No such directory")
847        mock_func_os_chdir.side_effect = test_error
848        expected_error = daemon.daemon.DaemonOSEnvironmentError
849        exc = self.assertRaises(
850                expected_error,
851                daemon.daemon.change_working_directory, **args)
852        self.assertEqual(test_error, exc.__cause__)
853
854    def test_error_message_contains_original_error_message(
855            self,
856            mock_func_os_chdir):
857        """ Should raise a DaemonError with original message. """
858        args = self.test_args
859        test_error = FileNotFoundError("No such directory")
860        mock_func_os_chdir.side_effect = test_error
861        expected_error = daemon.daemon.DaemonOSEnvironmentError
862        exc = self.assertRaises(
863                expected_error,
864                daemon.daemon.change_working_directory, **args)
865        self.assertIn(unicode(test_error), unicode(exc))
866
867
868@mock.patch.object(os, "chroot")
869@mock.patch.object(os, "chdir")
870class change_root_directory_TestCase(scaffold.TestCase):
871    """ Test cases for change_root_directory function. """
872
873    def setUp(self):
874        """ Set up test fixtures. """
875        super(change_root_directory_TestCase, self).setUp()
876
877        self.test_directory = object()
878        self.test_args = dict(
879                directory=self.test_directory,
880                )
881
882    def test_changes_working_directory_to_specified_directory(
883            self,
884            mock_func_os_chdir, mock_func_os_chroot):
885        """ Should change working directory to specified directory. """
886        args = self.test_args
887        directory = self.test_directory
888        daemon.daemon.change_root_directory(**args)
889        mock_func_os_chdir.assert_called_with(directory)
890
891    def test_changes_root_directory_to_specified_directory(
892            self,
893            mock_func_os_chdir, mock_func_os_chroot):
894        """ Should change root directory to specified directory. """
895        args = self.test_args
896        directory = self.test_directory
897        daemon.daemon.change_root_directory(**args)
898        mock_func_os_chroot.assert_called_with(directory)
899
900    def test_raises_daemon_error_on_os_error_from_chdir(
901            self,
902            mock_func_os_chdir, mock_func_os_chroot):
903        """ Should raise a DaemonError on receiving an IOError from chdir. """
904        args = self.test_args
905        test_error = FileNotFoundError("No such directory")
906        mock_func_os_chdir.side_effect = test_error
907        expected_error = daemon.daemon.DaemonOSEnvironmentError
908        exc = self.assertRaises(
909                expected_error,
910                daemon.daemon.change_root_directory, **args)
911        self.assertEqual(test_error, exc.__cause__)
912
913    def test_raises_daemon_error_on_os_error_from_chroot(
914            self,
915            mock_func_os_chdir, mock_func_os_chroot):
916        """ Should raise a DaemonError on receiving an OSError from chroot. """
917        args = self.test_args
918        test_error = OSError(errno.EPERM, "No chroot for you!")
919        mock_func_os_chroot.side_effect = test_error
920        expected_error = daemon.daemon.DaemonOSEnvironmentError
921        exc = self.assertRaises(
922                expected_error,
923                daemon.daemon.change_root_directory, **args)
924        self.assertEqual(test_error, exc.__cause__)
925
926    def test_error_message_contains_original_error_message(
927            self,
928            mock_func_os_chdir, mock_func_os_chroot):
929        """ Should raise a DaemonError with original message. """
930        args = self.test_args
931        test_error = FileNotFoundError("No such directory")
932        mock_func_os_chdir.side_effect = test_error
933        expected_error = daemon.daemon.DaemonOSEnvironmentError
934        exc = self.assertRaises(
935                expected_error,
936                daemon.daemon.change_root_directory, **args)
937        self.assertIn(unicode(test_error), unicode(exc))
938
939
940@mock.patch.object(os, "umask")
941class change_file_creation_mask_TestCase(scaffold.TestCase):
942    """ Test cases for change_file_creation_mask function. """
943
944    def setUp(self):
945        """ Set up test fixtures. """
946        super(change_file_creation_mask_TestCase, self).setUp()
947
948        self.test_mask = object()
949        self.test_args = dict(
950                mask=self.test_mask,
951                )
952
953    def test_changes_umask_to_specified_mask(self, mock_func_os_umask):
954        """ Should change working directory to specified directory. """
955        args = self.test_args
956        mask = self.test_mask
957        daemon.daemon.change_file_creation_mask(**args)
958        mock_func_os_umask.assert_called_with(mask)
959
960    def test_raises_daemon_error_on_os_error_from_chdir(
961            self,
962            mock_func_os_umask):
963        """ Should raise a DaemonError on receiving an OSError from umask. """
964        args = self.test_args
965        test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
966        mock_func_os_umask.side_effect = test_error
967        expected_error = daemon.daemon.DaemonOSEnvironmentError
968        exc = self.assertRaises(
969                expected_error,
970                daemon.daemon.change_file_creation_mask, **args)
971        self.assertEqual(test_error, exc.__cause__)
972
973    def test_error_message_contains_original_error_message(
974            self,
975            mock_func_os_umask):
976        """ Should raise a DaemonError with original message. """
977        args = self.test_args
978        test_error = FileNotFoundError("No such directory")
979        mock_func_os_umask.side_effect = test_error
980        expected_error = daemon.daemon.DaemonOSEnvironmentError
981        exc = self.assertRaises(
982                expected_error,
983                daemon.daemon.change_file_creation_mask, **args)
984        self.assertIn(unicode(test_error), unicode(exc))
985
986
987@mock.patch.object(os, "setgid")
988@mock.patch.object(os, "setuid")
989class change_process_owner_TestCase(scaffold.TestCase):
990    """ Test cases for change_process_owner function. """
991
992    def setUp(self):
993        """ Set up test fixtures. """
994        super(change_process_owner_TestCase, self).setUp()
995
996        self.test_uid = object()
997        self.test_gid = object()
998        self.test_args = dict(
999                uid=self.test_uid,
1000                gid=self.test_gid,
1001                )
1002
1003    def test_changes_gid_and_uid_in_order(
1004            self,
1005            mock_func_os_setuid, mock_func_os_setgid):
1006        """ Should change process GID and UID in correct order.
1007
1008            Since the process requires appropriate privilege to use
1009            either of `setuid` or `setgid`, changing the UID must be
1010            done last.
1011
1012            """
1013        args = self.test_args
1014        daemon.daemon.change_process_owner(**args)
1015        mock_func_os_setuid.assert_called()
1016        mock_func_os_setgid.assert_called()
1017
1018    def test_changes_group_id_to_gid(
1019            self,
1020            mock_func_os_setuid, mock_func_os_setgid):
1021        """ Should change process GID to specified value. """
1022        args = self.test_args
1023        gid = self.test_gid
1024        daemon.daemon.change_process_owner(**args)
1025        mock_func_os_setgid.assert_called(gid)
1026
1027    def test_changes_user_id_to_uid(
1028            self,
1029            mock_func_os_setuid, mock_func_os_setgid):
1030        """ Should change process UID to specified value. """
1031        args = self.test_args
1032        uid = self.test_uid
1033        daemon.daemon.change_process_owner(**args)
1034        mock_func_os_setuid.assert_called(uid)
1035
1036    def test_raises_daemon_error_on_os_error_from_setgid(
1037            self,
1038            mock_func_os_setuid, mock_func_os_setgid):
1039        """ Should raise a DaemonError on receiving an OSError from setgid. """
1040        args = self.test_args
1041        test_error = OSError(errno.EPERM, "No switching for you!")
1042        mock_func_os_setgid.side_effect = test_error
1043        expected_error = daemon.daemon.DaemonOSEnvironmentError
1044        exc = self.assertRaises(
1045                expected_error,
1046                daemon.daemon.change_process_owner, **args)
1047        self.assertEqual(test_error, exc.__cause__)
1048
1049    def test_raises_daemon_error_on_os_error_from_setuid(
1050            self,
1051            mock_func_os_setuid, mock_func_os_setgid):
1052        """ Should raise a DaemonError on receiving an OSError from setuid. """
1053        args = self.test_args
1054        test_error = OSError(errno.EPERM, "No switching for you!")
1055        mock_func_os_setuid.side_effect = test_error
1056        expected_error = daemon.daemon.DaemonOSEnvironmentError
1057        exc = self.assertRaises(
1058                expected_error,
1059                daemon.daemon.change_process_owner, **args)
1060        self.assertEqual(test_error, exc.__cause__)
1061
1062    def test_error_message_contains_original_error_message(
1063            self,
1064            mock_func_os_setuid, mock_func_os_setgid):
1065        """ Should raise a DaemonError with original message. """
1066        args = self.test_args
1067        test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
1068        mock_func_os_setuid.side_effect = test_error
1069        expected_error = daemon.daemon.DaemonOSEnvironmentError
1070        exc = self.assertRaises(
1071                expected_error,
1072                daemon.daemon.change_process_owner, **args)
1073        self.assertIn(unicode(test_error), unicode(exc))
1074
1075
1076RLimitResult = collections.namedtuple('RLimitResult', ['soft', 'hard'])
1077
1078fake_RLIMIT_CORE = object()
1079
1080@mock.patch.object(resource, "RLIMIT_CORE", new=fake_RLIMIT_CORE)
1081@mock.patch.object(resource, "setrlimit", side_effect=(lambda x, y: None))
1082@mock.patch.object(resource, "getrlimit", side_effect=(lambda x: None))
1083class prevent_core_dump_TestCase(scaffold.TestCase):
1084    """ Test cases for prevent_core_dump function. """
1085
1086    def setUp(self):
1087        """ Set up test fixtures. """
1088        super(prevent_core_dump_TestCase, self).setUp()
1089
1090    def test_sets_core_limit_to_zero(
1091            self,
1092            mock_func_resource_getrlimit, mock_func_resource_setrlimit):
1093        """ Should set the RLIMIT_CORE resource to zero. """
1094        expected_resource = fake_RLIMIT_CORE
1095        expected_limit = tuple(RLimitResult(soft=0, hard=0))
1096        daemon.daemon.prevent_core_dump()
1097        mock_func_resource_getrlimit.assert_called_with(expected_resource)
1098        mock_func_resource_setrlimit.assert_called_with(
1099                expected_resource, expected_limit)
1100
1101    def test_raises_error_when_no_core_resource(
1102            self,
1103            mock_func_resource_getrlimit, mock_func_resource_setrlimit):
1104        """ Should raise DaemonError if no RLIMIT_CORE resource. """
1105        test_error = ValueError("Bogus platform doesn't have RLIMIT_CORE")
1106        def fake_getrlimit(res):
1107            if res == resource.RLIMIT_CORE:
1108                raise test_error
1109            else:
1110                return None
1111        mock_func_resource_getrlimit.side_effect = fake_getrlimit
1112        expected_error = daemon.daemon.DaemonOSEnvironmentError
1113        exc = self.assertRaises(
1114                expected_error,
1115                daemon.daemon.prevent_core_dump)
1116        self.assertEqual(test_error, exc.__cause__)
1117
1118
1119@mock.patch.object(os, "close")
1120class close_file_descriptor_if_open_TestCase(scaffold.TestCase):
1121    """ Test cases for close_file_descriptor_if_open function. """
1122
1123    def setUp(self):
1124        """ Set up test fixtures. """
1125        super(close_file_descriptor_if_open_TestCase, self).setUp()
1126
1127        self.fake_fd = 274
1128
1129    def test_requests_file_descriptor_close(self, mock_func_os_close):
1130        """ Should request close of file descriptor. """
1131        fd = self.fake_fd
1132        daemon.daemon.close_file_descriptor_if_open(fd)
1133        mock_func_os_close.assert_called_with(fd)
1134
1135    def test_ignores_badfd_error_on_close(self, mock_func_os_close):
1136        """ Should ignore OSError EBADF when closing. """
1137        fd = self.fake_fd
1138        test_error = OSError(errno.EBADF, "Bad file descriptor")
1139        def fake_os_close(fd):
1140            raise test_error
1141        mock_func_os_close.side_effect = fake_os_close
1142        daemon.daemon.close_file_descriptor_if_open(fd)
1143        mock_func_os_close.assert_called_with(fd)
1144
1145    def test_raises_error_if_oserror_on_close(self, mock_func_os_close):
1146        """ Should raise DaemonError if an OSError occurs when closing. """
1147        fd = self.fake_fd
1148        test_error = OSError(object(), "Unexpected error")
1149        def fake_os_close(fd):
1150            raise test_error
1151        mock_func_os_close.side_effect = fake_os_close
1152        expected_error = daemon.daemon.DaemonOSEnvironmentError
1153        exc = self.assertRaises(
1154                expected_error,
1155                daemon.daemon.close_file_descriptor_if_open, fd)
1156        self.assertEqual(test_error, exc.__cause__)
1157
1158    def test_raises_error_if_ioerror_on_close(self, mock_func_os_close):
1159        """ Should raise DaemonError if an IOError occurs when closing. """
1160        fd = self.fake_fd
1161        test_error = IOError(object(), "Unexpected error")
1162        def fake_os_close(fd):
1163            raise test_error
1164        mock_func_os_close.side_effect = fake_os_close
1165        expected_error = daemon.daemon.DaemonOSEnvironmentError
1166        exc = self.assertRaises(
1167                expected_error,
1168                daemon.daemon.close_file_descriptor_if_open, fd)
1169        self.assertEqual(test_error, exc.__cause__)
1170
1171
1172class maxfd_TestCase(scaffold.TestCase):
1173    """ Test cases for module MAXFD constant. """
1174
1175    def test_positive(self):
1176        """ Should be a positive number. """
1177        maxfd = daemon.daemon.MAXFD
1178        self.assertTrue(maxfd > 0)
1179
1180    def test_integer(self):
1181        """ Should be an integer. """
1182        maxfd = daemon.daemon.MAXFD
1183        self.assertEqual(int(maxfd), maxfd)
1184
1185    def test_reasonably_high(self):
1186        """ Should be reasonably high for default open files limit.
1187
1188            If the system reports a limit of “infinity” on maximum
1189            file descriptors, we still need a finite number in order
1190            to close “all” of them. Ensure this is reasonably high
1191            to catch most use cases.
1192
1193            """
1194        expected_minimum = 2048
1195        maxfd = daemon.daemon.MAXFD
1196        self.assertTrue(
1197                expected_minimum <= maxfd,
1198                msg=(
1199                    "MAXFD should be at least {minimum!r}"
1200                    " (got {maxfd!r})".format(
1201                        minimum=expected_minimum, maxfd=maxfd)))
1202
1203
1204fake_default_maxfd = 8
1205fake_RLIMIT_NOFILE = object()
1206fake_RLIM_INFINITY = object()
1207fake_rlimit_nofile_large = 2468
1208
1209def fake_getrlimit_nofile_soft_infinity(resource):
1210    result = RLimitResult(soft=fake_RLIM_INFINITY, hard=object())
1211    if resource != fake_RLIMIT_NOFILE:
1212        result = NotImplemented
1213    return result
1214
1215def fake_getrlimit_nofile_hard_infinity(resource):
1216    result = RLimitResult(soft=object(), hard=fake_RLIM_INFINITY)
1217    if resource != fake_RLIMIT_NOFILE:
1218        result = NotImplemented
1219    return result
1220
1221def fake_getrlimit_nofile_hard_large(resource):
1222    result = RLimitResult(soft=object(), hard=fake_rlimit_nofile_large)
1223    if resource != fake_RLIMIT_NOFILE:
1224        result = NotImplemented
1225    return result
1226
1227@mock.patch.object(daemon.daemon, "MAXFD", new=fake_default_maxfd)
1228@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
1229@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
1230@mock.patch.object(
1231        resource, "getrlimit",
1232        side_effect=fake_getrlimit_nofile_hard_large)
1233class get_maximum_file_descriptors_TestCase(scaffold.TestCase):
1234    """ Test cases for get_maximum_file_descriptors function. """
1235
1236    def test_returns_system_hard_limit(self, mock_func_resource_getrlimit):
1237        """ Should return process hard limit on number of files. """
1238        expected_result = fake_rlimit_nofile_large
1239        result = daemon.daemon.get_maximum_file_descriptors()
1240        self.assertEqual(expected_result, result)
1241
1242    def test_returns_module_default_if_hard_limit_infinity(
1243            self, mock_func_resource_getrlimit):
1244        """ Should return module MAXFD if hard limit is infinity. """
1245        mock_func_resource_getrlimit.side_effect = (
1246                fake_getrlimit_nofile_hard_infinity)
1247        expected_result = fake_default_maxfd
1248        result = daemon.daemon.get_maximum_file_descriptors()
1249        self.assertEqual(expected_result, result)
1250
1251
1252def fake_get_maximum_file_descriptors():
1253    return fake_default_maxfd
1254
1255@mock.patch.object(resource, "RLIMIT_NOFILE", new=fake_RLIMIT_NOFILE)
1256@mock.patch.object(resource, "RLIM_INFINITY", new=fake_RLIM_INFINITY)
1257@mock.patch.object(
1258        resource, "getrlimit",
1259        new=fake_getrlimit_nofile_soft_infinity)
1260@mock.patch.object(
1261        daemon.daemon, "get_maximum_file_descriptors",
1262        new=fake_get_maximum_file_descriptors)
1263@mock.patch.object(daemon.daemon, "close_file_descriptor_if_open")
1264class close_all_open_files_TestCase(scaffold.TestCase):
1265    """ Test cases for close_all_open_files function. """
1266
1267    def test_requests_all_open_files_to_close(
1268            self, mock_func_close_file_descriptor_if_open):
1269        """ Should request close of all open files. """
1270        expected_file_descriptors = range(fake_default_maxfd)
1271        expected_calls = [
1272                mock.call(fd) for fd in expected_file_descriptors]
1273        daemon.daemon.close_all_open_files()
1274        mock_func_close_file_descriptor_if_open.assert_has_calls(
1275                expected_calls, any_order=True)
1276
1277    def test_requests_all_but_excluded_files_to_close(
1278            self, mock_func_close_file_descriptor_if_open):
1279        """ Should request close of all open files but those excluded. """
1280        test_exclude = set([3, 7])
1281        args = dict(
1282                exclude=test_exclude,
1283                )
1284        expected_file_descriptors = set(
1285                fd for fd in range(fake_default_maxfd)
1286                if fd not in test_exclude)
1287        expected_calls = [
1288                mock.call(fd) for fd in expected_file_descriptors]
1289        daemon.daemon.close_all_open_files(**args)
1290        mock_func_close_file_descriptor_if_open.assert_has_calls(
1291                expected_calls, any_order=True)
1292
1293
1294class detach_process_context_TestCase(scaffold.TestCase):
1295    """ Test cases for detach_process_context function. """
1296
1297    class FakeOSExit(SystemExit):
1298        """ Fake exception raised for os._exit(). """
1299
1300    def setUp(self):
1301        """ Set up test fixtures. """
1302        super(detach_process_context_TestCase, self).setUp()
1303
1304        self.mock_module_os = mock.MagicMock(wraps=os)
1305
1306        fake_pids = [0, 0]
1307        func_patcher_os_fork = mock.patch.object(
1308                os, "fork",
1309                side_effect=iter(fake_pids))
1310        self.mock_func_os_fork = func_patcher_os_fork.start()
1311        self.addCleanup(func_patcher_os_fork.stop)
1312        self.mock_module_os.attach_mock(self.mock_func_os_fork, "fork")
1313
1314        func_patcher_os_setsid = mock.patch.object(os, "setsid")
1315        self.mock_func_os_setsid = func_patcher_os_setsid.start()
1316        self.addCleanup(func_patcher_os_setsid.stop)
1317        self.mock_module_os.attach_mock(self.mock_func_os_setsid, "setsid")
1318
1319        def raise_os_exit(status=None):
1320            raise self.FakeOSExit(status)
1321
1322        func_patcher_os_force_exit = mock.patch.object(
1323                os, "_exit",
1324                side_effect=raise_os_exit)
1325        self.mock_func_os_force_exit = func_patcher_os_force_exit.start()
1326        self.addCleanup(func_patcher_os_force_exit.stop)
1327        self.mock_module_os.attach_mock(self.mock_func_os_force_exit, "_exit")
1328
1329    def test_parent_exits(self):
1330        """ Parent process should exit. """
1331        parent_pid = 23
1332        self.mock_func_os_fork.side_effect = iter([parent_pid])
1333        self.assertRaises(
1334                self.FakeOSExit,
1335                daemon.daemon.detach_process_context)
1336        self.mock_module_os.assert_has_calls([
1337                mock.call.fork(),
1338                mock.call._exit(0),
1339                ])
1340
1341    def test_first_fork_error_raises_error(self):
1342        """ Error on first fork should raise DaemonProcessDetachError. """
1343        fork_errno = 13
1344        fork_strerror = "Bad stuff happened"
1345        test_error = OSError(fork_errno, fork_strerror)
1346        test_pids_iter = iter([test_error])
1347
1348        def fake_fork():
1349            next_item = next(test_pids_iter)
1350            if isinstance(next_item, Exception):
1351                raise next_item
1352            else:
1353                return next_item
1354
1355        self.mock_func_os_fork.side_effect = fake_fork
1356        exc = self.assertRaises(
1357                daemon.daemon.DaemonProcessDetachError,
1358                daemon.daemon.detach_process_context)
1359        self.assertEqual(test_error, exc.__cause__)
1360        self.mock_module_os.assert_has_calls([
1361                mock.call.fork(),
1362                ])
1363
1364    def test_child_starts_new_process_group(self):
1365        """ Child should start new process group. """
1366        daemon.daemon.detach_process_context()
1367        self.mock_module_os.assert_has_calls([
1368                mock.call.fork(),
1369                mock.call.setsid(),
1370                ])
1371
1372    def test_child_forks_next_parent_exits(self):
1373        """ Child should fork, then exit if parent. """
1374        fake_pids = [0, 42]
1375        self.mock_func_os_fork.side_effect = iter(fake_pids)
1376        self.assertRaises(
1377                self.FakeOSExit,
1378                daemon.daemon.detach_process_context)
1379        self.mock_module_os.assert_has_calls([
1380                mock.call.fork(),
1381                mock.call.setsid(),
1382                mock.call.fork(),
1383                mock.call._exit(0),
1384                ])
1385
1386    def test_second_fork_error_reports_to_stderr(self):
1387        """ Error on second fork should cause report to stderr. """
1388        fork_errno = 17
1389        fork_strerror = "Nasty stuff happened"
1390        test_error = OSError(fork_errno, fork_strerror)
1391        test_pids_iter = iter([0, test_error])
1392
1393        def fake_fork():
1394            next_item = next(test_pids_iter)
1395            if isinstance(next_item, Exception):
1396                raise next_item
1397            else:
1398                return next_item
1399
1400        self.mock_func_os_fork.side_effect = fake_fork
1401        exc = self.assertRaises(
1402                daemon.daemon.DaemonProcessDetachError,
1403                daemon.daemon.detach_process_context)
1404        self.assertEqual(test_error, exc.__cause__)
1405        self.mock_module_os.assert_has_calls([
1406                mock.call.fork(),
1407                mock.call.setsid(),
1408                mock.call.fork(),
1409                ])
1410
1411    def test_child_forks_next_child_continues(self):
1412        """ Child should fork, then continue if child. """
1413        daemon.daemon.detach_process_context()
1414        self.mock_module_os.assert_has_calls([
1415                mock.call.fork(),
1416                mock.call.setsid(),
1417                mock.call.fork(),
1418                ])
1419
1420
1421@mock.patch("os.getppid", return_value=765)
1422class is_process_started_by_init_TestCase(scaffold.TestCase):
1423    """ Test cases for is_process_started_by_init function. """
1424
1425    def test_returns_false_by_default(self, mock_func_os_getppid):
1426        """ Should return False under normal circumstances. """
1427        expected_result = False
1428        result = daemon.daemon.is_process_started_by_init()
1429        self.assertIs(result, expected_result)
1430
1431    def test_returns_true_if_parent_process_is_init(
1432            self, mock_func_os_getppid):
1433        """ Should return True if parent process is `init`. """
1434        init_pid = 1
1435        mock_func_os_getppid.return_value = init_pid
1436        expected_result = True
1437        result = daemon.daemon.is_process_started_by_init()
1438        self.assertIs(result, expected_result)
1439
1440
1441class is_socket_TestCase(scaffold.TestCase):
1442    """ Test cases for is_socket function. """
1443
1444    def setUp(self):
1445        """ Set up test fixtures. """
1446        super(is_socket_TestCase, self).setUp()
1447
1448        def fake_getsockopt(level, optname, buflen=None):
1449            result = object()
1450            if optname is socket.SO_TYPE:
1451                result = socket.SOCK_RAW
1452            return result
1453
1454        self.fake_socket_getsockopt_func = fake_getsockopt
1455
1456        self.fake_socket_error = socket.error(
1457                errno.ENOTSOCK,
1458                "Socket operation on non-socket")
1459
1460        self.mock_socket = mock.MagicMock(spec=socket.socket)
1461        self.mock_socket.getsockopt.side_effect = self.fake_socket_error
1462
1463        def fake_socket_fromfd(fd, family, type, proto=None):
1464            return self.mock_socket
1465
1466        func_patcher_socket_fromfd = mock.patch.object(
1467                socket, "fromfd",
1468                side_effect=fake_socket_fromfd)
1469        func_patcher_socket_fromfd.start()
1470        self.addCleanup(func_patcher_socket_fromfd.stop)
1471
1472    def test_returns_false_by_default(self):
1473        """ Should return False under normal circumstances. """
1474        test_fd = 23
1475        expected_result = False
1476        result = daemon.daemon.is_socket(test_fd)
1477        self.assertIs(result, expected_result)
1478
1479    def test_returns_true_if_stdin_is_socket(self):
1480        """ Should return True if `stdin` is a socket. """
1481        test_fd = 23
1482        getsockopt = self.mock_socket.getsockopt
1483        getsockopt.side_effect = self.fake_socket_getsockopt_func
1484        expected_result = True
1485        result = daemon.daemon.is_socket(test_fd)
1486        self.assertIs(result, expected_result)
1487
1488    def test_returns_false_if_stdin_socket_raises_error(self):
1489        """ Should return True if `stdin` is a socket and raises error. """
1490        test_fd = 23
1491        getsockopt = self.mock_socket.getsockopt
1492        getsockopt.side_effect = socket.error(
1493                object(), "Weird socket stuff")
1494        expected_result = True
1495        result = daemon.daemon.is_socket(test_fd)
1496        self.assertIs(result, expected_result)
1497
1498
1499class is_process_started_by_superserver_TestCase(scaffold.TestCase):
1500    """ Test cases for is_process_started_by_superserver function. """
1501
1502    def setUp(self):
1503        """ Set up test fixtures. """
1504        super(is_process_started_by_superserver_TestCase, self).setUp()
1505
1506        def fake_is_socket(fd):
1507            if sys.__stdin__.fileno() == fd:
1508                result = self.fake_stdin_is_socket_func()
1509            else:
1510                result = False
1511            return result
1512
1513        self.fake_stdin_is_socket_func = (lambda: False)
1514
1515        func_patcher_is_socket = mock.patch.object(
1516                daemon.daemon, "is_socket",
1517                side_effect=fake_is_socket)
1518        func_patcher_is_socket.start()
1519        self.addCleanup(func_patcher_is_socket.stop)
1520
1521    def test_returns_false_by_default(self):
1522        """ Should return False under normal circumstances. """
1523        expected_result = False
1524        result = daemon.daemon.is_process_started_by_superserver()
1525        self.assertIs(result, expected_result)
1526
1527    def test_returns_true_if_stdin_is_socket(self):
1528        """ Should return True if `stdin` is a socket. """
1529        self.fake_stdin_is_socket_func = (lambda: True)
1530        expected_result = True
1531        result = daemon.daemon.is_process_started_by_superserver()
1532        self.assertIs(result, expected_result)
1533
1534
1535@mock.patch.object(
1536        daemon.daemon, "is_process_started_by_superserver",
1537        return_value=False)
1538@mock.patch.object(
1539        daemon.daemon, "is_process_started_by_init",
1540        return_value=False)
1541class is_detach_process_context_required_TestCase(scaffold.TestCase):
1542    """ Test cases for is_detach_process_context_required function. """
1543
1544    def test_returns_true_by_default(
1545            self,
1546            mock_func_is_process_started_by_init,
1547            mock_func_is_process_started_by_superserver):
1548        """ Should return True under normal circumstances. """
1549        expected_result = True
1550        result = daemon.daemon.is_detach_process_context_required()
1551        self.assertIs(result, expected_result)
1552
1553    def test_returns_false_if_started_by_init(
1554            self,
1555            mock_func_is_process_started_by_init,
1556            mock_func_is_process_started_by_superserver):
1557        """ Should return False if current process started by init. """
1558        mock_func_is_process_started_by_init.return_value = True
1559        expected_result = False
1560        result = daemon.daemon.is_detach_process_context_required()
1561        self.assertIs(result, expected_result)
1562
1563    def test_returns_true_if_started_by_superserver(
1564            self,
1565            mock_func_is_process_started_by_init,
1566            mock_func_is_process_started_by_superserver):
1567        """ Should return False if current process started by superserver. """
1568        mock_func_is_process_started_by_superserver.return_value = True
1569        expected_result = False
1570        result = daemon.daemon.is_detach_process_context_required()
1571        self.assertIs(result, expected_result)
1572
1573
1574def setup_streams_fixtures(testcase):
1575    """ Set up common test fixtures for standard streams. """
1576    testcase.stream_file_paths = dict(
1577            stdin=tempfile.mktemp(),
1578            stdout=tempfile.mktemp(),
1579            stderr=tempfile.mktemp(),
1580            )
1581
1582    testcase.stream_files_by_name = dict(
1583            (name, FakeFileDescriptorStringIO())
1584            for name in ['stdin', 'stdout', 'stderr']
1585            )
1586
1587    testcase.stream_files_by_path = dict(
1588            (testcase.stream_file_paths[name],
1589                testcase.stream_files_by_name[name])
1590            for name in ['stdin', 'stdout', 'stderr']
1591            )
1592
1593
1594@mock.patch.object(os, "dup2")
1595class redirect_stream_TestCase(scaffold.TestCase):
1596    """ Test cases for redirect_stream function. """
1597
1598    def setUp(self):
1599        """ Set up test fixtures. """
1600        super(redirect_stream_TestCase, self).setUp()
1601
1602        self.test_system_stream = FakeFileDescriptorStringIO()
1603        self.test_target_stream = FakeFileDescriptorStringIO()
1604        self.test_null_file = FakeFileDescriptorStringIO()
1605
1606        def fake_os_open(path, flag, mode=None):
1607            if path == os.devnull:
1608                result = self.test_null_file.fileno()
1609            else:
1610                raise FileNotFoundError("No such file", path)
1611            return result
1612
1613        func_patcher_os_open = mock.patch.object(
1614                os, "open",
1615                side_effect=fake_os_open)
1616        self.mock_func_os_open = func_patcher_os_open.start()
1617        self.addCleanup(func_patcher_os_open.stop)
1618
1619    def test_duplicates_target_file_descriptor(
1620            self, mock_func_os_dup2):
1621        """ Should duplicate file descriptor from target to system stream. """
1622        system_stream = self.test_system_stream
1623        system_fileno = system_stream.fileno()
1624        target_stream = self.test_target_stream
1625        target_fileno = target_stream.fileno()
1626        daemon.daemon.redirect_stream(system_stream, target_stream)
1627        mock_func_os_dup2.assert_called_with(target_fileno, system_fileno)
1628
1629    def test_duplicates_null_file_descriptor_by_default(
1630            self, mock_func_os_dup2):
1631        """ Should by default duplicate the null file to the system stream. """
1632        system_stream = self.test_system_stream
1633        system_fileno = system_stream.fileno()
1634        target_stream = None
1635        null_path = os.devnull
1636        null_flag = os.O_RDWR
1637        null_file = self.test_null_file
1638        null_fileno = null_file.fileno()
1639        daemon.daemon.redirect_stream(system_stream, target_stream)
1640        self.mock_func_os_open.assert_called_with(null_path, null_flag)
1641        mock_func_os_dup2.assert_called_with(null_fileno, system_fileno)
1642
1643
1644class make_default_signal_map_TestCase(scaffold.TestCase):
1645    """ Test cases for make_default_signal_map function. """
1646
1647    def setUp(self):
1648        """ Set up test fixtures. """
1649        super(make_default_signal_map_TestCase, self).setUp()
1650
1651        # Use whatever default string type this Python version needs.
1652        signal_module_name = str('signal')
1653        self.fake_signal_module = ModuleType(signal_module_name)
1654
1655        fake_signal_names = [
1656                'SIGHUP',
1657                'SIGCLD',
1658                'SIGSEGV',
1659                'SIGTSTP',
1660                'SIGTTIN',
1661                'SIGTTOU',
1662                'SIGTERM',
1663                ]
1664        for name in fake_signal_names:
1665            setattr(self.fake_signal_module, name, object())
1666
1667        module_patcher_signal = mock.patch.object(
1668                daemon.daemon, "signal", new=self.fake_signal_module)
1669        module_patcher_signal.start()
1670        self.addCleanup(module_patcher_signal.stop)
1671
1672        default_signal_map_by_name = {
1673                'SIGTSTP': None,
1674                'SIGTTIN': None,
1675                'SIGTTOU': None,
1676                'SIGTERM': 'terminate',
1677                }
1678        self.default_signal_map = dict(
1679                (getattr(self.fake_signal_module, name), target)
1680                for (name, target) in default_signal_map_by_name.items())
1681
1682    def test_returns_constructed_signal_map(self):
1683        """ Should return map per default. """
1684        expected_result = self.default_signal_map
1685        result = daemon.daemon.make_default_signal_map()
1686        self.assertEqual(expected_result, result)
1687
1688    def test_returns_signal_map_with_only_ids_in_signal_module(self):
1689        """ Should return map with only signals in the `signal` module.
1690
1691            The `signal` module is documented to only define those
1692            signals which exist on the running system. Therefore the
1693            default map should not contain any signals which are not
1694            defined in the `signal` module.
1695
1696            """
1697        del(self.default_signal_map[self.fake_signal_module.SIGTTOU])
1698        del(self.fake_signal_module.SIGTTOU)
1699        expected_result = self.default_signal_map
1700        result = daemon.daemon.make_default_signal_map()
1701        self.assertEqual(expected_result, result)
1702
1703
1704@mock.patch.object(daemon.daemon.signal, "signal")
1705class set_signal_handlers_TestCase(scaffold.TestCase):
1706    """ Test cases for set_signal_handlers function. """
1707
1708    def setUp(self):
1709        """ Set up test fixtures. """
1710        super(set_signal_handlers_TestCase, self).setUp()
1711
1712        self.signal_handler_map = {
1713                signal.SIGQUIT: object(),
1714                signal.SIGSEGV: object(),
1715                signal.SIGINT: object(),
1716                }
1717
1718    def test_sets_signal_handler_for_each_item(self, mock_func_signal_signal):
1719        """ Should set signal handler for each item in map. """
1720        signal_handler_map = self.signal_handler_map
1721        expected_calls = [
1722                mock.call(signal_number, handler)
1723                for (signal_number, handler) in signal_handler_map.items()]
1724        daemon.daemon.set_signal_handlers(signal_handler_map)
1725        self.assertEquals(expected_calls, mock_func_signal_signal.mock_calls)
1726
1727
1728@mock.patch.object(daemon.daemon.atexit, "register")
1729class register_atexit_function_TestCase(scaffold.TestCase):
1730    """ Test cases for register_atexit_function function. """
1731
1732    def test_registers_function_for_atexit_processing(
1733            self, mock_func_atexit_register):
1734        """ Should register specified function for atexit processing. """
1735        func = object()
1736        daemon.daemon.register_atexit_function(func)
1737        mock_func_atexit_register.assert_called_with(func)
1738
1739
1740# Local variables:
1741# coding: utf-8
1742# mode: python
1743# End:
1744# vim: fileencoding=utf-8 filetype=python :
1745