monitoredqueue.py revision 781d71db
1"""pure Python monitored_queue function
2
3For use when Cython extension is unavailable (PyPy).
4
5Authors
6-------
7* MinRK
8"""
9
10# Copyright (C) PyZMQ Developers
11# Distributed under the terms of the Modified BSD License.
12
13import zmq
14
15def _relay(ins, outs, sides, prefix, swap_ids):
16    msg = ins.recv_multipart()
17    if swap_ids:
18        msg[:2] = msg[:2][::-1]
19    outs.send_multipart(msg)
20    sides.send_multipart([prefix] + msg)
21
22def monitored_queue(in_socket, out_socket, mon_socket,
23                    in_prefix=b'in', out_prefix=b'out'):
24
25    swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
26
27    poller = zmq.Poller()
28    poller.register(in_socket, zmq.POLLIN)
29    poller.register(out_socket, zmq.POLLIN)
30    while True:
31        events = dict(poller.poll())
32        if in_socket in events:
33            _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
34        if out_socket in events:
35            _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
36
37__all__ = ['monitored_queue']
38