1# coding: utf-8
2"""tornado IOLoop API with zmq compatibility
3
4If you have tornado ≥ 3.0, this is a subclass of tornado's IOLoop,
5otherwise we ship a minimal subset of tornado in zmq.eventloop.minitornado.
6
7The minimal shipped version of tornado's IOLoop does not include
8support for concurrent futures - this will only be available if you
9have tornado ≥ 3.0.
10"""
11
12# Copyright (C) PyZMQ Developers
13# Distributed under the terms of the Modified BSD License.
14
15from __future__ import absolute_import, division, with_statement
16
17import os
18import time
19import warnings
20
21from zmq import (
22    Poller,
23    POLLIN, POLLOUT, POLLERR,
24    ZMQError, ETERM,
25)
26
27try:
28    import tornado
29    tornado_version = tornado.version_info
30except (ImportError, AttributeError):
31    tornado_version = ()
32
33try:
34    # tornado ≥ 3
35    from tornado.ioloop import PollIOLoop, PeriodicCallback
36    from tornado.log import gen_log
37except ImportError:
38    from .minitornado.ioloop import PollIOLoop, PeriodicCallback
39    from .minitornado.log import gen_log
40
41
42class DelayedCallback(PeriodicCallback):
43    """Schedules the given callback to be called once.
44
45    The callback is called once, after callback_time milliseconds.
46
47    `start` must be called after the DelayedCallback is created.
48
49    The timeout is calculated from when `start` is called.
50    """
51    def __init__(self, callback, callback_time, io_loop=None):
52        # PeriodicCallback require callback_time to be positive
53        warnings.warn("""DelayedCallback is deprecated.
54        Use loop.add_timeout instead.""", DeprecationWarning)
55        callback_time = max(callback_time, 1e-3)
56        super(DelayedCallback, self).__init__(callback, callback_time, io_loop)
57
58    def start(self):
59        """Starts the timer."""
60        self._running = True
61        self._firstrun = True
62        self._next_timeout = time.time() + self.callback_time / 1000.0
63        self.io_loop.add_timeout(self._next_timeout, self._run)
64
65    def _run(self):
66        if not self._running: return
67        self._running = False
68        try:
69            self.callback()
70        except Exception:
71            gen_log.error("Error in delayed callback", exc_info=True)
72
73
74class ZMQPoller(object):
75    """A poller that can be used in the tornado IOLoop.
76
77    This simply wraps a regular zmq.Poller, scaling the timeout
78    by 1000, so that it is in seconds rather than milliseconds.
79    """
80
81    def __init__(self):
82        self._poller = Poller()
83
84    @staticmethod
85    def _map_events(events):
86        """translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR"""
87        z_events = 0
88        if events & IOLoop.READ:
89            z_events |= POLLIN
90        if events & IOLoop.WRITE:
91            z_events |= POLLOUT
92        if events & IOLoop.ERROR:
93            z_events |= POLLERR
94        return z_events
95
96    @staticmethod
97    def _remap_events(z_events):
98        """translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR"""
99        events = 0
100        if z_events & POLLIN:
101            events |= IOLoop.READ
102        if z_events & POLLOUT:
103            events |= IOLoop.WRITE
104        if z_events & POLLERR:
105            events |= IOLoop.ERROR
106        return events
107
108    def register(self, fd, events):
109        return self._poller.register(fd, self._map_events(events))
110
111    def modify(self, fd, events):
112        return self._poller.modify(fd, self._map_events(events))
113
114    def unregister(self, fd):
115        return self._poller.unregister(fd)
116
117    def poll(self, timeout):
118        """poll in seconds rather than milliseconds.
119
120        Event masks will be IOLoop.READ/WRITE/ERROR
121        """
122        z_events = self._poller.poll(1000*timeout)
123        return [ (fd,self._remap_events(evt)) for (fd,evt) in z_events ]
124
125    def close(self):
126        pass
127
128
129class ZMQIOLoop(PollIOLoop):
130    """ZMQ subclass of tornado's IOLoop"""
131    def initialize(self, impl=None, **kwargs):
132        impl = ZMQPoller() if impl is None else impl
133        super(ZMQIOLoop, self).initialize(impl=impl, **kwargs)
134
135    @staticmethod
136    def instance():
137        """Returns a global `IOLoop` instance.
138
139        Most applications have a single, global `IOLoop` running on the
140        main thread.  Use this method to get this instance from
141        another thread.  To get the current thread's `IOLoop`, use `current()`.
142        """
143        # install ZMQIOLoop as the active IOLoop implementation
144        # when using tornado 3
145        if tornado_version >= (3,):
146            PollIOLoop.configure(ZMQIOLoop)
147        return PollIOLoop.instance()
148
149    def start(self):
150        try:
151            super(ZMQIOLoop, self).start()
152        except ZMQError as e:
153            if e.errno == ETERM:
154                # quietly return on ETERM
155                pass
156            else:
157                raise e
158
159
160if tornado_version >= (3,0) and tornado_version < (3,1):
161    def backport_close(self, all_fds=False):
162        """backport IOLoop.close to 3.0 from 3.1 (supports fd.close() method)"""
163        from zmq.eventloop.minitornado.ioloop import PollIOLoop as mini_loop
164        return mini_loop.close.__get__(self)(all_fds)
165    ZMQIOLoop.close = backport_close
166
167
168# public API name
169IOLoop = ZMQIOLoop
170
171
172def install():
173    """set the tornado IOLoop instance with the pyzmq IOLoop.
174
175    After calling this function, tornado's IOLoop.instance() and pyzmq's
176    IOLoop.instance() will return the same object.
177
178    An assertion error will be raised if tornado's IOLoop has been initialized
179    prior to calling this function.
180    """
181    from tornado import ioloop
182    # check if tornado's IOLoop is already initialized to something other
183    # than the pyzmq IOLoop instance:
184    assert (not ioloop.IOLoop.initialized()) or \
185        ioloop.IOLoop.instance() is IOLoop.instance(), "tornado IOLoop already initialized"
186
187    if tornado_version >= (3,):
188        # tornado 3 has an official API for registering new defaults, yay!
189        ioloop.IOLoop.configure(ZMQIOLoop)
190    else:
191        # we have to set the global instance explicitly
192        ioloop.IOLoop._instance = IOLoop.instance()
193
194