monitoredqueue.pxd revision 781d71db
1"""MonitoredQueue class declarations.
2
3Authors
4-------
5* MinRK
6* Brian Granger
7"""
8
9#
10#    Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
11#
12#    This file is part of pyzmq, but is derived and adapted from zmq_queue.cpp
13#    originally from libzmq-2.1.6, used under LGPLv3
14#
15#    pyzmq is free software; you can redistribute it and/or modify it under
16#    the terms of the Lesser GNU General Public License as published by
17#    the Free Software Foundation; either version 3 of the License, or
18#    (at your option) any later version.
19#
20#    pyzmq is distributed in the hope that it will be useful,
21#    but WITHOUT ANY WARRANTY; without even the implied warranty of
22#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23#    Lesser GNU General Public License for more details.
24#
25#    You should have received a copy of the Lesser GNU General Public License
26#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
27#
28
29#-----------------------------------------------------------------------------
30# Imports
31#-----------------------------------------------------------------------------
32
33from libzmq cimport *
34
35#-----------------------------------------------------------------------------
36# MonitoredQueue C functions
37#-----------------------------------------------------------------------------
38
39cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_, 
40                zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg,
41                bint swap_ids) nogil:
42    cdef int rc
43    cdef int64_t flag_2
44    cdef int flag_3
45    cdef int flags
46    cdef bint more
47    cdef size_t flagsz
48    cdef void * flag_ptr
49    
50    if ZMQ_VERSION_MAJOR < 3:
51        flagsz = sizeof (int64_t)
52        flag_ptr = &flag_2
53    else:
54        flagsz = sizeof (int)
55        flag_ptr = &flag_3
56    
57    if swap_ids:# both router, must send second identity first
58        # recv two ids into msg, id_msg
59        rc = zmq_msg_recv(&msg, insocket_, 0)
60        if rc < 0: return rc
61        
62        rc = zmq_msg_recv(&id_msg, insocket_, 0)
63        if rc < 0: return rc
64
65        # send second id (id_msg) first
66        #!!!! always send a copy before the original !!!!
67        rc = zmq_msg_copy(&side_msg, &id_msg)
68        if rc < 0: return rc
69        rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
70        if rc < 0: return rc
71        rc = zmq_msg_send(&id_msg, sidesocket_, ZMQ_SNDMORE)
72        if rc < 0: return rc
73        # send first id (msg) second
74        rc = zmq_msg_copy(&side_msg, &msg)
75        if rc < 0: return rc
76        rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
77        if rc < 0: return rc
78        rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
79        if rc < 0: return rc
80    while (True):
81        rc = zmq_msg_recv(&msg, insocket_, 0)
82        if rc < 0: return rc
83        # assert (rc == 0)
84        rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz)
85        if rc < 0: return rc
86        flags = 0
87        if ZMQ_VERSION_MAJOR < 3:
88            if flag_2:
89                flags |= ZMQ_SNDMORE
90        else:
91            if flag_3:
92                flags |= ZMQ_SNDMORE
93            # LABEL has been removed:
94            # rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz)
95            # if flag_3:
96            #     flags |= ZMQ_SNDLABEL
97        # assert (rc == 0)
98
99        rc = zmq_msg_copy(&side_msg, &msg)
100        if rc < 0: return rc
101        if flags:
102            rc = zmq_msg_send(&side_msg, outsocket_, flags)
103            if rc < 0: return rc
104            # only SNDMORE for side-socket
105            rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
106            if rc < 0: return rc
107        else:
108            rc = zmq_msg_send(&side_msg, outsocket_, 0)
109            if rc < 0: return rc
110            rc = zmq_msg_send(&msg, sidesocket_, 0)
111            if rc < 0: return rc
112            break
113    return rc
114
115# the MonitoredQueue C function, adapted from zmq::queue.cpp :
116cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
117                        void *sidesocket_, zmq_msg_t *in_msg_ptr, 
118                        zmq_msg_t *out_msg_ptr, int swap_ids) nogil:
119    """The actual C function for a monitored queue device. 
120
121    See ``monitored_queue()`` for details.
122    """
123    
124    cdef zmq_msg_t msg
125    cdef int rc = zmq_msg_init (&msg)
126    cdef zmq_msg_t id_msg
127    rc = zmq_msg_init (&id_msg)
128    if rc < 0: return rc
129    cdef zmq_msg_t side_msg
130    rc = zmq_msg_init (&side_msg)
131    if rc < 0: return rc
132    
133    cdef zmq_pollitem_t items [2]
134    items [0].socket = insocket_
135    items [0].fd = 0
136    items [0].events = ZMQ_POLLIN
137    items [0].revents = 0
138    items [1].socket = outsocket_
139    items [1].fd = 0
140    items [1].events = ZMQ_POLLIN
141    items [1].revents = 0
142    # I don't think sidesocket should be polled?
143    # items [2].socket = sidesocket_
144    # items [2].fd = 0
145    # items [2].events = ZMQ_POLLIN
146    # items [2].revents = 0
147    
148    while (True):
149    
150        # //  Wait while there are either requests or replies to process.
151        rc = zmq_poll (&items [0], 2, -1)
152        if rc < 0: return rc
153        # //  The algorithm below asumes ratio of request and replies processed
154        # //  under full load to be 1:1. Although processing requests replies
155        # //  first is tempting it is suspectible to DoS attacks (overloading
156        # //  the system with unsolicited replies).
157        # 
158        # //  Process a request.
159        if (items [0].revents & ZMQ_POLLIN):
160            # send in_prefix to side socket
161            rc = zmq_msg_copy(&side_msg, in_msg_ptr)
162            if rc < 0: return rc
163            rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
164            if rc < 0: return rc
165            # relay the rest of the message
166            rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
167            if rc < 0: return rc
168        if (items [1].revents & ZMQ_POLLIN):
169            # send out_prefix to side socket
170            rc = zmq_msg_copy(&side_msg, out_msg_ptr)
171            if rc < 0: return rc
172            rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
173            if rc < 0: return rc
174            # relay the rest of the message
175            rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
176            if rc < 0: return rc
177    return rc
178