1b2732e9dSimarom#!/usr/bin/env python
3b2732e9dSimarom# Copyright 2009 Facebook
5b2732e9dSimarom# Licensed under the Apache License, Version 2.0 (the "License"); you may
6b2732e9dSimarom# not use this file except in compliance with the License. You may obtain
7b2732e9dSimarom# a copy of the License at
9b2732e9dSimarom#     http://www.apache.org/licenses/LICENSE-2.0
11b2732e9dSimarom# Unless required by applicable law or agreed to in writing, software
12b2732e9dSimarom# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13b2732e9dSimarom# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14b2732e9dSimarom# License for the specific language governing permissions and limitations
15b2732e9dSimarom# under the License.
17b2732e9dSimarom"""An I/O event loop for non-blocking sockets.
19b2732e9dSimaromTypical applications will use a single `IOLoop` object, in the
20b2732e9dSimarom`IOLoop.instance` singleton.  The `IOLoop.start` method should usually
21b2732e9dSimarombe called at the end of the ``main()`` function.  Atypical applications may
22b2732e9dSimaromuse more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest`
25b2732e9dSimaromIn addition to I/O events, the `IOLoop` can also schedule time-based events.
26b2732e9dSimarom`IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
29b2732e9dSimaromfrom __future__ import absolute_import, division, print_function, with_statement
31b2732e9dSimaromimport datetime
32b2732e9dSimaromimport errno
33b2732e9dSimaromimport functools
34b2732e9dSimaromimport heapq
35b2732e9dSimaromimport logging
36b2732e9dSimaromimport numbers
37b2732e9dSimaromimport os
38b2732e9dSimaromimport select
39b2732e9dSimaromimport sys
40b2732e9dSimaromimport threading
41b2732e9dSimaromimport time
42b2732e9dSimaromimport traceback
44b2732e9dSimaromfrom .concurrent import Future, TracebackFuture
45b2732e9dSimaromfrom .log import app_log, gen_log
46b2732e9dSimaromfrom . import stack_context
47b2732e9dSimaromfrom .util import Configurable
50b2732e9dSimarom    import signal
51b2732e9dSimaromexcept ImportError:
52b2732e9dSimarom    signal = None
55b2732e9dSimarom    import thread  # py2
56b2732e9dSimaromexcept ImportError:
57b2732e9dSimarom    import _thread as thread  # py3
59b2732e9dSimaromfrom .platform.auto import set_close_exec, Waker
62b2732e9dSimaromclass TimeoutError(Exception):
63b2732e9dSimarom    pass
66b2732e9dSimaromclass IOLoop(Configurable):
67b2732e9dSimarom    """A level-triggered I/O loop.
69b2732e9dSimarom    We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they
70b2732e9dSimarom    are available, or else we fall back on select(). If you are
71b2732e9dSimarom    implementing a system that needs to handle thousands of
72b2732e9dSimarom    simultaneous connections, you should use a system that supports
73b2732e9dSimarom    either ``epoll`` or ``kqueue``.
75b2732e9dSimarom    Example usage for a simple TCP server::
77b2732e9dSimarom        import errno
78b2732e9dSimarom        import functools
79b2732e9dSimarom        import ioloop
80b2732e9dSimarom        import socket
82b2732e9dSimarom        def connection_ready(sock, fd, events):
83b2732e9dSimarom            while True:
84b2732e9dSimarom                try:
85b2732e9dSimarom                    connection, address = sock.accept()
86b2732e9dSimarom                except socket.error, e:
87b2732e9dSimarom                    if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
88b2732e9dSimarom                        raise
89b2732e9dSimarom                    return
90b2732e9dSimarom                connection.setblocking(0)
91b2732e9dSimarom                handle_connection(connection, address)
93b2732e9dSimarom        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
94b2732e9dSimarom        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
95b2732e9dSimarom        sock.setblocking(0)
96b2732e9dSimarom        sock.bind(("", port))
97b2732e9dSimarom        sock.listen(128)
99b2732e9dSimarom        io_loop = ioloop.IOLoop.instance()
100b2732e9dSimarom        callback = functools.partial(connection_ready, sock)
101b2732e9dSimarom        io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
102b2732e9dSimarom        io_loop.start()
104b2732e9dSimarom    """
105b2732e9dSimarom    # Constants from the epoll module
106b2732e9dSimarom    _EPOLLIN = 0x001
107b2732e9dSimarom    _EPOLLPRI = 0x002
108b2732e9dSimarom    _EPOLLOUT = 0x004
109b2732e9dSimarom    _EPOLLERR = 0x008
110b2732e9dSimarom    _EPOLLHUP = 0x010
111b2732e9dSimarom    _EPOLLRDHUP = 0x2000
112b2732e9dSimarom    _EPOLLONESHOT = (1 << 30)
113b2732e9dSimarom    _EPOLLET = (1 << 31)
115b2732e9dSimarom    # Our events map exactly to the epoll events
116b2732e9dSimarom    NONE = 0
117b2732e9dSimarom    READ = _EPOLLIN
118b2732e9dSimarom    WRITE = _EPOLLOUT
119b2732e9dSimarom    ERROR = _EPOLLERR | _EPOLLHUP
121b2732e9dSimarom    # Global lock for creating global IOLoop instance
122b2732e9dSimarom    _instance_lock = threading.Lock()
124b2732e9dSimarom    _current = threading.local()
126b2732e9dSimarom    @staticmethod
127b2732e9dSimarom    def instance():
128b2732e9dSimarom        """Returns a global `IOLoop` instance.
130b2732e9dSimarom        Most applications have a single, global `IOLoop` running on the
131b2732e9dSimarom        main thread.  Use this method to get this instance from
132b2732e9dSimarom        another thread.  To get the current thread's `IOLoop`, use `current()`.
133b2732e9dSimarom        """
134b2732e9dSimarom        if not hasattr(IOLoop, "_instance"):
135b2732e9dSimarom            with IOLoop._instance_lock:
136b2732e9dSimarom                if not hasattr(IOLoop, "_instance"):
137b2732e9dSimarom                    # New instance after double check
138b2732e9dSimarom                    IOLoop._instance = IOLoop()
139b2732e9dSimarom        return IOLoop._instance
141b2732e9dSimarom    @staticmethod
142b2732e9dSimarom    def initialized():
143b2732e9dSimarom        """Returns true if the singleton instance has been created."""
144b2732e9dSimarom        return hasattr(IOLoop, "_instance")
146b2732e9dSimarom    def install(self):
147b2732e9dSimarom        """Installs this `IOLoop` object as the singleton instance.
149b2732e9dSimarom        This is normally not necessary as `instance()` will create
150b2732e9dSimarom        an `IOLoop` on demand, but you may want to call `install` to use
151b2732e9dSimarom        a custom subclass of `IOLoop`.
152b2732e9dSimarom        """
153b2732e9dSimarom        assert not IOLoop.initialized()
154b2732e9dSimarom        IOLoop._instance = self
156b2732e9dSimarom    @staticmethod
157b2732e9dSimarom    def current():
158b2732e9dSimarom        """Returns the current thread's `IOLoop`.
160b2732e9dSimarom        If an `IOLoop` is currently running or has been marked as current
161b2732e9dSimarom        by `make_current`, returns that instance.  Otherwise returns
162b2732e9dSimarom        `IOLoop.instance()`, i.e. the main thread's `IOLoop`.
164b2732e9dSimarom        A common pattern for classes that depend on ``IOLoops`` is to use
165b2732e9dSimarom        a default argument to enable programs with multiple ``IOLoops``
166b2732e9dSimarom        but not require the argument for simpler applications::
168b2732e9dSimarom            class MyClass(object):
169b2732e9dSimarom                def __init__(self, io_loop=None):
170b2732e9dSimarom                    self.io_loop = io_loop or IOLoop.current()
172b2732e9dSimarom        In general you should use `IOLoop.current` as the default when
173b2732e9dSimarom        constructing an asynchronous object, and use `IOLoop.instance`
174b2732e9dSimarom        when you mean to communicate to the main thread from a different
175b2732e9dSimarom        one.
176b2732e9dSimarom        """
177b2732e9dSimarom        current = getattr(IOLoop._current, "instance", None)
178b2732e9dSimarom        if current is None:
179b2732e9dSimarom            return IOLoop.instance()
180b2732e9dSimarom        return current
182b2732e9dSimarom    def make_current(self):
183b2732e9dSimarom        """Makes this the `IOLoop` for the current thread.
185b2732e9dSimarom        An `IOLoop` automatically becomes current for its thread
186b2732e9dSimarom        when it is started, but it is sometimes useful to call
187b2732e9dSimarom        `make_current` explictly before starting the `IOLoop`,
188b2732e9dSimarom        so that code run at startup time can find the right
189b2732e9dSimarom        instance.
190b2732e9dSimarom        """
191b2732e9dSimarom        IOLoop._current.instance = self
193b2732e9dSimarom    @staticmethod
194b2732e9dSimarom    def clear_current():
195b2732e9dSimarom        IOLoop._current.instance = None
197b2732e9dSimarom    @classmethod
198b2732e9dSimarom    def configurable_base(cls):
199b2732e9dSimarom        return IOLoop
201b2732e9dSimarom    @classmethod
202b2732e9dSimarom    def configurable_default(cls):
203b2732e9dSimarom        # this is the only patch to IOLoop:
204b2732e9dSimarom        from zmq.eventloop.ioloop import ZMQIOLoop
205b2732e9dSimarom        return ZMQIOLoop
206b2732e9dSimarom        # the remainder of this method is unused,
207b2732e9dSimarom        # but left for preservation reasons
208b2732e9dSimarom        if hasattr(select, "epoll"):
209b2732e9dSimarom            from tornado.platform.epoll import EPollIOLoop
210b2732e9dSimarom            return EPollIOLoop
211b2732e9dSimarom        if hasattr(select, "kqueue"):
212b2732e9dSimarom            # Python 2.6+ on BSD or Mac
213b2732e9dSimarom            from tornado.platform.kqueue import KQueueIOLoop
214b2732e9dSimarom            return KQueueIOLoop
215b2732e9dSimarom        from tornado.platform.select import SelectIOLoop
216b2732e9dSimarom        return SelectIOLoop
218b2732e9dSimarom    def initialize(self):
219b2732e9dSimarom        pass
221b2732e9dSimarom    def close(self, all_fds=False):
222b2732e9dSimarom        """Closes the `IOLoop`, freeing any resources used.
224b2732e9dSimarom        If ``all_fds`` is true, all file descriptors registered on the
225b2732e9dSimarom        IOLoop will be closed (not just the ones created by the
226b2732e9dSimarom        `IOLoop` itself).
228b2732e9dSimarom        Many applications will only use a single `IOLoop` that runs for the
229b2732e9dSimarom        entire lifetime of the process.  In that case closing the `IOLoop`
230b2732e9dSimarom        is not necessary since everything will be cleaned up when the
231b2732e9dSimarom        process exits.  `IOLoop.close` is provided mainly for scenarios
232b2732e9dSimarom        such as unit tests, which create and destroy a large number of
233b2732e9dSimarom        ``IOLoops``.
235b2732e9dSimarom        An `IOLoop` must be completely stopped before it can be closed.  This
236b2732e9dSimarom        means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
237b2732e9dSimarom        be allowed to return before attempting to call `IOLoop.close()`.
238b2732e9dSimarom        Therefore the call to `close` will usually appear just after
239b2732e9dSimarom        the call to `start` rather than near the call to `stop`.
241b2732e9dSimarom        .. versionchanged:: 3.1
242b2732e9dSimarom           If the `IOLoop` implementation supports non-integer objects
243b2732e9dSimarom           for "file descriptors", those objects will have their
244b2732e9dSimarom           ``close`` method when ``all_fds`` is true.
245b2732e9dSimarom        """
246b2732e9dSimarom        raise NotImplementedError()
248b2732e9dSimarom    def add_handler(self, fd, handler, events):
249b2732e9dSimarom        """Registers the given handler to receive the given events for fd.
251b2732e9dSimarom        The ``events`` argument is a bitwise or of the constants
252b2732e9dSimarom        ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
254b2732e9dSimarom        When an event occurs, ``handler(fd, events)`` will be run.
255b2732e9dSimarom        """
256b2732e9dSimarom        raise NotImplementedError()
258b2732e9dSimarom    def update_handler(self, fd, events):
259b2732e9dSimarom        """Changes the events we listen for fd."""
260b2732e9dSimarom        raise NotImplementedError()
262b2732e9dSimarom    def remove_handler(self, fd):
263b2732e9dSimarom        """Stop listening for events on fd."""
264b2732e9dSimarom        raise NotImplementedError()
266b2732e9dSimarom    def set_blocking_signal_threshold(self, seconds, action):
267b2732e9dSimarom        """Sends a signal if the `IOLoop` is blocked for more than
268b2732e9dSimarom        ``s`` seconds.
270b2732e9dSimarom        Pass ``seconds=None`` to disable.  Requires Python 2.6 on a unixy
271b2732e9dSimarom        platform.
273b2732e9dSimarom        The action parameter is a Python signal handler.  Read the
274b2732e9dSimarom        documentation for the `signal` module for more information.
275b2732e9dSimarom        If ``action`` is None, the process will be killed if it is
276b2732e9dSimarom        blocked for too long.
277b2732e9dSimarom        """
278b2732e9dSimarom        raise NotImplementedError()
280b2732e9dSimarom    def set_blocking_log_threshold(self, seconds):
281b2732e9dSimarom        """Logs a stack trace if the `IOLoop` is blocked for more than
282b2732e9dSimarom        ``s`` seconds.
284b2732e9dSimarom        Equivalent to ``set_blocking_signal_threshold(seconds,
285b2732e9dSimarom        self.log_stack)``
286b2732e9dSimarom        """
287b2732e9dSimarom        self.set_blocking_signal_threshold(seconds, self.log_stack)
289b2732e9dSimarom    def log_stack(self, signal, frame):
290b2732e9dSimarom        """Signal handler to log the stack trace of the current thread.
292b2732e9dSimarom        For use with `set_blocking_signal_threshold`.
293b2732e9dSimarom        """
294b2732e9dSimarom        gen_log.warning('IOLoop blocked for %f seconds in\n%s',
295b2732e9dSimarom                        self._blocking_signal_threshold,
296b2732e9dSimarom                        ''.join(traceback.format_stack(frame)))
298b2732e9dSimarom    def start(self):
299b2732e9dSimarom        """Starts the I/O loop.
301b2732e9dSimarom        The loop will run until one of the callbacks calls `stop()`, which
302b2732e9dSimarom        will make the loop stop after the current event iteration completes.
303b2732e9dSimarom        """
304b2732e9dSimarom        raise NotImplementedError()
306b2732e9dSimarom    def stop(self):
307b2732e9dSimarom        """Stop the I/O loop.
309b2732e9dSimarom        If the event loop is not currently running, the next call to `start()`
310b2732e9dSimarom        will return immediately.
312b2732e9dSimarom        To use asynchronous methods from otherwise-synchronous code (such as
313b2732e9dSimarom        unit tests), you can start and stop the event loop like this::
315b2732e9dSimarom          ioloop = IOLoop()
316b2732e9dSimarom          async_method(ioloop=ioloop, callback=ioloop.stop)
317b2732e9dSimarom          ioloop.start()
319b2732e9dSimarom        ``ioloop.start()`` will return after ``async_method`` has run
320b2732e9dSimarom        its callback, whether that callback was invoked before or
321b2732e9dSimarom        after ``ioloop.start``.
323b2732e9dSimarom        Note that even after `stop` has been called, the `IOLoop` is not
324b2732e9dSimarom        completely stopped until `IOLoop.start` has also returned.
325b2732e9dSimarom        Some work that was scheduled before the call to `stop` may still
326b2732e9dSimarom        be run before the `IOLoop` shuts down.
327b2732e9dSimarom        """
328b2732e9dSimarom        raise NotImplementedError()
330b2732e9dSimarom    def run_sync(self, func, timeout=None):
331b2732e9dSimarom        """Starts the `IOLoop`, runs the given function, and stops the loop.
333b2732e9dSimarom        If the function returns a `.Future`, the `IOLoop` will run
334b2732e9dSimarom        until the future is resolved.  If it raises an exception, the
335b2732e9dSimarom        `IOLoop` will stop and the exception will be re-raised to the
336b2732e9dSimarom        caller.
338b2732e9dSimarom        The keyword-only argument ``timeout`` may be used to set
339b2732e9dSimarom        a maximum duration for the function.  If the timeout expires,
340b2732e9dSimarom        a `TimeoutError` is raised.
342b2732e9dSimarom        This method is useful in conjunction with `tornado.gen.coroutine`
343b2732e9dSimarom        to allow asynchronous calls in a ``main()`` function::
345b2732e9dSimarom            @gen.coroutine
346b2732e9dSimarom            def main():
347b2732e9dSimarom                # do stuff...
349b2732e9dSimarom            if __name__ == '__main__':
350b2732e9dSimarom                IOLoop.instance().run_sync(main)
351b2732e9dSimarom        """
352b2732e9dSimarom        future_cell = [None]
354b2732e9dSimarom        def run():
355b2732e9dSimarom            try:
356b2732e9dSimarom                result = func()
357b2732e9dSimarom            except Exception:
358b2732e9dSimarom                future_cell[0] = TracebackFuture()
359b2732e9dSimarom                future_cell[0].set_exc_info(sys.exc_info())
360b2732e9dSimarom            else:
361b2732e9dSimarom                if isinstance(result, Future):
362b2732e9dSimarom                    future_cell[0] = result
363b2732e9dSimarom                else:
364b2732e9dSimarom                    future_cell[0] = Future()
365b2732e9dSimarom                    future_cell[0].set_result(result)
366b2732e9dSimarom            self.add_future(future_cell[0], lambda future: self.stop())
367b2732e9dSimarom        self.add_callback(run)
368b2732e9dSimarom        if timeout is not None:
369b2732e9dSimarom            timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
370b2732e9dSimarom        self.start()
371b2732e9dSimarom        if timeout is not None:
372b2732e9dSimarom            self.remove_timeout(timeout_handle)
373b2732e9dSimarom        if not future_cell[0].done():
374b2732e9dSimarom            raise TimeoutError('Operation timed out after %s seconds' % timeout)
375b2732e9dSimarom        return future_cell[0].result()
377b2732e9dSimarom    def time(self):
378b2732e9dSimarom        """Returns the current time according to the `IOLoop`'s clock.
380b2732e9dSimarom        The return value is a floating-point number relative to an
381b2732e9dSimarom        unspecified time in the past.
383b2732e9dSimarom        By default, the `IOLoop`'s time function is `time.time`.  However,
384b2732e9dSimarom        it may be configured to use e.g. `time.monotonic` instead.
385b2732e9dSimarom        Calls to `add_timeout` that pass a number instead of a
386b2732e9dSimarom        `datetime.timedelta` should use this function to compute the
387b2732e9dSimarom        appropriate time, so they can work no matter what time function
388b2732e9dSimarom        is chosen.
389b2732e9dSimarom        """
390b2732e9dSimarom        return time.time()
392b2732e9dSimarom    def add_timeout(self, deadline, callback):
393b2732e9dSimarom        """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
395b2732e9dSimarom        Returns an opaque handle that may be passed to
396b2732e9dSimarom        `remove_timeout` to cancel.
398b2732e9dSimarom        ``deadline`` may be a number denoting a time (on the same
399b2732e9dSimarom        scale as `IOLoop.time`, normally `time.time`), or a
400b2732e9dSimarom        `datetime.timedelta` object for a deadline relative to the
401b2732e9dSimarom        current time.
403b2732e9dSimarom        Note that it is not safe to call `add_timeout` from other threads.
404b2732e9dSimarom        Instead, you must use `add_callback` to transfer control to the
405b2732e9dSimarom        `IOLoop`'s thread, and then call `add_timeout` from there.
406b2732e9dSimarom        """
407b2732e9dSimarom        raise NotImplementedError()
409b2732e9dSimarom    def remove_timeout(self, timeout):
410b2732e9dSimarom        """Cancels a pending timeout.
412b2732e9dSimarom        The argument is a handle as returned by `add_timeout`.  It is
413b2732e9dSimarom        safe to call `remove_timeout` even if the callback has already
414b2732e9dSimarom        been run.
415b2732e9dSimarom        """
416b2732e9dSimarom        raise NotImplementedError()
418b2732e9dSimarom    def add_callback(self, callback, *args, **kwargs):
419b2732e9dSimarom        """Calls the given callback on the next I/O loop iteration.
421b2732e9dSimarom        It is safe to call this method from any thread at any time,
422b2732e9dSimarom        except from a signal handler.  Note that this is the **only**
423b2732e9dSimarom        method in `IOLoop` that makes this thread-safety guarantee; all
424b2732e9dSimarom        other interaction with the `IOLoop` must be done from that
425b2732e9dSimarom        `IOLoop`'s thread.  `add_callback()` may be used to transfer
426b2732e9dSimarom        control from other threads to the `IOLoop`'s thread.
428b2732e9dSimarom        To add a callback from a signal handler, see
429b2732e9dSimarom        `add_callback_from_signal`.
430b2732e9dSimarom        """
431b2732e9dSimarom        raise NotImplementedError()
433b2732e9dSimarom    def add_callback_from_signal(self, callback, *args, **kwargs):
434b2732e9dSimarom        """Calls the given callback on the next I/O loop iteration.
436b2732e9dSimarom        Safe for use from a Python signal handler; should not be used
437b2732e9dSimarom        otherwise.
439b2732e9dSimarom        Callbacks added with this method will be run without any
440b2732e9dSimarom        `.stack_context`, to avoid picking up the context of the function
441b2732e9dSimarom        that was interrupted by the signal.
442b2732e9dSimarom        """
443b2732e9dSimarom        raise NotImplementedError()
445b2732e9dSimarom    def add_future(self, future, callback):
446b2732e9dSimarom        """Schedules a callback on the ``IOLoop`` when the given
447b2732e9dSimarom        `.Future` is finished.
449b2732e9dSimarom        The callback is invoked with one argument, the
450b2732e9dSimarom        `.Future`.
451b2732e9dSimarom        """
452b2732e9dSimarom        assert isinstance(future, Future)
453b2732e9dSimarom        callback = stack_context.wrap(callback)
454b2732e9dSimarom        future.add_done_callback(
455b2732e9dSimarom            lambda future: self.add_callback(callback, future))
457b2732e9dSimarom    def _run_callback(self, callback):
458b2732e9dSimarom        """Runs a callback with error handling.
460b2732e9dSimarom        For use in subclasses.
461b2732e9dSimarom        """
462b2732e9dSimarom        try:
463b2732e9dSimarom            callback()
464b2732e9dSimarom        except Exception:
465b2732e9dSimarom            self.handle_callback_exception(callback)
467b2732e9dSimarom    def handle_callback_exception(self, callback):
468b2732e9dSimarom        """This method is called whenever a callback run by the `IOLoop`
469b2732e9dSimarom        throws an exception.
471b2732e9dSimarom        By default simply logs the exception as an error.  Subclasses
472b2732e9dSimarom        may override this method to customize reporting of exceptions.
474b2732e9dSimarom        The exception itself is not passed explicitly, but is available
475b2732e9dSimarom        in `sys.exc_info`.
476b2732e9dSimarom        """
477b2732e9dSimarom        app_log.error("Exception in callback %r", callback, exc_info=True)
480b2732e9dSimaromclass PollIOLoop(IOLoop):
481b2732e9dSimarom    """Base class for IOLoops built around a select-like function.
483b2732e9dSimarom    For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
484b2732e9dSimarom    (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
485b2732e9dSimarom    `tornado.platform.select.SelectIOLoop` (all platforms).
486b2732e9dSimarom    """
487b2732e9dSimarom    def initialize(self, impl, time_func=None):
488b2732e9dSimarom        super(PollIOLoop, self).initialize()
489b2732e9dSimarom        self._impl = impl
490b2732e9dSimarom        if hasattr(self._impl, 'fileno'):
491b2732e9dSimarom            set_close_exec(self._impl.fileno())
492b2732e9dSimarom        self.time_func = time_func or time.time
493b2732e9dSimarom        self._handlers = {}
494b2732e9dSimarom        self._events = {}
495b2732e9dSimarom        self._callbacks = []
496b2732e9dSimarom        self._callback_lock = threading.Lock()
497b2732e9dSimarom        self._timeouts = []
498b2732e9dSimarom        self._cancellations = 0
499b2732e9dSimarom        self._running = False
500b2732e9dSimarom        self._stopped = False
501b2732e9dSimarom        self._closing = False
502b2732e9dSimarom        self._thread_ident = None
503b2732e9dSimarom        self._blocking_signal_threshold = None
505b2732e9dSimarom        # Create a pipe that we send bogus data to when we want to wake
506b2732e9dSimarom        # the I/O loop when it is idle
507b2732e9dSimarom        self._waker = Waker()
508b2732e9dSimarom        self.add_handler(self._waker.fileno(),
509b2732e9dSimarom                         lambda fd, events: self._waker.consume(),
510b2732e9dSimarom                         self.READ)
512b2732e9dSimarom    def close(self, all_fds=False):
513b2732e9dSimarom        with self._callback_lock:
514b2732e9dSimarom            self._closing = True
515b2732e9dSimarom        self.remove_handler(self._waker.fileno())
516b2732e9dSimarom        if all_fds:
517b2732e9dSimarom            for fd in self._handlers.keys():
518b2732e9dSimarom                try:
519b2732e9dSimarom                    close_method = getattr(fd, 'close', None)
520b2732e9dSimarom                    if close_method is not None:
521b2732e9dSimarom                        close_method()
522b2732e9dSimarom                    else:
523b2732e9dSimarom                        os.close(fd)
524b2732e9dSimarom                except Exception:
525b2732e9dSimarom                    gen_log.debug("error closing fd %s", fd, exc_info=True)
526b2732e9dSimarom        self._waker.close()
527b2732e9dSimarom        self._impl.close()
529b2732e9dSimarom    def add_handler(self, fd, handler, events):
530b2732e9dSimarom        self._handlers[fd] = stack_context.wrap(handler)
531b2732e9dSimarom        self._impl.register(fd, events | self.ERROR)
533b2732e9dSimarom    def update_handler(self, fd, events):
534b2732e9dSimarom        self._impl.modify(fd, events | self.ERROR)
536b2732e9dSimarom    def remove_handler(self, fd):
537b2732e9dSimarom        self._handlers.pop(fd, None)
538b2732e9dSimarom        self._events.pop(fd, None)
539b2732e9dSimarom        try:
540b2732e9dSimarom            self._impl.unregister(fd)
541b2732e9dSimarom        except Exception:
542b2732e9dSimarom            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
544b2732e9dSimarom    def set_blocking_signal_threshold(self, seconds, action):
545b2732e9dSimarom        if not hasattr(signal, "setitimer"):
546b2732e9dSimarom            gen_log.error("set_blocking_signal_threshold requires a signal module "
547b2732e9dSimarom                          "with the setitimer method")
548b2732e9dSimarom            return
549b2732e9dSimarom        self._blocking_signal_threshold = seconds
550b2732e9dSimarom        if seconds is not None:
551b2732e9dSimarom            signal.signal(signal.SIGALRM,
552b2732e9dSimarom                          action if action is not None else signal.SIG_DFL)
554b2732e9dSimarom    def start(self):
555b2732e9dSimarom        if not logging.getLogger().handlers:
556b2732e9dSimarom            # The IOLoop catches and logs exceptions, so it's
557b2732e9dSimarom            # important that log output be visible.  However, python's
558b2732e9dSimarom            # default behavior for non-root loggers (prior to python
559b2732e9dSimarom            # 3.2) is to print an unhelpful "no handlers could be
560b2732e9dSimarom            # found" message rather than the actual log entry, so we
561b2732e9dSimarom            # must explicitly configure logging if we've made it this
562b2732e9dSimarom            # far without anything.
563b2732e9dSimarom            logging.basicConfig()
564b2732e9dSimarom        if self._stopped:
565b2732e9dSimarom            self._stopped = False
566b2732e9dSimarom            return
567b2732e9dSimarom        old_current = getattr(IOLoop._current, "instance", None)
568b2732e9dSimarom        IOLoop._current.instance = self
569b2732e9dSimarom        self._thread_ident = thread.get_ident()
570b2732e9dSimarom        self._running = True
572b2732e9dSimarom        # signal.set_wakeup_fd closes a race condition in event loops:
573b2732e9dSimarom        # a signal may arrive at the beginning of select/poll/etc
574b2732e9dSimarom        # before it goes into its interruptible sleep, so the signal
575b2732e9dSimarom        # will be consumed without waking the select.  The solution is
576b2732e9dSimarom        # for the (C, synchronous) signal handler to write to a pipe,
577b2732e9dSimarom        # which will then be seen by select.
578b2732e9dSimarom        #
579b2732e9dSimarom        # In python's signal handling semantics, this only matters on the
580b2732e9dSimarom        # main thread (fortunately, set_wakeup_fd only works on the main
581b2732e9dSimarom        # thread and will raise a ValueError otherwise).
582b2732e9dSimarom        #
583b2732e9dSimarom        # If someone has already set a wakeup fd, we don't want to
584b2732e9dSimarom        # disturb it.  This is an issue for twisted, which does its
585b2732e9dSimarom        # SIGCHILD processing in response to its own wakeup fd being
586b2732e9dSimarom        # written to.  As long as the wakeup fd is registered on the IOLoop,
587b2732e9dSimarom        # the loop will still wake up and everything should work.
588b2732e9dSimarom        old_wakeup_fd = None
589b2732e9dSimarom        if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
590b2732e9dSimarom            # requires python 2.6+, unix.  set_wakeup_fd exists but crashes
591b2732e9dSimarom            # the python process on windows.
592b2732e9dSimarom            try:
593b2732e9dSimarom                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
594b2732e9dSimarom                if old_wakeup_fd != -1:
595b2732e9dSimarom                    # Already set, restore previous value.  This is a little racy,
596b2732e9dSimarom                    # but there's no clean get_wakeup_fd and in real use the
597b2732e9dSimarom                    # IOLoop is just started once at the beginning.
598b2732e9dSimarom                    signal.set_wakeup_fd(old_wakeup_fd)
599b2732e9dSimarom                    old_wakeup_fd = None
600b2732e9dSimarom            except ValueError:  # non-main thread
601b2732e9dSimarom                pass
603b2732e9dSimarom        while True:
604b2732e9dSimarom            poll_timeout = 3600.0
606b2732e9dSimarom            # Prevent IO event starvation by delaying new callbacks
607b2732e9dSimarom            # to the next iteration of the event loop.
608b2732e9dSimarom            with self._callback_lock:
609b2732e9dSimarom                callbacks = self._callbacks
610b2732e9dSimarom                self._callbacks = []
611b2732e9dSimarom            for callback in callbacks:
612b2732e9dSimarom                self._run_callback(callback)
614b2732e9dSimarom            if self._timeouts:
615b2732e9dSimarom                now = self.time()
616b2732e9dSimarom                while self._timeouts:
617b2732e9dSimarom                    if self._timeouts[0].callback is None:
618b2732e9dSimarom                        # the timeout was cancelled
619b2732e9dSimarom                        heapq.heappop(self._timeouts)
620b2732e9dSimarom                        self._cancellations -= 1
621b2732e9dSimarom                    elif self._timeouts[0].deadline <= now:
622b2732e9dSimarom                        timeout = heapq.heappop(self._timeouts)
623b2732e9dSimarom                        self._run_callback(timeout.callback)
624b2732e9dSimarom                    else:
625b2732e9dSimarom                        seconds = self._timeouts[0].deadline - now
626b2732e9dSimarom                        poll_timeout = min(seconds, poll_timeout)
627b2732e9dSimarom                        break
628b2732e9dSimarom                if (self._cancellations > 512
629b2732e9dSimarom                        and self._cancellations > (len(self._timeouts) >> 1)):
630b2732e9dSimarom                    # Clean up the timeout queue when it gets large and it's
631b2732e9dSimarom                    # more than half cancellations.
632b2732e9dSimarom                    self._cancellations = 0
633b2732e9dSimarom                    self._timeouts = [x for x in self._timeouts
634b2732e9dSimarom                                      if x.callback is not None]
635b2732e9dSimarom                    heapq.heapify(self._timeouts)
637b2732e9dSimarom            if self._callbacks:
638b2732e9dSimarom                # If any callbacks or timeouts called add_callback,
639b2732e9dSimarom                # we don't want to wait in poll() before we run them.
640b2732e9dSimarom                poll_timeout = 0.0
642b2732e9dSimarom            if not self._running:
643b2732e9dSimarom                break
645b2732e9dSimarom            if self._blocking_signal_threshold is not None:
646b2732e9dSimarom                # clear alarm so it doesn't fire while poll is waiting for
647b2732e9dSimarom                # events.
648b2732e9dSimarom                signal.setitimer(signal.ITIMER_REAL, 0, 0)
650b2732e9dSimarom            try:
651b2732e9dSimarom                event_pairs = self._impl.poll(poll_timeout)
652b2732e9dSimarom            except Exception as e:
653b2732e9dSimarom                # Depending on python version and IOLoop implementation,
654b2732e9dSimarom                # different exception types may be thrown and there are
655b2732e9dSimarom                # two ways EINTR might be signaled:
656b2732e9dSimarom                # * e.errno == errno.EINTR
657b2732e9dSimarom                # * e.args is like (errno.EINTR, 'Interrupted system call')
658b2732e9dSimarom                if (getattr(e, 'errno', None) == errno.EINTR or
659b2732e9dSimarom                    (isinstance(getattr(e, 'args', None), tuple) and
660b2732e9dSimarom                     len(e.args) == 2 and e.args[0] == errno.EINTR)):
661b2732e9dSimarom                    continue
662b2732e9dSimarom                else:
663b2732e9dSimarom                    raise
665b2732e9dSimarom            if self._blocking_signal_threshold is not None:
666b2732e9dSimarom                signal.setitimer(signal.ITIMER_REAL,
667b2732e9dSimarom                                 self._blocking_signal_threshold, 0)
669b2732e9dSimarom            # Pop one fd at a time from the set of pending fds and run
670b2732e9dSimarom            # its handler. Since that handler may perform actions on
671b2732e9dSimarom            # other file descriptors, there may be reentrant calls to
672b2732e9dSimarom            # this IOLoop that update self._events
673b2732e9dSimarom            self._events.update(event_pairs)
674b2732e9dSimarom            while self._events:
675b2732e9dSimarom                fd, events = self._events.popitem()
676b2732e9dSimarom                try:
677b2732e9dSimarom                    self._handlers[fd](fd, events)
678b2732e9dSimarom                except (OSError, IOError) as e:
679b2732e9dSimarom                    if e.args[0] == errno.EPIPE:
680b2732e9dSimarom                        # Happens when the client closes the connection
681b2732e9dSimarom                        pass
682b2732e9dSimarom                    else:
683b2732e9dSimarom                        app_log.error("Exception in I/O handler for fd %s",
684b2732e9dSimarom                                      fd, exc_info=True)
685b2732e9dSimarom                except Exception:
686b2732e9dSimarom                    app_log.error("Exception in I/O handler for fd %s",
687b2732e9dSimarom                                  fd, exc_info=True)
688b2732e9dSimarom        # reset the stopped flag so another start/stop pair can be issued
689b2732e9dSimarom        self._stopped = False
690b2732e9dSimarom        if self._blocking_signal_threshold is not None:
691b2732e9dSimarom            signal.setitimer(signal.ITIMER_REAL, 0, 0)
692b2732e9dSimarom        IOLoop._current.instance = old_current
693b2732e9dSimarom        if old_wakeup_fd is not None:
694b2732e9dSimarom            signal.set_wakeup_fd(old_wakeup_fd)
696b2732e9dSimarom    def stop(self):
697b2732e9dSimarom        self._running = False
698b2732e9dSimarom        self._stopped = True
699b2732e9dSimarom        self._waker.wake()
701b2732e9dSimarom    def time(self):
702b2732e9dSimarom        return self.time_func()
704b2732e9dSimarom    def add_timeout(self, deadline, callback):
705b2732e9dSimarom        timeout = _Timeout(deadline, stack_context.wrap(callback), self)
706b2732e9dSimarom        heapq.heappush(self._timeouts, timeout)
707b2732e9dSimarom        return timeout
709b2732e9dSimarom    def remove_timeout(self, timeout):
710b2732e9dSimarom        # Removing from a heap is complicated, so just leave the defunct
711b2732e9dSimarom        # timeout object in the queue (see discussion in
712b2732e9dSimarom        # http://docs.python.org/library/heapq.html).
713b2732e9dSimarom        # If this turns out to be a problem, we could add a garbage
714b2732e9dSimarom        # collection pass whenever there are too many dead timeouts.
715b2732e9dSimarom        timeout.callback = None
716b2732e9dSimarom        self._cancellations += 1
718b2732e9dSimarom    def add_callback(self, callback, *args, **kwargs):
719b2732e9dSimarom        with self._callback_lock:
720b2732e9dSimarom            if self._closing:
721b2732e9dSimarom                raise RuntimeError("IOLoop is closing")
722b2732e9dSimarom            list_empty = not self._callbacks
723b2732e9dSimarom            self._callbacks.append(functools.partial(
724b2732e9dSimarom                stack_context.wrap(callback), *args, **kwargs))
725b2732e9dSimarom        if list_empty and thread.get_ident() != self._thread_ident:
726b2732e9dSimarom            # If we're in the IOLoop's thread, we know it's not currently
727b2732e9dSimarom            # polling.  If we're not, and we added the first callback to an
728b2732e9dSimarom            # empty list, we may need to wake it up (it may wake up on its
729b2732e9dSimarom            # own, but an occasional extra wake is harmless).  Waking
730b2732e9dSimarom            # up a polling IOLoop is relatively expensive, so we try to
731b2732e9dSimarom            # avoid it when we can.
732b2732e9dSimarom            self._waker.wake()
734b2732e9dSimarom    def add_callback_from_signal(self, callback, *args, **kwargs):
735b2732e9dSimarom        with stack_context.NullContext():
736b2732e9dSimarom            if thread.get_ident() != self._thread_ident:
737b2732e9dSimarom                # if the signal is handled on another thread, we can add
738b2732e9dSimarom                # it normally (modulo the NullContext)
739b2732e9dSimarom                self.add_callback(callback, *args, **kwargs)
740b2732e9dSimarom            else:
741b2732e9dSimarom                # If we're on the IOLoop's thread, we cannot use
742b2732e9dSimarom                # the regular add_callback because it may deadlock on
743b2732e9dSimarom                # _callback_lock.  Blindly insert into self._callbacks.
744b2732e9dSimarom                # This is safe because the GIL makes list.append atomic.
745b2732e9dSimarom                # One subtlety is that if the signal interrupted the
746b2732e9dSimarom                # _callback_lock block in IOLoop.start, we may modify
747b2732e9dSimarom                # either the old or new version of self._callbacks,
748b2732e9dSimarom                # but either way will work.
749b2732e9dSimarom                self._callbacks.append(functools.partial(
750b2732e9dSimarom                    stack_context.wrap(callback), *args, **kwargs))
753b2732e9dSimaromclass _Timeout(object):
754b2732e9dSimarom    """An IOLoop timeout, a UNIX timestamp and a callback"""
756b2732e9dSimarom    # Reduce memory overhead when there are lots of pending callbacks
757b2732e9dSimarom    __slots__ = ['deadline', 'callback']
759b2732e9dSimarom    def __init__(self, deadline, callback, io_loop):
760b2732e9dSimarom        if isinstance(deadline, numbers.Real):
761b2732e9dSimarom            self.deadline = deadline
762b2732e9dSimarom        elif isinstance(deadline, datetime.timedelta):
763b2732e9dSimarom            self.deadline = io_loop.time() + _Timeout.timedelta_to_seconds(deadline)
764b2732e9dSimarom        else:
765b2732e9dSimarom            raise TypeError("Unsupported deadline %r" % deadline)
766b2732e9dSimarom        self.callback = callback
768b2732e9dSimarom    @staticmethod
769b2732e9dSimarom    def timedelta_to_seconds(td):
770b2732e9dSimarom        """Equivalent to td.total_seconds() (introduced in python 2.7)."""
771b2732e9dSimarom        return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
773b2732e9dSimarom    # Comparison methods to sort by deadline, with object id as a tiebreaker
774b2732e9dSimarom    # to guarantee a consistent ordering.  The heapq module uses __le__
775b2732e9dSimarom    # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
776b2732e9dSimarom    # use __lt__).
777b2732e9dSimarom    def __lt__(self, other):
778b2732e9dSimarom        return ((self.deadline, id(self)) <
779b2732e9dSimarom                (other.deadline, id(other)))
781b2732e9dSimarom    def __le__(self, other):
782b2732e9dSimarom        return ((self.deadline, id(self)) <=
783b2732e9dSimarom                (other.deadline, id(other)))
786b2732e9dSimaromclass PeriodicCallback(object):
787b2732e9dSimarom    """Schedules the given callback to be called periodically.
789b2732e9dSimarom    The callback is called every ``callback_time`` milliseconds.
791b2732e9dSimarom    `start` must be called after the `PeriodicCallback` is created.
792b2732e9dSimarom    """
793b2732e9dSimarom    def __init__(self, callback, callback_time, io_loop=None):
794b2732e9dSimarom        self.callback = callback
795b2732e9dSimarom        if callback_time <= 0:
796b2732e9dSimarom            raise ValueError("Periodic callback must have a positive callback_time")
797b2732e9dSimarom        self.callback_time = callback_time
798b2732e9dSimarom        self.io_loop = io_loop or IOLoop.current()
799b2732e9dSimarom        self._running = False
800b2732e9dSimarom        self._timeout = None
802b2732e9dSimarom    def start(self):
803b2732e9dSimarom        """Starts the timer."""
804b2732e9dSimarom        self._running = True
805b2732e9dSimarom        self._next_timeout = self.io_loop.time()
806b2732e9dSimarom        self._schedule_next()
808b2732e9dSimarom    def stop(self):
809b2732e9dSimarom        """Stops the timer."""
810b2732e9dSimarom        self._running = False
811b2732e9dSimarom        if self._timeout is not None:
812b2732e9dSimarom            self.io_loop.remove_timeout(self._timeout)
813b2732e9dSimarom            self._timeout = None
815b2732e9dSimarom    def _run(self):
816b2732e9dSimarom        if not self._running:
817b2732e9dSimarom            return
818b2732e9dSimarom        try:
819b2732e9dSimarom            self.callback()
820b2732e9dSimarom        except Exception:
821b2732e9dSimarom            app_log.error("Error in periodic callback", exc_info=True)
822b2732e9dSimarom        self._schedule_next()
824b2732e9dSimarom    def _schedule_next(self):
825b2732e9dSimarom        if self._running:
826b2732e9dSimarom            current_time = self.io_loop.time()
827b2732e9dSimarom            while self._next_timeout <= current_time:
828b2732e9dSimarom                self._next_timeout += self.callback_time / 1000.0
829b2732e9dSimarom            self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)