poll.py revision 781d71db
1"""0MQ polling related functions and classes."""
2
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
5
6
7import zmq
8from zmq.backend import zmq_poll
9from .constants import POLLIN, POLLOUT, POLLERR
10
11#-----------------------------------------------------------------------------
12# Polling related methods
13#-----------------------------------------------------------------------------
14
15
16class Poller(object):
17    """A stateful poll interface that mirrors Python's built-in poll."""
18    sockets = None
19    _map = {}
20
21    def __init__(self):
22        self.sockets = []
23        self._map = {}
24
25    def __contains__(self, socket):
26        return socket in self._map
27
28    def register(self, socket, flags=POLLIN|POLLOUT):
29        """p.register(socket, flags=POLLIN|POLLOUT)
30
31        Register a 0MQ socket or native fd for I/O monitoring.
32
33        register(s,0) is equivalent to unregister(s).
34
35        Parameters
36        ----------
37        socket : zmq.Socket or native socket
38            A zmq.Socket or any Python object having a ``fileno()``
39            method that returns a valid file descriptor.
40        flags : int
41            The events to watch for.  Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
42            If `flags=0`, socket will be unregistered.
43        """
44        if flags:
45            if socket in self._map:
46                idx = self._map[socket]
47                self.sockets[idx] = (socket, flags)
48            else:
49                idx = len(self.sockets)
50                self.sockets.append((socket, flags))
51                self._map[socket] = idx
52        elif socket in self._map:
53            # uregister sockets registered with no events
54            self.unregister(socket)
55        else:
56            # ignore new sockets with no events
57            pass
58
59    def modify(self, socket, flags=POLLIN|POLLOUT):
60        """Modify the flags for an already registered 0MQ socket or native fd."""
61        self.register(socket, flags)
62
63    def unregister(self, socket):
64        """Remove a 0MQ socket or native fd for I/O monitoring.
65
66        Parameters
67        ----------
68        socket : Socket
69            The socket instance to stop polling.
70        """
71        idx = self._map.pop(socket)
72        self.sockets.pop(idx)
73        # shift indices after deletion
74        for socket, flags in self.sockets[idx:]:
75            self._map[socket] -= 1
76
77    def poll(self, timeout=None):
78        """Poll the registered 0MQ or native fds for I/O.
79
80        Parameters
81        ----------
82        timeout : float, int
83            The timeout in milliseconds. If None, no `timeout` (infinite). This
84            is in milliseconds to be compatible with ``select.poll()``. The
85            underlying zmq_poll uses microseconds and we convert to that in
86            this function.
87
88        Returns
89        -------
90        events : list of tuples
91            The list of events that are ready to be processed.
92            This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket
93            or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second.
94            It is common to call ``events = dict(poller.poll())``,
95            which turns the list of tuples into a mapping of ``socket : event``.
96        """
97        if timeout is None or timeout < 0:
98            timeout = -1
99        elif isinstance(timeout, float):
100            timeout = int(timeout)
101        return zmq_poll(self.sockets, timeout=timeout)
102
103
104def select(rlist, wlist, xlist, timeout=None):
105    """select(rlist, wlist, xlist, timeout=None) -> (rlist, wlist, xlist)
106
107    Return the result of poll as a lists of sockets ready for r/w/exception.
108
109    This has the same interface as Python's built-in ``select.select()`` function.
110
111    Parameters
112    ----------
113    timeout : float, int, optional
114        The timeout in seconds. If None, no timeout (infinite). This is in seconds to be
115        compatible with ``select.select()``. The underlying zmq_poll uses microseconds
116        and we convert to that in this function.
117    rlist : list of sockets/FDs
118        sockets/FDs to be polled for read events
119    wlist : list of sockets/FDs
120        sockets/FDs to be polled for write events
121    xlist : list of sockets/FDs
122        sockets/FDs to be polled for error events
123
124    Returns
125    -------
126    (rlist, wlist, xlist) : tuple of lists of sockets (length 3)
127        Lists correspond to sockets available for read/write/error events respectively.
128    """
129    if timeout is None:
130        timeout = -1
131    # Convert from sec -> us for zmq_poll.
132    # zmq_poll accepts 3.x style timeout in ms
133    timeout = int(timeout*1000.0)
134    if timeout < 0:
135        timeout = -1
136    sockets = []
137    for s in set(rlist + wlist + xlist):
138        flags = 0
139        if s in rlist:
140            flags |= POLLIN
141        if s in wlist:
142            flags |= POLLOUT
143        if s in xlist:
144            flags |= POLLERR
145        sockets.append((s, flags))
146    return_sockets = zmq_poll(sockets, timeout)
147    rlist, wlist, xlist = [], [], []
148    for s, flags in return_sockets:
149        if flags & POLLIN:
150            rlist.append(s)
151        if flags & POLLOUT:
152            wlist.append(s)
153        if flags & POLLERR:
154            xlist.append(s)
155    return rlist, wlist, xlist
156
157#-----------------------------------------------------------------------------
158# Symbols to export
159#-----------------------------------------------------------------------------
160
161__all__ = [ 'Poller', 'select' ]
162