1b2732e9dSimarom# -*- coding: utf8 -*-
2b2732e9dSimarom
3b2732e9dSimarom# Copyright (C) PyZMQ Developers
4b2732e9dSimarom# Distributed under the terms of the Modified BSD License.
5b2732e9dSimarom
6b2732e9dSimaromimport logging
7b2732e9dSimaromimport os
8b2732e9dSimaromimport shutil
9b2732e9dSimaromimport sys
10b2732e9dSimaromimport tempfile
11b2732e9dSimarom
12b2732e9dSimaromimport zmq.auth
13b2732e9dSimaromfrom zmq.auth.ioloop import IOLoopAuthenticator
14b2732e9dSimaromfrom zmq.auth.thread import ThreadAuthenticator
15b2732e9dSimarom
16b2732e9dSimaromfrom zmq.eventloop import ioloop, zmqstream
17b2732e9dSimaromfrom zmq.tests import (BaseZMQTestCase, SkipTest)
18b2732e9dSimarom
19b2732e9dSimaromclass BaseAuthTestCase(BaseZMQTestCase):
20b2732e9dSimarom    def setUp(self):
21b2732e9dSimarom        if zmq.zmq_version_info() < (4,0):
22b2732e9dSimarom            raise SkipTest("security is new in libzmq 4.0")
23b2732e9dSimarom        try:
24b2732e9dSimarom            zmq.curve_keypair()
25b2732e9dSimarom        except zmq.ZMQError:
26b2732e9dSimarom            raise SkipTest("security requires libzmq to be linked against libsodium")
27b2732e9dSimarom        super(BaseAuthTestCase, self).setUp()
28b2732e9dSimarom        # enable debug logging while we run tests
29b2732e9dSimarom        logging.getLogger('zmq.auth').setLevel(logging.DEBUG)
30b2732e9dSimarom        self.auth = self.make_auth()
31b2732e9dSimarom        self.auth.start()
32b2732e9dSimarom        self.base_dir, self.public_keys_dir, self.secret_keys_dir = self.create_certs()
33b2732e9dSimarom
34b2732e9dSimarom    def make_auth(self):
35b2732e9dSimarom        raise NotImplementedError()
36b2732e9dSimarom
37b2732e9dSimarom    def tearDown(self):
38b2732e9dSimarom        if self.auth:
39b2732e9dSimarom            self.auth.stop()
40b2732e9dSimarom            self.auth = None
41b2732e9dSimarom        self.remove_certs(self.base_dir)
42b2732e9dSimarom        super(BaseAuthTestCase, self).tearDown()
43b2732e9dSimarom
44b2732e9dSimarom    def create_certs(self):
45b2732e9dSimarom        """Create CURVE certificates for a test"""
46b2732e9dSimarom
47b2732e9dSimarom        # Create temporary CURVE keypairs for this test run. We create all keys in a
48b2732e9dSimarom        # temp directory and then move them into the appropriate private or public
49b2732e9dSimarom        # directory.
50b2732e9dSimarom
51b2732e9dSimarom        base_dir = tempfile.mkdtemp()
52b2732e9dSimarom        keys_dir = os.path.join(base_dir, 'certificates')
53b2732e9dSimarom        public_keys_dir = os.path.join(base_dir, 'public_keys')
54b2732e9dSimarom        secret_keys_dir = os.path.join(base_dir, 'private_keys')
55b2732e9dSimarom
56b2732e9dSimarom        os.mkdir(keys_dir)
57b2732e9dSimarom        os.mkdir(public_keys_dir)
58b2732e9dSimarom        os.mkdir(secret_keys_dir)
59b2732e9dSimarom
60b2732e9dSimarom        server_public_file, server_secret_file = zmq.auth.create_certificates(keys_dir, "server")
61b2732e9dSimarom        client_public_file, client_secret_file = zmq.auth.create_certificates(keys_dir, "client")
62b2732e9dSimarom
63b2732e9dSimarom        for key_file in os.listdir(keys_dir):
64b2732e9dSimarom            if key_file.endswith(".key"):
65b2732e9dSimarom                shutil.move(os.path.join(keys_dir, key_file),
66b2732e9dSimarom                            os.path.join(public_keys_dir, '.'))
67b2732e9dSimarom
68b2732e9dSimarom        for key_file in os.listdir(keys_dir):
69b2732e9dSimarom            if key_file.endswith(".key_secret"):
70b2732e9dSimarom                shutil.move(os.path.join(keys_dir, key_file),
71b2732e9dSimarom                            os.path.join(secret_keys_dir, '.'))
72b2732e9dSimarom
73b2732e9dSimarom        return (base_dir, public_keys_dir, secret_keys_dir)
74b2732e9dSimarom
75b2732e9dSimarom    def remove_certs(self, base_dir):
76b2732e9dSimarom        """Remove certificates for a test"""
77b2732e9dSimarom        shutil.rmtree(base_dir)
78b2732e9dSimarom
79b2732e9dSimarom    def load_certs(self, secret_keys_dir):
80b2732e9dSimarom        """Return server and client certificate keys"""
81b2732e9dSimarom        server_secret_file = os.path.join(secret_keys_dir, "server.key_secret")
82b2732e9dSimarom        client_secret_file = os.path.join(secret_keys_dir, "client.key_secret")
83b2732e9dSimarom
84b2732e9dSimarom        server_public, server_secret = zmq.auth.load_certificate(server_secret_file)
85b2732e9dSimarom        client_public, client_secret = zmq.auth.load_certificate(client_secret_file)
86b2732e9dSimarom
87b2732e9dSimarom        return server_public, server_secret, client_public, client_secret
88b2732e9dSimarom
89b2732e9dSimarom
90b2732e9dSimaromclass TestThreadAuthentication(BaseAuthTestCase):
91b2732e9dSimarom    """Test authentication running in a thread"""
92b2732e9dSimarom
93b2732e9dSimarom    def make_auth(self):
94b2732e9dSimarom        return ThreadAuthenticator(self.context)
95b2732e9dSimarom
96b2732e9dSimarom    def can_connect(self, server, client):
97b2732e9dSimarom        """Check if client can connect to server using tcp transport"""
98b2732e9dSimarom        result = False
99b2732e9dSimarom        iface = 'tcp://127.0.0.1'
100b2732e9dSimarom        port = server.bind_to_random_port(iface)
101b2732e9dSimarom        client.connect("%s:%i" % (iface, port))
102b2732e9dSimarom        msg = [b"Hello World"]
103b2732e9dSimarom        server.send_multipart(msg)
104b2732e9dSimarom        if client.poll(1000):
105b2732e9dSimarom            rcvd_msg = client.recv_multipart()
106b2732e9dSimarom            self.assertEqual(rcvd_msg, msg)
107b2732e9dSimarom            result = True
108b2732e9dSimarom        return result
109b2732e9dSimarom
110b2732e9dSimarom    def test_null(self):
111b2732e9dSimarom        """threaded auth - NULL"""
112b2732e9dSimarom        # A default NULL connection should always succeed, and not
113b2732e9dSimarom        # go through our authentication infrastructure at all.
114b2732e9dSimarom        self.auth.stop()
115b2732e9dSimarom        self.auth = None
116b2732e9dSimarom
117b2732e9dSimarom        server = self.socket(zmq.PUSH)
118b2732e9dSimarom        client = self.socket(zmq.PULL)
119b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
120b2732e9dSimarom
121b2732e9dSimarom        # By setting a domain we switch on authentication for NULL sockets,
122b2732e9dSimarom        # though no policies are configured yet. The client connection
123b2732e9dSimarom        # should still be allowed.
124b2732e9dSimarom        server = self.socket(zmq.PUSH)
125b2732e9dSimarom        server.zap_domain = b'global'
126b2732e9dSimarom        client = self.socket(zmq.PULL)
127b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
128b2732e9dSimarom
129b2732e9dSimarom    def test_blacklist(self):
130b2732e9dSimarom        """threaded auth - Blacklist"""
131b2732e9dSimarom        # Blacklist 127.0.0.1, connection should fail
132b2732e9dSimarom        self.auth.deny('127.0.0.1')
133b2732e9dSimarom        server = self.socket(zmq.PUSH)
134b2732e9dSimarom        # By setting a domain we switch on authentication for NULL sockets,
135b2732e9dSimarom        # though no policies are configured yet.
136b2732e9dSimarom        server.zap_domain = b'global'
137b2732e9dSimarom        client = self.socket(zmq.PULL)
138b2732e9dSimarom        self.assertFalse(self.can_connect(server, client))
139b2732e9dSimarom
140b2732e9dSimarom    def test_whitelist(self):
141b2732e9dSimarom        """threaded auth - Whitelist"""
142b2732e9dSimarom        # Whitelist 127.0.0.1, connection should pass"
143b2732e9dSimarom        self.auth.allow('127.0.0.1')
144b2732e9dSimarom        server = self.socket(zmq.PUSH)
145b2732e9dSimarom        # By setting a domain we switch on authentication for NULL sockets,
146b2732e9dSimarom        # though no policies are configured yet.
147b2732e9dSimarom        server.zap_domain = b'global'
148b2732e9dSimarom        client = self.socket(zmq.PULL)
149b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
150b2732e9dSimarom
151b2732e9dSimarom    def test_plain(self):
152b2732e9dSimarom        """threaded auth - PLAIN"""
153b2732e9dSimarom
154b2732e9dSimarom        # Try PLAIN authentication - without configuring server, connection should fail
155b2732e9dSimarom        server = self.socket(zmq.PUSH)
156b2732e9dSimarom        server.plain_server = True
157b2732e9dSimarom        client = self.socket(zmq.PULL)
158b2732e9dSimarom        client.plain_username = b'admin'
159b2732e9dSimarom        client.plain_password = b'Password'
160b2732e9dSimarom        self.assertFalse(self.can_connect(server, client))
161b2732e9dSimarom
162b2732e9dSimarom        # Try PLAIN authentication - with server configured, connection should pass
163b2732e9dSimarom        server = self.socket(zmq.PUSH)
164b2732e9dSimarom        server.plain_server = True
165b2732e9dSimarom        client = self.socket(zmq.PULL)
166b2732e9dSimarom        client.plain_username = b'admin'
167b2732e9dSimarom        client.plain_password = b'Password'
168b2732e9dSimarom        self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
169b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
170b2732e9dSimarom
171b2732e9dSimarom        # Try PLAIN authentication - with bogus credentials, connection should fail
172b2732e9dSimarom        server = self.socket(zmq.PUSH)
173b2732e9dSimarom        server.plain_server = True
174b2732e9dSimarom        client = self.socket(zmq.PULL)
175b2732e9dSimarom        client.plain_username = b'admin'
176b2732e9dSimarom        client.plain_password = b'Bogus'
177b2732e9dSimarom        self.assertFalse(self.can_connect(server, client))
178b2732e9dSimarom
179b2732e9dSimarom        # Remove authenticator and check that a normal connection works
180b2732e9dSimarom        self.auth.stop()
181b2732e9dSimarom        self.auth = None
182b2732e9dSimarom
183b2732e9dSimarom        server = self.socket(zmq.PUSH)
184b2732e9dSimarom        client = self.socket(zmq.PULL)
185b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
186b2732e9dSimarom        client.close()
187b2732e9dSimarom        server.close()
188b2732e9dSimarom
189b2732e9dSimarom    def test_curve(self):
190b2732e9dSimarom        """threaded auth - CURVE"""
191b2732e9dSimarom        self.auth.allow('127.0.0.1')
192b2732e9dSimarom        certs = self.load_certs(self.secret_keys_dir)
193b2732e9dSimarom        server_public, server_secret, client_public, client_secret = certs
194b2732e9dSimarom
195b2732e9dSimarom        #Try CURVE authentication - without configuring server, connection should fail
196b2732e9dSimarom        server = self.socket(zmq.PUSH)
197b2732e9dSimarom        server.curve_publickey = server_public
198b2732e9dSimarom        server.curve_secretkey = server_secret
199b2732e9dSimarom        server.curve_server = True
200b2732e9dSimarom        client = self.socket(zmq.PULL)
201b2732e9dSimarom        client.curve_publickey = client_public
202b2732e9dSimarom        client.curve_secretkey = client_secret
203b2732e9dSimarom        client.curve_serverkey = server_public
204b2732e9dSimarom        self.assertFalse(self.can_connect(server, client))
205b2732e9dSimarom
206b2732e9dSimarom        #Try CURVE authentication - with server configured to CURVE_ALLOW_ANY, connection should pass
207b2732e9dSimarom        self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
208b2732e9dSimarom        server = self.socket(zmq.PUSH)
209b2732e9dSimarom        server.curve_publickey = server_public
210b2732e9dSimarom        server.curve_secretkey = server_secret
211b2732e9dSimarom        server.curve_server = True
212b2732e9dSimarom        client = self.socket(zmq.PULL)
213b2732e9dSimarom        client.curve_publickey = client_public
214b2732e9dSimarom        client.curve_secretkey = client_secret
215b2732e9dSimarom        client.curve_serverkey = server_public
216b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
217b2732e9dSimarom
218b2732e9dSimarom        # Try CURVE authentication - with server configured, connection should pass
219b2732e9dSimarom        self.auth.configure_curve(domain='*', location=self.public_keys_dir)
220b2732e9dSimarom        server = self.socket(zmq.PUSH)
221b2732e9dSimarom        server.curve_publickey = server_public
222b2732e9dSimarom        server.curve_secretkey = server_secret
223b2732e9dSimarom        server.curve_server = True
224b2732e9dSimarom        client = self.socket(zmq.PULL)
225b2732e9dSimarom        client.curve_publickey = client_public
226b2732e9dSimarom        client.curve_secretkey = client_secret
227b2732e9dSimarom        client.curve_serverkey = server_public
228b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
229b2732e9dSimarom
230b2732e9dSimarom        # Remove authenticator and check that a normal connection works
231b2732e9dSimarom        self.auth.stop()
232b2732e9dSimarom        self.auth = None
233b2732e9dSimarom
234b2732e9dSimarom        # Try connecting using NULL and no authentication enabled, connection should pass
235b2732e9dSimarom        server = self.socket(zmq.PUSH)
236b2732e9dSimarom        client = self.socket(zmq.PULL)
237b2732e9dSimarom        self.assertTrue(self.can_connect(server, client))
238b2732e9dSimarom
239b2732e9dSimarom
240b2732e9dSimaromdef with_ioloop(method, expect_success=True):
241b2732e9dSimarom    """decorator for running tests with an IOLoop"""
242b2732e9dSimarom    def test_method(self):
243b2732e9dSimarom        r = method(self)
244b2732e9dSimarom
245b2732e9dSimarom        loop = self.io_loop
246b2732e9dSimarom        if expect_success:
247b2732e9dSimarom            self.pullstream.on_recv(self.on_message_succeed)
248b2732e9dSimarom        else:
249b2732e9dSimarom            self.pullstream.on_recv(self.on_message_fail)
250b2732e9dSimarom
251b2732e9dSimarom        t = loop.time()
252b2732e9dSimarom        loop.add_callback(self.attempt_connection)
253b2732e9dSimarom        loop.add_callback(self.send_msg)
254b2732e9dSimarom        if expect_success:
255b2732e9dSimarom            loop.add_timeout(t + 1, self.on_test_timeout_fail)
256b2732e9dSimarom        else:
257b2732e9dSimarom            loop.add_timeout(t + 1, self.on_test_timeout_succeed)
258b2732e9dSimarom
259b2732e9dSimarom        loop.start()
260b2732e9dSimarom        if self.fail_msg:
261b2732e9dSimarom            self.fail(self.fail_msg)
262b2732e9dSimarom
263b2732e9dSimarom        return r
264b2732e9dSimarom    return test_method
265b2732e9dSimarom
266b2732e9dSimaromdef should_auth(method):
267b2732e9dSimarom    return with_ioloop(method, True)
268b2732e9dSimarom
269b2732e9dSimaromdef should_not_auth(method):
270b2732e9dSimarom    return with_ioloop(method, False)
271b2732e9dSimarom
272b2732e9dSimaromclass TestIOLoopAuthentication(BaseAuthTestCase):
273b2732e9dSimarom    """Test authentication running in ioloop"""
274b2732e9dSimarom
275b2732e9dSimarom    def setUp(self):
276b2732e9dSimarom        self.fail_msg = None
277b2732e9dSimarom        self.io_loop = ioloop.IOLoop()
278b2732e9dSimarom        super(TestIOLoopAuthentication, self).setUp()
279b2732e9dSimarom        self.server = self.socket(zmq.PUSH)
280b2732e9dSimarom        self.client = self.socket(zmq.PULL)
281b2732e9dSimarom        self.pushstream = zmqstream.ZMQStream(self.server, self.io_loop)
282b2732e9dSimarom        self.pullstream = zmqstream.ZMQStream(self.client, self.io_loop)
283b2732e9dSimarom
284b2732e9dSimarom    def make_auth(self):
285b2732e9dSimarom        return IOLoopAuthenticator(self.context, io_loop=self.io_loop)
286b2732e9dSimarom
287b2732e9dSimarom    def tearDown(self):
288b2732e9dSimarom        if self.auth:
289b2732e9dSimarom            self.auth.stop()
290b2732e9dSimarom            self.auth = None
291b2732e9dSimarom        self.io_loop.close(all_fds=True)
292b2732e9dSimarom        super(TestIOLoopAuthentication, self).tearDown()
293b2732e9dSimarom
294b2732e9dSimarom    def attempt_connection(self):
295b2732e9dSimarom        """Check if client can connect to server using tcp transport"""
296b2732e9dSimarom        iface = 'tcp://127.0.0.1'
297b2732e9dSimarom        port = self.server.bind_to_random_port(iface)
298b2732e9dSimarom        self.client.connect("%s:%i" % (iface, port))
299b2732e9dSimarom
300b2732e9dSimarom    def send_msg(self):
301b2732e9dSimarom        """Send a message from server to a client"""
302b2732e9dSimarom        msg = [b"Hello World"]
303b2732e9dSimarom        self.pushstream.send_multipart(msg)
304b2732e9dSimarom
305b2732e9dSimarom    def on_message_succeed(self, frames):
306b2732e9dSimarom        """A message was received, as expected."""
307b2732e9dSimarom        if frames != [b"Hello World"]:
308b2732e9dSimarom            self.fail_msg = "Unexpected message received"
309b2732e9dSimarom        self.io_loop.stop()
310b2732e9dSimarom
311b2732e9dSimarom    def on_message_fail(self, frames):
312b2732e9dSimarom        """A message was received, unexpectedly."""
313b2732e9dSimarom        self.fail_msg = 'Received messaged unexpectedly, security failed'
314b2732e9dSimarom        self.io_loop.stop()
315b2732e9dSimarom
316b2732e9dSimarom    def on_test_timeout_succeed(self):
317b2732e9dSimarom        """Test timer expired, indicates test success"""
318b2732e9dSimarom        self.io_loop.stop()
319b2732e9dSimarom
320b2732e9dSimarom    def on_test_timeout_fail(self):
321b2732e9dSimarom        """Test timer expired, indicates test failure"""
322b2732e9dSimarom        self.fail_msg = 'Test timed out'
323b2732e9dSimarom        self.io_loop.stop()
324b2732e9dSimarom
325b2732e9dSimarom    @should_auth
326b2732e9dSimarom    def test_none(self):
327b2732e9dSimarom        """ioloop auth - NONE"""
328b2732e9dSimarom        # A default NULL connection should always succeed, and not
329b2732e9dSimarom        # go through our authentication infrastructure at all.
330b2732e9dSimarom        # no auth should be running
331b2732e9dSimarom        self.auth.stop()
332b2732e9dSimarom        self.auth = None
333b2732e9dSimarom
334b2732e9dSimarom    @should_auth
335b2732e9dSimarom    def test_null(self):
336b2732e9dSimarom        """ioloop auth - NULL"""
337b2732e9dSimarom        # By setting a domain we switch on authentication for NULL sockets,
338b2732e9dSimarom        # though no policies are configured yet. The client connection
339b2732e9dSimarom        # should still be allowed.
340b2732e9dSimarom        self.server.zap_domain = b'global'
341b2732e9dSimarom
342b2732e9dSimarom    @should_not_auth
343b2732e9dSimarom    def test_blacklist(self):
344b2732e9dSimarom        """ioloop auth - Blacklist"""
345b2732e9dSimarom        # Blacklist 127.0.0.1, connection should fail
346b2732e9dSimarom        self.auth.deny('127.0.0.1')
347b2732e9dSimarom        self.server.zap_domain = b'global'
348b2732e9dSimarom
349b2732e9dSimarom    @should_auth
350b2732e9dSimarom    def test_whitelist(self):
351b2732e9dSimarom        """ioloop auth - Whitelist"""
352b2732e9dSimarom        # Whitelist 127.0.0.1, which overrides the blacklist, connection should pass"
353b2732e9dSimarom        self.auth.allow('127.0.0.1')
354b2732e9dSimarom
355b2732e9dSimarom        self.server.setsockopt(zmq.ZAP_DOMAIN, b'global')
356b2732e9dSimarom
357b2732e9dSimarom    @should_not_auth
358b2732e9dSimarom    def test_plain_unconfigured_server(self):
359b2732e9dSimarom        """ioloop auth - PLAIN, unconfigured server"""
360b2732e9dSimarom        self.client.plain_username = b'admin'
361b2732e9dSimarom        self.client.plain_password = b'Password'
362b2732e9dSimarom        # Try PLAIN authentication - without configuring server, connection should fail
363b2732e9dSimarom        self.server.plain_server = True
364b2732e9dSimarom
365b2732e9dSimarom    @should_auth
366b2732e9dSimarom    def test_plain_configured_server(self):
367b2732e9dSimarom        """ioloop auth - PLAIN, configured server"""
368b2732e9dSimarom        self.client.plain_username = b'admin'
369b2732e9dSimarom        self.client.plain_password = b'Password'
370b2732e9dSimarom        # Try PLAIN authentication - with server configured, connection should pass
371b2732e9dSimarom        self.server.plain_server = True
372b2732e9dSimarom        self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
373b2732e9dSimarom
374b2732e9dSimarom    @should_not_auth
375b2732e9dSimarom    def test_plain_bogus_credentials(self):
376b2732e9dSimarom        """ioloop auth - PLAIN, bogus credentials"""
377b2732e9dSimarom        self.client.plain_username = b'admin'
378b2732e9dSimarom        self.client.plain_password = b'Bogus'
379b2732e9dSimarom        self.server.plain_server = True
380b2732e9dSimarom
381b2732e9dSimarom        self.auth.configure_plain(domain='*', passwords={'admin': 'Password'})
382b2732e9dSimarom
383b2732e9dSimarom    @should_not_auth
384b2732e9dSimarom    def test_curve_unconfigured_server(self):
385b2732e9dSimarom        """ioloop auth - CURVE, unconfigured server"""
386b2732e9dSimarom        certs = self.load_certs(self.secret_keys_dir)
387b2732e9dSimarom        server_public, server_secret, client_public, client_secret = certs
388b2732e9dSimarom
389b2732e9dSimarom        self.auth.allow('127.0.0.1')
390b2732e9dSimarom
391b2732e9dSimarom        self.server.curve_publickey = server_public
392b2732e9dSimarom        self.server.curve_secretkey = server_secret
393b2732e9dSimarom        self.server.curve_server = True
394b2732e9dSimarom
395b2732e9dSimarom        self.client.curve_publickey = client_public
396b2732e9dSimarom        self.client.curve_secretkey = client_secret
397b2732e9dSimarom        self.client.curve_serverkey = server_public
398b2732e9dSimarom
399b2732e9dSimarom    @should_auth
400b2732e9dSimarom    def test_curve_allow_any(self):
401b2732e9dSimarom        """ioloop auth - CURVE, CURVE_ALLOW_ANY"""
402b2732e9dSimarom        certs = self.load_certs(self.secret_keys_dir)
403b2732e9dSimarom        server_public, server_secret, client_public, client_secret = certs
404b2732e9dSimarom
405b2732e9dSimarom        self.auth.allow('127.0.0.1')
406b2732e9dSimarom        self.auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
407b2732e9dSimarom
408b2732e9dSimarom        self.server.curve_publickey = server_public
409b2732e9dSimarom        self.server.curve_secretkey = server_secret
410b2732e9dSimarom        self.server.curve_server = True
411b2732e9dSimarom
412b2732e9dSimarom        self.client.curve_publickey = client_public
413b2732e9dSimarom        self.client.curve_secretkey = client_secret
414b2732e9dSimarom        self.client.curve_serverkey = server_public
415b2732e9dSimarom
416b2732e9dSimarom    @should_auth
417b2732e9dSimarom    def test_curve_configured_server(self):
418b2732e9dSimarom        """ioloop auth - CURVE, configured server"""
419b2732e9dSimarom        self.auth.allow('127.0.0.1')
420b2732e9dSimarom        certs = self.load_certs(self.secret_keys_dir)
421b2732e9dSimarom        server_public, server_secret, client_public, client_secret = certs
422b2732e9dSimarom
423b2732e9dSimarom        self.auth.configure_curve(domain='*', location=self.public_keys_dir)
424b2732e9dSimarom
425b2732e9dSimarom        self.server.curve_publickey = server_public
426b2732e9dSimarom        self.server.curve_secretkey = server_secret
427b2732e9dSimarom        self.server.curve_server = True
428b2732e9dSimarom
429b2732e9dSimarom        self.client.curve_publickey = client_public
430b2732e9dSimarom        self.client.curve_secretkey = client_secret
431b2732e9dSimarom        self.client.curve_serverkey = server_public
432