1b2732e9dSimarom#
2b2732e9dSimarom# Copyright 2009 Facebook
3b2732e9dSimarom#
4b2732e9dSimarom# Licensed under the Apache License, Version 2.0 (the "License"); you may
5b2732e9dSimarom# not use this file except in compliance with the License. You may obtain
6b2732e9dSimarom# a copy of the License at
7b2732e9dSimarom#
8b2732e9dSimarom#     http://www.apache.org/licenses/LICENSE-2.0
9b2732e9dSimarom#
10b2732e9dSimarom# Unless required by applicable law or agreed to in writing, software
11b2732e9dSimarom# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12b2732e9dSimarom# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13b2732e9dSimarom# License for the specific language governing permissions and limitations
14b2732e9dSimarom# under the License.
15b2732e9dSimarom
16b2732e9dSimarom"""A utility class to send to and recv from a non-blocking socket."""
17b2732e9dSimarom
18b2732e9dSimaromfrom __future__ import with_statement
19b2732e9dSimarom
20b2732e9dSimaromimport sys
21b2732e9dSimarom
22b2732e9dSimaromimport zmq
23b2732e9dSimaromfrom zmq.utils import jsonapi
24b2732e9dSimarom
25b2732e9dSimaromtry:
26b2732e9dSimarom    import cPickle as pickle
27b2732e9dSimaromexcept ImportError:
28b2732e9dSimarom    import pickle
29b2732e9dSimarom
30b2732e9dSimaromfrom .ioloop import IOLoop
31b2732e9dSimarom
32b2732e9dSimaromtry:
33b2732e9dSimarom    # gen_log will only import from >= 3.0
34b2732e9dSimarom    from tornado.log import gen_log
35b2732e9dSimarom    from tornado import stack_context
36b2732e9dSimaromexcept ImportError:
37b2732e9dSimarom    from .minitornado.log import gen_log
38b2732e9dSimarom    from .minitornado import stack_context
39b2732e9dSimarom
40b2732e9dSimaromtry:
41b2732e9dSimarom    from queue import Queue
42b2732e9dSimaromexcept ImportError:
43b2732e9dSimarom    from Queue import Queue
44b2732e9dSimarom
45b2732e9dSimaromfrom zmq.utils.strtypes import bytes, unicode, basestring
46b2732e9dSimarom
47b2732e9dSimaromtry:
48b2732e9dSimarom    callable
49b2732e9dSimaromexcept NameError:
50b2732e9dSimarom    callable = lambda obj: hasattr(obj, '__call__')
51b2732e9dSimarom
52b2732e9dSimarom
53b2732e9dSimaromclass ZMQStream(object):
54b2732e9dSimarom    """A utility class to register callbacks when a zmq socket sends and receives
55b2732e9dSimarom
56b2732e9dSimarom    For use with zmq.eventloop.ioloop
57b2732e9dSimarom
58b2732e9dSimarom    There are three main methods
59b2732e9dSimarom
60b2732e9dSimarom    Methods:
61b2732e9dSimarom
62b2732e9dSimarom    * **on_recv(callback, copy=True):**
63b2732e9dSimarom        register a callback to be run every time the socket has something to receive
64b2732e9dSimarom    * **on_send(callback):**
65b2732e9dSimarom        register a callback to be run every time you call send
66b2732e9dSimarom    * **send(self, msg, flags=0, copy=False, callback=None):**
67b2732e9dSimarom        perform a send that will trigger the callback
68b2732e9dSimarom        if callback is passed, on_send is also called.
69b2732e9dSimarom
70b2732e9dSimarom        There are also send_multipart(), send_json(), send_pyobj()
71b2732e9dSimarom
72b2732e9dSimarom    Three other methods for deactivating the callbacks:
73b2732e9dSimarom
74b2732e9dSimarom    * **stop_on_recv():**
75b2732e9dSimarom        turn off the recv callback
76b2732e9dSimarom    * **stop_on_send():**
77b2732e9dSimarom        turn off the send callback
78b2732e9dSimarom
79b2732e9dSimarom    which simply call ``on_<evt>(None)``.
80b2732e9dSimarom
81b2732e9dSimarom    The entire socket interface, excluding direct recv methods, is also
82b2732e9dSimarom    provided, primarily through direct-linking the methods.
83b2732e9dSimarom    e.g.
84b2732e9dSimarom
85b2732e9dSimarom    >>> stream.bind is stream.socket.bind
86b2732e9dSimarom    True
87b2732e9dSimarom
88b2732e9dSimarom    """
89b2732e9dSimarom
90b2732e9dSimarom    socket = None
91b2732e9dSimarom    io_loop = None
92b2732e9dSimarom    poller = None
93b2732e9dSimarom
94b2732e9dSimarom    def __init__(self, socket, io_loop=None):
95b2732e9dSimarom        self.socket = socket
96b2732e9dSimarom        self.io_loop = io_loop or IOLoop.instance()
97b2732e9dSimarom        self.poller = zmq.Poller()
98b2732e9dSimarom
99b2732e9dSimarom        self._send_queue = Queue()
100b2732e9dSimarom        self._recv_callback = None
101b2732e9dSimarom        self._send_callback = None
102b2732e9dSimarom        self._close_callback = None
103b2732e9dSimarom        self._recv_copy = False
104b2732e9dSimarom        self._flushed = False
105b2732e9dSimarom
106b2732e9dSimarom        self._state = self.io_loop.ERROR
107b2732e9dSimarom        self._init_io_state()
108b2732e9dSimarom
109b2732e9dSimarom        # shortcircuit some socket methods
110b2732e9dSimarom        self.bind = self.socket.bind
111b2732e9dSimarom        self.bind_to_random_port = self.socket.bind_to_random_port
112b2732e9dSimarom        self.connect = self.socket.connect
113b2732e9dSimarom        self.setsockopt = self.socket.setsockopt
114b2732e9dSimarom        self.getsockopt = self.socket.getsockopt
115b2732e9dSimarom        self.setsockopt_string = self.socket.setsockopt_string
116b2732e9dSimarom        self.getsockopt_string = self.socket.getsockopt_string
117b2732e9dSimarom        self.setsockopt_unicode = self.socket.setsockopt_unicode
118b2732e9dSimarom        self.getsockopt_unicode = self.socket.getsockopt_unicode
119b2732e9dSimarom
120b2732e9dSimarom
121b2732e9dSimarom    def stop_on_recv(self):
122b2732e9dSimarom        """Disable callback and automatic receiving."""
123b2732e9dSimarom        return self.on_recv(None)
124b2732e9dSimarom
125b2732e9dSimarom    def stop_on_send(self):
126b2732e9dSimarom        """Disable callback on sending."""
127b2732e9dSimarom        return self.on_send(None)
128b2732e9dSimarom
129b2732e9dSimarom    def stop_on_err(self):
130b2732e9dSimarom        """DEPRECATED, does nothing"""
131b2732e9dSimarom        gen_log.warn("on_err does nothing, and will be removed")
132b2732e9dSimarom
133b2732e9dSimarom    def on_err(self, callback):
134b2732e9dSimarom        """DEPRECATED, does nothing"""
135b2732e9dSimarom        gen_log.warn("on_err does nothing, and will be removed")
136b2732e9dSimarom
137b2732e9dSimarom    def on_recv(self, callback, copy=True):
138b2732e9dSimarom        """Register a callback for when a message is ready to recv.
139b2732e9dSimarom
140b2732e9dSimarom        There can be only one callback registered at a time, so each
141b2732e9dSimarom        call to `on_recv` replaces previously registered callbacks.
142b2732e9dSimarom
143b2732e9dSimarom        on_recv(None) disables recv event polling.
144b2732e9dSimarom
145b2732e9dSimarom        Use on_recv_stream(callback) instead, to register a callback that will receive
146b2732e9dSimarom        both this ZMQStream and the message, instead of just the message.
147b2732e9dSimarom
148b2732e9dSimarom        Parameters
149b2732e9dSimarom        ----------
150b2732e9dSimarom
151b2732e9dSimarom        callback : callable
152b2732e9dSimarom            callback must take exactly one argument, which will be a
153b2732e9dSimarom            list, as returned by socket.recv_multipart()
154b2732e9dSimarom            if callback is None, recv callbacks are disabled.
155b2732e9dSimarom        copy : bool
156b2732e9dSimarom            copy is passed directly to recv, so if copy is False,
157b2732e9dSimarom            callback will receive Message objects. If copy is True,
158b2732e9dSimarom            then callback will receive bytes/str objects.
159b2732e9dSimarom
160b2732e9dSimarom        Returns : None
161b2732e9dSimarom        """
162b2732e9dSimarom
163b2732e9dSimarom        self._check_closed()
164b2732e9dSimarom        assert callback is None or callable(callback)
165b2732e9dSimarom        self._recv_callback = stack_context.wrap(callback)
166b2732e9dSimarom        self._recv_copy = copy
167b2732e9dSimarom        if callback is None:
168b2732e9dSimarom            self._drop_io_state(self.io_loop.READ)
169b2732e9dSimarom        else:
170b2732e9dSimarom            self._add_io_state(self.io_loop.READ)
171b2732e9dSimarom
172b2732e9dSimarom    def on_recv_stream(self, callback, copy=True):
173b2732e9dSimarom        """Same as on_recv, but callback will get this stream as first argument
174b2732e9dSimarom
175b2732e9dSimarom        callback must take exactly two arguments, as it will be called as::
176b2732e9dSimarom
177b2732e9dSimarom            callback(stream, msg)
178b2732e9dSimarom
179b2732e9dSimarom        Useful when a single callback should be used with multiple streams.
180b2732e9dSimarom        """
181b2732e9dSimarom        if callback is None:
182b2732e9dSimarom            self.stop_on_recv()
183b2732e9dSimarom        else:
184b2732e9dSimarom            self.on_recv(lambda msg: callback(self, msg), copy=copy)
185b2732e9dSimarom
186b2732e9dSimarom    def on_send(self, callback):
187b2732e9dSimarom        """Register a callback to be called on each send
188b2732e9dSimarom
189b2732e9dSimarom        There will be two arguments::
190b2732e9dSimarom
191b2732e9dSimarom            callback(msg, status)
192b2732e9dSimarom
193b2732e9dSimarom        * `msg` will be the list of sendable objects that was just sent
194b2732e9dSimarom        * `status` will be the return result of socket.send_multipart(msg) -
195b2732e9dSimarom          MessageTracker or None.
196b2732e9dSimarom
197b2732e9dSimarom        Non-copying sends return a MessageTracker object whose
198b2732e9dSimarom        `done` attribute will be True when the send is complete.
199b2732e9dSimarom        This allows users to track when an object is safe to write to
200b2732e9dSimarom        again.
201b2732e9dSimarom
202b2732e9dSimarom        The second argument will always be None if copy=True
203b2732e9dSimarom        on the send.
204b2732e9dSimarom
205b2732e9dSimarom        Use on_send_stream(callback) to register a callback that will be passed
206b2732e9dSimarom        this ZMQStream as the first argument, in addition to the other two.
207b2732e9dSimarom
208b2732e9dSimarom        on_send(None) disables recv event polling.
209b2732e9dSimarom
210b2732e9dSimarom        Parameters
211b2732e9dSimarom        ----------
212b2732e9dSimarom
213b2732e9dSimarom        callback : callable
214b2732e9dSimarom            callback must take exactly two arguments, which will be
215b2732e9dSimarom            the message being sent (always a list),
216b2732e9dSimarom            and the return result of socket.send_multipart(msg) -
217b2732e9dSimarom            MessageTracker or None.
218b2732e9dSimarom
219b2732e9dSimarom            if callback is None, send callbacks are disabled.
220b2732e9dSimarom        """
221b2732e9dSimarom
222b2732e9dSimarom        self._check_closed()
223b2732e9dSimarom        assert callback is None or callable(callback)
224b2732e9dSimarom        self._send_callback = stack_context.wrap(callback)
225b2732e9dSimarom
226b2732e9dSimarom
227b2732e9dSimarom    def on_send_stream(self, callback):
228b2732e9dSimarom        """Same as on_send, but callback will get this stream as first argument
229b2732e9dSimarom
230b2732e9dSimarom        Callback will be passed three arguments::
231b2732e9dSimarom
232b2732e9dSimarom            callback(stream, msg, status)
233b2732e9dSimarom
234b2732e9dSimarom        Useful when a single callback should be used with multiple streams.
235b2732e9dSimarom        """
236b2732e9dSimarom        if callback is None:
237b2732e9dSimarom            self.stop_on_send()
238b2732e9dSimarom        else:
239b2732e9dSimarom            self.on_send(lambda msg, status: callback(self, msg, status))
240b2732e9dSimarom
241b2732e9dSimarom
242b2732e9dSimarom    def send(self, msg, flags=0, copy=True, track=False, callback=None):
243b2732e9dSimarom        """Send a message, optionally also register a new callback for sends.
244b2732e9dSimarom        See zmq.socket.send for details.
245b2732e9dSimarom        """
246b2732e9dSimarom        return self.send_multipart([msg], flags=flags, copy=copy, track=track, callback=callback)
247b2732e9dSimarom
248b2732e9dSimarom    def send_multipart(self, msg, flags=0, copy=True, track=False, callback=None):
249b2732e9dSimarom        """Send a multipart message, optionally also register a new callback for sends.
250b2732e9dSimarom        See zmq.socket.send_multipart for details.
251b2732e9dSimarom        """
252b2732e9dSimarom        kwargs = dict(flags=flags, copy=copy, track=track)
253b2732e9dSimarom        self._send_queue.put((msg, kwargs))
254b2732e9dSimarom        callback = callback or self._send_callback
255b2732e9dSimarom        if callback is not None:
256b2732e9dSimarom            self.on_send(callback)
257b2732e9dSimarom        else:
258b2732e9dSimarom            # noop callback
259b2732e9dSimarom            self.on_send(lambda *args: None)
260b2732e9dSimarom        self._add_io_state(self.io_loop.WRITE)
261b2732e9dSimarom
262b2732e9dSimarom    def send_string(self, u, flags=0, encoding='utf-8', callback=None):
263b2732e9dSimarom        """Send a unicode message with an encoding.
264b2732e9dSimarom        See zmq.socket.send_unicode for details.
265b2732e9dSimarom        """
266b2732e9dSimarom        if not isinstance(u, basestring):
267b2732e9dSimarom            raise TypeError("unicode/str objects only")
268b2732e9dSimarom        return self.send(u.encode(encoding), flags=flags, callback=callback)
269b2732e9dSimarom
270b2732e9dSimarom    send_unicode = send_string
271b2732e9dSimarom
272b2732e9dSimarom    def send_json(self, obj, flags=0, callback=None):
273b2732e9dSimarom        """Send json-serialized version of an object.
274b2732e9dSimarom        See zmq.socket.send_json for details.
275b2732e9dSimarom        """
276b2732e9dSimarom        if jsonapi is None:
277b2732e9dSimarom            raise ImportError('jsonlib{1,2}, json or simplejson library is required.')
278b2732e9dSimarom        else:
279b2732e9dSimarom            msg = jsonapi.dumps(obj)
280b2732e9dSimarom            return self.send(msg, flags=flags, callback=callback)
281b2732e9dSimarom
282b2732e9dSimarom    def send_pyobj(self, obj, flags=0, protocol=-1, callback=None):
283b2732e9dSimarom        """Send a Python object as a message using pickle to serialize.
284b2732e9dSimarom
285b2732e9dSimarom        See zmq.socket.send_json for details.
286b2732e9dSimarom        """
287b2732e9dSimarom        msg = pickle.dumps(obj, protocol)
288b2732e9dSimarom        return self.send(msg, flags, callback=callback)
289b2732e9dSimarom
290b2732e9dSimarom    def _finish_flush(self):
291b2732e9dSimarom        """callback for unsetting _flushed flag."""
292b2732e9dSimarom        self._flushed = False
293b2732e9dSimarom
294b2732e9dSimarom    def flush(self, flag=zmq.POLLIN|zmq.POLLOUT, limit=None):
295b2732e9dSimarom        """Flush pending messages.
296b2732e9dSimarom
297b2732e9dSimarom        This method safely handles all pending incoming and/or outgoing messages,
298b2732e9dSimarom        bypassing the inner loop, passing them to the registered callbacks.
299b2732e9dSimarom
300b2732e9dSimarom        A limit can be specified, to prevent blocking under high load.
301b2732e9dSimarom
302b2732e9dSimarom        flush will return the first time ANY of these conditions are met:
303b2732e9dSimarom            * No more events matching the flag are pending.
304b2732e9dSimarom            * the total number of events handled reaches the limit.
305b2732e9dSimarom
306b2732e9dSimarom        Note that if ``flag|POLLIN != 0``, recv events will be flushed even if no callback
307b2732e9dSimarom        is registered, unlike normal IOLoop operation. This allows flush to be
308b2732e9dSimarom        used to remove *and ignore* incoming messages.
309b2732e9dSimarom
310b2732e9dSimarom        Parameters
311b2732e9dSimarom        ----------
312b2732e9dSimarom        flag : int, default=POLLIN|POLLOUT
313b2732e9dSimarom                0MQ poll flags.
314b2732e9dSimarom                If flag|POLLIN,  recv events will be flushed.
315b2732e9dSimarom                If flag|POLLOUT, send events will be flushed.
316b2732e9dSimarom                Both flags can be set at once, which is the default.
317b2732e9dSimarom        limit : None or int, optional
318b2732e9dSimarom                The maximum number of messages to send or receive.
319b2732e9dSimarom                Both send and recv count against this limit.
320b2732e9dSimarom
321b2732e9dSimarom        Returns
322b2732e9dSimarom        -------
323b2732e9dSimarom        int : count of events handled (both send and recv)
324b2732e9dSimarom        """
325b2732e9dSimarom        self._check_closed()
326b2732e9dSimarom        # unset self._flushed, so callbacks will execute, in case flush has
327b2732e9dSimarom        # already been called this iteration
328b2732e9dSimarom        already_flushed = self._flushed
329b2732e9dSimarom        self._flushed = False
330b2732e9dSimarom        # initialize counters
331b2732e9dSimarom        count = 0
332b2732e9dSimarom        def update_flag():
333b2732e9dSimarom            """Update the poll flag, to prevent registering POLLOUT events
334b2732e9dSimarom            if we don't have pending sends."""
335b2732e9dSimarom            return flag & zmq.POLLIN | (self.sending() and flag & zmq.POLLOUT)
336b2732e9dSimarom        flag = update_flag()
337b2732e9dSimarom        if not flag:
338b2732e9dSimarom            # nothing to do
339b2732e9dSimarom            return 0
340b2732e9dSimarom        self.poller.register(self.socket, flag)
341b2732e9dSimarom        events = self.poller.poll(0)
342b2732e9dSimarom        while events and (not limit or count < limit):
343b2732e9dSimarom            s,event = events[0]
344b2732e9dSimarom            if event & zmq.POLLIN: # receiving
345b2732e9dSimarom                self._handle_recv()
346b2732e9dSimarom                count += 1
347b2732e9dSimarom                if self.socket is None:
348b2732e9dSimarom                    # break if socket was closed during callback
349b2732e9dSimarom                    break
350b2732e9dSimarom            if event & zmq.POLLOUT and self.sending():
351b2732e9dSimarom                self._handle_send()
352b2732e9dSimarom                count += 1
353b2732e9dSimarom                if self.socket is None:
354b2732e9dSimarom                    # break if socket was closed during callback
355b2732e9dSimarom                    break
356b2732e9dSimarom
357b2732e9dSimarom            flag = update_flag()
358b2732e9dSimarom            if flag:
359b2732e9dSimarom                self.poller.register(self.socket, flag)
360b2732e9dSimarom                events = self.poller.poll(0)
361b2732e9dSimarom            else:
362b2732e9dSimarom                events = []
363b2732e9dSimarom        if count: # only bypass loop if we actually flushed something
364b2732e9dSimarom            # skip send/recv callbacks this iteration
365b2732e9dSimarom            self._flushed = True
366b2732e9dSimarom            # reregister them at the end of the loop
367b2732e9dSimarom            if not already_flushed: # don't need to do it again
368b2732e9dSimarom                self.io_loop.add_callback(self._finish_flush)
369b2732e9dSimarom        elif already_flushed:
370b2732e9dSimarom            self._flushed = True
371b2732e9dSimarom
372b2732e9dSimarom        # update ioloop poll state, which may have changed
373b2732e9dSimarom        self._rebuild_io_state()
374b2732e9dSimarom        return count
375b2732e9dSimarom
376b2732e9dSimarom    def set_close_callback(self, callback):
377b2732e9dSimarom        """Call the given callback when the stream is closed."""
378b2732e9dSimarom        self._close_callback = stack_context.wrap(callback)
379b2732e9dSimarom
380b2732e9dSimarom    def close(self, linger=None):
381b2732e9dSimarom        """Close this stream."""
382b2732e9dSimarom        if self.socket is not None:
383b2732e9dSimarom            self.io_loop.remove_handler(self.socket)
384b2732e9dSimarom            self.socket.close(linger)
385b2732e9dSimarom            self.socket = None
386b2732e9dSimarom            if self._close_callback:
387b2732e9dSimarom                self._run_callback(self._close_callback)
388b2732e9dSimarom
389b2732e9dSimarom    def receiving(self):
390b2732e9dSimarom        """Returns True if we are currently receiving from the stream."""
391b2732e9dSimarom        return self._recv_callback is not None
392b2732e9dSimarom
393b2732e9dSimarom    def sending(self):
394b2732e9dSimarom        """Returns True if we are currently sending to the stream."""
395b2732e9dSimarom        return not self._send_queue.empty()
396b2732e9dSimarom
397b2732e9dSimarom    def closed(self):
398b2732e9dSimarom        return self.socket is None
399b2732e9dSimarom
400b2732e9dSimarom    def _run_callback(self, callback, *args, **kwargs):
401b2732e9dSimarom        """Wrap running callbacks in try/except to allow us to
402b2732e9dSimarom        close our socket."""
403b2732e9dSimarom        try:
404b2732e9dSimarom            # Use a NullContext to ensure that all StackContexts are run
405b2732e9dSimarom            # inside our blanket exception handler rather than outside.
406b2732e9dSimarom            with stack_context.NullContext():
407b2732e9dSimarom                callback(*args, **kwargs)
408b2732e9dSimarom        except:
409b2732e9dSimarom            gen_log.error("Uncaught exception, closing connection.",
410b2732e9dSimarom                          exc_info=True)
411b2732e9dSimarom            # Close the socket on an uncaught exception from a user callback
412b2732e9dSimarom            # (It would eventually get closed when the socket object is
413b2732e9dSimarom            # gc'd, but we don't want to rely on gc happening before we
414b2732e9dSimarom            # run out of file descriptors)
415b2732e9dSimarom            self.close()
416b2732e9dSimarom            # Re-raise the exception so that IOLoop.handle_callback_exception
417b2732e9dSimarom            # can see it and log the error
418b2732e9dSimarom            raise
419b2732e9dSimarom
420b2732e9dSimarom    def _handle_events(self, fd, events):
421b2732e9dSimarom        """This method is the actual handler for IOLoop, that gets called whenever
422b2732e9dSimarom        an event on my socket is posted. It dispatches to _handle_recv, etc."""
423b2732e9dSimarom        # print "handling events"
424b2732e9dSimarom        if not self.socket:
425b2732e9dSimarom            gen_log.warning("Got events for closed stream %s", fd)
426b2732e9dSimarom            return
427b2732e9dSimarom        try:
428b2732e9dSimarom            # dispatch events:
429b2732e9dSimarom            if events & IOLoop.ERROR:
430b2732e9dSimarom                gen_log.error("got POLLERR event on ZMQStream, which doesn't make sense")
431b2732e9dSimarom                return
432b2732e9dSimarom            if events & IOLoop.READ:
433b2732e9dSimarom                self._handle_recv()
434b2732e9dSimarom                if not self.socket:
435b2732e9dSimarom                    return
436b2732e9dSimarom            if events & IOLoop.WRITE:
437b2732e9dSimarom                self._handle_send()
438b2732e9dSimarom                if not self.socket:
439b2732e9dSimarom                    return
440b2732e9dSimarom
441b2732e9dSimarom            # rebuild the poll state
442b2732e9dSimarom            self._rebuild_io_state()
443b2732e9dSimarom        except:
444b2732e9dSimarom            gen_log.error("Uncaught exception, closing connection.",
445b2732e9dSimarom                          exc_info=True)
446b2732e9dSimarom            self.close()
447b2732e9dSimarom            raise
448b2732e9dSimarom
449b2732e9dSimarom    def _handle_recv(self):
450b2732e9dSimarom        """Handle a recv event."""
451b2732e9dSimarom        if self._flushed:
452b2732e9dSimarom            return
453b2732e9dSimarom        try:
454b2732e9dSimarom            msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy)
455b2732e9dSimarom        except zmq.ZMQError as e:
456b2732e9dSimarom            if e.errno == zmq.EAGAIN:
457b2732e9dSimarom                # state changed since poll event
458b2732e9dSimarom                pass
459b2732e9dSimarom            else:
460b2732e9dSimarom                gen_log.error("RECV Error: %s"%zmq.strerror(e.errno))
461b2732e9dSimarom        else:
462b2732e9dSimarom            if self._recv_callback:
463b2732e9dSimarom                callback = self._recv_callback
464b2732e9dSimarom                # self._recv_callback = None
465b2732e9dSimarom                self._run_callback(callback, msg)
466b2732e9dSimarom
467b2732e9dSimarom        # self.update_state()
468b2732e9dSimarom
469b2732e9dSimarom
470b2732e9dSimarom    def _handle_send(self):
471b2732e9dSimarom        """Handle a send event."""
472b2732e9dSimarom        if self._flushed:
473b2732e9dSimarom            return
474b2732e9dSimarom        if not self.sending():
475b2732e9dSimarom            gen_log.error("Shouldn't have handled a send event")
476b2732e9dSimarom            return
477b2732e9dSimarom
478b2732e9dSimarom        msg, kwargs = self._send_queue.get()
479b2732e9dSimarom        try:
480b2732e9dSimarom            status = self.socket.send_multipart(msg, **kwargs)
481b2732e9dSimarom        except zmq.ZMQError as e:
482b2732e9dSimarom            gen_log.error("SEND Error: %s", e)
483b2732e9dSimarom            status = e
484b2732e9dSimarom        if self._send_callback:
485b2732e9dSimarom            callback = self._send_callback
486b2732e9dSimarom            self._run_callback(callback, msg, status)
487b2732e9dSimarom
488b2732e9dSimarom        # self.update_state()
489b2732e9dSimarom
490b2732e9dSimarom    def _check_closed(self):
491b2732e9dSimarom        if not self.socket:
492b2732e9dSimarom            raise IOError("Stream is closed")
493b2732e9dSimarom
494b2732e9dSimarom    def _rebuild_io_state(self):
495b2732e9dSimarom        """rebuild io state based on self.sending() and receiving()"""
496b2732e9dSimarom        if self.socket is None:
497b2732e9dSimarom            return
498b2732e9dSimarom        state = self.io_loop.ERROR
499b2732e9dSimarom        if self.receiving():
500b2732e9dSimarom            state |= self.io_loop.READ
501b2732e9dSimarom        if self.sending():
502b2732e9dSimarom            state |= self.io_loop.WRITE
503b2732e9dSimarom        if state != self._state:
504b2732e9dSimarom            self._state = state
505b2732e9dSimarom            self._update_handler(state)
506b2732e9dSimarom
507b2732e9dSimarom    def _add_io_state(self, state):
508b2732e9dSimarom        """Add io_state to poller."""
509b2732e9dSimarom        if not self._state & state:
510b2732e9dSimarom            self._state = self._state | state
511b2732e9dSimarom            self._update_handler(self._state)
512b2732e9dSimarom
513b2732e9dSimarom    def _drop_io_state(self, state):
514b2732e9dSimarom        """Stop poller from watching an io_state."""
515b2732e9dSimarom        if self._state & state:
516b2732e9dSimarom            self._state = self._state & (~state)
517b2732e9dSimarom            self._update_handler(self._state)
518b2732e9dSimarom
519b2732e9dSimarom    def _update_handler(self, state):
520b2732e9dSimarom        """Update IOLoop handler with state."""
521b2732e9dSimarom        if self.socket is None:
522b2732e9dSimarom            return
523b2732e9dSimarom        self.io_loop.update_handler(self.socket, state)
524b2732e9dSimarom
525b2732e9dSimarom    def _init_io_state(self):
526b2732e9dSimarom        """initialize the ioloop event handler"""
527b2732e9dSimarom        with stack_context.NullContext():
528b2732e9dSimarom            self.io_loop.add_handler(self.socket, self._handle_events, self._state)
529b2732e9dSimarom
530