1d3907f0dSYaroslav Brustinov# -*- coding: utf8 -*-
2d3907f0dSYaroslav Brustinov# Copyright (C) PyZMQ Developers
3d3907f0dSYaroslav Brustinov# Distributed under the terms of the Modified BSD License.
4d3907f0dSYaroslav Brustinov
5d3907f0dSYaroslav Brustinov
6d3907f0dSYaroslav Brustinovimport sys
7d3907f0dSYaroslav Brustinovimport time
8d3907f0dSYaroslav Brustinov
9d3907f0dSYaroslav Brustinovfrom unittest import TestCase
10d3907f0dSYaroslav Brustinov
11d3907f0dSYaroslav Brustinovimport zmq
12d3907f0dSYaroslav Brustinovfrom zmq.eventloop import ioloop, zmqstream
13d3907f0dSYaroslav Brustinov
14d3907f0dSYaroslav Brustinovclass TestZMQStream(TestCase):
15d3907f0dSYaroslav Brustinov
16d3907f0dSYaroslav Brustinov    def setUp(self):
17d3907f0dSYaroslav Brustinov        self.context = zmq.Context()
18d3907f0dSYaroslav Brustinov        self.socket = self.context.socket(zmq.REP)
19d3907f0dSYaroslav Brustinov        self.loop = ioloop.IOLoop.instance()
20d3907f0dSYaroslav Brustinov        self.stream = zmqstream.ZMQStream(self.socket)
21d3907f0dSYaroslav Brustinov
22d3907f0dSYaroslav Brustinov    def tearDown(self):
23d3907f0dSYaroslav Brustinov        self.socket.close()
24d3907f0dSYaroslav Brustinov        self.context.term()
25d3907f0dSYaroslav Brustinov
26d3907f0dSYaroslav Brustinov    def test_callable_check(self):
27d3907f0dSYaroslav Brustinov        """Ensure callable check works (py3k)."""
28d3907f0dSYaroslav Brustinov
29d3907f0dSYaroslav Brustinov        self.stream.on_send(lambda *args: None)
30d3907f0dSYaroslav Brustinov        self.stream.on_recv(lambda *args: None)
31d3907f0dSYaroslav Brustinov        self.assertRaises(AssertionError, self.stream.on_recv, 1)
32d3907f0dSYaroslav Brustinov        self.assertRaises(AssertionError, self.stream.on_send, 1)
33d3907f0dSYaroslav Brustinov        self.assertRaises(AssertionError, self.stream.on_recv, zmq)
34d3907f0dSYaroslav Brustinov
35