1b2732e9dSimarom# coding: utf-8
2b2732e9dSimarom"""tornado IOLoop API with zmq compatibility
3b2732e9dSimarom
4b2732e9dSimaromIf you have tornado ≥ 3.0, this is a subclass of tornado's IOLoop,
5b2732e9dSimaromotherwise we ship a minimal subset of tornado in zmq.eventloop.minitornado.
6b2732e9dSimarom
7b2732e9dSimaromThe minimal shipped version of tornado's IOLoop does not include
8b2732e9dSimaromsupport for concurrent futures - this will only be available if you
9b2732e9dSimaromhave tornado ≥ 3.0.
10b2732e9dSimarom"""
11b2732e9dSimarom
12b2732e9dSimarom# Copyright (C) PyZMQ Developers
13b2732e9dSimarom# Distributed under the terms of the Modified BSD License.
14b2732e9dSimarom
15b2732e9dSimaromfrom __future__ import absolute_import, division, with_statement
16b2732e9dSimarom
17b2732e9dSimaromimport os
18b2732e9dSimaromimport time
19b2732e9dSimaromimport warnings
20b2732e9dSimarom
21b2732e9dSimaromfrom zmq import (
22b2732e9dSimarom    Poller,
23b2732e9dSimarom    POLLIN, POLLOUT, POLLERR,
24b2732e9dSimarom    ZMQError, ETERM,
25b2732e9dSimarom)
26b2732e9dSimarom
27b2732e9dSimaromtry:
28b2732e9dSimarom    import tornado
29b2732e9dSimarom    tornado_version = tornado.version_info
30b2732e9dSimaromexcept (ImportError, AttributeError):
31b2732e9dSimarom    tornado_version = ()
32b2732e9dSimarom
33b2732e9dSimaromtry:
34b2732e9dSimarom    # tornado ≥ 3
35b2732e9dSimarom    from tornado.ioloop import PollIOLoop, PeriodicCallback
36b2732e9dSimarom    from tornado.log import gen_log
37b2732e9dSimaromexcept ImportError:
38b2732e9dSimarom    from .minitornado.ioloop import PollIOLoop, PeriodicCallback
39b2732e9dSimarom    from .minitornado.log import gen_log
40b2732e9dSimarom
41b2732e9dSimarom
42b2732e9dSimaromclass DelayedCallback(PeriodicCallback):
43b2732e9dSimarom    """Schedules the given callback to be called once.
44b2732e9dSimarom
45b2732e9dSimarom    The callback is called once, after callback_time milliseconds.
46b2732e9dSimarom
47b2732e9dSimarom    `start` must be called after the DelayedCallback is created.
48b2732e9dSimarom
49b2732e9dSimarom    The timeout is calculated from when `start` is called.
50b2732e9dSimarom    """
51b2732e9dSimarom    def __init__(self, callback, callback_time, io_loop=None):
52b2732e9dSimarom        # PeriodicCallback require callback_time to be positive
53b2732e9dSimarom        warnings.warn("""DelayedCallback is deprecated.
54b2732e9dSimarom        Use loop.add_timeout instead.""", DeprecationWarning)
55b2732e9dSimarom        callback_time = max(callback_time, 1e-3)
56b2732e9dSimarom        super(DelayedCallback, self).__init__(callback, callback_time, io_loop)
57b2732e9dSimarom
58b2732e9dSimarom    def start(self):
59b2732e9dSimarom        """Starts the timer."""
60b2732e9dSimarom        self._running = True
61b2732e9dSimarom        self._firstrun = True
62b2732e9dSimarom        self._next_timeout = time.time() + self.callback_time / 1000.0
63b2732e9dSimarom        self.io_loop.add_timeout(self._next_timeout, self._run)
64b2732e9dSimarom
65b2732e9dSimarom    def _run(self):
66b2732e9dSimarom        if not self._running: return
67b2732e9dSimarom        self._running = False
68b2732e9dSimarom        try:
69b2732e9dSimarom            self.callback()
70b2732e9dSimarom        except Exception:
71b2732e9dSimarom            gen_log.error("Error in delayed callback", exc_info=True)
72b2732e9dSimarom
73b2732e9dSimarom
74b2732e9dSimaromclass ZMQPoller(object):
75b2732e9dSimarom    """A poller that can be used in the tornado IOLoop.
76b2732e9dSimarom
77b2732e9dSimarom    This simply wraps a regular zmq.Poller, scaling the timeout
78b2732e9dSimarom    by 1000, so that it is in seconds rather than milliseconds.
79b2732e9dSimarom    """
80b2732e9dSimarom
81b2732e9dSimarom    def __init__(self):
82b2732e9dSimarom        self._poller = Poller()
83b2732e9dSimarom
84b2732e9dSimarom    @staticmethod
85b2732e9dSimarom    def _map_events(events):
86b2732e9dSimarom        """translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR"""
87b2732e9dSimarom        z_events = 0
88b2732e9dSimarom        if events & IOLoop.READ:
89b2732e9dSimarom            z_events |= POLLIN
90b2732e9dSimarom        if events & IOLoop.WRITE:
91b2732e9dSimarom            z_events |= POLLOUT
92b2732e9dSimarom        if events & IOLoop.ERROR:
93b2732e9dSimarom            z_events |= POLLERR
94b2732e9dSimarom        return z_events
95b2732e9dSimarom
96b2732e9dSimarom    @staticmethod
97b2732e9dSimarom    def _remap_events(z_events):
98b2732e9dSimarom        """translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR"""
99b2732e9dSimarom        events = 0
100b2732e9dSimarom        if z_events & POLLIN:
101b2732e9dSimarom            events |= IOLoop.READ
102b2732e9dSimarom        if z_events & POLLOUT:
103b2732e9dSimarom            events |= IOLoop.WRITE
104b2732e9dSimarom        if z_events & POLLERR:
105b2732e9dSimarom            events |= IOLoop.ERROR
106b2732e9dSimarom        return events
107b2732e9dSimarom
108b2732e9dSimarom    def register(self, fd, events):
109b2732e9dSimarom        return self._poller.register(fd, self._map_events(events))
110b2732e9dSimarom
111b2732e9dSimarom    def modify(self, fd, events):
112b2732e9dSimarom        return self._poller.modify(fd, self._map_events(events))
113b2732e9dSimarom
114b2732e9dSimarom    def unregister(self, fd):
115b2732e9dSimarom        return self._poller.unregister(fd)
116b2732e9dSimarom
117b2732e9dSimarom    def poll(self, timeout):
118b2732e9dSimarom        """poll in seconds rather than milliseconds.
119b2732e9dSimarom
120b2732e9dSimarom        Event masks will be IOLoop.READ/WRITE/ERROR
121b2732e9dSimarom        """
122b2732e9dSimarom        z_events = self._poller.poll(1000*timeout)
123b2732e9dSimarom        return [ (fd,self._remap_events(evt)) for (fd,evt) in z_events ]
124b2732e9dSimarom
125b2732e9dSimarom    def close(self):
126b2732e9dSimarom        pass
127b2732e9dSimarom
128b2732e9dSimarom
129b2732e9dSimaromclass ZMQIOLoop(PollIOLoop):
130b2732e9dSimarom    """ZMQ subclass of tornado's IOLoop"""
131b2732e9dSimarom    def initialize(self, impl=None, **kwargs):
132b2732e9dSimarom        impl = ZMQPoller() if impl is None else impl
133b2732e9dSimarom        super(ZMQIOLoop, self).initialize(impl=impl, **kwargs)
134b2732e9dSimarom
135b2732e9dSimarom    @staticmethod
136b2732e9dSimarom    def instance():
137b2732e9dSimarom        """Returns a global `IOLoop` instance.
138b2732e9dSimarom
139b2732e9dSimarom        Most applications have a single, global `IOLoop` running on the
140b2732e9dSimarom        main thread.  Use this method to get this instance from
141b2732e9dSimarom        another thread.  To get the current thread's `IOLoop`, use `current()`.
142b2732e9dSimarom        """
143b2732e9dSimarom        # install ZMQIOLoop as the active IOLoop implementation
144b2732e9dSimarom        # when using tornado 3
145b2732e9dSimarom        if tornado_version >= (3,):
146b2732e9dSimarom            PollIOLoop.configure(ZMQIOLoop)
147b2732e9dSimarom        return PollIOLoop.instance()
148b2732e9dSimarom
149b2732e9dSimarom    def start(self):
150b2732e9dSimarom        try:
151b2732e9dSimarom            super(ZMQIOLoop, self).start()
152b2732e9dSimarom        except ZMQError as e:
153b2732e9dSimarom            if e.errno == ETERM:
154b2732e9dSimarom                # quietly return on ETERM
155b2732e9dSimarom                pass
156b2732e9dSimarom            else:
157b2732e9dSimarom                raise e
158b2732e9dSimarom
159b2732e9dSimarom
160b2732e9dSimaromif tornado_version >= (3,0) and tornado_version < (3,1):
161b2732e9dSimarom    def backport_close(self, all_fds=False):
162b2732e9dSimarom        """backport IOLoop.close to 3.0 from 3.1 (supports fd.close() method)"""
163b2732e9dSimarom        from zmq.eventloop.minitornado.ioloop import PollIOLoop as mini_loop
164b2732e9dSimarom        return mini_loop.close.__get__(self)(all_fds)
165b2732e9dSimarom    ZMQIOLoop.close = backport_close
166b2732e9dSimarom
167b2732e9dSimarom
168b2732e9dSimarom# public API name
169b2732e9dSimaromIOLoop = ZMQIOLoop
170b2732e9dSimarom
171b2732e9dSimarom
172b2732e9dSimaromdef install():
173b2732e9dSimarom    """set the tornado IOLoop instance with the pyzmq IOLoop.
174b2732e9dSimarom
175b2732e9dSimarom    After calling this function, tornado's IOLoop.instance() and pyzmq's
176b2732e9dSimarom    IOLoop.instance() will return the same object.
177b2732e9dSimarom
178b2732e9dSimarom    An assertion error will be raised if tornado's IOLoop has been initialized
179b2732e9dSimarom    prior to calling this function.
180b2732e9dSimarom    """
181b2732e9dSimarom    from tornado import ioloop
182b2732e9dSimarom    # check if tornado's IOLoop is already initialized to something other
183b2732e9dSimarom    # than the pyzmq IOLoop instance:
184b2732e9dSimarom    assert (not ioloop.IOLoop.initialized()) or \
185b2732e9dSimarom        ioloop.IOLoop.instance() is IOLoop.instance(), "tornado IOLoop already initialized"
186b2732e9dSimarom
187b2732e9dSimarom    if tornado_version >= (3,):
188b2732e9dSimarom        # tornado 3 has an official API for registering new defaults, yay!
189b2732e9dSimarom        ioloop.IOLoop.configure(ZMQIOLoop)
190b2732e9dSimarom    else:
191b2732e9dSimarom        # we have to set the global instance explicitly
192b2732e9dSimarom        ioloop.IOLoop._instance = IOLoop.instance()
193b2732e9dSimarom
194