test_monitor.py revision 781d71db
1# -*- coding: utf-8 -*-
2# Copyright (C) PyZMQ Developers
3# Distributed under the terms of the Modified BSD License.
4
5
6import sys
7import time
8import struct
9
10from unittest import TestCase
11
12import zmq
13from zmq.tests import BaseZMQTestCase, skip_if, skip_pypy
14from zmq.utils.monitor import recv_monitor_message
15
16skip_lt_4 = skip_if(zmq.zmq_version_info() < (4,), "requires zmq >= 4")
17
18class TestSocketMonitor(BaseZMQTestCase):
19
20    @skip_lt_4
21    def test_monitor(self):
22        """Test monitoring interface for sockets."""
23        s_rep = self.context.socket(zmq.REP)
24        s_req = self.context.socket(zmq.REQ)
25        self.sockets.extend([s_rep, s_req])
26        s_req.bind("tcp://127.0.0.1:6666")
27        # try monitoring the REP socket
28
29        s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
30        # create listening socket for monitor
31        s_event = self.context.socket(zmq.PAIR)
32        self.sockets.append(s_event)
33        s_event.connect("inproc://monitor.rep")
34        s_event.linger = 0
35        # test receive event for connect event
36        s_rep.connect("tcp://127.0.0.1:6666")
37        m = recv_monitor_message(s_event)
38        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
39            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
40            # test receive event for connected event
41            m = recv_monitor_message(s_event)
42        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
43        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
44
45        # test monitor can be disabled.
46        s_rep.disable_monitor()
47        m = recv_monitor_message(s_event)
48        self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
49
50
51    @skip_lt_4
52    def test_monitor_connected(self):
53        """Test connected monitoring socket."""
54        s_rep = self.context.socket(zmq.REP)
55        s_req = self.context.socket(zmq.REQ)
56        self.sockets.extend([s_rep, s_req])
57        s_req.bind("tcp://127.0.0.1:6667")
58        # try monitoring the REP socket
59        # create listening socket for monitor
60        s_event = s_rep.get_monitor_socket()
61        s_event.linger = 0
62        self.sockets.append(s_event)
63        # test receive event for connect event
64        s_rep.connect("tcp://127.0.0.1:6667")
65        m = recv_monitor_message(s_event)
66        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
67            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
68            # test receive event for connected event
69            m = recv_monitor_message(s_event)
70        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
71        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
72