1#!/usr/bin/env python
2#
3# Copyright 2009 Facebook
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17"""An I/O event loop for non-blocking sockets.
18
19Typical applications will use a single `IOLoop` object, in the
20`IOLoop.instance` singleton.  The `IOLoop.start` method should usually
21be called at the end of the ``main()`` function.  Atypical applications may
22use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest`
23case.
24
25In addition to I/O events, the `IOLoop` can also schedule time-based events.
26`IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
27"""
28
29from __future__ import absolute_import, division, print_function, with_statement
30
31import datetime
32import errno
33import functools
34import heapq
35import logging
36import numbers
37import os
38import select
39import sys
40import threading
41import time
42import traceback
43
44from .concurrent import Future, TracebackFuture
45from .log import app_log, gen_log
46from . import stack_context
47from .util import Configurable
48
49try:
50    import signal
51except ImportError:
52    signal = None
53
54try:
55    import thread  # py2
56except ImportError:
57    import _thread as thread  # py3
58
59from .platform.auto import set_close_exec, Waker
60
61
62class TimeoutError(Exception):
63    pass
64
65
66class IOLoop(Configurable):
67    """A level-triggered I/O loop.
68
69    We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they
70    are available, or else we fall back on select(). If you are
71    implementing a system that needs to handle thousands of
72    simultaneous connections, you should use a system that supports
73    either ``epoll`` or ``kqueue``.
74
75    Example usage for a simple TCP server::
76
77        import errno
78        import functools
79        import ioloop
80        import socket
81
82        def connection_ready(sock, fd, events):
83            while True:
84                try:
85                    connection, address = sock.accept()
86                except socket.error, e:
87                    if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
88                        raise
89                    return
90                connection.setblocking(0)
91                handle_connection(connection, address)
92
93        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
94        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
95        sock.setblocking(0)
96        sock.bind(("", port))
97        sock.listen(128)
98
99        io_loop = ioloop.IOLoop.instance()
100        callback = functools.partial(connection_ready, sock)
101        io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
102        io_loop.start()
103
104    """
105    # Constants from the epoll module
106    _EPOLLIN = 0x001
107    _EPOLLPRI = 0x002
108    _EPOLLOUT = 0x004
109    _EPOLLERR = 0x008
110    _EPOLLHUP = 0x010
111    _EPOLLRDHUP = 0x2000
112    _EPOLLONESHOT = (1 << 30)
113    _EPOLLET = (1 << 31)
114
115    # Our events map exactly to the epoll events
116    NONE = 0
117    READ = _EPOLLIN
118    WRITE = _EPOLLOUT
119    ERROR = _EPOLLERR | _EPOLLHUP
120
121    # Global lock for creating global IOLoop instance
122    _instance_lock = threading.Lock()
123
124    _current = threading.local()
125
126    @staticmethod
127    def instance():
128        """Returns a global `IOLoop` instance.
129
130        Most applications have a single, global `IOLoop` running on the
131        main thread.  Use this method to get this instance from
132        another thread.  To get the current thread's `IOLoop`, use `current()`.
133        """
134        if not hasattr(IOLoop, "_instance"):
135            with IOLoop._instance_lock:
136                if not hasattr(IOLoop, "_instance"):
137                    # New instance after double check
138                    IOLoop._instance = IOLoop()
139        return IOLoop._instance
140
141    @staticmethod
142    def initialized():
143        """Returns true if the singleton instance has been created."""
144        return hasattr(IOLoop, "_instance")
145
146    def install(self):
147        """Installs this `IOLoop` object as the singleton instance.
148
149        This is normally not necessary as `instance()` will create
150        an `IOLoop` on demand, but you may want to call `install` to use
151        a custom subclass of `IOLoop`.
152        """
153        assert not IOLoop.initialized()
154        IOLoop._instance = self
155
156    @staticmethod
157    def current():
158        """Returns the current thread's `IOLoop`.
159
160        If an `IOLoop` is currently running or has been marked as current
161        by `make_current`, returns that instance.  Otherwise returns
162        `IOLoop.instance()`, i.e. the main thread's `IOLoop`.
163
164        A common pattern for classes that depend on ``IOLoops`` is to use
165        a default argument to enable programs with multiple ``IOLoops``
166        but not require the argument for simpler applications::
167
168            class MyClass(object):
169                def __init__(self, io_loop=None):
170                    self.io_loop = io_loop or IOLoop.current()
171
172        In general you should use `IOLoop.current` as the default when
173        constructing an asynchronous object, and use `IOLoop.instance`
174        when you mean to communicate to the main thread from a different
175        one.
176        """
177        current = getattr(IOLoop._current, "instance", None)
178        if current is None:
179            return IOLoop.instance()
180        return current
181
182    def make_current(self):
183        """Makes this the `IOLoop` for the current thread.
184
185        An `IOLoop` automatically becomes current for its thread
186        when it is started, but it is sometimes useful to call
187        `make_current` explictly before starting the `IOLoop`,
188        so that code run at startup time can find the right
189        instance.
190        """
191        IOLoop._current.instance = self
192
193    @staticmethod
194    def clear_current():
195        IOLoop._current.instance = None
196
197    @classmethod
198    def configurable_base(cls):
199        return IOLoop
200
201    @classmethod
202    def configurable_default(cls):
203        # this is the only patch to IOLoop:
204        from zmq.eventloop.ioloop import ZMQIOLoop
205        return ZMQIOLoop
206        # the remainder of this method is unused,
207        # but left for preservation reasons
208        if hasattr(select, "epoll"):
209            from tornado.platform.epoll import EPollIOLoop
210            return EPollIOLoop
211        if hasattr(select, "kqueue"):
212            # Python 2.6+ on BSD or Mac
213            from tornado.platform.kqueue import KQueueIOLoop
214            return KQueueIOLoop
215        from tornado.platform.select import SelectIOLoop
216        return SelectIOLoop
217
218    def initialize(self):
219        pass
220
221    def close(self, all_fds=False):
222        """Closes the `IOLoop`, freeing any resources used.
223
224        If ``all_fds`` is true, all file descriptors registered on the
225        IOLoop will be closed (not just the ones created by the
226        `IOLoop` itself).
227
228        Many applications will only use a single `IOLoop` that runs for the
229        entire lifetime of the process.  In that case closing the `IOLoop`
230        is not necessary since everything will be cleaned up when the
231        process exits.  `IOLoop.close` is provided mainly for scenarios
232        such as unit tests, which create and destroy a large number of
233        ``IOLoops``.
234
235        An `IOLoop` must be completely stopped before it can be closed.  This
236        means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
237        be allowed to return before attempting to call `IOLoop.close()`.
238        Therefore the call to `close` will usually appear just after
239        the call to `start` rather than near the call to `stop`.
240
241        .. versionchanged:: 3.1
242           If the `IOLoop` implementation supports non-integer objects
243           for "file descriptors", those objects will have their
244           ``close`` method when ``all_fds`` is true.
245        """
246        raise NotImplementedError()
247
248    def add_handler(self, fd, handler, events):
249        """Registers the given handler to receive the given events for fd.
250
251        The ``events`` argument is a bitwise or of the constants
252        ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
253
254        When an event occurs, ``handler(fd, events)`` will be run.
255        """
256        raise NotImplementedError()
257
258    def update_handler(self, fd, events):
259        """Changes the events we listen for fd."""
260        raise NotImplementedError()
261
262    def remove_handler(self, fd):
263        """Stop listening for events on fd."""
264        raise NotImplementedError()
265
266    def set_blocking_signal_threshold(self, seconds, action):
267        """Sends a signal if the `IOLoop` is blocked for more than
268        ``s`` seconds.
269
270        Pass ``seconds=None`` to disable.  Requires Python 2.6 on a unixy
271        platform.
272
273        The action parameter is a Python signal handler.  Read the
274        documentation for the `signal` module for more information.
275        If ``action`` is None, the process will be killed if it is
276        blocked for too long.
277        """
278        raise NotImplementedError()
279
280    def set_blocking_log_threshold(self, seconds):
281        """Logs a stack trace if the `IOLoop` is blocked for more than
282        ``s`` seconds.
283
284        Equivalent to ``set_blocking_signal_threshold(seconds,
285        self.log_stack)``
286        """
287        self.set_blocking_signal_threshold(seconds, self.log_stack)
288
289    def log_stack(self, signal, frame):
290        """Signal handler to log the stack trace of the current thread.
291
292        For use with `set_blocking_signal_threshold`.
293        """
294        gen_log.warning('IOLoop blocked for %f seconds in\n%s',
295                        self._blocking_signal_threshold,
296                        ''.join(traceback.format_stack(frame)))
297
298    def start(self):
299        """Starts the I/O loop.
300
301        The loop will run until one of the callbacks calls `stop()`, which
302        will make the loop stop after the current event iteration completes.
303        """
304        raise NotImplementedError()
305
306    def stop(self):
307        """Stop the I/O loop.
308
309        If the event loop is not currently running, the next call to `start()`
310        will return immediately.
311
312        To use asynchronous methods from otherwise-synchronous code (such as
313        unit tests), you can start and stop the event loop like this::
314
315          ioloop = IOLoop()
316          async_method(ioloop=ioloop, callback=ioloop.stop)
317          ioloop.start()
318
319        ``ioloop.start()`` will return after ``async_method`` has run
320        its callback, whether that callback was invoked before or
321        after ``ioloop.start``.
322
323        Note that even after `stop` has been called, the `IOLoop` is not
324        completely stopped until `IOLoop.start` has also returned.
325        Some work that was scheduled before the call to `stop` may still
326        be run before the `IOLoop` shuts down.
327        """
328        raise NotImplementedError()
329
330    def run_sync(self, func, timeout=None):
331        """Starts the `IOLoop`, runs the given function, and stops the loop.
332
333        If the function returns a `.Future`, the `IOLoop` will run
334        until the future is resolved.  If it raises an exception, the
335        `IOLoop` will stop and the exception will be re-raised to the
336        caller.
337
338        The keyword-only argument ``timeout`` may be used to set
339        a maximum duration for the function.  If the timeout expires,
340        a `TimeoutError` is raised.
341
342        This method is useful in conjunction with `tornado.gen.coroutine`
343        to allow asynchronous calls in a ``main()`` function::
344
345            @gen.coroutine
346            def main():
347                # do stuff...
348
349            if __name__ == '__main__':
350                IOLoop.instance().run_sync(main)
351        """
352        future_cell = [None]
353
354        def run():
355            try:
356                result = func()
357            except Exception:
358                future_cell[0] = TracebackFuture()
359                future_cell[0].set_exc_info(sys.exc_info())
360            else:
361                if isinstance(result, Future):
362                    future_cell[0] = result
363                else:
364                    future_cell[0] = Future()
365                    future_cell[0].set_result(result)
366            self.add_future(future_cell[0], lambda future: self.stop())
367        self.add_callback(run)
368        if timeout is not None:
369            timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
370        self.start()
371        if timeout is not None:
372            self.remove_timeout(timeout_handle)
373        if not future_cell[0].done():
374            raise TimeoutError('Operation timed out after %s seconds' % timeout)
375        return future_cell[0].result()
376
377    def time(self):
378        """Returns the current time according to the `IOLoop`'s clock.
379
380        The return value is a floating-point number relative to an
381        unspecified time in the past.
382
383        By default, the `IOLoop`'s time function is `time.time`.  However,
384        it may be configured to use e.g. `time.monotonic` instead.
385        Calls to `add_timeout` that pass a number instead of a
386        `datetime.timedelta` should use this function to compute the
387        appropriate time, so they can work no matter what time function
388        is chosen.
389        """
390        return time.time()
391
392    def add_timeout(self, deadline, callback):
393        """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
394
395        Returns an opaque handle that may be passed to
396        `remove_timeout` to cancel.
397
398        ``deadline`` may be a number denoting a time (on the same
399        scale as `IOLoop.time`, normally `time.time`), or a
400        `datetime.timedelta` object for a deadline relative to the
401        current time.
402
403        Note that it is not safe to call `add_timeout` from other threads.
404        Instead, you must use `add_callback` to transfer control to the
405        `IOLoop`'s thread, and then call `add_timeout` from there.
406        """
407        raise NotImplementedError()
408
409    def remove_timeout(self, timeout):
410        """Cancels a pending timeout.
411
412        The argument is a handle as returned by `add_timeout`.  It is
413        safe to call `remove_timeout` even if the callback has already
414        been run.
415        """
416        raise NotImplementedError()
417
418    def add_callback(self, callback, *args, **kwargs):
419        """Calls the given callback on the next I/O loop iteration.
420
421        It is safe to call this method from any thread at any time,
422        except from a signal handler.  Note that this is the **only**
423        method in `IOLoop` that makes this thread-safety guarantee; all
424        other interaction with the `IOLoop` must be done from that
425        `IOLoop`'s thread.  `add_callback()` may be used to transfer
426        control from other threads to the `IOLoop`'s thread.
427
428        To add a callback from a signal handler, see
429        `add_callback_from_signal`.
430        """
431        raise NotImplementedError()
432
433    def add_callback_from_signal(self, callback, *args, **kwargs):
434        """Calls the given callback on the next I/O loop iteration.
435
436        Safe for use from a Python signal handler; should not be used
437        otherwise.
438
439        Callbacks added with this method will be run without any
440        `.stack_context`, to avoid picking up the context of the function
441        that was interrupted by the signal.
442        """
443        raise NotImplementedError()
444
445    def add_future(self, future, callback):
446        """Schedules a callback on the ``IOLoop`` when the given
447        `.Future` is finished.
448
449        The callback is invoked with one argument, the
450        `.Future`.
451        """
452        assert isinstance(future, Future)
453        callback = stack_context.wrap(callback)
454        future.add_done_callback(
455            lambda future: self.add_callback(callback, future))
456
457    def _run_callback(self, callback):
458        """Runs a callback with error handling.
459
460        For use in subclasses.
461        """
462        try:
463            callback()
464        except Exception:
465            self.handle_callback_exception(callback)
466
467    def handle_callback_exception(self, callback):
468        """This method is called whenever a callback run by the `IOLoop`
469        throws an exception.
470
471        By default simply logs the exception as an error.  Subclasses
472        may override this method to customize reporting of exceptions.
473
474        The exception itself is not passed explicitly, but is available
475        in `sys.exc_info`.
476        """
477        app_log.error("Exception in callback %r", callback, exc_info=True)
478
479
480class PollIOLoop(IOLoop):
481    """Base class for IOLoops built around a select-like function.
482
483    For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
484    (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
485    `tornado.platform.select.SelectIOLoop` (all platforms).
486    """
487    def initialize(self, impl, time_func=None):
488        super(PollIOLoop, self).initialize()
489        self._impl = impl
490        if hasattr(self._impl, 'fileno'):
491            set_close_exec(self._impl.fileno())
492        self.time_func = time_func or time.time
493        self._handlers = {}
494        self._events = {}
495        self._callbacks = []
496        self._callback_lock = threading.Lock()
497        self._timeouts = []
498        self._cancellations = 0
499        self._running = False
500        self._stopped = False
501        self._closing = False
502        self._thread_ident = None
503        self._blocking_signal_threshold = None
504
505        # Create a pipe that we send bogus data to when we want to wake
506        # the I/O loop when it is idle
507        self._waker = Waker()
508        self.add_handler(self._waker.fileno(),
509                         lambda fd, events: self._waker.consume(),
510                         self.READ)
511
512    def close(self, all_fds=False):
513        with self._callback_lock:
514            self._closing = True
515        self.remove_handler(self._waker.fileno())
516        if all_fds:
517            for fd in self._handlers.keys():
518                try:
519                    close_method = getattr(fd, 'close', None)
520                    if close_method is not None:
521                        close_method()
522                    else:
523                        os.close(fd)
524                except Exception:
525                    gen_log.debug("error closing fd %s", fd, exc_info=True)
526        self._waker.close()
527        self._impl.close()
528
529    def add_handler(self, fd, handler, events):
530        self._handlers[fd] = stack_context.wrap(handler)
531        self._impl.register(fd, events | self.ERROR)
532
533    def update_handler(self, fd, events):
534        self._impl.modify(fd, events | self.ERROR)
535
536    def remove_handler(self, fd):
537        self._handlers.pop(fd, None)
538        self._events.pop(fd, None)
539        try:
540            self._impl.unregister(fd)
541        except Exception:
542            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
543
544    def set_blocking_signal_threshold(self, seconds, action):
545        if not hasattr(signal, "setitimer"):
546            gen_log.error("set_blocking_signal_threshold requires a signal module "
547                          "with the setitimer method")
548            return
549        self._blocking_signal_threshold = seconds
550        if seconds is not None:
551            signal.signal(signal.SIGALRM,
552                          action if action is not None else signal.SIG_DFL)
553
554    def start(self):
555        if not logging.getLogger().handlers:
556            # The IOLoop catches and logs exceptions, so it's
557            # important that log output be visible.  However, python's
558            # default behavior for non-root loggers (prior to python
559            # 3.2) is to print an unhelpful "no handlers could be
560            # found" message rather than the actual log entry, so we
561            # must explicitly configure logging if we've made it this
562            # far without anything.
563            logging.basicConfig()
564        if self._stopped:
565            self._stopped = False
566            return
567        old_current = getattr(IOLoop._current, "instance", None)
568        IOLoop._current.instance = self
569        self._thread_ident = thread.get_ident()
570        self._running = True
571
572        # signal.set_wakeup_fd closes a race condition in event loops:
573        # a signal may arrive at the beginning of select/poll/etc
574        # before it goes into its interruptible sleep, so the signal
575        # will be consumed without waking the select.  The solution is
576        # for the (C, synchronous) signal handler to write to a pipe,
577        # which will then be seen by select.
578        #
579        # In python's signal handling semantics, this only matters on the
580        # main thread (fortunately, set_wakeup_fd only works on the main
581        # thread and will raise a ValueError otherwise).
582        #
583        # If someone has already set a wakeup fd, we don't want to
584        # disturb it.  This is an issue for twisted, which does its
585        # SIGCHILD processing in response to its own wakeup fd being
586        # written to.  As long as the wakeup fd is registered on the IOLoop,
587        # the loop will still wake up and everything should work.
588        old_wakeup_fd = None
589        if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
590            # requires python 2.6+, unix.  set_wakeup_fd exists but crashes
591            # the python process on windows.
592            try:
593                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
594                if old_wakeup_fd != -1:
595                    # Already set, restore previous value.  This is a little racy,
596                    # but there's no clean get_wakeup_fd and in real use the
597                    # IOLoop is just started once at the beginning.
598                    signal.set_wakeup_fd(old_wakeup_fd)
599                    old_wakeup_fd = None
600            except ValueError:  # non-main thread
601                pass
602
603        while True:
604            poll_timeout = 3600.0
605
606            # Prevent IO event starvation by delaying new callbacks
607            # to the next iteration of the event loop.
608            with self._callback_lock:
609                callbacks = self._callbacks
610                self._callbacks = []
611            for callback in callbacks:
612                self._run_callback(callback)
613
614            if self._timeouts:
615                now = self.time()
616                while self._timeouts:
617                    if self._timeouts[0].callback is None:
618                        # the timeout was cancelled
619                        heapq.heappop(self._timeouts)
620                        self._cancellations -= 1
621                    elif self._timeouts[0].deadline <= now:
622                        timeout = heapq.heappop(self._timeouts)
623                        self._run_callback(timeout.callback)
624                    else:
625                        seconds = self._timeouts[0].deadline - now
626                        poll_timeout = min(seconds, poll_timeout)
627                        break
628                if (self._cancellations > 512
629                        and self._cancellations > (len(self._timeouts) >> 1)):
630                    # Clean up the timeout queue when it gets large and it's
631                    # more than half cancellations.
632                    self._cancellations = 0
633                    self._timeouts = [x for x in self._timeouts
634                                      if x.callback is not None]
635                    heapq.heapify(self._timeouts)
636
637            if self._callbacks:
638                # If any callbacks or timeouts called add_callback,
639                # we don't want to wait in poll() before we run them.
640                poll_timeout = 0.0
641
642            if not self._running:
643                break
644
645            if self._blocking_signal_threshold is not None:
646                # clear alarm so it doesn't fire while poll is waiting for
647                # events.
648                signal.setitimer(signal.ITIMER_REAL, 0, 0)
649
650            try:
651                event_pairs = self._impl.poll(poll_timeout)
652            except Exception as e:
653                # Depending on python version and IOLoop implementation,
654                # different exception types may be thrown and there are
655                # two ways EINTR might be signaled:
656                # * e.errno == errno.EINTR
657                # * e.args is like (errno.EINTR, 'Interrupted system call')
658                if (getattr(e, 'errno', None) == errno.EINTR or
659                    (isinstance(getattr(e, 'args', None), tuple) and
660                     len(e.args) == 2 and e.args[0] == errno.EINTR)):
661                    continue
662                else:
663                    raise
664
665            if self._blocking_signal_threshold is not None:
666                signal.setitimer(signal.ITIMER_REAL,
667                                 self._blocking_signal_threshold, 0)
668
669            # Pop one fd at a time from the set of pending fds and run
670            # its handler. Since that handler may perform actions on
671            # other file descriptors, there may be reentrant calls to
672            # this IOLoop that update self._events
673            self._events.update(event_pairs)
674            while self._events:
675                fd, events = self._events.popitem()
676                try:
677                    self._handlers[fd](fd, events)
678                except (OSError, IOError) as e:
679                    if e.args[0] == errno.EPIPE:
680                        # Happens when the client closes the connection
681                        pass
682                    else:
683                        app_log.error("Exception in I/O handler for fd %s",
684                                      fd, exc_info=True)
685                except Exception:
686                    app_log.error("Exception in I/O handler for fd %s",
687                                  fd, exc_info=True)
688        # reset the stopped flag so another start/stop pair can be issued
689        self._stopped = False
690        if self._blocking_signal_threshold is not None:
691            signal.setitimer(signal.ITIMER_REAL, 0, 0)
692        IOLoop._current.instance = old_current
693        if old_wakeup_fd is not None:
694            signal.set_wakeup_fd(old_wakeup_fd)
695
696    def stop(self):
697        self._running = False
698        self._stopped = True
699        self._waker.wake()
700
701    def time(self):
702        return self.time_func()
703
704    def add_timeout(self, deadline, callback):
705        timeout = _Timeout(deadline, stack_context.wrap(callback), self)
706        heapq.heappush(self._timeouts, timeout)
707        return timeout
708
709    def remove_timeout(self, timeout):
710        # Removing from a heap is complicated, so just leave the defunct
711        # timeout object in the queue (see discussion in
712        # http://docs.python.org/library/heapq.html).
713        # If this turns out to be a problem, we could add a garbage
714        # collection pass whenever there are too many dead timeouts.
715        timeout.callback = None
716        self._cancellations += 1
717
718    def add_callback(self, callback, *args, **kwargs):
719        with self._callback_lock:
720            if self._closing:
721                raise RuntimeError("IOLoop is closing")
722            list_empty = not self._callbacks
723            self._callbacks.append(functools.partial(
724                stack_context.wrap(callback), *args, **kwargs))
725        if list_empty and thread.get_ident() != self._thread_ident:
726            # If we're in the IOLoop's thread, we know it's not currently
727            # polling.  If we're not, and we added the first callback to an
728            # empty list, we may need to wake it up (it may wake up on its
729            # own, but an occasional extra wake is harmless).  Waking
730            # up a polling IOLoop is relatively expensive, so we try to
731            # avoid it when we can.
732            self._waker.wake()
733
734    def add_callback_from_signal(self, callback, *args, **kwargs):
735        with stack_context.NullContext():
736            if thread.get_ident() != self._thread_ident:
737                # if the signal is handled on another thread, we can add
738                # it normally (modulo the NullContext)
739                self.add_callback(callback, *args, **kwargs)
740            else:
741                # If we're on the IOLoop's thread, we cannot use
742                # the regular add_callback because it may deadlock on
743                # _callback_lock.  Blindly insert into self._callbacks.
744                # This is safe because the GIL makes list.append atomic.
745                # One subtlety is that if the signal interrupted the
746                # _callback_lock block in IOLoop.start, we may modify
747                # either the old or new version of self._callbacks,
748                # but either way will work.
749                self._callbacks.append(functools.partial(
750                    stack_context.wrap(callback), *args, **kwargs))
751
752
753class _Timeout(object):
754    """An IOLoop timeout, a UNIX timestamp and a callback"""
755
756    # Reduce memory overhead when there are lots of pending callbacks
757    __slots__ = ['deadline', 'callback']
758
759    def __init__(self, deadline, callback, io_loop):
760        if isinstance(deadline, numbers.Real):
761            self.deadline = deadline
762        elif isinstance(deadline, datetime.timedelta):
763            self.deadline = io_loop.time() + _Timeout.timedelta_to_seconds(deadline)
764        else:
765            raise TypeError("Unsupported deadline %r" % deadline)
766        self.callback = callback
767
768    @staticmethod
769    def timedelta_to_seconds(td):
770        """Equivalent to td.total_seconds() (introduced in python 2.7)."""
771        return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
772
773    # Comparison methods to sort by deadline, with object id as a tiebreaker
774    # to guarantee a consistent ordering.  The heapq module uses __le__
775    # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
776    # use __lt__).
777    def __lt__(self, other):
778        return ((self.deadline, id(self)) <
779                (other.deadline, id(other)))
780
781    def __le__(self, other):
782        return ((self.deadline, id(self)) <=
783                (other.deadline, id(other)))
784
785
786class PeriodicCallback(object):
787    """Schedules the given callback to be called periodically.
788
789    The callback is called every ``callback_time`` milliseconds.
790
791    `start` must be called after the `PeriodicCallback` is created.
792    """
793    def __init__(self, callback, callback_time, io_loop=None):
794        self.callback = callback
795        if callback_time <= 0:
796            raise ValueError("Periodic callback must have a positive callback_time")
797        self.callback_time = callback_time
798        self.io_loop = io_loop or IOLoop.current()
799        self._running = False
800        self._timeout = None
801
802    def start(self):
803        """Starts the timer."""
804        self._running = True
805        self._next_timeout = self.io_loop.time()
806        self._schedule_next()
807
808    def stop(self):
809        """Stops the timer."""
810        self._running = False
811        if self._timeout is not None:
812            self.io_loop.remove_timeout(self._timeout)
813            self._timeout = None
814
815    def _run(self):
816        if not self._running:
817            return
818        try:
819            self.callback()
820        except Exception:
821            app_log.error("Error in periodic callback", exc_info=True)
822        self._schedule_next()
823
824    def _schedule_next(self):
825        if self._running:
826            current_time = self.io_loop.time()
827            while self._next_timeout <= current_time:
828                self._next_timeout += self.callback_time / 1000.0
829            self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
830