selectors.py revision 420216e5
1# Backport of selectors.py from Python 3.5+ to support Python < 3.4
2# Also has the behavior specified in PEP 475 which is to retry syscalls
3# in the case of an EINTR error. This module is required because selectors34
4# does not follow this behavior and instead returns that no dile descriptor
5# events have occurred rather than retry the syscall. The decision to drop
6# support for select.devpoll is made to maintain 100% test coverage.
7
8import errno
9import math
10import select
11from collections import namedtuple, Mapping
12
13import time
14try:
15    monotonic = time.monotonic
16except (AttributeError, ImportError):  # Python 3.3<
17    monotonic = time.time
18
19EVENT_READ = (1 << 0)
20EVENT_WRITE = (1 << 1)
21
22HAS_SELECT = True  # Variable that shows whether the platform has a selector.
23_SYSCALL_SENTINEL = object()  # Sentinel in case a system call returns None.
24
25
26class SelectorError(Exception):
27    def __init__(self, errcode):
28        super(SelectorError, self).__init__()
29        self.errno = errcode
30
31    def __repr__(self):
32        return "<SelectorError errno={0}>".format(self.errno)
33
34    def __str__(self):
35        return self.__repr__()
36
37
38def _fileobj_to_fd(fileobj):
39    """ Return a file descriptor from a file object. If
40    given an integer will simply return that integer back. """
41    if isinstance(fileobj, int):
42        fd = fileobj
43    else:
44        try:
45            fd = int(fileobj.fileno())
46        except (AttributeError, TypeError, ValueError):
47            raise ValueError("Invalid file object: {0!r}".format(fileobj))
48    if fd < 0:
49        raise ValueError("Invalid file descriptor: {0}".format(fd))
50    return fd
51
52
53def _syscall_wrapper(func, recalc_timeout, *args, **kwargs):
54    """ Wrapper function for syscalls that could fail due to EINTR.
55    All functions should be retried if there is time left in the timeout
56    in accordance with PEP 475. """
57    timeout = kwargs.get("timeout", None)
58    if timeout is None:
59        expires = None
60        recalc_timeout = False
61    else:
62        timeout = float(timeout)
63        if timeout < 0.0:  # Timeout less than 0 treated as no timeout.
64            expires = None
65        else:
66            expires = monotonic() + timeout
67
68    args = list(args)
69    if recalc_timeout and "timeout" not in kwargs:
70        raise ValueError(
71            "Timeout must be in args or kwargs to be recalculated")
72
73    result = _SYSCALL_SENTINEL
74    while result is _SYSCALL_SENTINEL:
75        try:
76            result = func(*args, **kwargs)
77        # OSError is thrown by select.select
78        # IOError is thrown by select.epoll.poll
79        # select.error is thrown by select.poll.poll
80        # Aren't we thankful for Python 3.x rework for exceptions?
81        except (OSError, IOError, select.error) as e:
82            # select.error wasn't a subclass of OSError in the past.
83            errcode = None
84            if hasattr(e, "errno"):
85                errcode = e.errno
86            elif hasattr(e, "args"):
87                errcode = e.args[0]
88
89            # Also test for the Windows equivalent of EINTR.
90            is_interrupt = (errcode == errno.EINTR or (hasattr(errno, "WSAEINTR") and
91                                                       errcode == errno.WSAEINTR))
92
93            if is_interrupt:
94                if expires is not None:
95                    current_time = monotonic()
96                    if current_time > expires:
97                        raise OSError(errno=errno.ETIMEDOUT)
98                    if recalc_timeout:
99                        if "timeout" in kwargs:
100                            kwargs["timeout"] = expires - current_time
101                continue
102            if errcode:
103                raise SelectorError(errcode)
104            else:
105                raise
106    return result
107
108
109SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
110
111
112class _SelectorMapping(Mapping):
113    """ Mapping of file objects to selector keys """
114
115    def __init__(self, selector):
116        self._selector = selector
117
118    def __len__(self):
119        return len(self._selector._fd_to_key)
120
121    def __getitem__(self, fileobj):
122        try:
123            fd = self._selector._fileobj_lookup(fileobj)
124            return self._selector._fd_to_key[fd]
125        except KeyError:
126            raise KeyError("{0!r} is not registered.".format(fileobj))
127
128    def __iter__(self):
129        return iter(self._selector._fd_to_key)
130
131
132class BaseSelector(object):
133    """ Abstract Selector class
134
135    A selector supports registering file objects to be monitored
136    for specific I/O events.
137
138    A file object is a file descriptor or any object with a
139    `fileno()` method. An arbitrary object can be attached to the
140    file object which can be used for example to store context info,
141    a callback, etc.
142
143    A selector can use various implementations (select(), poll(), epoll(),
144    and kqueue()) depending on the platform. The 'DefaultSelector' class uses
145    the most efficient implementation for the current platform.
146    """
147    def __init__(self):
148        # Maps file descriptors to keys.
149        self._fd_to_key = {}
150
151        # Read-only mapping returned by get_map()
152        self._map = _SelectorMapping(self)
153
154    def _fileobj_lookup(self, fileobj):
155        """ Return a file descriptor from a file object.
156        This wraps _fileobj_to_fd() to do an exhaustive
157        search in case the object is invalid but we still
158        have it in our map. Used by unregister() so we can
159        unregister an object that was previously registered
160        even if it is closed. It is also used by _SelectorMapping
161        """
162        try:
163            return _fileobj_to_fd(fileobj)
164        except ValueError:
165
166            # Search through all our mapped keys.
167            for key in self._fd_to_key.values():
168                if key.fileobj is fileobj:
169                    return key.fd
170
171            # Raise ValueError after all.
172            raise
173
174    def register(self, fileobj, events, data=None):
175        """ Register a file object for a set of events to monitor. """
176        if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
177            raise ValueError("Invalid events: {0!r}".format(events))
178
179        key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
180
181        if key.fd in self._fd_to_key:
182            raise KeyError("{0!r} (FD {1}) is already registered"
183                           .format(fileobj, key.fd))
184
185        self._fd_to_key[key.fd] = key
186        return key
187
188    def unregister(self, fileobj):
189        """ Unregister a file object from being monitored. """
190        try:
191            key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
192        except KeyError:
193            raise KeyError("{0!r} is not registered".format(fileobj))
194        return key
195
196    def modify(self, fileobj, events, data=None):
197        """ Change a registered file object monitored events and data. """
198        # NOTE: Some subclasses optimize this operation even further.
199        try:
200            key = self._fd_to_key[self._fileobj_lookup(fileobj)]
201        except KeyError:
202            raise KeyError("{0!r} is not registered".format(fileobj))
203
204        if events != key.events:
205            self.unregister(fileobj)
206            key = self.register(fileobj, events, data)
207
208        elif data != key.data:
209            # Use a shortcut to update the data.
210            key = key._replace(data=data)
211            self._fd_to_key[key.fd] = key
212
213        return key
214
215    def select(self, timeout=None):
216        """ Perform the actual selection until some monitored file objects
217        are ready or the timeout expires. """
218        raise NotImplementedError()
219
220    def close(self):
221        """ Close the selector. This must be called to ensure that all
222        underlying resources are freed. """
223        self._fd_to_key.clear()
224        self._map = None
225
226    def get_key(self, fileobj):
227        """ Return the key associated with a registered file object. """
228        mapping = self.get_map()
229        if mapping is None:
230            raise RuntimeError("Selector is closed")
231        try:
232            return mapping[fileobj]
233        except KeyError:
234            raise KeyError("{0!r} is not registered".format(fileobj))
235
236    def get_map(self):
237        """ Return a mapping of file objects to selector keys """
238        return self._map
239
240    def _key_from_fd(self, fd):
241        """ Return the key associated to a given file descriptor
242         Return None if it is not found. """
243        try:
244            return self._fd_to_key[fd]
245        except KeyError:
246            return None
247
248    def __enter__(self):
249        return self
250
251    def __exit__(self, *args):
252        self.close()
253
254
255# Almost all platforms have select.select()
256if hasattr(select, "select"):
257    class SelectSelector(BaseSelector):
258        """ Select-based selector. """
259        def __init__(self):
260            super(SelectSelector, self).__init__()
261            self._readers = set()
262            self._writers = set()
263
264        def register(self, fileobj, events, data=None):
265            key = super(SelectSelector, self).register(fileobj, events, data)
266            if events & EVENT_READ:
267                self._readers.add(key.fd)
268            if events & EVENT_WRITE:
269                self._writers.add(key.fd)
270            return key
271
272        def unregister(self, fileobj):
273            key = super(SelectSelector, self).unregister(fileobj)
274            self._readers.discard(key.fd)
275            self._writers.discard(key.fd)
276            return key
277
278        def _select(self, r, w, timeout=None):
279            """ Wrapper for select.select because timeout is a positional arg """
280            return select.select(r, w, [], timeout)
281
282        def select(self, timeout=None):
283            # Selecting on empty lists on Windows errors out.
284            if not len(self._readers) and not len(self._writers):
285                return []
286
287            timeout = None if timeout is None else max(timeout, 0.0)
288            ready = []
289            r, w, _ = _syscall_wrapper(self._select, True, self._readers,
290                                       self._writers, timeout)
291            r = set(r)
292            w = set(w)
293            for fd in r | w:
294                events = 0
295                if fd in r:
296                    events |= EVENT_READ
297                if fd in w:
298                    events |= EVENT_WRITE
299
300                key = self._key_from_fd(fd)
301                if key:
302                    ready.append((key, events & key.events))
303            return ready
304
305
306if hasattr(select, "poll"):
307    class PollSelector(BaseSelector):
308        """ Poll-based selector """
309        def __init__(self):
310            super(PollSelector, self).__init__()
311            self._poll = select.poll()
312
313        def register(self, fileobj, events, data=None):
314            key = super(PollSelector, self).register(fileobj, events, data)
315            event_mask = 0
316            if events & EVENT_READ:
317                event_mask |= select.POLLIN
318            if events & EVENT_WRITE:
319                event_mask |= select.POLLOUT
320            self._poll.register(key.fd, event_mask)
321            return key
322
323        def unregister(self, fileobj):
324            key = super(PollSelector, self).unregister(fileobj)
325            self._poll.unregister(key.fd)
326            return key
327
328        def _wrap_poll(self, timeout=None):
329            """ Wrapper function for select.poll.poll() so that
330            _syscall_wrapper can work with only seconds. """
331            if timeout is not None:
332                if timeout <= 0:
333                    timeout = 0
334                else:
335                    # select.poll.poll() has a resolution of 1 millisecond,
336                    # round away from zero to wait *at least* timeout seconds.
337                    timeout = math.ceil(timeout * 1e3)
338
339            result = self._poll.poll(timeout)
340            return result
341
342        def select(self, timeout=None):
343            ready = []
344            fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout)
345            for fd, event_mask in fd_events:
346                events = 0
347                if event_mask & ~select.POLLIN:
348                    events |= EVENT_WRITE
349                if event_mask & ~select.POLLOUT:
350                    events |= EVENT_READ
351
352                key = self._key_from_fd(fd)
353                if key:
354                    ready.append((key, events & key.events))
355
356            return ready
357
358
359if hasattr(select, "epoll"):
360    class EpollSelector(BaseSelector):
361        """ Epoll-based selector """
362        def __init__(self):
363            super(EpollSelector, self).__init__()
364            self._epoll = select.epoll()
365
366        def fileno(self):
367            return self._epoll.fileno()
368
369        def register(self, fileobj, events, data=None):
370            key = super(EpollSelector, self).register(fileobj, events, data)
371            events_mask = 0
372            if events & EVENT_READ:
373                events_mask |= select.EPOLLIN
374            if events & EVENT_WRITE:
375                events_mask |= select.EPOLLOUT
376            _syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
377            return key
378
379        def unregister(self, fileobj):
380            key = super(EpollSelector, self).unregister(fileobj)
381            try:
382                _syscall_wrapper(self._epoll.unregister, False, key.fd)
383            except SelectorError:
384                # This can occur when the fd was closed since registry.
385                pass
386            return key
387
388        def select(self, timeout=None):
389            if timeout is not None:
390                if timeout <= 0:
391                    timeout = 0.0
392                else:
393                    # select.epoll.poll() has a resolution of 1 millisecond
394                    # but luckily takes seconds so we don't need a wrapper
395                    # like PollSelector. Just for better rounding.
396                    timeout = math.ceil(timeout * 1e3) * 1e-3
397                timeout = float(timeout)
398            else:
399                timeout = -1.0  # epoll.poll() must have a float.
400
401            # We always want at least 1 to ensure that select can be called
402            # with no file descriptors registered. Otherwise will fail.
403            max_events = max(len(self._fd_to_key), 1)
404
405            ready = []
406            fd_events = _syscall_wrapper(self._epoll.poll, True,
407                                         timeout=timeout,
408                                         maxevents=max_events)
409            for fd, event_mask in fd_events:
410                events = 0
411                if event_mask & ~select.EPOLLIN:
412                    events |= EVENT_WRITE
413                if event_mask & ~select.EPOLLOUT:
414                    events |= EVENT_READ
415
416                key = self._key_from_fd(fd)
417                if key:
418                    ready.append((key, events & key.events))
419            return ready
420
421        def close(self):
422            self._epoll.close()
423            super(EpollSelector, self).close()
424
425
426if hasattr(select, "kqueue"):
427    class KqueueSelector(BaseSelector):
428        """ Kqueue / Kevent-based selector """
429        def __init__(self):
430            super(KqueueSelector, self).__init__()
431            self._kqueue = select.kqueue()
432
433        def fileno(self):
434            return self._kqueue.fileno()
435
436        def register(self, fileobj, events, data=None):
437            key = super(KqueueSelector, self).register(fileobj, events, data)
438            if events & EVENT_READ:
439                kevent = select.kevent(key.fd,
440                                       select.KQ_FILTER_READ,
441                                       select.KQ_EV_ADD)
442
443                _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
444
445            if events & EVENT_WRITE:
446                kevent = select.kevent(key.fd,
447                                       select.KQ_FILTER_WRITE,
448                                       select.KQ_EV_ADD)
449
450                _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
451
452            return key
453
454        def unregister(self, fileobj):
455            key = super(KqueueSelector, self).unregister(fileobj)
456            if key.events & EVENT_READ:
457                kevent = select.kevent(key.fd,
458                                       select.KQ_FILTER_READ,
459                                       select.KQ_EV_DELETE)
460                try:
461                    _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
462                except SelectorError:
463                    pass
464            if key.events & EVENT_WRITE:
465                kevent = select.kevent(key.fd,
466                                       select.KQ_FILTER_WRITE,
467                                       select.KQ_EV_DELETE)
468                try:
469                    _syscall_wrapper(self._kqueue.control, False, [kevent], 0, 0)
470                except SelectorError:
471                    pass
472
473            return key
474
475        def select(self, timeout=None):
476            if timeout is not None:
477                timeout = max(timeout, 0)
478
479            max_events = len(self._fd_to_key) * 2
480            ready_fds = {}
481
482            kevent_list = _syscall_wrapper(self._kqueue.control, True,
483                                           None, max_events, timeout)
484
485            for kevent in kevent_list:
486                fd = kevent.ident
487                event_mask = kevent.filter
488                events = 0
489                if event_mask == select.KQ_FILTER_READ:
490                    events |= EVENT_READ
491                if event_mask == select.KQ_FILTER_WRITE:
492                    events |= EVENT_WRITE
493
494                key = self._key_from_fd(fd)
495                if key:
496                    if key.fd not in ready_fds:
497                        ready_fds[key.fd] = (key, events & key.events)
498                    else:
499                        old_events = ready_fds[key.fd][1]
500                        ready_fds[key.fd] = (key, (events | old_events) & key.events)
501
502            return list(ready_fds.values())
503
504        def close(self):
505            self._kqueue.close()
506            super(KqueueSelector, self).close()
507
508
509# Choose the best implementation, roughly:
510# kqueue == epoll > poll > select. Devpoll not supported. (See above)
511# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
512if 'KqueueSelector' in globals():  # Platform-specific: Mac OS and BSD
513    DefaultSelector = KqueueSelector
514elif 'EpollSelector' in globals():  # Platform-specific: Linux
515    DefaultSelector = EpollSelector
516elif 'PollSelector' in globals():  # Platform-specific: Linux
517    DefaultSelector = PollSelector
518elif 'SelectSelector' in globals():  # Platform-specific: Windows
519    DefaultSelector = SelectSelector
520else:  # Platform-specific: AppEngine
521    def no_selector(_):
522        raise ValueError("Platform does not have a selector")
523    DefaultSelector = no_selector
524    HAS_SELECT = False
525