1"""0MQ Context class."""
2# coding: utf-8
3
4# Copyright (c) PyZMQ Developers.
5# Distributed under the terms of the Lesser GNU Public License (LGPL).
6
7from libc.stdlib cimport free, malloc, realloc
8
9from libzmq cimport *
10
11cdef extern from "getpid_compat.h":
12    int getpid()
13
14from zmq.error import ZMQError
15from zmq.backend.cython.checkrc cimport _check_rc
16
17
18_instance = None
19
20cdef class Context:
21    """Context(io_threads=1)
22
23    Manage the lifecycle of a 0MQ context.
24
25    Parameters
26    ----------
27    io_threads : int
28        The number of IO threads.
29    """
30    
31    # no-op for the signature
32    def __init__(self, io_threads=1, shadow=0):
33        pass
34    
35    def __cinit__(self, int io_threads=1, size_t shadow=0, **kwargs):
36        self.handle = NULL
37        self._sockets = NULL
38        if shadow:
39            self.handle = <void *>shadow
40            self._shadow = True
41        else:
42            self._shadow = False
43            if ZMQ_VERSION_MAJOR >= 3:
44                self.handle = zmq_ctx_new()
45            else:
46                self.handle = zmq_init(io_threads)
47        
48        if self.handle == NULL:
49            raise ZMQError()
50        
51        cdef int rc = 0
52        if ZMQ_VERSION_MAJOR >= 3 and not self._shadow:
53            rc = zmq_ctx_set(self.handle, ZMQ_IO_THREADS, io_threads)
54            _check_rc(rc)
55        
56        self.closed = False
57        self._n_sockets = 0
58        self._max_sockets = 32
59        
60        self._sockets = <void **>malloc(self._max_sockets*sizeof(void *))
61        if self._sockets == NULL:
62            raise MemoryError("Could not allocate _sockets array")
63        
64        self._pid = getpid()
65    
66    def __dealloc__(self):
67        """don't touch members in dealloc, just cleanup allocations"""
68        cdef int rc
69        if self._sockets != NULL:
70            free(self._sockets)
71            self._sockets = NULL
72            self._n_sockets = 0
73
74        # we can't call object methods in dealloc as it
75        # might already be partially deleted
76        if not self._shadow:
77            self._term()
78    
79    cdef inline void _add_socket(self, void* handle):
80        """Add a socket handle to be closed when Context terminates.
81        
82        This is to be called in the Socket constructor.
83        """
84        if self._n_sockets >= self._max_sockets:
85            self._max_sockets *= 2
86            self._sockets = <void **>realloc(self._sockets, self._max_sockets*sizeof(void *))
87            if self._sockets == NULL:
88                raise MemoryError("Could not reallocate _sockets array")
89        
90        self._sockets[self._n_sockets] = handle
91        self._n_sockets += 1
92
93    cdef inline void _remove_socket(self, void* handle):
94        """Remove a socket from the collected handles.
95        
96        This should be called by Socket.close, to prevent trying to
97        close a socket a second time.
98        """
99        cdef bint found = False
100        
101        for idx in range(self._n_sockets):
102            if self._sockets[idx] == handle:
103                found=True
104                break
105        
106        if found:
107            self._n_sockets -= 1
108            if self._n_sockets:
109                # move last handle to closed socket's index
110                self._sockets[idx] = self._sockets[self._n_sockets]
111    
112    
113    @property
114    def underlying(self):
115        """The address of the underlying libzmq context"""
116        return <size_t> self.handle
117    
118    # backward-compat, though nobody is using it
119    _handle = underlying
120    
121    cdef inline int _term(self):
122        cdef int rc=0
123        if self.handle != NULL and not self.closed and getpid() == self._pid:
124            with nogil:
125                rc = zmq_ctx_destroy(self.handle)
126        self.handle = NULL
127        return rc
128    
129    def term(self):
130        """ctx.term()
131
132        Close or terminate the context.
133        
134        This can be called to close the context by hand. If this is not called,
135        the context will automatically be closed when it is garbage collected.
136        """
137        cdef int rc
138        rc = self._term()
139        self.closed = True
140    
141    def set(self, int option, optval):
142        """ctx.set(option, optval)
143
144        Set a context option.
145
146        See the 0MQ API documentation for zmq_ctx_set
147        for details on specific options.
148        
149        .. versionadded:: libzmq-3.2
150        .. versionadded:: 13.0
151
152        Parameters
153        ----------
154        option : int
155            The option to set.  Available values will depend on your
156            version of libzmq.  Examples include::
157            
158                zmq.IO_THREADS, zmq.MAX_SOCKETS
159        
160        optval : int
161            The value of the option to set.
162        """
163        cdef int optval_int_c
164        cdef int rc
165        cdef char* optval_c
166
167        if self.closed:
168            raise RuntimeError("Context has been destroyed")
169        
170        if not isinstance(optval, int):
171            raise TypeError('expected int, got: %r' % optval)
172        optval_int_c = optval
173        rc = zmq_ctx_set(self.handle, option, optval_int_c)
174        _check_rc(rc)
175
176    def get(self, int option):
177        """ctx.get(option)
178
179        Get the value of a context option.
180
181        See the 0MQ API documentation for zmq_ctx_get
182        for details on specific options.
183        
184        .. versionadded:: libzmq-3.2
185        .. versionadded:: 13.0
186
187        Parameters
188        ----------
189        option : int
190            The option to get.  Available values will depend on your
191            version of libzmq.  Examples include::
192            
193                zmq.IO_THREADS, zmq.MAX_SOCKETS
194            
195        Returns
196        -------
197        optval : int
198            The value of the option as an integer.
199        """
200        cdef int optval_int_c
201        cdef size_t sz
202        cdef int rc
203
204        if self.closed:
205            raise RuntimeError("Context has been destroyed")
206
207        rc = zmq_ctx_get(self.handle, option)
208        _check_rc(rc)
209
210        return rc
211
212    def destroy(self, linger=None):
213        """ctx.destroy(linger=None)
214        
215        Close all sockets associated with this context, and then terminate
216        the context. If linger is specified,
217        the LINGER sockopt of the sockets will be set prior to closing.
218        
219        .. warning::
220        
221            destroy involves calling ``zmq_close()``, which is **NOT** threadsafe.
222            If there are active sockets in other threads, this must not be called.
223        """
224        
225        cdef int linger_c
226        cdef bint setlinger=False
227        
228        if linger is not None:
229            linger_c = linger
230            setlinger=True
231
232        if self.handle != NULL and not self.closed and self._n_sockets:
233            while self._n_sockets:
234                if setlinger:
235                    zmq_setsockopt(self._sockets[0], ZMQ_LINGER, &linger_c, sizeof(int))
236                rc = zmq_close(self._sockets[0])
237                if rc < 0 and zmq_errno() != ZMQ_ENOTSOCK:
238                    raise ZMQError()
239                self._n_sockets -= 1
240                self._sockets[0] = self._sockets[self._n_sockets]
241        self.term()
242    
243__all__ = ['Context']
244