1d3907f0dSYaroslav Brustinov#
2d3907f0dSYaroslav Brustinov# Copyright 2009 Facebook
3d3907f0dSYaroslav Brustinov#
4d3907f0dSYaroslav Brustinov# Licensed under the Apache License, Version 2.0 (the "License"); you may
5d3907f0dSYaroslav Brustinov# not use this file except in compliance with the License. You may obtain
6d3907f0dSYaroslav Brustinov# a copy of the License at
7d3907f0dSYaroslav Brustinov#
8d3907f0dSYaroslav Brustinov#     http://www.apache.org/licenses/LICENSE-2.0
9d3907f0dSYaroslav Brustinov#
10d3907f0dSYaroslav Brustinov# Unless required by applicable law or agreed to in writing, software
11d3907f0dSYaroslav Brustinov# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12d3907f0dSYaroslav Brustinov# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13d3907f0dSYaroslav Brustinov# License for the specific language governing permissions and limitations
14d3907f0dSYaroslav Brustinov# under the License.
15d3907f0dSYaroslav Brustinov
16d3907f0dSYaroslav Brustinov"""A utility class to send to and recv from a non-blocking socket."""
17d3907f0dSYaroslav Brustinov
18d3907f0dSYaroslav Brustinovfrom __future__ import with_statement
19d3907f0dSYaroslav Brustinov
20d3907f0dSYaroslav Brustinovimport sys
21d3907f0dSYaroslav Brustinov
22d3907f0dSYaroslav Brustinovimport zmq
23d3907f0dSYaroslav Brustinovfrom zmq.utils import jsonapi
24d3907f0dSYaroslav Brustinov
25d3907f0dSYaroslav Brustinovtry:
26d3907f0dSYaroslav Brustinov    import cPickle as pickle
27d3907f0dSYaroslav Brustinovexcept ImportError:
28d3907f0dSYaroslav Brustinov    import pickle
29d3907f0dSYaroslav Brustinov
30d3907f0dSYaroslav Brustinovfrom .ioloop import IOLoop
31d3907f0dSYaroslav Brustinov
32d3907f0dSYaroslav Brustinovtry:
33d3907f0dSYaroslav Brustinov    # gen_log will only import from >= 3.0
34d3907f0dSYaroslav Brustinov    from tornado.log import gen_log
35d3907f0dSYaroslav Brustinov    from tornado import stack_context
36d3907f0dSYaroslav Brustinovexcept ImportError:
37d3907f0dSYaroslav Brustinov    from .minitornado.log import gen_log
38d3907f0dSYaroslav Brustinov    from .minitornado import stack_context
39d3907f0dSYaroslav Brustinov
40d3907f0dSYaroslav Brustinovtry:
41d3907f0dSYaroslav Brustinov    from queue import Queue
42d3907f0dSYaroslav Brustinovexcept ImportError:
43d3907f0dSYaroslav Brustinov    from Queue import Queue
44d3907f0dSYaroslav Brustinov
45d3907f0dSYaroslav Brustinovfrom zmq.utils.strtypes import bytes, unicode, basestring
46d3907f0dSYaroslav Brustinov
47d3907f0dSYaroslav Brustinovtry:
48d3907f0dSYaroslav Brustinov    callable
49d3907f0dSYaroslav Brustinovexcept NameError:
50d3907f0dSYaroslav Brustinov    callable = lambda obj: hasattr(obj, '__call__')
51d3907f0dSYaroslav Brustinov
52d3907f0dSYaroslav Brustinov
53d3907f0dSYaroslav Brustinovclass ZMQStream(object):
54d3907f0dSYaroslav Brustinov    """A utility class to register callbacks when a zmq socket sends and receives
55d3907f0dSYaroslav Brustinov
56d3907f0dSYaroslav Brustinov    For use with zmq.eventloop.ioloop
57d3907f0dSYaroslav Brustinov
58d3907f0dSYaroslav Brustinov    There are three main methods
59d3907f0dSYaroslav Brustinov
60d3907f0dSYaroslav Brustinov    Methods:
61d3907f0dSYaroslav Brustinov
62d3907f0dSYaroslav Brustinov    * **on_recv(callback, copy=True):**
63d3907f0dSYaroslav Brustinov        register a callback to be run every time the socket has something to receive
64d3907f0dSYaroslav Brustinov    * **on_send(callback):**
65d3907f0dSYaroslav Brustinov        register a callback to be run every time you call send
66d3907f0dSYaroslav Brustinov    * **send(self, msg, flags=0, copy=False, callback=None):**
67d3907f0dSYaroslav Brustinov        perform a send that will trigger the callback
68d3907f0dSYaroslav Brustinov        if callback is passed, on_send is also called.
69d3907f0dSYaroslav Brustinov
70d3907f0dSYaroslav Brustinov        There are also send_multipart(), send_json(), send_pyobj()
71d3907f0dSYaroslav Brustinov
72d3907f0dSYaroslav Brustinov    Three other methods for deactivating the callbacks:
73d3907f0dSYaroslav Brustinov
74d3907f0dSYaroslav Brustinov    * **stop_on_recv():**
75d3907f0dSYaroslav Brustinov        turn off the recv callback
76d3907f0dSYaroslav Brustinov    * **stop_on_send():**
77d3907f0dSYaroslav Brustinov        turn off the send callback
78d3907f0dSYaroslav Brustinov
79d3907f0dSYaroslav Brustinov    which simply call ``on_<evt>(None)``.
80d3907f0dSYaroslav Brustinov
81d3907f0dSYaroslav Brustinov    The entire socket interface, excluding direct recv methods, is also
82d3907f0dSYaroslav Brustinov    provided, primarily through direct-linking the methods.
83d3907f0dSYaroslav Brustinov    e.g.
84d3907f0dSYaroslav Brustinov
85d3907f0dSYaroslav Brustinov    >>> stream.bind is stream.socket.bind
86d3907f0dSYaroslav Brustinov    True
87d3907f0dSYaroslav Brustinov
88d3907f0dSYaroslav Brustinov    """
89d3907f0dSYaroslav Brustinov
90d3907f0dSYaroslav Brustinov    socket = None
91d3907f0dSYaroslav Brustinov    io_loop = None
92d3907f0dSYaroslav Brustinov    poller = None
93d3907f0dSYaroslav Brustinov
94d3907f0dSYaroslav Brustinov    def __init__(self, socket, io_loop=None):
95d3907f0dSYaroslav Brustinov        self.socket = socket
96d3907f0dSYaroslav Brustinov        self.io_loop = io_loop or IOLoop.instance()
97d3907f0dSYaroslav Brustinov        self.poller = zmq.Poller()
98d3907f0dSYaroslav Brustinov
99d3907f0dSYaroslav Brustinov        self._send_queue = Queue()
100d3907f0dSYaroslav Brustinov        self._recv_callback = None
101d3907f0dSYaroslav Brustinov        self._send_callback = None
102d3907f0dSYaroslav Brustinov        self._close_callback = None
103d3907f0dSYaroslav Brustinov        self._recv_copy = False
104d3907f0dSYaroslav Brustinov        self._flushed = False
105d3907f0dSYaroslav Brustinov
106d3907f0dSYaroslav Brustinov        self._state = self.io_loop.ERROR
107d3907f0dSYaroslav Brustinov        self._init_io_state()
108d3907f0dSYaroslav Brustinov
109d3907f0dSYaroslav Brustinov        # shortcircuit some socket methods
110d3907f0dSYaroslav Brustinov        self.bind = self.socket.bind
111d3907f0dSYaroslav Brustinov        self.bind_to_random_port = self.socket.bind_to_random_port
112d3907f0dSYaroslav Brustinov        self.connect = self.socket.connect
113d3907f0dSYaroslav Brustinov        self.setsockopt = self.socket.setsockopt
114d3907f0dSYaroslav Brustinov        self.getsockopt = self.socket.getsockopt
115d3907f0dSYaroslav Brustinov        self.setsockopt_string = self.socket.setsockopt_string
116d3907f0dSYaroslav Brustinov        self.getsockopt_string = self.socket.getsockopt_string
117d3907f0dSYaroslav Brustinov        self.setsockopt_unicode = self.socket.setsockopt_unicode
118d3907f0dSYaroslav Brustinov        self.getsockopt_unicode = self.socket.getsockopt_unicode
119d3907f0dSYaroslav Brustinov
120d3907f0dSYaroslav Brustinov
121d3907f0dSYaroslav Brustinov    def stop_on_recv(self):
122d3907f0dSYaroslav Brustinov        """Disable callback and automatic receiving."""
123d3907f0dSYaroslav Brustinov        return self.on_recv(None)
124d3907f0dSYaroslav Brustinov
125d3907f0dSYaroslav Brustinov    def stop_on_send(self):
126d3907f0dSYaroslav Brustinov        """Disable callback on sending."""
127d3907f0dSYaroslav Brustinov        return self.on_send(None)
128d3907f0dSYaroslav Brustinov
129d3907f0dSYaroslav Brustinov    def stop_on_err(self):
130d3907f0dSYaroslav Brustinov        """DEPRECATED, does nothing"""
131d3907f0dSYaroslav Brustinov        gen_log.warn("on_err does nothing, and will be removed")
132d3907f0dSYaroslav Brustinov
133d3907f0dSYaroslav Brustinov    def on_err(self, callback):
134d3907f0dSYaroslav Brustinov        """DEPRECATED, does nothing"""
135d3907f0dSYaroslav Brustinov        gen_log.warn("on_err does nothing, and will be removed")
136d3907f0dSYaroslav Brustinov
137d3907f0dSYaroslav Brustinov    def on_recv(self, callback, copy=True):
138d3907f0dSYaroslav Brustinov        """Register a callback for when a message is ready to recv.
139d3907f0dSYaroslav Brustinov
140d3907f0dSYaroslav Brustinov        There can be only one callback registered at a time, so each
141d3907f0dSYaroslav Brustinov        call to `on_recv` replaces previously registered callbacks.
142d3907f0dSYaroslav Brustinov
143d3907f0dSYaroslav Brustinov        on_recv(None) disables recv event polling.
144d3907f0dSYaroslav Brustinov
145d3907f0dSYaroslav Brustinov        Use on_recv_stream(callback) instead, to register a callback that will receive
146d3907f0dSYaroslav Brustinov        both this ZMQStream and the message, instead of just the message.
147d3907f0dSYaroslav Brustinov
148d3907f0dSYaroslav Brustinov        Parameters
149d3907f0dSYaroslav Brustinov        ----------
150d3907f0dSYaroslav Brustinov
151d3907f0dSYaroslav Brustinov        callback : callable
152d3907f0dSYaroslav Brustinov            callback must take exactly one argument, which will be a
153d3907f0dSYaroslav Brustinov            list, as returned by socket.recv_multipart()
154d3907f0dSYaroslav Brustinov            if callback is None, recv callbacks are disabled.
155d3907f0dSYaroslav Brustinov        copy : bool
156d3907f0dSYaroslav Brustinov            copy is passed directly to recv, so if copy is False,
157d3907f0dSYaroslav Brustinov            callback will receive Message objects. If copy is True,
158d3907f0dSYaroslav Brustinov            then callback will receive bytes/str objects.
159d3907f0dSYaroslav Brustinov
160d3907f0dSYaroslav Brustinov        Returns : None
161d3907f0dSYaroslav Brustinov        """
162d3907f0dSYaroslav Brustinov
163d3907f0dSYaroslav Brustinov        self._check_closed()
164d3907f0dSYaroslav Brustinov        assert callback is None or callable(callback)
165d3907f0dSYaroslav Brustinov        self._recv_callback = stack_context.wrap(callback)
166d3907f0dSYaroslav Brustinov        self._recv_copy = copy
167d3907f0dSYaroslav Brustinov        if callback is None:
168d3907f0dSYaroslav Brustinov            self._drop_io_state(self.io_loop.READ)
169d3907f0dSYaroslav Brustinov        else:
170d3907f0dSYaroslav Brustinov            self._add_io_state(self.io_loop.READ)
171d3907f0dSYaroslav Brustinov
172d3907f0dSYaroslav Brustinov    def on_recv_stream(self, callback, copy=True):
173d3907f0dSYaroslav Brustinov        """Same as on_recv, but callback will get this stream as first argument
174d3907f0dSYaroslav Brustinov
175d3907f0dSYaroslav Brustinov        callback must take exactly two arguments, as it will be called as::
176d3907f0dSYaroslav Brustinov
177d3907f0dSYaroslav Brustinov            callback(stream, msg)
178d3907f0dSYaroslav Brustinov
179d3907f0dSYaroslav Brustinov        Useful when a single callback should be used with multiple streams.
180d3907f0dSYaroslav Brustinov        """
181d3907f0dSYaroslav Brustinov        if callback is None:
182d3907f0dSYaroslav Brustinov            self.stop_on_recv()
183d3907f0dSYaroslav Brustinov        else:
184d3907f0dSYaroslav Brustinov            self.on_recv(lambda msg: callback(self, msg), copy=copy)
185d3907f0dSYaroslav Brustinov
186d3907f0dSYaroslav Brustinov    def on_send(self, callback):
187d3907f0dSYaroslav Brustinov        """Register a callback to be called on each send
188d3907f0dSYaroslav Brustinov
189d3907f0dSYaroslav Brustinov        There will be two arguments::
190d3907f0dSYaroslav Brustinov
191d3907f0dSYaroslav Brustinov            callback(msg, status)
192d3907f0dSYaroslav Brustinov
193d3907f0dSYaroslav Brustinov        * `msg` will be the list of sendable objects that was just sent
194d3907f0dSYaroslav Brustinov        * `status` will be the return result of socket.send_multipart(msg) -
195d3907f0dSYaroslav Brustinov          MessageTracker or None.
196d3907f0dSYaroslav Brustinov
197d3907f0dSYaroslav Brustinov        Non-copying sends return a MessageTracker object whose
198d3907f0dSYaroslav Brustinov        `done` attribute will be True when the send is complete.
199d3907f0dSYaroslav Brustinov        This allows users to track when an object is safe to write to
200d3907f0dSYaroslav Brustinov        again.
201d3907f0dSYaroslav Brustinov
202d3907f0dSYaroslav Brustinov        The second argument will always be None if copy=True
203d3907f0dSYaroslav Brustinov        on the send.
204d3907f0dSYaroslav Brustinov
205d3907f0dSYaroslav Brustinov        Use on_send_stream(callback) to register a callback that will be passed
206d3907f0dSYaroslav Brustinov        this ZMQStream as the first argument, in addition to the other two.
207d3907f0dSYaroslav Brustinov
208d3907f0dSYaroslav Brustinov        on_send(None) disables recv event polling.
209d3907f0dSYaroslav Brustinov
210d3907f0dSYaroslav Brustinov        Parameters
211d3907f0dSYaroslav Brustinov        ----------
212d3907f0dSYaroslav Brustinov
213d3907f0dSYaroslav Brustinov        callback : callable
214d3907f0dSYaroslav Brustinov            callback must take exactly two arguments, which will be
215d3907f0dSYaroslav Brustinov            the message being sent (always a list),
216d3907f0dSYaroslav Brustinov            and the return result of socket.send_multipart(msg) -
217d3907f0dSYaroslav Brustinov            MessageTracker or None.
218d3907f0dSYaroslav Brustinov
219d3907f0dSYaroslav Brustinov            if callback is None, send callbacks are disabled.
220d3907f0dSYaroslav Brustinov        """
221d3907f0dSYaroslav Brustinov
222d3907f0dSYaroslav Brustinov        self._check_closed()
223d3907f0dSYaroslav Brustinov        assert callback is None or callable(callback)
224d3907f0dSYaroslav Brustinov        self._send_callback = stack_context.wrap(callback)
225d3907f0dSYaroslav Brustinov
226d3907f0dSYaroslav Brustinov
227d3907f0dSYaroslav Brustinov    def on_send_stream(self, callback):
228d3907f0dSYaroslav Brustinov        """Same as on_send, but callback will get this stream as first argument
229d3907f0dSYaroslav Brustinov
230d3907f0dSYaroslav Brustinov        Callback will be passed three arguments::
231d3907f0dSYaroslav Brustinov
232d3907f0dSYaroslav Brustinov            callback(stream, msg, status)
233d3907f0dSYaroslav Brustinov
234d3907f0dSYaroslav Brustinov        Useful when a single callback should be used with multiple streams.
235d3907f0dSYaroslav Brustinov        """
236d3907f0dSYaroslav Brustinov        if callback is None:
237d3907f0dSYaroslav Brustinov            self.stop_on_send()
238d3907f0dSYaroslav Brustinov        else:
239d3907f0dSYaroslav Brustinov            self.on_send(lambda msg, status: callback(self, msg, status))
240d3907f0dSYaroslav Brustinov
241d3907f0dSYaroslav Brustinov
242d3907f0dSYaroslav Brustinov    def send(self, msg, flags=0, copy=True, track=False, callback=None):
243d3907f0dSYaroslav Brustinov        """Send a message, optionally also register a new callback for sends.
244d3907f0dSYaroslav Brustinov        See zmq.socket.send for details.
245d3907f0dSYaroslav Brustinov        """
246d3907f0dSYaroslav Brustinov        return self.send_multipart([msg], flags=flags, copy=copy, track=track, callback=callback)
247d3907f0dSYaroslav Brustinov
248d3907f0dSYaroslav Brustinov    def send_multipart(self, msg, flags=0, copy=True, track=False, callback=None):
249d3907f0dSYaroslav Brustinov        """Send a multipart message, optionally also register a new callback for sends.
250d3907f0dSYaroslav Brustinov        See zmq.socket.send_multipart for details.
251d3907f0dSYaroslav Brustinov        """
252d3907f0dSYaroslav Brustinov        kwargs = dict(flags=flags, copy=copy, track=track)
253d3907f0dSYaroslav Brustinov        self._send_queue.put((msg, kwargs))
254d3907f0dSYaroslav Brustinov        callback = callback or self._send_callback
255d3907f0dSYaroslav Brustinov        if callback is not None:
256d3907f0dSYaroslav Brustinov            self.on_send(callback)
257d3907f0dSYaroslav Brustinov        else:
258d3907f0dSYaroslav Brustinov            # noop callback
259d3907f0dSYaroslav Brustinov            self.on_send(lambda *args: None)
260d3907f0dSYaroslav Brustinov        self._add_io_state(self.io_loop.WRITE)
261d3907f0dSYaroslav Brustinov
262d3907f0dSYaroslav Brustinov    def send_string(self, u, flags=0, encoding='utf-8', callback=None):
263d3907f0dSYaroslav Brustinov        """Send a unicode message with an encoding.
264d3907f0dSYaroslav Brustinov        See zmq.socket.send_unicode for details.
265d3907f0dSYaroslav Brustinov        """
266d3907f0dSYaroslav Brustinov        if not isinstance(u, basestring):
267d3907f0dSYaroslav Brustinov            raise TypeError("unicode/str objects only")
268d3907f0dSYaroslav Brustinov        return self.send(u.encode(encoding), flags=flags, callback=callback)
269d3907f0dSYaroslav Brustinov
270d3907f0dSYaroslav Brustinov    send_unicode = send_string
271d3907f0dSYaroslav Brustinov
272d3907f0dSYaroslav Brustinov    def send_json(self, obj, flags=0, callback=None):
273d3907f0dSYaroslav Brustinov        """Send json-serialized version of an object.
274d3907f0dSYaroslav Brustinov        See zmq.socket.send_json for details.
275d3907f0dSYaroslav Brustinov        """
276d3907f0dSYaroslav Brustinov        if jsonapi is None:
277d3907f0dSYaroslav Brustinov            raise ImportError('jsonlib{1,2}, json or simplejson library is required.')
278d3907f0dSYaroslav Brustinov        else:
279d3907f0dSYaroslav Brustinov            msg = jsonapi.dumps(obj)
280d3907f0dSYaroslav Brustinov            return self.send(msg, flags=flags, callback=callback)
281d3907f0dSYaroslav Brustinov
282d3907f0dSYaroslav Brustinov    def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
283d3907f0dSYaroslav Brustinov        """Send a Python object as a message using pickle to serialize.
284d3907f0dSYaroslav Brustinov
285d3907f0dSYaroslav Brustinov        See zmq.socket.send_json for details.
286d3907f0dSYaroslav Brustinov        """
287d3907f0dSYaroslav Brustinov        msg = pickle.dumps(obj, protocol)
288d3907f0dSYaroslav Brustinov        return self.send(msg, flags, callback=callback)
289d3907f0dSYaroslav Brustinov
290d3907f0dSYaroslav Brustinov    def _finish_flush(self):
291d3907f0dSYaroslav Brustinov        """callback for unsetting _flushed flag."""
292d3907f0dSYaroslav Brustinov        self._flushed = False
293d3907f0dSYaroslav Brustinov
294d3907f0dSYaroslav Brustinov    def flush(self, flag=zmq.POLLIN|zmq.POLLOUT, limit=None):
295d3907f0dSYaroslav Brustinov        """Flush pending messages.
296d3907f0dSYaroslav Brustinov
297d3907f0dSYaroslav Brustinov        This method safely handles all pending incoming and/or outgoing messages,
298d3907f0dSYaroslav Brustinov        bypassing the inner loop, passing them to the registered callbacks.
299d3907f0dSYaroslav Brustinov
300d3907f0dSYaroslav Brustinov        A limit can be specified, to prevent blocking under high load.
301d3907f0dSYaroslav Brustinov
302d3907f0dSYaroslav Brustinov        flush will return the first time ANY of these conditions are met:
303d3907f0dSYaroslav Brustinov            * No more events matching the flag are pending.
304d3907f0dSYaroslav Brustinov            * the total number of events handled reaches the limit.
305d3907f0dSYaroslav Brustinov
306d3907f0dSYaroslav Brustinov        Note that if ``flag|POLLIN != 0``, recv events will be flushed even if no callback
307d3907f0dSYaroslav Brustinov        is registered, unlike normal IOLoop operation. This allows flush to be
308d3907f0dSYaroslav Brustinov        used to remove *and ignore* incoming messages.
309d3907f0dSYaroslav Brustinov
310d3907f0dSYaroslav Brustinov        Parameters
311d3907f0dSYaroslav Brustinov        ----------
312d3907f0dSYaroslav Brustinov        flag : int, default=POLLIN|POLLOUT
313d3907f0dSYaroslav Brustinov                0MQ poll flags.
314d3907f0dSYaroslav Brustinov                If flag|POLLIN,  recv events will be flushed.
315d3907f0dSYaroslav Brustinov                If flag|POLLOUT, send events will be flushed.
316d3907f0dSYaroslav Brustinov                Both flags can be set at once, which is the default.
317d3907f0dSYaroslav Brustinov        limit : None or int, optional
318d3907f0dSYaroslav Brustinov                The maximum number of messages to send or receive.
319d3907f0dSYaroslav Brustinov                Both send and recv count against this limit.
320d3907f0dSYaroslav Brustinov
321d3907f0dSYaroslav Brustinov        Returns
322d3907f0dSYaroslav Brustinov        -------
323d3907f0dSYaroslav Brustinov        int : count of events handled (both send and recv)
324d3907f0dSYaroslav Brustinov        """
325d3907f0dSYaroslav Brustinov        self._check_closed()
326d3907f0dSYaroslav Brustinov        # unset self._flushed, so callbacks will execute, in case flush has
327d3907f0dSYaroslav Brustinov        # already been called this iteration
328d3907f0dSYaroslav Brustinov        already_flushed = self._flushed
329d3907f0dSYaroslav Brustinov        self._flushed = False
330d3907f0dSYaroslav Brustinov        # initialize counters
331d3907f0dSYaroslav Brustinov        count = 0
332d3907f0dSYaroslav Brustinov        def update_flag():
333d3907f0dSYaroslav Brustinov            """Update the poll flag, to prevent registering POLLOUT events
334d3907f0dSYaroslav Brustinov            if we don't have pending sends."""
335d3907f0dSYaroslav Brustinov            return flag & zmq.POLLIN | (self.sending() and flag & zmq.POLLOUT)
336d3907f0dSYaroslav Brustinov        flag = update_flag()
337d3907f0dSYaroslav Brustinov        if not flag:
338d3907f0dSYaroslav Brustinov            # nothing to do
339d3907f0dSYaroslav Brustinov            return 0
340d3907f0dSYaroslav Brustinov        self.poller.register(self.socket, flag)
341d3907f0dSYaroslav Brustinov        events = self.poller.poll(0)
342d3907f0dSYaroslav Brustinov        while events and (not limit or count < limit):
343d3907f0dSYaroslav Brustinov            s,event = events[0]
344d3907f0dSYaroslav Brustinov            if event & zmq.POLLIN: # receiving
345d3907f0dSYaroslav Brustinov                self._handle_recv()
346d3907f0dSYaroslav Brustinov                count += 1
347d3907f0dSYaroslav Brustinov                if self.socket is None:
348d3907f0dSYaroslav Brustinov                    # break if socket was closed during callback
349d3907f0dSYaroslav Brustinov                    break
350d3907f0dSYaroslav Brustinov            if event & zmq.POLLOUT and self.sending():
351d3907f0dSYaroslav Brustinov                self._handle_send()
352d3907f0dSYaroslav Brustinov                count += 1
353d3907f0dSYaroslav Brustinov                if self.socket is None:
354d3907f0dSYaroslav Brustinov                    # break if socket was closed during callback
355d3907f0dSYaroslav Brustinov                    break
356d3907f0dSYaroslav Brustinov
357d3907f0dSYaroslav Brustinov            flag = update_flag()
358d3907f0dSYaroslav Brustinov            if flag:
359d3907f0dSYaroslav Brustinov                self.poller.register(self.socket, flag)
360d3907f0dSYaroslav Brustinov                events = self.poller.poll(0)
361d3907f0dSYaroslav Brustinov            else:
362d3907f0dSYaroslav Brustinov                events = []
363d3907f0dSYaroslav Brustinov        if count: # only bypass loop if we actually flushed something
364d3907f0dSYaroslav Brustinov            # skip send/recv callbacks this iteration
365d3907f0dSYaroslav Brustinov            self._flushed = True
366d3907f0dSYaroslav Brustinov            # reregister them at the end of the loop
367d3907f0dSYaroslav Brustinov            if not already_flushed: # don't need to do it again
368d3907f0dSYaroslav Brustinov                self.io_loop.add_callback(self._finish_flush)
369d3907f0dSYaroslav Brustinov        elif already_flushed:
370d3907f0dSYaroslav Brustinov            self._flushed = True
371d3907f0dSYaroslav Brustinov
372d3907f0dSYaroslav Brustinov        # update ioloop poll state, which may have changed
373d3907f0dSYaroslav Brustinov        self._rebuild_io_state()
374d3907f0dSYaroslav Brustinov        return count
375d3907f0dSYaroslav Brustinov
376d3907f0dSYaroslav Brustinov    def set_close_callback(self, callback):
377d3907f0dSYaroslav Brustinov        """Call the given callback when the stream is closed."""
378d3907f0dSYaroslav Brustinov        self._close_callback = stack_context.wrap(callback)
379d3907f0dSYaroslav Brustinov
380d3907f0dSYaroslav Brustinov    def close(self, linger=None):
381d3907f0dSYaroslav Brustinov        """Close this stream."""
382d3907f0dSYaroslav Brustinov        if self.socket is not None:
383d3907f0dSYaroslav Brustinov            self.io_loop.remove_handler(self.socket)
384d3907f0dSYaroslav Brustinov            self.socket.close(linger)
385d3907f0dSYaroslav Brustinov            self.socket = None
386d3907f0dSYaroslav Brustinov            if self._close_callback:
387d3907f0dSYaroslav Brustinov                self._run_callback(self._close_callback)
388d3907f0dSYaroslav Brustinov
389d3907f0dSYaroslav Brustinov    def receiving(self):
390d3907f0dSYaroslav Brustinov        """Returns True if we are currently receiving from the stream."""
391d3907f0dSYaroslav Brustinov        return self._recv_callback is not None
392d3907f0dSYaroslav Brustinov
393d3907f0dSYaroslav Brustinov    def sending(self):
394d3907f0dSYaroslav Brustinov        """Returns True if we are currently sending to the stream."""
395d3907f0dSYaroslav Brustinov        return not self._send_queue.empty()
396d3907f0dSYaroslav Brustinov
397d3907f0dSYaroslav Brustinov    def closed(self):
398d3907f0dSYaroslav Brustinov        return self.socket is None
399d3907f0dSYaroslav Brustinov
400d3907f0dSYaroslav Brustinov    def _run_callback(self, callback, *args, **kwargs):
401d3907f0dSYaroslav Brustinov        """Wrap running callbacks in try/except to allow us to
402d3907f0dSYaroslav Brustinov        close our socket."""
403d3907f0dSYaroslav Brustinov        try:
404d3907f0dSYaroslav Brustinov            # Use a NullContext to ensure that all StackContexts are run
405d3907f0dSYaroslav Brustinov            # inside our blanket exception handler rather than outside.
406d3907f0dSYaroslav Brustinov            with stack_context.NullContext():
407d3907f0dSYaroslav Brustinov                callback(*args, **kwargs)
408d3907f0dSYaroslav Brustinov        except:
409d3907f0dSYaroslav Brustinov            gen_log.error("Uncaught exception, closing connection.",
410d3907f0dSYaroslav Brustinov                          exc_info=True)
411d3907f0dSYaroslav Brustinov            # Close the socket on an uncaught exception from a user callback
412d3907f0dSYaroslav Brustinov            # (It would eventually get closed when the socket object is
413d3907f0dSYaroslav Brustinov            # gc'd, but we don't want to rely on gc happening before we
414d3907f0dSYaroslav Brustinov            # run out of file descriptors)
415d3907f0dSYaroslav Brustinov            self.close()
416d3907f0dSYaroslav Brustinov            # Re-raise the exception so that IOLoop.handle_callback_exception
417d3907f0dSYaroslav Brustinov            # can see it and log the error
418d3907f0dSYaroslav Brustinov            raise
419d3907f0dSYaroslav Brustinov
420d3907f0dSYaroslav Brustinov    def _handle_events(self, fd, events):
421d3907f0dSYaroslav Brustinov        """This method is the actual handler for IOLoop, that gets called whenever
422d3907f0dSYaroslav Brustinov        an event on my socket is posted. It dispatches to _handle_recv, etc."""
423d3907f0dSYaroslav Brustinov        # print "handling events"
424d3907f0dSYaroslav Brustinov        if not self.socket:
425d3907f0dSYaroslav Brustinov            gen_log.warning("Got events for closed stream %s", fd)
426d3907f0dSYaroslav Brustinov            return
427d3907f0dSYaroslav Brustinov        try:
428d3907f0dSYaroslav Brustinov            # dispatch events:
429d3907f0dSYaroslav Brustinov            if events & IOLoop.ERROR:
430d3907f0dSYaroslav Brustinov                gen_log.error("got POLLERR event on ZMQStream, which doesn't make sense")
431d3907f0dSYaroslav Brustinov                return
432d3907f0dSYaroslav Brustinov            if events & IOLoop.READ:
433d3907f0dSYaroslav Brustinov                self._handle_recv()
434d3907f0dSYaroslav Brustinov                if not self.socket:
435d3907f0dSYaroslav Brustinov                    return
436d3907f0dSYaroslav Brustinov            if events & IOLoop.WRITE:
437d3907f0dSYaroslav Brustinov                self._handle_send()
438d3907f0dSYaroslav Brustinov                if not self.socket:
439d3907f0dSYaroslav Brustinov                    return
440d3907f0dSYaroslav Brustinov
441d3907f0dSYaroslav Brustinov            # rebuild the poll state
442d3907f0dSYaroslav Brustinov            self._rebuild_io_state()
443d3907f0dSYaroslav Brustinov        except:
444d3907f0dSYaroslav Brustinov            gen_log.error("Uncaught exception, closing connection.",
445d3907f0dSYaroslav Brustinov                          exc_info=True)
446d3907f0dSYaroslav Brustinov            self.close()
447d3907f0dSYaroslav Brustinov            raise
448d3907f0dSYaroslav Brustinov
449d3907f0dSYaroslav Brustinov    def _handle_recv(self):
450d3907f0dSYaroslav Brustinov        """Handle a recv event."""
451d3907f0dSYaroslav Brustinov        if self._flushed:
452d3907f0dSYaroslav Brustinov            return
453d3907f0dSYaroslav Brustinov        try:
454d3907f0dSYaroslav Brustinov            msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
455d3907f0dSYaroslav Brustinov        except zmq.ZMQError as e:
456d3907f0dSYaroslav Brustinov            if e.errno == zmq.EAGAIN:
457d3907f0dSYaroslav Brustinov                # state changed since poll event
458d3907f0dSYaroslav Brustinov                pass
459d3907f0dSYaroslav Brustinov            else:
460d3907f0dSYaroslav Brustinov                gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
461d3907f0dSYaroslav Brustinov        else:
462d3907f0dSYaroslav Brustinov            if self._recv_callback:
463d3907f0dSYaroslav Brustinov                callback = self._recv_callback
464d3907f0dSYaroslav Brustinov                # self._recv_callback = None
465d3907f0dSYaroslav Brustinov                self._run_callback(callback, msg)
466d3907f0dSYaroslav Brustinov
467d3907f0dSYaroslav Brustinov        # self.update_state()
468d3907f0dSYaroslav Brustinov
469d3907f0dSYaroslav Brustinov
470d3907f0dSYaroslav Brustinov    def _handle_send(self):
471d3907f0dSYaroslav Brustinov        """Handle a send event."""
472d3907f0dSYaroslav Brustinov        if self._flushed:
473d3907f0dSYaroslav Brustinov            return
474d3907f0dSYaroslav Brustinov        if not self.sending():
475d3907f0dSYaroslav Brustinov            gen_log.error("Shouldn't have handled a send event")
476d3907f0dSYaroslav Brustinov            return
477d3907f0dSYaroslav Brustinov
478d3907f0dSYaroslav Brustinov        msg, kwargs = self._send_queue.get()
479d3907f0dSYaroslav Brustinov        try:
480d3907f0dSYaroslav Brustinov            status = self.socket.send_multipart(msg, **kwargs)
481d3907f0dSYaroslav Brustinov        except zmq.ZMQError as e:
482d3907f0dSYaroslav Brustinov            gen_log.error("SEND Error: %s", e)
483d3907f0dSYaroslav Brustinov            status = e
484d3907f0dSYaroslav Brustinov        if self._send_callback:
485d3907f0dSYaroslav Brustinov            callback = self._send_callback
486d3907f0dSYaroslav Brustinov            self._run_callback(callback, msg, status)
487d3907f0dSYaroslav Brustinov
488d3907f0dSYaroslav Brustinov        # self.update_state()
489d3907f0dSYaroslav Brustinov
490d3907f0dSYaroslav Brustinov    def _check_closed(self):
491d3907f0dSYaroslav Brustinov        if not self.socket:
492d3907f0dSYaroslav Brustinov            raise IOError("Stream is closed")
493d3907f0dSYaroslav Brustinov
494d3907f0dSYaroslav Brustinov    def _rebuild_io_state(self):
495d3907f0dSYaroslav Brustinov        """rebuild io state based on self.sending() and receiving()"""
496d3907f0dSYaroslav Brustinov        if self.socket is None:
497d3907f0dSYaroslav Brustinov            return
498d3907f0dSYaroslav Brustinov        state = self.io_loop.ERROR
499d3907f0dSYaroslav Brustinov        if self.receiving():
500d3907f0dSYaroslav Brustinov            state |= self.io_loop.READ
501d3907f0dSYaroslav Brustinov        if self.sending():
502d3907f0dSYaroslav Brustinov            state |= self.io_loop.WRITE
503d3907f0dSYaroslav Brustinov        if state != self._state:
504d3907f0dSYaroslav Brustinov            self._state = state
505d3907f0dSYaroslav Brustinov            self._update_handler(state)
506d3907f0dSYaroslav Brustinov
507d3907f0dSYaroslav Brustinov    def _add_io_state(self, state):
508d3907f0dSYaroslav Brustinov        """Add io_state to poller."""
509d3907f0dSYaroslav Brustinov        if not self._state & state:
510d3907f0dSYaroslav Brustinov            self._state = self._state | state
511d3907f0dSYaroslav Brustinov            self._update_handler(self._state)
512d3907f0dSYaroslav Brustinov
513d3907f0dSYaroslav Brustinov    def _drop_io_state(self, state):
514d3907f0dSYaroslav Brustinov        """Stop poller from watching an io_state."""
515d3907f0dSYaroslav Brustinov        if self._state & state:
516d3907f0dSYaroslav Brustinov            self._state = self._state & (~state)
517d3907f0dSYaroslav Brustinov            self._update_handler(self._state)
518d3907f0dSYaroslav Brustinov
519d3907f0dSYaroslav Brustinov    def _update_handler(self, state):
520d3907f0dSYaroslav Brustinov        """Update IOLoop handler with state."""
521d3907f0dSYaroslav Brustinov        if self.socket is None:
522d3907f0dSYaroslav Brustinov            return
523d3907f0dSYaroslav Brustinov        self.io_loop.update_handler(self.socket, state)
524d3907f0dSYaroslav Brustinov
525d3907f0dSYaroslav Brustinov    def _init_io_state(self):
526d3907f0dSYaroslav Brustinov        """initialize the ioloop event handler"""
527d3907f0dSYaroslav Brustinov        with stack_context.NullContext():
528d3907f0dSYaroslav Brustinov            self.io_loop.add_handler(self.socket, self._handle_events, self._state)
529d3907f0dSYaroslav Brustinov
530