1b2732e9dSimarom# coding: utf-8
2b2732e9dSimarom"""zmq Socket class"""
3b2732e9dSimarom
4b2732e9dSimarom# Copyright (C) PyZMQ Developers
5b2732e9dSimarom# Distributed under the terms of the Modified BSD License.
6b2732e9dSimarom
7b2732e9dSimaromimport random
8b2732e9dSimaromimport codecs
9b2732e9dSimarom
10b2732e9dSimaromimport errno as errno_mod
11b2732e9dSimarom
12b2732e9dSimaromfrom ._cffi import (C, ffi, new_uint64_pointer, new_int64_pointer,
13b2732e9dSimarom                    new_int_pointer, new_binary_data, value_uint64_pointer,
14b2732e9dSimarom                    value_int64_pointer, value_int_pointer, value_binary_data,
15b2732e9dSimarom                    IPC_PATH_MAX_LEN)
16b2732e9dSimarom
17b2732e9dSimaromfrom .message import Frame
18b2732e9dSimaromfrom .constants import *
19b2732e9dSimarom
20b2732e9dSimaromimport zmq
21b2732e9dSimaromfrom zmq.error import ZMQError, _check_rc, _check_version
22b2732e9dSimaromfrom zmq.utils.strtypes import unicode
23b2732e9dSimarom
24b2732e9dSimarom
25b2732e9dSimaromdef new_pointer_from_opt(option, length=0):
26b2732e9dSimarom    from zmq.sugar.constants import (
27b2732e9dSimarom        int64_sockopts, bytes_sockopts,
28b2732e9dSimarom    )
29b2732e9dSimarom    if option in int64_sockopts:
30b2732e9dSimarom        return new_int64_pointer()
31b2732e9dSimarom    elif option in bytes_sockopts:
32b2732e9dSimarom        return new_binary_data(length)
33b2732e9dSimarom    else:
34b2732e9dSimarom        # default
35b2732e9dSimarom        return new_int_pointer()
36b2732e9dSimarom
37b2732e9dSimaromdef value_from_opt_pointer(option, opt_pointer, length=0):
38b2732e9dSimarom    from zmq.sugar.constants import (
39b2732e9dSimarom        int64_sockopts, bytes_sockopts,
40b2732e9dSimarom    )
41b2732e9dSimarom    if option in int64_sockopts:
42b2732e9dSimarom        return int(opt_pointer[0])
43b2732e9dSimarom    elif option in bytes_sockopts:
44b2732e9dSimarom        return ffi.buffer(opt_pointer, length)[:]
45b2732e9dSimarom    else:
46b2732e9dSimarom        return int(opt_pointer[0])
47b2732e9dSimarom
48b2732e9dSimaromdef initialize_opt_pointer(option, value, length=0):
49b2732e9dSimarom    from zmq.sugar.constants import (
50b2732e9dSimarom        int64_sockopts, bytes_sockopts,
51b2732e9dSimarom    )
52b2732e9dSimarom    if option in int64_sockopts:
53b2732e9dSimarom        return value_int64_pointer(value)
54b2732e9dSimarom    elif option in bytes_sockopts:
55b2732e9dSimarom        return value_binary_data(value, length)
56b2732e9dSimarom    else:
57b2732e9dSimarom        return value_int_pointer(value)
58b2732e9dSimarom
59b2732e9dSimarom
60b2732e9dSimaromclass Socket(object):
61b2732e9dSimarom    context = None
62b2732e9dSimarom    socket_type = None
63b2732e9dSimarom    _zmq_socket = None
64b2732e9dSimarom    _closed = None
65b2732e9dSimarom    _ref = None
66b2732e9dSimarom    _shadow = False
67b2732e9dSimarom
68b2732e9dSimarom    def __init__(self, context=None, socket_type=None, shadow=None):
69b2732e9dSimarom        self.context = context
70b2732e9dSimarom        if shadow is not None:
71b2732e9dSimarom            self._zmq_socket = ffi.cast("void *", shadow)
72b2732e9dSimarom            self._shadow = True
73b2732e9dSimarom        else:
74b2732e9dSimarom            self._shadow = False
75b2732e9dSimarom            self._zmq_socket = C.zmq_socket(context._zmq_ctx, socket_type)
76b2732e9dSimarom        if self._zmq_socket == ffi.NULL:
77b2732e9dSimarom            raise ZMQError()
78b2732e9dSimarom        self._closed = False
79b2732e9dSimarom        if context:
80b2732e9dSimarom            self._ref = context._add_socket(self)
81b2732e9dSimarom
82b2732e9dSimarom    @property
83b2732e9dSimarom    def underlying(self):
84b2732e9dSimarom        """The address of the underlying libzmq socket"""
85b2732e9dSimarom        return int(ffi.cast('size_t', self._zmq_socket))
86b2732e9dSimarom
87b2732e9dSimarom    @property
88b2732e9dSimarom    def closed(self):
89b2732e9dSimarom        return self._closed
90b2732e9dSimarom
91b2732e9dSimarom    def close(self, linger=None):
92b2732e9dSimarom        rc = 0
93b2732e9dSimarom        if not self._closed and hasattr(self, '_zmq_socket'):
94b2732e9dSimarom            if self._zmq_socket is not None:
95b2732e9dSimarom                rc = C.zmq_close(self._zmq_socket)
96b2732e9dSimarom            self._closed = True
97b2732e9dSimarom            if self.context:
98b2732e9dSimarom                self.context._rm_socket(self._ref)
99b2732e9dSimarom        return rc
100b2732e9dSimarom
101b2732e9dSimarom    def bind(self, address):
102b2732e9dSimarom        if isinstance(address, unicode):
103b2732e9dSimarom            address = address.encode('utf8')
104b2732e9dSimarom        rc = C.zmq_bind(self._zmq_socket, address)
105b2732e9dSimarom        if rc < 0:
106b2732e9dSimarom            if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
107b2732e9dSimarom                # py3compat: address is bytes, but msg wants str
108b2732e9dSimarom                if str is unicode:
109b2732e9dSimarom                    address = address.decode('utf-8', 'replace')
110b2732e9dSimarom                path = address.split('://', 1)[-1]
111b2732e9dSimarom                msg = ('ipc path "{0}" is longer than {1} '
112b2732e9dSimarom                                'characters (sizeof(sockaddr_un.sun_path)).'
113b2732e9dSimarom                                .format(path, IPC_PATH_MAX_LEN))
114b2732e9dSimarom                raise ZMQError(C.zmq_errno(), msg=msg)
115b2732e9dSimarom            else:
116b2732e9dSimarom                _check_rc(rc)
117b2732e9dSimarom
118b2732e9dSimarom    def unbind(self, address):
119b2732e9dSimarom        _check_version((3,2), "unbind")
120b2732e9dSimarom        if isinstance(address, unicode):
121b2732e9dSimarom            address = address.encode('utf8')
122b2732e9dSimarom        rc = C.zmq_unbind(self._zmq_socket, address)
123b2732e9dSimarom        _check_rc(rc)
124b2732e9dSimarom
125b2732e9dSimarom    def connect(self, address):
126b2732e9dSimarom        if isinstance(address, unicode):
127b2732e9dSimarom            address = address.encode('utf8')
128b2732e9dSimarom        rc = C.zmq_connect(self._zmq_socket, address)
129b2732e9dSimarom        _check_rc(rc)
130b2732e9dSimarom
131b2732e9dSimarom    def disconnect(self, address):
132b2732e9dSimarom        _check_version((3,2), "disconnect")
133b2732e9dSimarom        if isinstance(address, unicode):
134b2732e9dSimarom            address = address.encode('utf8')
135b2732e9dSimarom        rc = C.zmq_disconnect(self._zmq_socket, address)
136b2732e9dSimarom        _check_rc(rc)
137b2732e9dSimarom
138b2732e9dSimarom    def set(self, option, value):
139b2732e9dSimarom        length = None
140b2732e9dSimarom        if isinstance(value, unicode):
141b2732e9dSimarom            raise TypeError("unicode not allowed, use bytes")
142b2732e9dSimarom
143b2732e9dSimarom        if isinstance(value, bytes):
144b2732e9dSimarom            if option not in zmq.constants.bytes_sockopts:
145b2732e9dSimarom                raise TypeError("not a bytes sockopt: %s" % option)
146b2732e9dSimarom            length = len(value)
147b2732e9dSimarom
148b2732e9dSimarom        c_data = initialize_opt_pointer(option, value, length)
149b2732e9dSimarom
150b2732e9dSimarom        c_value_pointer = c_data[0]
151b2732e9dSimarom        c_sizet = c_data[1]
152b2732e9dSimarom
153b2732e9dSimarom        rc = C.zmq_setsockopt(self._zmq_socket,
154b2732e9dSimarom                               option,
155b2732e9dSimarom                               ffi.cast('void*', c_value_pointer),
156b2732e9dSimarom                               c_sizet)
157b2732e9dSimarom        _check_rc(rc)
158b2732e9dSimarom
159b2732e9dSimarom    def get(self, option):
160b2732e9dSimarom        c_data = new_pointer_from_opt(option, length=255)
161b2732e9dSimarom
162b2732e9dSimarom        c_value_pointer = c_data[0]
163b2732e9dSimarom        c_sizet_pointer = c_data[1]
164b2732e9dSimarom
165b2732e9dSimarom        rc = C.zmq_getsockopt(self._zmq_socket,
166b2732e9dSimarom                               option,
167b2732e9dSimarom                               c_value_pointer,
168b2732e9dSimarom                               c_sizet_pointer)
169b2732e9dSimarom        _check_rc(rc)
170b2732e9dSimarom
171b2732e9dSimarom        sz = c_sizet_pointer[0]
172b2732e9dSimarom        v = value_from_opt_pointer(option, c_value_pointer, sz)
173b2732e9dSimarom        if option != zmq.IDENTITY and option in zmq.constants.bytes_sockopts and v.endswith(b'\0'):
174b2732e9dSimarom            v = v[:-1]
175b2732e9dSimarom        return v
176b2732e9dSimarom
177b2732e9dSimarom    def send(self, message, flags=0, copy=False, track=False):
178b2732e9dSimarom        if isinstance(message, unicode):
179b2732e9dSimarom            raise TypeError("Message must be in bytes, not an unicode Object")
180b2732e9dSimarom
181b2732e9dSimarom        if isinstance(message, Frame):
182b2732e9dSimarom            message = message.bytes
183b2732e9dSimarom
184b2732e9dSimarom        zmq_msg = ffi.new('zmq_msg_t*')
185b2732e9dSimarom        c_message = ffi.new('char[]', message)
186b2732e9dSimarom        rc = C.zmq_msg_init_size(zmq_msg, len(message))
187b2732e9dSimarom        C.memcpy(C.zmq_msg_data(zmq_msg), c_message, len(message))
188b2732e9dSimarom
189b2732e9dSimarom        rc = C.zmq_msg_send(zmq_msg, self._zmq_socket, flags)
190b2732e9dSimarom        C.zmq_msg_close(zmq_msg)
191b2732e9dSimarom        _check_rc(rc)
192b2732e9dSimarom
193b2732e9dSimarom        if track:
194b2732e9dSimarom            return zmq.MessageTracker()
195b2732e9dSimarom
196b2732e9dSimarom    def recv(self, flags=0, copy=True, track=False):
197b2732e9dSimarom        zmq_msg = ffi.new('zmq_msg_t*')
198b2732e9dSimarom        C.zmq_msg_init(zmq_msg)
199b2732e9dSimarom
200b2732e9dSimarom        rc = C.zmq_msg_recv(zmq_msg, self._zmq_socket, flags)
201b2732e9dSimarom
202b2732e9dSimarom        if rc < 0:
203b2732e9dSimarom            C.zmq_msg_close(zmq_msg)
204b2732e9dSimarom            _check_rc(rc)
205b2732e9dSimarom
206b2732e9dSimarom        _buffer = ffi.buffer(C.zmq_msg_data(zmq_msg), C.zmq_msg_size(zmq_msg))
207b2732e9dSimarom        value = _buffer[:]
208b2732e9dSimarom        C.zmq_msg_close(zmq_msg)
209b2732e9dSimarom
210b2732e9dSimarom        frame = Frame(value, track=track)
211b2732e9dSimarom        frame.more = self.getsockopt(RCVMORE)
212b2732e9dSimarom
213b2732e9dSimarom        if copy:
214b2732e9dSimarom            return frame.bytes
215b2732e9dSimarom        else:
216b2732e9dSimarom            return frame
217b2732e9dSimarom
218b2732e9dSimarom    def monitor(self, addr, events=-1):
219b2732e9dSimarom        """s.monitor(addr, flags)
220b2732e9dSimarom
221b2732e9dSimarom        Start publishing socket events on inproc.
222b2732e9dSimarom        See libzmq docs for zmq_monitor for details.
223b2732e9dSimarom
224b2732e9dSimarom        Note: requires libzmq >= 3.2
225b2732e9dSimarom
226b2732e9dSimarom        Parameters
227b2732e9dSimarom        ----------
228b2732e9dSimarom        addr : str
229b2732e9dSimarom            The inproc url used for monitoring. Passing None as
230b2732e9dSimarom            the addr will cause an existing socket monitor to be
231b2732e9dSimarom            deregistered.
232b2732e9dSimarom        events : int [default: zmq.EVENT_ALL]
233b2732e9dSimarom            The zmq event bitmask for which events will be sent to the monitor.
234b2732e9dSimarom        """
235b2732e9dSimarom
236b2732e9dSimarom        _check_version((3,2), "monitor")
237b2732e9dSimarom        if events < 0:
238b2732e9dSimarom            events = zmq.EVENT_ALL
239b2732e9dSimarom        if addr is None:
240b2732e9dSimarom            addr = ffi.NULL
241b2732e9dSimarom        rc = C.zmq_socket_monitor(self._zmq_socket, addr, events)
242b2732e9dSimarom
243b2732e9dSimarom
244b2732e9dSimarom__all__ = ['Socket', 'IPC_PATH_MAX_LEN']
245