test_cffi_backend.py revision 781d71db
1# -*- coding: utf8 -*-
2
3import sys
4import time
5
6from unittest import TestCase
7
8from zmq.tests import BaseZMQTestCase, SkipTest
9
10try:
11    from zmq.backend.cffi import (
12        zmq_version_info,
13        PUSH, PULL, IDENTITY,
14        REQ, REP, POLLIN, POLLOUT,
15    )
16    from zmq.backend.cffi._cffi import ffi, C
17    have_ffi_backend = True
18except ImportError:
19    have_ffi_backend = False
20
21
22class TestCFFIBackend(TestCase):
23
24    def setUp(self):
25        if not have_ffi_backend or not 'PyPy' in sys.version:
26            raise SkipTest('PyPy Tests Only')
27
28    def test_zmq_version_info(self):
29        version = zmq_version_info()
30
31        assert version[0] in range(2,11)
32
33    def test_zmq_ctx_new_destroy(self):
34        ctx = C.zmq_ctx_new()
35
36        assert ctx != ffi.NULL
37        assert 0 == C.zmq_ctx_destroy(ctx)
38
39    def test_zmq_socket_open_close(self):
40        ctx = C.zmq_ctx_new()
41        socket = C.zmq_socket(ctx, PUSH)
42
43        assert ctx != ffi.NULL
44        assert ffi.NULL != socket
45        assert 0 == C.zmq_close(socket)
46        assert 0 == C.zmq_ctx_destroy(ctx)
47
48    def test_zmq_setsockopt(self):
49        ctx = C.zmq_ctx_new()
50        socket = C.zmq_socket(ctx, PUSH)
51
52        identity = ffi.new('char[3]', 'zmq')
53        ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
54
55        assert ret == 0
56        assert ctx != ffi.NULL
57        assert ffi.NULL != socket
58        assert 0 == C.zmq_close(socket)
59        assert 0 == C.zmq_ctx_destroy(ctx)
60
61    def test_zmq_getsockopt(self):
62        ctx = C.zmq_ctx_new()
63        socket = C.zmq_socket(ctx, PUSH)
64
65        identity = ffi.new('char[]', 'zmq')
66        ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
67        assert ret == 0
68
69        option_len = ffi.new('size_t*', 3)
70        option = ffi.new('char*')
71        ret = C.zmq_getsockopt(socket,
72                            IDENTITY,
73                            ffi.cast('void*', option),
74                            option_len)
75
76        assert ret == 0
77        assert ffi.string(ffi.cast('char*', option))[0] == "z"
78        assert ffi.string(ffi.cast('char*', option))[1] == "m"
79        assert ffi.string(ffi.cast('char*', option))[2] == "q"
80        assert ctx != ffi.NULL
81        assert ffi.NULL != socket
82        assert 0 == C.zmq_close(socket)
83        assert 0 == C.zmq_ctx_destroy(ctx)
84
85    def test_zmq_bind(self):
86        ctx = C.zmq_ctx_new()
87        socket = C.zmq_socket(ctx, 8)
88
89        assert 0 == C.zmq_bind(socket, 'tcp://*:4444')
90        assert ctx != ffi.NULL
91        assert ffi.NULL != socket
92        assert 0 == C.zmq_close(socket)
93        assert 0 == C.zmq_ctx_destroy(ctx)
94
95    def test_zmq_bind_connect(self):
96        ctx = C.zmq_ctx_new()
97
98        socket1 = C.zmq_socket(ctx, PUSH)
99        socket2 = C.zmq_socket(ctx, PULL)
100
101        assert 0 == C.zmq_bind(socket1, 'tcp://*:4444')
102        assert 0 == C.zmq_connect(socket2, 'tcp://127.0.0.1:4444')
103        assert ctx != ffi.NULL
104        assert ffi.NULL != socket1
105        assert ffi.NULL != socket2
106        assert 0 == C.zmq_close(socket1)
107        assert 0 == C.zmq_close(socket2)
108        assert 0 == C.zmq_ctx_destroy(ctx)
109
110    def test_zmq_msg_init_close(self):
111        zmq_msg = ffi.new('zmq_msg_t*')
112
113        assert ffi.NULL != zmq_msg
114        assert 0 == C.zmq_msg_init(zmq_msg)
115        assert 0 == C.zmq_msg_close(zmq_msg)
116
117    def test_zmq_msg_init_size(self):
118        zmq_msg = ffi.new('zmq_msg_t*')
119
120        assert ffi.NULL != zmq_msg
121        assert 0 == C.zmq_msg_init_size(zmq_msg, 10)
122        assert 0 == C.zmq_msg_close(zmq_msg)
123
124    def test_zmq_msg_init_data(self):
125        zmq_msg = ffi.new('zmq_msg_t*')
126        message = ffi.new('char[5]', 'Hello')
127
128        assert 0 == C.zmq_msg_init_data(zmq_msg,
129                                        ffi.cast('void*', message),
130                                        5,
131                                        ffi.NULL,
132                                        ffi.NULL)
133
134        assert ffi.NULL != zmq_msg
135        assert 0 == C.zmq_msg_close(zmq_msg)
136
137    def test_zmq_msg_data(self):
138        zmq_msg = ffi.new('zmq_msg_t*')
139        message = ffi.new('char[]', 'Hello')
140        assert 0 == C.zmq_msg_init_data(zmq_msg,
141                                        ffi.cast('void*', message),
142                                        5,
143                                        ffi.NULL,
144                                        ffi.NULL)
145
146        data = C.zmq_msg_data(zmq_msg)
147
148        assert ffi.NULL != zmq_msg
149        assert ffi.string(ffi.cast("char*", data)) == 'Hello'
150        assert 0 == C.zmq_msg_close(zmq_msg)
151
152
153    def test_zmq_send(self):
154        ctx = C.zmq_ctx_new()
155
156        sender = C.zmq_socket(ctx, REQ)
157        receiver = C.zmq_socket(ctx, REP)
158
159        assert 0 == C.zmq_bind(receiver, 'tcp://*:7777')
160        assert 0 == C.zmq_connect(sender, 'tcp://127.0.0.1:7777')
161
162        time.sleep(0.1)
163
164        zmq_msg = ffi.new('zmq_msg_t*')
165        message = ffi.new('char[5]', 'Hello')
166
167        C.zmq_msg_init_data(zmq_msg,
168                            ffi.cast('void*', message),
169                            ffi.cast('size_t', 5),
170                            ffi.NULL,
171                            ffi.NULL)
172
173        assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
174        assert 0 == C.zmq_msg_close(zmq_msg)
175        assert C.zmq_close(sender) == 0
176        assert C.zmq_close(receiver) == 0
177        assert C.zmq_ctx_destroy(ctx) == 0
178
179    def test_zmq_recv(self):
180        ctx = C.zmq_ctx_new()
181
182        sender = C.zmq_socket(ctx, REQ)
183        receiver = C.zmq_socket(ctx, REP)
184
185        assert 0 == C.zmq_bind(receiver, 'tcp://*:2222')
186        assert 0 == C.zmq_connect(sender, 'tcp://127.0.0.1:2222')
187
188        time.sleep(0.1)
189
190        zmq_msg = ffi.new('zmq_msg_t*')
191        message = ffi.new('char[5]', 'Hello')
192
193        C.zmq_msg_init_data(zmq_msg,
194                            ffi.cast('void*', message),
195                            ffi.cast('size_t', 5),
196                            ffi.NULL,
197                            ffi.NULL)
198
199        zmq_msg2 = ffi.new('zmq_msg_t*')
200        C.zmq_msg_init(zmq_msg2)
201
202        assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
203        assert 5 == C.zmq_msg_recv(zmq_msg2, receiver, 0)
204        assert 5 == C.zmq_msg_size(zmq_msg2)
205        assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
206                                      C.zmq_msg_size(zmq_msg2))[:]
207        assert C.zmq_close(sender) == 0
208        assert C.zmq_close(receiver) == 0
209        assert C.zmq_ctx_destroy(ctx) == 0
210
211    def test_zmq_poll(self):
212        ctx = C.zmq_ctx_new()
213
214        sender = C.zmq_socket(ctx, REQ)
215        receiver = C.zmq_socket(ctx, REP)
216
217        r1 = C.zmq_bind(receiver, 'tcp://*:3333')
218        r2 = C.zmq_connect(sender, 'tcp://127.0.0.1:3333')
219
220        zmq_msg = ffi.new('zmq_msg_t*')
221        message = ffi.new('char[5]', 'Hello')
222
223        C.zmq_msg_init_data(zmq_msg,
224                            ffi.cast('void*', message),
225                            ffi.cast('size_t', 5),
226                            ffi.NULL,
227                            ffi.NULL)
228
229        receiver_pollitem = ffi.new('zmq_pollitem_t*')
230        receiver_pollitem.socket = receiver
231        receiver_pollitem.fd = 0
232        receiver_pollitem.events = POLLIN | POLLOUT
233        receiver_pollitem.revents = 0
234
235        ret = C.zmq_poll(ffi.NULL, 0, 0)
236        assert ret == 0
237
238        ret = C.zmq_poll(receiver_pollitem, 1, 0)
239        assert ret == 0
240
241        ret = C.zmq_msg_send(zmq_msg, sender, 0)
242        print(ffi.string(C.zmq_strerror(C.zmq_errno())))
243        assert ret == 5
244
245        time.sleep(0.2)
246
247        ret = C.zmq_poll(receiver_pollitem, 1, 0)
248        assert ret == 1
249
250        assert int(receiver_pollitem.revents) & POLLIN
251        assert not int(receiver_pollitem.revents) & POLLOUT
252
253        zmq_msg2 = ffi.new('zmq_msg_t*')
254        C.zmq_msg_init(zmq_msg2)
255
256        ret_recv = C.zmq_msg_recv(zmq_msg2, receiver, 0)
257        assert ret_recv == 5
258
259        assert 5 == C.zmq_msg_size(zmq_msg2)
260        assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
261                                    C.zmq_msg_size(zmq_msg2))[:]
262
263        sender_pollitem = ffi.new('zmq_pollitem_t*')
264        sender_pollitem.socket = sender
265        sender_pollitem.fd = 0
266        sender_pollitem.events = POLLIN | POLLOUT
267        sender_pollitem.revents = 0
268
269        ret = C.zmq_poll(sender_pollitem, 1, 0)
270        assert ret == 0
271
272        zmq_msg_again = ffi.new('zmq_msg_t*')
273        message_again = ffi.new('char[11]', 'Hello Again')
274
275        C.zmq_msg_init_data(zmq_msg_again,
276                            ffi.cast('void*', message_again),
277                            ffi.cast('size_t', 11),
278                            ffi.NULL,
279                            ffi.NULL)
280
281        assert 11 == C.zmq_msg_send(zmq_msg_again, receiver, 0)
282
283        time.sleep(0.2)
284
285        assert 0 <= C.zmq_poll(sender_pollitem, 1, 0)
286        assert int(sender_pollitem.revents) & POLLIN
287        assert 11 == C.zmq_msg_recv(zmq_msg2, sender, 0)
288        assert 11 == C.zmq_msg_size(zmq_msg2)
289        assert b"Hello Again" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
290                                            int(C.zmq_msg_size(zmq_msg2)))[:]
291        assert 0 == C.zmq_close(sender)
292        assert 0 == C.zmq_close(receiver)
293        assert 0 == C.zmq_ctx_destroy(ctx)
294        assert 0 == C.zmq_msg_close(zmq_msg)
295        assert 0 == C.zmq_msg_close(zmq_msg2)
296        assert 0 == C.zmq_msg_close(zmq_msg_again)
297
298    def test_zmq_stopwatch_functions(self):
299        stopwatch = C.zmq_stopwatch_start()
300        ret = C.zmq_stopwatch_stop(stopwatch)
301
302        assert ffi.NULL != stopwatch
303        assert 0 < int(ret)
304
305    def test_zmq_sleep(self):
306        try:
307            C.zmq_sleep(1)
308        except Exception as e:
309            raise AssertionError("Error executing zmq_sleep(int)")
310
311