test_device.py revision d3907f0d
1# Copyright (C) PyZMQ Developers
2# Distributed under the terms of the Modified BSD License.
3
4import time
5
6import zmq
7from zmq import devices
8from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest, PYPY
9from zmq.utils.strtypes import (bytes,unicode,basestring)
10
11if PYPY:
12    # cleanup of shared Context doesn't work on PyPy
13    devices.Device.context_factory = zmq.Context
14
15class TestDevice(BaseZMQTestCase):
16
17    def test_device_types(self):
18        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
19            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
20            self.assertEqual(dev.device_type, devtype)
21            del dev
22
23    def test_device_attributes(self):
24        dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
25        self.assertEqual(dev.in_type, zmq.SUB)
26        self.assertEqual(dev.out_type, zmq.PUB)
27        self.assertEqual(dev.device_type, zmq.QUEUE)
28        self.assertEqual(dev.daemon, True)
29        del dev
30
31    def test_tsdevice_attributes(self):
32        dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
33        self.assertEqual(dev.in_type, zmq.SUB)
34        self.assertEqual(dev.out_type, zmq.PUB)
35        self.assertEqual(dev.device_type, zmq.QUEUE)
36        self.assertEqual(dev.daemon, True)
37        del dev
38
39
40    def test_single_socket_forwarder_connect(self):
41        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
42        req = self.context.socket(zmq.REQ)
43        port = req.bind_to_random_port('tcp://127.0.0.1')
44        dev.connect_in('tcp://127.0.0.1:%i'%port)
45        dev.start()
46        time.sleep(.25)
47        msg = b'hello'
48        req.send(msg)
49        self.assertEqual(msg, self.recv(req))
50        del dev
51        req.close()
52        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
53        req = self.context.socket(zmq.REQ)
54        port = req.bind_to_random_port('tcp://127.0.0.1')
55        dev.connect_out('tcp://127.0.0.1:%i'%port)
56        dev.start()
57        time.sleep(.25)
58        msg = b'hello again'
59        req.send(msg)
60        self.assertEqual(msg, self.recv(req))
61        del dev
62        req.close()
63
64    def test_single_socket_forwarder_bind(self):
65        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
66        # select random port:
67        binder = self.context.socket(zmq.REQ)
68        port = binder.bind_to_random_port('tcp://127.0.0.1')
69        binder.close()
70        time.sleep(0.1)
71        req = self.context.socket(zmq.REQ)
72        req.connect('tcp://127.0.0.1:%i'%port)
73        dev.bind_in('tcp://127.0.0.1:%i'%port)
74        dev.start()
75        time.sleep(.25)
76        msg = b'hello'
77        req.send(msg)
78        self.assertEqual(msg, self.recv(req))
79        del dev
80        req.close()
81        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
82        # select random port:
83        binder = self.context.socket(zmq.REQ)
84        port = binder.bind_to_random_port('tcp://127.0.0.1')
85        binder.close()
86        time.sleep(0.1)
87        req = self.context.socket(zmq.REQ)
88        req.connect('tcp://127.0.0.1:%i'%port)
89        dev.bind_in('tcp://127.0.0.1:%i'%port)
90        dev.start()
91        time.sleep(.25)
92        msg = b'hello again'
93        req.send(msg)
94        self.assertEqual(msg, self.recv(req))
95        del dev
96        req.close()
97
98    def test_proxy(self):
99        if zmq.zmq_version_info() < (3,2):
100            raise SkipTest("Proxies only in libzmq >= 3")
101        dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
102        binder = self.context.socket(zmq.REQ)
103        iface = 'tcp://127.0.0.1'
104        port = binder.bind_to_random_port(iface)
105        port2 = binder.bind_to_random_port(iface)
106        port3 = binder.bind_to_random_port(iface)
107        binder.close()
108        time.sleep(0.1)
109        dev.bind_in("%s:%i" % (iface, port))
110        dev.bind_out("%s:%i" % (iface, port2))
111        dev.bind_mon("%s:%i" % (iface, port3))
112        dev.start()
113        time.sleep(0.25)
114        msg = b'hello'
115        push = self.context.socket(zmq.PUSH)
116        push.connect("%s:%i" % (iface, port))
117        pull = self.context.socket(zmq.PULL)
118        pull.connect("%s:%i" % (iface, port2))
119        mon = self.context.socket(zmq.PULL)
120        mon.connect("%s:%i" % (iface, port3))
121        push.send(msg)
122        self.sockets.extend([push, pull, mon])
123        self.assertEqual(msg, self.recv(pull))
124        self.assertEqual(msg, self.recv(mon))
125
126if have_gevent:
127    import gevent
128    import zmq.green
129
130    class TestDeviceGreen(GreenTest, BaseZMQTestCase):
131
132        def test_green_device(self):
133            rep = self.context.socket(zmq.REP)
134            req = self.context.socket(zmq.REQ)
135            self.sockets.extend([req, rep])
136            port = rep.bind_to_random_port('tcp://127.0.0.1')
137            g = gevent.spawn(zmq.green.device, zmq.QUEUE, rep, rep)
138            req.connect('tcp://127.0.0.1:%i' % port)
139            req.send(b'hi')
140            timeout = gevent.Timeout(3)
141            timeout.start()
142            receiver = gevent.spawn(req.recv)
143            self.assertEqual(receiver.get(2), b'hi')
144            timeout.cancel()
145            g.kill(block=True)
146
147