1"""0MQ Socket class."""
2
3#
4#    Copyright (c) 2010-2011 Brian E. Granger & Min Ragan-Kelley
5#
6#    This file is part of pyzmq.
7#
8#    pyzmq is free software; you can redistribute it and/or modify it under
9#    the terms of the Lesser GNU General Public License as published by
10#    the Free Software Foundation; either version 3 of the License, or
11#    (at your option) any later version.
12#
13#    pyzmq is distributed in the hope that it will be useful,
14#    but WITHOUT ANY WARRANTY; without even the implied warranty of
15#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16#    Lesser GNU General Public License for more details.
17#
18#    You should have received a copy of the Lesser GNU General Public License
19#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20#
21
22#-----------------------------------------------------------------------------
23# Cython Imports
24#-----------------------------------------------------------------------------
25
26# get version-independent aliases:
27cdef extern from "pyversion_compat.h":
28    pass
29
30from libc.errno cimport ENAMETOOLONG
31from libc.string cimport memcpy
32
33from cpython cimport PyBytes_FromStringAndSize
34from cpython cimport PyBytes_AsString, PyBytes_Size
35from cpython cimport Py_DECREF, Py_INCREF
36
37from buffers cimport asbuffer_r, viewfromobject_r
38
39from libzmq cimport *
40from message cimport Frame, copy_zmq_msg_bytes
41
42from context cimport Context
43
44cdef extern from "Python.h":
45    ctypedef int Py_ssize_t
46
47cdef extern from "ipcmaxlen.h":
48    int get_ipc_path_max_len()
49
50cdef extern from "getpid_compat.h":
51    int getpid()
52
53
54#-----------------------------------------------------------------------------
55# Python Imports
56#-----------------------------------------------------------------------------
57
58import copy as copy_mod
59import time
60import sys
61import random
62import struct
63import codecs
64
65from zmq.utils import jsonapi
66
67try:
68    import cPickle
69    pickle = cPickle
70except:
71    cPickle = None
72    import pickle
73
74import zmq
75from zmq.backend.cython import constants
76from zmq.backend.cython.constants import *
77from zmq.backend.cython.checkrc cimport _check_rc
78from zmq.error import ZMQError, ZMQBindError, _check_version
79from zmq.utils.strtypes import bytes,unicode,basestring
80
81#-----------------------------------------------------------------------------
82# Code
83#-----------------------------------------------------------------------------
84
85IPC_PATH_MAX_LEN = get_ipc_path_max_len()
86
87# inline some small socket submethods:
88# true methods frequently cannot be inlined, acc. Cython docs
89
90cdef inline _check_closed(Socket s):
91    """raise ENOTSUP if socket is closed
92    
93    Does not do a deep check
94    """
95    if s._closed:
96        raise ZMQError(ENOTSOCK)
97
98cdef inline _check_closed_deep(Socket s):
99    """thorough check of whether the socket has been closed,
100    even if by another entity (e.g. ctx.destroy).
101    
102    Only used by the `closed` property.
103    
104    returns True if closed, False otherwise
105    """
106    cdef int rc
107    cdef int errno
108    cdef int stype
109    cdef size_t sz=sizeof(int)
110    if s._closed:
111        return True
112    else:
113        rc = zmq_getsockopt(s.handle, ZMQ_TYPE, <void *>&stype, &sz)
114        if rc < 0 and zmq_errno() == ENOTSOCK:
115            s._closed = True
116            return True
117        else:
118            _check_rc(rc)
119    return False
120
121cdef inline Frame _recv_frame(void *handle, int flags=0, track=False):
122    """Receive a message in a non-copying manner and return a Frame."""
123    cdef int rc
124    msg = zmq.Frame(track=track)
125    cdef Frame cmsg = msg
126
127    with nogil:
128        rc = zmq_msg_recv(&cmsg.zmq_msg, handle, flags)
129    
130    _check_rc(rc)
131    return msg
132
133cdef inline object _recv_copy(void *handle, int flags=0):
134    """Receive a message and return a copy"""
135    cdef zmq_msg_t zmq_msg
136    with nogil:
137        zmq_msg_init (&zmq_msg)
138        rc = zmq_msg_recv(&zmq_msg, handle, flags)
139    _check_rc(rc)
140    msg_bytes = copy_zmq_msg_bytes(&zmq_msg)
141    zmq_msg_close(&zmq_msg)
142    return msg_bytes
143
144cdef inline object _send_frame(void *handle, Frame msg, int flags=0):
145    """Send a Frame on this socket in a non-copy manner."""
146    cdef int rc
147    cdef Frame msg_copy
148
149    # Always copy so the original message isn't garbage collected.
150    # This doesn't do a real copy, just a reference.
151    msg_copy = msg.fast_copy()
152
153    with nogil:
154        rc = zmq_msg_send(&msg_copy.zmq_msg, handle, flags)
155
156    _check_rc(rc)
157    return msg.tracker
158
159
160cdef inline object _send_copy(void *handle, object msg, int flags=0):
161    """Send a message on this socket by copying its content."""
162    cdef int rc, rc2
163    cdef zmq_msg_t data
164    cdef char *msg_c
165    cdef Py_ssize_t msg_c_len=0
166
167    # copy to c array:
168    asbuffer_r(msg, <void **>&msg_c, &msg_c_len)
169
170    # Copy the msg before sending. This avoids any complications with
171    # the GIL, etc.
172    # If zmq_msg_init_* fails we must not call zmq_msg_close (Bus Error)
173    rc = zmq_msg_init_size(&data, msg_c_len)
174
175    _check_rc(rc)
176
177    with nogil:
178        memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
179        rc = zmq_msg_send(&data, handle, flags)
180        rc2 = zmq_msg_close(&data)
181    _check_rc(rc)
182    _check_rc(rc2)
183
184
185cdef class Socket:
186    """Socket(context, socket_type)
187
188    A 0MQ socket.
189
190    These objects will generally be constructed via the socket() method of a Context object.
191    
192    Note: 0MQ Sockets are *not* threadsafe. **DO NOT** share them across threads.
193    
194    Parameters
195    ----------
196    context : Context
197        The 0MQ Context this Socket belongs to.
198    socket_type : int
199        The socket type, which can be any of the 0MQ socket types: 
200        REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB.
201    
202    See Also
203    --------
204    .Context.socket : method for creating a socket bound to a Context.
205    """
206    
207    # no-op for the signature
208    def __init__(self, context=None, socket_type=-1, shadow=0):
209        pass
210    
211    def __cinit__(self, Context context=None, int socket_type=-1, size_t shadow=0, *args, **kwargs):
212        cdef Py_ssize_t c_handle
213
214        self.handle = NULL
215        self.context = context
216        if shadow:
217            self._shadow = True
218            self.handle = <void *>shadow
219        else:
220            if context is None:
221                raise TypeError("context must be specified")
222            if socket_type < 0:
223                raise TypeError("socket_type must be specified")
224            self._shadow = False
225            self.handle = zmq_socket(context.handle, socket_type)
226        if self.handle == NULL:
227            raise ZMQError()
228        self._closed = False
229        self._pid = getpid()
230        if context:
231            context._add_socket(self.handle)
232
233    def __dealloc__(self):
234        """remove from context's list
235        
236        But be careful that context might not exist if called during gc
237        """
238        if self.handle != NULL and not self._shadow and getpid() == self._pid:
239            # during gc, self.context might be NULL
240            if self.context and not self.context.closed:
241                self.context._remove_socket(self.handle)
242    
243    @property
244    def underlying(self):
245        """The address of the underlying libzmq socket"""
246        return <size_t> self.handle
247    
248    @property
249    def closed(self):
250        return _check_closed_deep(self)
251    
252    def close(self, linger=None):
253        """s.close(linger=None)
254
255        Close the socket.
256        
257        If linger is specified, LINGER sockopt will be set prior to closing.
258
259        This can be called to close the socket by hand. If this is not
260        called, the socket will automatically be closed when it is
261        garbage collected.
262        """
263        cdef int rc=0
264        cdef int linger_c
265        cdef bint setlinger=False
266        
267        if linger is not None:
268            linger_c = linger
269            setlinger=True
270        
271        if self.handle != NULL and not self._closed and getpid() == self._pid:
272            if setlinger:
273                zmq_setsockopt(self.handle, ZMQ_LINGER, &linger_c, sizeof(int))
274            rc = zmq_close(self.handle)
275            if rc != 0 and zmq_errno() != ENOTSOCK:
276                # ignore ENOTSOCK (closed by Context)
277                _check_rc(rc)
278            self._closed = True
279            # during gc, self.context might be NULL
280            if self.context:
281                self.context._remove_socket(self.handle)
282            self.handle = NULL
283
284    def set(self, int option, optval):
285        """s.set(option, optval)
286
287        Set socket options.
288
289        See the 0MQ API documentation for details on specific options.
290
291        Parameters
292        ----------
293        option : int
294            The option to set.  Available values will depend on your
295            version of libzmq.  Examples include::
296            
297                zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD
298        
299        optval : int or bytes
300            The value of the option to set.
301        """
302        cdef int64_t optval_int64_c
303        cdef int optval_int_c
304        cdef int rc
305        cdef char* optval_c
306        cdef Py_ssize_t sz
307
308        _check_closed(self)
309        if isinstance(optval, unicode):
310            raise TypeError("unicode not allowed, use setsockopt_string")
311
312        if option in zmq.constants.bytes_sockopts:
313            if not isinstance(optval, bytes):
314                raise TypeError('expected bytes, got: %r' % optval)
315            optval_c = PyBytes_AsString(optval)
316            sz = PyBytes_Size(optval)
317            rc = zmq_setsockopt(
318                    self.handle, option,
319                    optval_c, sz
320                )
321        elif option in zmq.constants.int64_sockopts:
322            if not isinstance(optval, int):
323                raise TypeError('expected int, got: %r' % optval)
324            optval_int64_c = optval
325            rc = zmq_setsockopt(
326                    self.handle, option,
327                    &optval_int64_c, sizeof(int64_t)
328                )
329        else:
330            # default is to assume int, which is what most new sockopts will be
331            # this lets pyzmq work with newer libzmq which may add constants
332            # pyzmq has not yet added, rather than artificially raising. Invalid
333            # sockopts will still raise just the same, but it will be libzmq doing
334            # the raising.
335            if not isinstance(optval, int):
336                raise TypeError('expected int, got: %r' % optval)
337            optval_int_c = optval
338            rc = zmq_setsockopt(
339                    self.handle, option,
340                    &optval_int_c, sizeof(int)
341                )
342
343        _check_rc(rc)
344
345    def get(self, int option):
346        """s.get(option)
347
348        Get the value of a socket option.
349
350        See the 0MQ API documentation for details on specific options.
351
352        Parameters
353        ----------
354        option : int
355            The option to get.  Available values will depend on your
356            version of libzmq.  Examples include::
357            
358                zmq.IDENTITY, HWM, LINGER, FD, EVENTS
359
360        Returns
361        -------
362        optval : int or bytes
363            The value of the option as a bytestring or int.
364        """
365        cdef int64_t optval_int64_c
366        cdef int optval_int_c
367        cdef fd_t optval_fd_c
368        cdef char identity_str_c [255]
369        cdef size_t sz
370        cdef int rc
371
372        _check_closed(self)
373
374        if option in zmq.constants.bytes_sockopts:
375            sz = 255
376            rc = zmq_getsockopt(self.handle, option, <void *>identity_str_c, &sz)
377            _check_rc(rc)
378            # strip null-terminated strings *except* identity
379            if option != ZMQ_IDENTITY and sz > 0 and (<char *>identity_str_c)[sz-1] == b'\0':
380                sz -= 1
381            result = PyBytes_FromStringAndSize(<char *>identity_str_c, sz)
382        elif option in zmq.constants.int64_sockopts:
383            sz = sizeof(int64_t)
384            rc = zmq_getsockopt(self.handle, option, <void *>&optval_int64_c, &sz)
385            _check_rc(rc)
386            result = optval_int64_c
387        elif option in zmq.constants.fd_sockopts:
388            sz = sizeof(fd_t)
389            rc = zmq_getsockopt(self.handle, option, <void *>&optval_fd_c, &sz)
390            _check_rc(rc)
391            result = optval_fd_c
392        else:
393            # default is to assume int, which is what most new sockopts will be
394            # this lets pyzmq work with newer libzmq which may add constants
395            # pyzmq has not yet added, rather than artificially raising. Invalid
396            # sockopts will still raise just the same, but it will be libzmq doing
397            # the raising.
398            sz = sizeof(int)
399            rc = zmq_getsockopt(self.handle, option, <void *>&optval_int_c, &sz)
400            _check_rc(rc)
401            result = optval_int_c
402
403        return result
404    
405    def bind(self, addr):
406        """s.bind(addr)
407
408        Bind the socket to an address.
409
410        This causes the socket to listen on a network port. Sockets on the
411        other side of this connection will use ``Socket.connect(addr)`` to
412        connect to this socket.
413
414        Parameters
415        ----------
416        addr : str
417            The address string. This has the form 'protocol://interface:port',
418            for example 'tcp://127.0.0.1:5555'. Protocols supported include
419            tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
420            encoded to utf-8 first.
421        """
422        cdef int rc
423        cdef char* c_addr
424
425        _check_closed(self)
426        if isinstance(addr, unicode):
427            addr = addr.encode('utf-8')
428        if not isinstance(addr, bytes):
429            raise TypeError('expected str, got: %r' % addr)
430        c_addr = addr
431        rc = zmq_bind(self.handle, c_addr)
432        if rc != 0:
433            if IPC_PATH_MAX_LEN and zmq_errno() == ENAMETOOLONG:
434                # py3compat: addr is bytes, but msg wants str
435                if str is unicode:
436                    addr = addr.decode('utf-8', 'replace')
437                path = addr.split('://', 1)[-1]
438                msg = ('ipc path "{0}" is longer than {1} '
439                                'characters (sizeof(sockaddr_un.sun_path)). '
440                                'zmq.IPC_PATH_MAX_LEN constant can be used '
441                                'to check addr length (if it is defined).'
442                                .format(path, IPC_PATH_MAX_LEN))
443                raise ZMQError(msg=msg)
444        _check_rc(rc)
445
446    def connect(self, addr):
447        """s.connect(addr)
448
449        Connect to a remote 0MQ socket.
450
451        Parameters
452        ----------
453        addr : str
454            The address string. This has the form 'protocol://interface:port',
455            for example 'tcp://127.0.0.1:5555'. Protocols supported are
456            tcp, upd, pgm, inproc and ipc. If the address is unicode, it is
457            encoded to utf-8 first.
458        """
459        cdef int rc
460        cdef char* c_addr
461
462        _check_closed(self)
463        if isinstance(addr, unicode):
464            addr = addr.encode('utf-8')
465        if not isinstance(addr, bytes):
466            raise TypeError('expected str, got: %r' % addr)
467        c_addr = addr
468        
469        rc = zmq_connect(self.handle, c_addr)
470        if rc != 0:
471            raise ZMQError()
472
473    def unbind(self, addr):
474        """s.unbind(addr)
475        
476        Unbind from an address (undoes a call to bind).
477        
478        .. versionadded:: libzmq-3.2
479        .. versionadded:: 13.0
480
481        Parameters
482        ----------
483        addr : str
484            The address string. This has the form 'protocol://interface:port',
485            for example 'tcp://127.0.0.1:5555'. Protocols supported are
486            tcp, upd, pgm, inproc and ipc. If the address is unicode, it is
487            encoded to utf-8 first.
488        """
489        cdef int rc
490        cdef char* c_addr
491
492        _check_version((3,2), "unbind")
493        _check_closed(self)
494        if isinstance(addr, unicode):
495            addr = addr.encode('utf-8')
496        if not isinstance(addr, bytes):
497            raise TypeError('expected str, got: %r' % addr)
498        c_addr = addr
499        
500        rc = zmq_unbind(self.handle, c_addr)
501        if rc != 0:
502            raise ZMQError()
503
504    def disconnect(self, addr):
505        """s.disconnect(addr)
506
507        Disconnect from a remote 0MQ socket (undoes a call to connect).
508        
509        .. versionadded:: libzmq-3.2
510        .. versionadded:: 13.0
511
512        Parameters
513        ----------
514        addr : str
515            The address string. This has the form 'protocol://interface:port',
516            for example 'tcp://127.0.0.1:5555'. Protocols supported are
517            tcp, upd, pgm, inproc and ipc. If the address is unicode, it is
518            encoded to utf-8 first.
519        """
520        cdef int rc
521        cdef char* c_addr
522        
523        _check_version((3,2), "disconnect")
524        _check_closed(self)
525        if isinstance(addr, unicode):
526            addr = addr.encode('utf-8')
527        if not isinstance(addr, bytes):
528            raise TypeError('expected str, got: %r' % addr)
529        c_addr = addr
530        
531        rc = zmq_disconnect(self.handle, c_addr)
532        if rc != 0:
533            raise ZMQError()
534
535    def monitor(self, addr, int events=ZMQ_EVENT_ALL):
536        """s.monitor(addr, flags)
537
538        Start publishing socket events on inproc.
539        See libzmq docs for zmq_monitor for details.
540        
541        While this function is available from libzmq 3.2,
542        pyzmq cannot parse monitor messages from libzmq prior to 4.0.
543        
544        .. versionadded: libzmq-3.2
545        .. versionadded: 14.0
546        
547        Parameters
548        ----------
549        addr : str
550            The inproc url used for monitoring. Passing None as
551            the addr will cause an existing socket monitor to be
552            deregistered.
553        events : int [default: zmq.EVENT_ALL]
554            The zmq event bitmask for which events will be sent to the monitor.
555        """
556        cdef int rc, c_flags
557        cdef char* c_addr = NULL
558        
559        _check_version((3,2), "monitor")
560        if addr is not None:
561            if isinstance(addr, unicode):
562                addr = addr.encode('utf-8')
563            if not isinstance(addr, bytes):
564                raise TypeError('expected str, got: %r' % addr)
565            c_addr = addr
566        c_flags = events
567        rc = zmq_socket_monitor(self.handle, c_addr, c_flags)
568        _check_rc(rc)
569
570    #-------------------------------------------------------------------------
571    # Sending and receiving messages
572    #-------------------------------------------------------------------------
573
574    cpdef object send(self, object data, int flags=0, copy=True, track=False):
575        """s.send(data, flags=0, copy=True, track=False)
576
577        Send a message on this socket.
578
579        This queues the message to be sent by the IO thread at a later time.
580
581        Parameters
582        ----------
583        data : object, str, Frame
584            The content of the message.
585        flags : int
586            Any supported flag: NOBLOCK, SNDMORE.
587        copy : bool
588            Should the message be sent in a copying or non-copying manner.
589        track : bool
590            Should the message be tracked for notification that ZMQ has
591            finished with it? (ignored if copy=True)
592
593        Returns
594        -------
595        None : if `copy` or not track
596            None if message was sent, raises an exception otherwise.
597        MessageTracker : if track and not copy
598            a MessageTracker object, whose `pending` property will
599            be True until the send is completed.
600        
601        Raises
602        ------
603        TypeError
604            If a unicode object is passed
605        ValueError
606            If `track=True`, but an untracked Frame is passed.
607        ZMQError
608            If the send does not succeed for any reason.
609        
610        """
611        _check_closed(self)
612        
613        if isinstance(data, unicode):
614            raise TypeError("unicode not allowed, use send_string")
615        
616        if copy:
617            # msg.bytes never returns the input data object
618            # it is always a copy, but always the same copy
619            if isinstance(data, Frame):
620                data = data.buffer
621            return _send_copy(self.handle, data, flags)
622        else:
623            if isinstance(data, Frame):
624                if track and not data.tracker:
625                    raise ValueError('Not a tracked message')
626                msg = data
627            else:
628                msg = Frame(data, track=track)
629            return _send_frame(self.handle, msg, flags)
630
631    cpdef object recv(self, int flags=0, copy=True, track=False):
632        """s.recv(flags=0, copy=True, track=False)
633
634        Receive a message.
635
636        Parameters
637        ----------
638        flags : int
639            Any supported flag: NOBLOCK. If NOBLOCK is set, this method
640            will raise a ZMQError with EAGAIN if a message is not ready.
641            If NOBLOCK is not set, then this method will block until a
642            message arrives.
643        copy : bool
644            Should the message be received in a copying or non-copying manner?
645            If False a Frame object is returned, if True a string copy of
646            message is returned.
647        track : bool
648            Should the message be tracked for notification that ZMQ has
649            finished with it? (ignored if copy=True)
650
651        Returns
652        -------
653        msg : bytes, Frame
654            The received message frame.  If `copy` is False, then it will be a Frame,
655            otherwise it will be bytes.
656            
657        Raises
658        ------
659        ZMQError
660            for any of the reasons zmq_msg_recv might fail.
661        """
662        _check_closed(self)
663        
664        if copy:
665            return _recv_copy(self.handle, flags)
666        else:
667            frame = _recv_frame(self.handle, flags, track)
668            frame.more = self.getsockopt(zmq.RCVMORE)
669            return frame
670    
671
672__all__ = ['Socket', 'IPC_PATH_MAX_LEN']
673