tldk_sock.c revision e18a033b
1/*
2 * Copyright (c) 2017  Intel Corporation.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27#include <ngx_tldk.h>
28#include <tldk_sock.h>
29
30#include <rte_malloc.h>
31#include <rte_errno.h>
32
33struct tldk_sock_stat {
34	uint64_t nb_accept;
35	uint64_t nb_close;
36	uint64_t nb_readv;
37	uint64_t nb_recv;
38	uint64_t nb_setopts;
39	uint64_t nb_shutdown;
40	uint64_t nb_writev;
41};
42
43static struct tldk_sock_stat sock_stat;
44
45/* One socket/file table per worker */
46struct tldk_stbl stbl = {
47	.snum = 0,
48};
49
50static int (*real_accept4)(int, struct sockaddr *, socklen_t *, int);
51static int (*real_close)(int);
52static ssize_t (*real_readv)(int, const struct iovec *, int);
53static ssize_t (*real_recv)(int, void *, size_t, int);
54static int (*real_setsockopt)(int, int, int, const void *, socklen_t);
55static int (*real_shutdown)(int, int);
56static ssize_t (*real_writev)(int, const struct iovec *, int);
57
58static inline uint32_t
59get_socks(struct tldk_sock_list *list, struct tldk_sock *rs[],
60	uint32_t num)
61{
62	struct tldk_sock *s;
63	uint32_t i, n;
64
65	n = RTE_MIN(list->num, num);
66	for (i = 0, s = LIST_FIRST(&list->head);
67			i != n;
68			i++, s = LIST_NEXT(s, link)) {
69		rs[i] = s;
70	}
71
72	/* we retrieved all free entries */
73	if (s == NULL)
74		LIST_INIT(&list->head);
75	else
76		LIST_FIRST(&list->head) = s;
77
78	list->num -= n;
79	return n;
80}
81
82static inline struct tldk_sock *
83get_sock(struct tldk_sock_list *list)
84{
85	struct tldk_sock *s;
86
87	if (get_socks(list, &s, 1) != 1)
88		return NULL;
89
90	return s;
91}
92
93static inline void
94put_socks(struct tldk_sock_list *list, struct tldk_sock *fs[], uint32_t num)
95{
96	uint32_t i;
97
98	for (i = 0; i != num; i++)
99		LIST_INSERT_HEAD(&list->head, fs[i], link);
100	list->num += num;
101}
102
103static inline void
104put_sock(struct tldk_sock_list *list, struct tldk_sock *s)
105{
106	put_socks(list, &s, 1);
107}
108
109static inline void
110rem_sock(struct tldk_sock_list *list, struct tldk_sock *s)
111{
112	LIST_REMOVE(s, link);
113	list->num--;
114}
115
116static void
117term_sock(struct tldk_sock *ts)
118{
119	tle_event_idle(ts->erev);
120	tle_event_idle(ts->rxev);
121	tle_event_idle(ts->txev);
122	tle_tcp_stream_close(ts->s);
123	ts->s = NULL;
124	ts->posterr = 0;
125}
126
127static int32_t
128close_sock(struct tldk_sock *ts)
129{
130	if (ts->s == NULL)
131		return EBADF;
132	term_sock(ts);
133	rem_sock(&stbl.use, ts);
134	put_sock(&stbl.free, ts);
135	return 0;
136}
137
138static void
139dump_sock_stats(void)
140{
141	RTE_LOG(NOTICE, USER1, "%s(worker=%lu)={\n"
142		"nb_accept=%" PRIu64 ";\n"
143		"nb_close=%" PRIu64 ";\n"
144		"nb_readv=%" PRIu64 ";\n"
145		"nb_recv=%" PRIu64 ";\n"
146		"nb_setopts=%" PRIu64 ";\n"
147		"nb_shutdown=%" PRIu64 ";\n"
148		"nb_writev=%" PRIu64 ";\n"
149		"};\n",
150		__func__,
151		ngx_worker,
152		sock_stat.nb_accept,
153		sock_stat.nb_close,
154		sock_stat.nb_readv,
155		sock_stat.nb_recv,
156		sock_stat.nb_setopts,
157		sock_stat.nb_shutdown,
158		sock_stat.nb_writev);
159}
160
161void
162tldk_stbl_fini(void)
163{
164	dump_sock_stats();
165	tldk_dump_event_stats();
166	rte_free(stbl.sd);
167	tle_evq_destroy(stbl.txeq);
168	tle_evq_destroy(stbl.rxeq);
169	tle_evq_destroy(stbl.ereq);
170	tle_evq_destroy(stbl.syneq);
171}
172
173#define INIT_FUNC(func) do { \
174	real_##func = dlsym(RTLD_NEXT, #func); \
175	RTE_ASSERT(real_##func); \
176} while (0)
177
178static void __attribute__((constructor))
179stub_init(void)
180{
181	INIT_FUNC(accept4);
182	INIT_FUNC(close);
183	INIT_FUNC(readv);
184	INIT_FUNC(recv);
185	INIT_FUNC(setsockopt);
186	INIT_FUNC(shutdown);
187	INIT_FUNC(writev);
188}
189
190#undef INIT_FUNC
191
192int
193tldk_stbl_init(const ngx_cycle_t *cycle, const struct tldk_ctx *tc)
194{
195	uint32_t i, lc, sid, sn;
196	size_t sz;
197	struct tle_evq_param eprm;
198	struct rlimit rlim;
199
200	lc = tc->cf->lcore;
201	sn = tc->cf->nb_stream;
202	sid = rte_lcore_to_socket_id(lc);
203
204	if (sn < cycle->listening.nelts + cycle->connection_n)
205		return -EINVAL;
206
207	if (getrlimit(RLIMIT_NOFILE, &rlim) != 0)
208		return -errno;
209
210	stbl.nosd = rlim.rlim_max;
211
212	/* allocate event queues */
213
214	memset(&eprm, 0, sizeof(eprm));
215	eprm.socket_id = sid;
216	eprm.max_events = sn;
217
218	stbl.syneq = tle_evq_create(&eprm);
219	stbl.ereq = tle_evq_create(&eprm);
220	stbl.rxeq = tle_evq_create(&eprm);
221	stbl.txeq = tle_evq_create(&eprm);
222
223	RTE_LOG(NOTICE, USER1, "%s(lcore=%u, worker=%lu): "
224		"synevq=%p, erevq=%p, rxevq=%p, txevq=%p\n",
225		__func__, lc, ngx_worker,
226		stbl.syneq, stbl.ereq, stbl.rxeq, stbl.txeq);
227	if (stbl.syneq == NULL || stbl.ereq == NULL || stbl.rxeq == NULL ||
228			stbl.txeq == NULL)
229		return -ENOMEM;
230
231	LIST_INIT(&stbl.lstn.head);
232	LIST_INIT(&stbl.free.head);
233	LIST_INIT(&stbl.use.head);
234
235	sz = sn * sizeof(*stbl.sd);
236	stbl.sd = rte_zmalloc_socket(NULL, sz, RTE_CACHE_LINE_SIZE,
237		rte_lcore_to_socket_id(lc));
238
239	if (stbl.sd == NULL) {
240		RTE_LOG(ERR, USER1, "%s(lcore=%u, worker=%lu): "
241			"failed to allocate %zu bytes\n",
242			__func__, lc, ngx_worker, sz);
243		return -ENOMEM;
244	}
245
246	stbl.snum = sn;
247
248	/* init listen socks */
249	for (i = 0; i != cycle->listening.nelts; i++) {
250		stbl.sd[i].rxev = tle_event_alloc(stbl.syneq, stbl.sd + i);
251		stbl.sd[i].txev = tle_event_alloc(stbl.txeq, stbl.sd + i);
252		stbl.sd[i].erev = tle_event_alloc(stbl.ereq, stbl.sd + i);
253		put_sock(&stbl.lstn, stbl.sd + i);
254	}
255
256	/* init worker connection socks */
257	for (; i != sn; i++) {
258		stbl.sd[i].rxev = tle_event_alloc(stbl.rxeq, stbl.sd + i);
259		stbl.sd[i].txev = tle_event_alloc(stbl.txeq, stbl.sd + i);
260		stbl.sd[i].erev = tle_event_alloc(stbl.ereq, stbl.sd + i);
261		put_sock(&stbl.free, stbl.sd + i);
262	}
263
264	return 0;
265}
266
267int
268tldk_open_bind_listen(struct tldk_ctx *tcx, int domain, int type,
269	const struct sockaddr *addr, socklen_t addrlen, int backlog)
270{
271	int32_t rc;
272	struct tldk_sock *ts;
273	struct tle_tcp_stream_param sprm;
274
275	ts = get_sock(&stbl.lstn);
276	if (ts == NULL) {
277		errno = ENOBUFS;
278		return -1;
279	}
280
281	tle_event_active(ts->erev, TLE_SEV_DOWN);
282	tle_event_active(ts->rxev, TLE_SEV_DOWN);
283	tle_event_active(ts->txev, TLE_SEV_DOWN);
284
285	/* setup stream parameters */
286
287	memset(&sprm, 0, sizeof(sprm));
288
289	sprm.cfg.err_ev = ts->erev;
290	sprm.cfg.recv_ev = ts->rxev;
291	sprm.cfg.send_ev = ts->txev;
292
293	memcpy(&sprm.addr.local, addr, addrlen);
294	sprm.addr.remote.ss_family = sprm.addr.local.ss_family;
295
296	ts->s = tle_tcp_stream_open(tcx->ctx, &sprm);
297	if (ts->s != NULL)
298		rc = tle_tcp_stream_listen(ts->s);
299	else
300		rc = -rte_errno;
301
302	if (rc != 0) {
303		term_sock(ts);
304		put_sock(&stbl.lstn, ts);
305		errno = -rc;
306		return -1;
307	}
308
309	return SOCK_TO_SD(ts);
310}
311
312/*
313 * socket API
314 */
315
316int
317close(int sd)
318{
319	int32_t rc;
320	struct tldk_sock *ts;
321
322	FE_TRACE("worker#%lu: %s(%d);\n",
323		ngx_worker, __func__, sd);
324
325	ts = sd_to_sock(sd);
326	if (ts == NULL)
327		return real_close(sd);
328
329	sock_stat.nb_close++;
330
331	rc = close_sock(ts);
332	if (rc != 0) {
333		errno =-rc;
334		return -1;
335	}
336	return 0;
337}
338
339int
340shutdown(int sd, int how)
341{
342	struct tldk_sock *ts;
343
344	FE_TRACE("worker#%lu: %s(%d, %#x);\n",
345		ngx_worker, __func__, sd, how);
346
347	ts = sd_to_sock(sd);
348	if (ts == NULL)
349		return real_shutdown(sd, how);
350
351	sock_stat.nb_shutdown++;
352
353	errno = ENOTSUP;
354	return -1;
355}
356
357
358int
359accept4(int sd, struct sockaddr *addr, socklen_t *addrlen, int flags)
360{
361	uint32_t n, slen;
362	struct tle_stream *s;
363	struct tldk_sock *cs, *ts;
364	struct tle_tcp_stream_cfg prm;
365	struct tle_tcp_stream_addr sa;
366
367	FE_TRACE("worker#%lu: %s(%d, %p, %p, %#x);\n",
368		ngx_worker, __func__, sd, addr, addrlen, flags);
369
370	ts = sd_to_sock(sd);
371	if (ts == NULL)
372		return real_accept4(sd, addr, addrlen, flags);
373	else if (ts->s == NULL) {
374		errno = EBADF;
375		return -1;
376	}
377
378	sock_stat.nb_accept++;
379
380	n = ts->acpt.num;
381	if (n == 0) {
382		n = tle_tcp_stream_accept(ts->s, ts->acpt.buf,
383			RTE_DIM(ts->acpt.buf));
384		if (n == 0) {
385			errno = EAGAIN;
386			return -1;
387		}
388	}
389
390	s = ts->acpt.buf[n - 1];
391	ts->acpt.num = n - 1;
392
393	tle_event_raise(ts->rxev);
394
395	cs = get_sock(&stbl.free);
396	if (cs == NULL) {
397		tle_tcp_stream_close(s);
398		errno = ENOBUFS;
399		return -1;
400	}
401
402	cs->s = s;
403	put_sock(&stbl.use, cs);
404
405	tle_event_active(cs->erev, TLE_SEV_DOWN);
406	tle_event_active(cs->rxev, TLE_SEV_DOWN);
407	tle_event_active(cs->txev, TLE_SEV_DOWN);
408
409	memset(&prm, 0, sizeof(prm));
410	prm.recv_ev = cs->rxev;
411	prm.send_ev = cs->txev;
412	prm.err_ev = cs->erev;
413	tle_tcp_stream_update_cfg(&s, &prm, 1);
414
415	if (tle_tcp_stream_get_addr(s, &sa) == 0) {
416
417		if (sa.remote.ss_family == AF_INET)
418			slen = sizeof(struct sockaddr_in);
419		else if (sa.remote.ss_family == AF_INET6)
420			slen = sizeof(struct sockaddr_in6);
421		else
422			slen = 0;
423
424		slen = RTE_MIN(slen, *addrlen);
425		memcpy(addr, &sa.remote, slen);
426		*addrlen = slen;
427	}
428
429	return SOCK_TO_SD(cs);
430}
431
432ssize_t
433recv(int sd, void *buf, size_t len, int flags)
434{
435	ssize_t sz;
436	struct tldk_sock *ts;
437	struct iovec iv;
438
439	FE_TRACE("worker#%lu: %s(%d, %p, %zu, %#x);\n",
440		ngx_worker, __func__, sd, buf, len, flags);
441
442	ts = sd_to_sock(sd);
443	if (ts == NULL)
444		return real_recv(sd, buf, len, flags);
445	else if (ts->s == NULL) {
446		errno = EBADF;
447		return -1;
448	}
449
450	sock_stat.nb_recv++;
451
452	iv.iov_base = buf;
453	iv.iov_len = len;
454
455	sz = tle_tcp_stream_readv(ts->s, &iv, 1);
456	if (sz < 0)
457		errno = rte_errno;
458	else if (sz == 0 && ts->posterr == 0) {
459		errno = EAGAIN;
460		sz = -1;
461	}
462
463	FE_TRACE("worker#%lu: %s(%d, %p, %zu, %#x) returns %zd;\n",
464		ngx_worker, __func__, sd, buf, len, flags, sz);
465	return sz;
466}
467
468ssize_t
469readv(int sd, const struct iovec *iov, int iovcnt)
470{
471	ssize_t sz;
472	struct tldk_sock *ts;
473	struct tldk_ctx *tcx;
474
475	FE_TRACE("worker#%lu: %s(%d, %p, %d);\n",
476		ngx_worker, __func__, sd, iov, iovcnt);
477
478	tcx =  wrk2ctx + ngx_worker;
479	ts = sd_to_sock(sd);
480	if (ts == NULL)
481		return real_readv(sd, iov, iovcnt);
482	else if (ts->s == NULL || tcx == NULL) {
483		errno = EBADF;
484		return -1;
485	}
486
487	sock_stat.nb_readv++;
488
489	sz = tle_tcp_stream_readv(ts->s, iov, iovcnt);
490	if (sz < 0)
491		errno = rte_errno;
492	else if (sz == 0 && ts->posterr == 0) {
493		errno = EAGAIN;
494		sz = -1;
495	}
496
497	FE_TRACE("worker#%lu: %s(%d, %p, %d) returns %zd;\n",
498		ngx_worker, __func__, sd, iov, iovcnt, sz);
499	return sz;
500}
501
502ssize_t
503writev(int sd, const struct iovec *iov, int iovcnt)
504{
505	ssize_t sz;
506	struct tldk_sock *ts;
507	struct tldk_ctx *tcx;
508
509	FE_TRACE("worker#%lu: %s(%d, %p, %d);\n",
510		ngx_worker, __func__, sd, iov, iovcnt);
511
512	tcx =  wrk2ctx + ngx_worker;
513	ts = sd_to_sock(sd);
514	if (ts == NULL)
515		return real_writev(sd, iov, iovcnt);
516	else if (ts->s == NULL || tcx == NULL) {
517		errno = EBADF;
518		return -1;
519	}
520
521	sock_stat.nb_writev++;
522
523	sz = tle_tcp_stream_writev(ts->s, tcx->mpool, iov, iovcnt);
524	if (sz < 0)
525		errno = rte_errno;
526
527	FE_TRACE("worker#%lu: %s(%d, %p, %d) returns %zd;\n",
528		ngx_worker, __func__, sd, iov, iovcnt, sz);
529	return sz;
530}
531
532int
533setsockopt(int sd, int level, int optname, const void *optval, socklen_t optlen)
534{
535	struct tldk_sock *ts;
536
537	FE_TRACE("worker#%lu: %s(%d, %#x, %#x, %p, %d);\n",
538		ngx_worker, __func__, sd, level, optname, optval, optlen);
539
540	ts = sd_to_sock(sd);
541	if (ts == NULL)
542		return real_setsockopt(sd, level, optname, optval, optlen);
543	else if (ts->s == NULL) {
544		errno = EBADF;
545		return -1;
546	}
547
548	return 0;
549}
550