poll.py revision 781d71db
1import zmq
2import gevent
3from gevent import select
4
5from zmq import Poller as _original_Poller
6
7
8class _Poller(_original_Poller):
9    """Replacement for :class:`zmq.Poller`
10
11    Ensures that the greened Poller below is used in calls to
12    :meth:`zmq.Poller.poll`.
13    """
14    _gevent_bug_timeout = 1.33 # minimum poll interval, for working around gevent bug
15
16    def _get_descriptors(self):
17        """Returns three elements tuple with socket descriptors ready
18        for gevent.select.select
19        """
20        rlist = []
21        wlist = []
22        xlist = []
23
24        for socket, flags in self.sockets:
25            if isinstance(socket, zmq.Socket):
26                rlist.append(socket.getsockopt(zmq.FD))
27                continue
28            elif isinstance(socket, int):
29                fd = socket
30            elif hasattr(socket, 'fileno'):
31                try:
32                    fd = int(socket.fileno())
33                except:
34                    raise ValueError('fileno() must return an valid integer fd')
35            else:
36                raise TypeError('Socket must be a 0MQ socket, an integer fd '
37                                'or have a fileno() method: %r' % socket)
38
39            if flags & zmq.POLLIN:
40                rlist.append(fd)
41            if flags & zmq.POLLOUT:
42                wlist.append(fd)
43            if flags & zmq.POLLERR:
44                xlist.append(fd)
45
46        return (rlist, wlist, xlist)
47
48    def poll(self, timeout=-1):
49        """Overridden method to ensure that the green version of
50        Poller is used.
51
52        Behaves the same as :meth:`zmq.core.Poller.poll`
53        """
54
55        if timeout is None:
56            timeout = -1
57
58        if timeout < 0:
59            timeout = -1
60
61        rlist = None
62        wlist = None
63        xlist = None
64
65        if timeout > 0:
66            tout = gevent.Timeout.start_new(timeout/1000.0)
67
68        try:
69            # Loop until timeout or events available
70            rlist, wlist, xlist = self._get_descriptors()
71            while True:
72                events = super(_Poller, self).poll(0)
73                if events or timeout == 0:
74                    return events
75
76                # wait for activity on sockets in a green way
77                # set a minimum poll frequency,
78                # because gevent < 1.0 cannot be trusted to catch edge-triggered FD events
79                _bug_timeout = gevent.Timeout.start_new(self._gevent_bug_timeout)
80                try:
81                    select.select(rlist, wlist, xlist)
82                except gevent.Timeout as t:
83                    if t is not _bug_timeout:
84                        raise
85                finally:
86                    _bug_timeout.cancel()
87
88        except gevent.Timeout as t:
89            if t is not tout:
90                raise
91            return []
92        finally:
93           if timeout > 0:
94               tout.cancel()
95
96