session.c revision e140d5d0
1/*
2 * Copyright (c) 2017-2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15/**
16 * @file
17 * @brief Session and session manager
18 */
19
20#include <vnet/session/session.h>
21#include <vnet/session/session_debug.h>
22#include <vnet/session/application.h>
23#include <vnet/dpo/load_balance.h>
24#include <vnet/fib/ip4_fib.h>
25
26session_main_t session_main;
27
28static inline int
29session_send_evt_to_thread (void *data, void *args, u32 thread_index,
30			    session_evt_type_t evt_type)
31{
32  session_event_t *evt;
33  svm_msg_q_msg_t msg;
34  svm_msg_q_t *mq;
35
36  mq = session_main_get_vpp_event_queue (thread_index);
37  if (PREDICT_FALSE (svm_msg_q_lock (mq)))
38    return -1;
39  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
40    {
41      svm_msg_q_unlock (mq);
42      return -2;
43    }
44  switch (evt_type)
45    {
46    case SESSION_CTRL_EVT_RPC:
47      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
48      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
49      evt->rpc_args.fp = data;
50      evt->rpc_args.arg = args;
51      break;
52    case SESSION_IO_EVT_RX:
53    case SESSION_IO_EVT_TX:
54    case SESSION_IO_EVT_TX_FLUSH:
55    case SESSION_IO_EVT_BUILTIN_RX:
56      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
57      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
58      evt->session_index = *(u32 *) data;
59      break;
60    case SESSION_IO_EVT_BUILTIN_TX:
61    case SESSION_CTRL_EVT_CLOSE:
62    case SESSION_CTRL_EVT_RESET:
63      msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
64      evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
65      evt->session_handle = session_handle ((session_t *) data);
66      break;
67    default:
68      clib_warning ("evt unhandled!");
69      svm_msg_q_unlock (mq);
70      return -1;
71    }
72  evt->event_type = evt_type;
73
74  svm_msg_q_add_and_unlock (mq, &msg);
75  return 0;
76}
77
78int
79session_send_io_evt_to_thread (svm_fifo_t * f, session_evt_type_t evt_type)
80{
81  return session_send_evt_to_thread (&f->master_session_index, 0,
82				     f->master_thread_index, evt_type);
83}
84
85int
86session_send_io_evt_to_thread_custom (void *data, u32 thread_index,
87				      session_evt_type_t evt_type)
88{
89  return session_send_evt_to_thread (data, 0, thread_index, evt_type);
90}
91
92int
93session_send_ctrl_evt_to_thread (session_t * s, session_evt_type_t evt_type)
94{
95  /* only events supported are disconnect and reset */
96  ASSERT (evt_type == SESSION_CTRL_EVT_CLOSE
97	  || evt_type == SESSION_CTRL_EVT_RESET);
98  return session_send_evt_to_thread (s, 0, s->thread_index, evt_type);
99}
100
101void
102session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
103				      void *rpc_args)
104{
105  session_send_evt_to_thread (fp, rpc_args, thread_index,
106			      SESSION_CTRL_EVT_RPC);
107}
108
109void
110session_send_rpc_evt_to_thread (u32 thread_index, void *fp, void *rpc_args)
111{
112  if (thread_index != vlib_get_thread_index ())
113    session_send_rpc_evt_to_thread_force (thread_index, fp, rpc_args);
114  else
115    {
116      void (*fnp) (void *) = fp;
117      fnp (rpc_args);
118    }
119}
120
121void
122session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
123{
124  session_t *s;
125
126  s = session_get (tc->s_index, tc->thread_index);
127  ASSERT (s->thread_index == vlib_get_thread_index ());
128  ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
129  if (!(s->flags & SESSION_F_CUSTOM_TX))
130    {
131      s->flags |= SESSION_F_CUSTOM_TX;
132      if (svm_fifo_set_event (s->tx_fifo))
133	{
134	  session_worker_t *wrk;
135	  session_evt_elt_t *elt;
136	  wrk = session_main_get_worker (tc->thread_index);
137	  if (has_prio)
138	    elt = session_evt_alloc_new (wrk);
139	  else
140	    elt = session_evt_alloc_old (wrk);
141	  elt->evt.session_index = tc->s_index;
142	  elt->evt.event_type = SESSION_IO_EVT_TX;
143	}
144    }
145}
146
147static void
148session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
149{
150  u32 thread_index = vlib_get_thread_index ();
151  session_evt_elt_t *elt;
152  session_worker_t *wrk;
153
154  /* If we are in the handler thread, or being called with the worker barrier
155   * held, just append a new event to pending disconnects vector. */
156  if (vlib_thread_is_main_w_barrier () || thread_index == s->thread_index)
157    {
158      wrk = session_main_get_worker (s->thread_index);
159      elt = session_evt_alloc_ctrl (wrk);
160      clib_memset (&elt->evt, 0, sizeof (session_event_t));
161      elt->evt.session_handle = session_handle (s);
162      elt->evt.event_type = evt;
163    }
164  else
165    session_send_ctrl_evt_to_thread (s, evt);
166}
167
168session_t *
169session_alloc (u32 thread_index)
170{
171  session_worker_t *wrk = &session_main.wrk[thread_index];
172  session_t *s;
173  u8 will_expand = 0;
174  pool_get_aligned_will_expand (wrk->sessions, will_expand,
175				CLIB_CACHE_LINE_BYTES);
176  /* If we have peekers, let them finish */
177  if (PREDICT_FALSE (will_expand && vlib_num_workers ()))
178    {
179      clib_rwlock_writer_lock (&wrk->peekers_rw_locks);
180      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
181      clib_rwlock_writer_unlock (&wrk->peekers_rw_locks);
182    }
183  else
184    {
185      pool_get_aligned (wrk->sessions, s, CLIB_CACHE_LINE_BYTES);
186    }
187  clib_memset (s, 0, sizeof (*s));
188  s->session_index = s - wrk->sessions;
189  s->thread_index = thread_index;
190  s->app_index = APP_INVALID_INDEX;
191  return s;
192}
193
194void
195session_free (session_t * s)
196{
197  if (CLIB_DEBUG)
198    {
199      u8 thread_index = s->thread_index;
200      clib_memset (s, 0xFA, sizeof (*s));
201      pool_put (session_main.wrk[thread_index].sessions, s);
202      return;
203    }
204  SESSION_EVT (SESSION_EVT_FREE, s);
205  pool_put (session_main.wrk[s->thread_index].sessions, s);
206}
207
208u8
209session_is_valid (u32 si, u8 thread_index)
210{
211  session_t *s;
212  transport_connection_t *tc;
213
214  s = pool_elt_at_index (session_main.wrk[thread_index].sessions, si);
215
216  if (!s)
217    return 1;
218
219  if (s->thread_index != thread_index || s->session_index != si)
220    return 0;
221
222  if (s->session_state == SESSION_STATE_TRANSPORT_DELETED
223      || s->session_state <= SESSION_STATE_LISTENING)
224    return 1;
225
226  tc = session_get_transport (s);
227  if (s->connection_index != tc->c_index
228      || s->thread_index != tc->thread_index || tc->s_index != si)
229    return 0;
230
231  return 1;
232}
233
234static void
235session_cleanup_notify (session_t * s, session_cleanup_ntf_t ntf)
236{
237  app_worker_t *app_wrk;
238
239  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
240  if (!app_wrk)
241    return;
242  app_worker_cleanup_notify (app_wrk, s, ntf);
243}
244
245void
246session_free_w_fifos (session_t * s)
247{
248  session_cleanup_notify (s, SESSION_CLEANUP_SESSION);
249  segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
250  session_free (s);
251}
252
253/**
254 * Cleans up session and lookup table.
255 *
256 * Transport connection must still be valid.
257 */
258static void
259session_delete (session_t * s)
260{
261  int rv;
262
263  /* Delete from the main lookup table. */
264  if ((rv = session_lookup_del_session (s)))
265    clib_warning ("session %u hash delete rv %d", s->session_index, rv);
266
267  session_free_w_fifos (s);
268}
269
270static session_t *
271session_alloc_for_connection (transport_connection_t * tc)
272{
273  session_t *s;
274  u32 thread_index = tc->thread_index;
275
276  ASSERT (thread_index == vlib_get_thread_index ()
277	  || transport_protocol_is_cl (tc->proto));
278
279  s = session_alloc (thread_index);
280  s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
281  s->session_state = SESSION_STATE_CLOSED;
282
283  /* Attach transport to session and vice versa */
284  s->connection_index = tc->c_index;
285  tc->s_index = s->session_index;
286  return s;
287}
288
289/**
290 * Discards bytes from buffer chain
291 *
292 * It discards n_bytes_to_drop starting at first buffer after chain_b
293 */
294always_inline void
295session_enqueue_discard_chain_bytes (vlib_main_t * vm, vlib_buffer_t * b,
296				     vlib_buffer_t ** chain_b,
297				     u32 n_bytes_to_drop)
298{
299  vlib_buffer_t *next = *chain_b;
300  u32 to_drop = n_bytes_to_drop;
301  ASSERT (b->flags & VLIB_BUFFER_NEXT_PRESENT);
302  while (to_drop && (next->flags & VLIB_BUFFER_NEXT_PRESENT))
303    {
304      next = vlib_get_buffer (vm, next->next_buffer);
305      if (next->current_length > to_drop)
306	{
307	  vlib_buffer_advance (next, to_drop);
308	  to_drop = 0;
309	}
310      else
311	{
312	  to_drop -= next->current_length;
313	  next->current_length = 0;
314	}
315    }
316  *chain_b = next;
317
318  if (to_drop == 0)
319    b->total_length_not_including_first_buffer -= n_bytes_to_drop;
320}
321
322/**
323 * Enqueue buffer chain tail
324 */
325always_inline int
326session_enqueue_chain_tail (session_t * s, vlib_buffer_t * b,
327			    u32 offset, u8 is_in_order)
328{
329  vlib_buffer_t *chain_b;
330  u32 chain_bi, len, diff;
331  vlib_main_t *vm = vlib_get_main ();
332  u8 *data;
333  u32 written = 0;
334  int rv = 0;
335
336  if (is_in_order && offset)
337    {
338      diff = offset - b->current_length;
339      if (diff > b->total_length_not_including_first_buffer)
340	return 0;
341      chain_b = b;
342      session_enqueue_discard_chain_bytes (vm, b, &chain_b, diff);
343      chain_bi = vlib_get_buffer_index (vm, chain_b);
344    }
345  else
346    chain_bi = b->next_buffer;
347
348  do
349    {
350      chain_b = vlib_get_buffer (vm, chain_bi);
351      data = vlib_buffer_get_current (chain_b);
352      len = chain_b->current_length;
353      if (!len)
354	continue;
355      if (is_in_order)
356	{
357	  rv = svm_fifo_enqueue (s->rx_fifo, len, data);
358	  if (rv == len)
359	    {
360	      written += rv;
361	    }
362	  else if (rv < len)
363	    {
364	      return (rv > 0) ? (written + rv) : written;
365	    }
366	  else if (rv > len)
367	    {
368	      written += rv;
369
370	      /* written more than what was left in chain */
371	      if (written > b->total_length_not_including_first_buffer)
372		return written;
373
374	      /* drop the bytes that have already been delivered */
375	      session_enqueue_discard_chain_bytes (vm, b, &chain_b, rv - len);
376	    }
377	}
378      else
379	{
380	  rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset, len, data);
381	  if (rv)
382	    {
383	      clib_warning ("failed to enqueue multi-buffer seg");
384	      return -1;
385	    }
386	  offset += len;
387	}
388    }
389  while ((chain_bi = (chain_b->flags & VLIB_BUFFER_NEXT_PRESENT)
390	  ? chain_b->next_buffer : 0));
391
392  if (is_in_order)
393    return written;
394
395  return 0;
396}
397
398/*
399 * Enqueue data for delivery to session peer. Does not notify peer of enqueue
400 * event but on request can queue notification events for later delivery by
401 * calling stream_server_flush_enqueue_events().
402 *
403 * @param tc Transport connection which is to be enqueued data
404 * @param b Buffer to be enqueued
405 * @param offset Offset at which to start enqueueing if out-of-order
406 * @param queue_event Flag to indicate if peer is to be notified or if event
407 *                    is to be queued. The former is useful when more data is
408 *                    enqueued and only one event is to be generated.
409 * @param is_in_order Flag to indicate if data is in order
410 * @return Number of bytes enqueued or a negative value if enqueueing failed.
411 */
412int
413session_enqueue_stream_connection (transport_connection_t * tc,
414				   vlib_buffer_t * b, u32 offset,
415				   u8 queue_event, u8 is_in_order)
416{
417  session_t *s;
418  int enqueued = 0, rv, in_order_off;
419
420  s = session_get (tc->s_index, tc->thread_index);
421
422  if (is_in_order)
423    {
424      enqueued = svm_fifo_enqueue (s->rx_fifo,
425				   b->current_length,
426				   vlib_buffer_get_current (b));
427      if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT)
428			 && enqueued >= 0))
429	{
430	  in_order_off = enqueued > b->current_length ? enqueued : 0;
431	  rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
432	  if (rv > 0)
433	    enqueued += rv;
434	}
435    }
436  else
437    {
438      rv = svm_fifo_enqueue_with_offset (s->rx_fifo, offset,
439					 b->current_length,
440					 vlib_buffer_get_current (b));
441      if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && !rv))
442	session_enqueue_chain_tail (s, b, offset + b->current_length, 0);
443      /* if something was enqueued, report even this as success for ooo
444       * segment handling */
445      return rv;
446    }
447
448  if (queue_event)
449    {
450      /* Queue RX event on this fifo. Eventually these will need to be flushed
451       * by calling stream_server_flush_enqueue_events () */
452      session_worker_t *wrk;
453
454      wrk = session_main_get_worker (s->thread_index);
455      if (!(s->flags & SESSION_F_RX_EVT))
456	{
457	  s->flags |= SESSION_F_RX_EVT;
458	  vec_add1 (wrk->session_to_enqueue[tc->proto], s->session_index);
459	}
460    }
461
462  return enqueued;
463}
464
465int
466session_enqueue_dgram_connection (session_t * s,
467				  session_dgram_hdr_t * hdr,
468				  vlib_buffer_t * b, u8 proto, u8 queue_event)
469{
470  int enqueued = 0, rv, in_order_off;
471
472  ASSERT (svm_fifo_max_enqueue_prod (s->rx_fifo)
473	  >= b->current_length + sizeof (*hdr));
474
475  svm_fifo_enqueue (s->rx_fifo, sizeof (session_dgram_hdr_t), (u8 *) hdr);
476  enqueued = svm_fifo_enqueue (s->rx_fifo, b->current_length,
477			       vlib_buffer_get_current (b));
478  if (PREDICT_FALSE ((b->flags & VLIB_BUFFER_NEXT_PRESENT) && enqueued >= 0))
479    {
480      in_order_off = enqueued > b->current_length ? enqueued : 0;
481      rv = session_enqueue_chain_tail (s, b, in_order_off, 1);
482      if (rv > 0)
483	enqueued += rv;
484    }
485  if (queue_event)
486    {
487      /* Queue RX event on this fifo. Eventually these will need to be flushed
488       * by calling stream_server_flush_enqueue_events () */
489      session_worker_t *wrk;
490
491      wrk = session_main_get_worker (s->thread_index);
492      if (!(s->flags & SESSION_F_RX_EVT))
493	{
494	  s->flags |= SESSION_F_RX_EVT;
495	  vec_add1 (wrk->session_to_enqueue[proto], s->session_index);
496	}
497    }
498  return enqueued;
499}
500
501int
502session_tx_fifo_peek_bytes (transport_connection_t * tc, u8 * buffer,
503			    u32 offset, u32 max_bytes)
504{
505  session_t *s = session_get (tc->s_index, tc->thread_index);
506  return svm_fifo_peek (s->tx_fifo, offset, max_bytes, buffer);
507}
508
509u32
510session_tx_fifo_dequeue_drop (transport_connection_t * tc, u32 max_bytes)
511{
512  session_t *s = session_get (tc->s_index, tc->thread_index);
513  u32 rv;
514
515  rv = svm_fifo_dequeue_drop (s->tx_fifo, max_bytes);
516
517  if (svm_fifo_needs_deq_ntf (s->tx_fifo, max_bytes))
518    session_dequeue_notify (s);
519
520  return rv;
521}
522
523static inline int
524session_notify_subscribers (u32 app_index, session_t * s,
525			    svm_fifo_t * f, session_evt_type_t evt_type)
526{
527  app_worker_t *app_wrk;
528  application_t *app;
529  int i;
530
531  app = application_get (app_index);
532  if (!app)
533    return -1;
534
535  for (i = 0; i < f->n_subscribers; i++)
536    {
537      app_wrk = application_get_worker (app, f->subscribers[i]);
538      if (!app_wrk)
539	continue;
540      if (app_worker_lock_and_send_event (app_wrk, s, evt_type))
541	return -1;
542    }
543
544  return 0;
545}
546
547/**
548 * Notify session peer that new data has been enqueued.
549 *
550 * @param s 	Stream session for which the event is to be generated.
551 * @param lock 	Flag to indicate if call should lock message queue.
552 *
553 * @return 0 on success or negative number if failed to send notification.
554 */
555static inline int
556session_enqueue_notify_inline (session_t * s)
557{
558  app_worker_t *app_wrk;
559  u32 session_index;
560  u8 n_subscribers;
561
562  session_index = s->session_index;
563  n_subscribers = svm_fifo_n_subscribers (s->rx_fifo);
564
565  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
566  if (PREDICT_FALSE (!app_wrk))
567    {
568      SESSION_DBG ("invalid s->app_index = %d", s->app_wrk_index);
569      return 0;
570    }
571
572  SESSION_EVT (SESSION_EVT_ENQ, s, svm_fifo_max_dequeue_prod (s->rx_fifo));
573
574  s->flags &= ~SESSION_F_RX_EVT;
575  if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
576						     SESSION_IO_EVT_RX)))
577    return -1;
578
579  if (PREDICT_FALSE (n_subscribers))
580    {
581      s = session_get (session_index, vlib_get_thread_index ());
582      return session_notify_subscribers (app_wrk->app_index, s,
583					 s->rx_fifo, SESSION_IO_EVT_RX);
584    }
585
586  return 0;
587}
588
589int
590session_enqueue_notify (session_t * s)
591{
592  return session_enqueue_notify_inline (s);
593}
594
595static void
596session_enqueue_notify_rpc (void *arg)
597{
598  u32 session_index = pointer_to_uword (arg);
599  session_t *s;
600
601  s = session_get_if_valid (session_index, vlib_get_thread_index ());
602  if (!s)
603    return;
604
605  session_enqueue_notify (s);
606}
607
608/**
609 * Like session_enqueue_notify, but can be called from a thread that does not
610 * own the session.
611 */
612void
613session_enqueue_notify_thread (session_handle_t sh)
614{
615  u32 thread_index = session_thread_from_handle (sh);
616  u32 session_index = session_index_from_handle (sh);
617
618  /*
619   * Pass session index (u32) as opposed to handle (u64) in case pointers
620   * are not 64-bit.
621   */
622  session_send_rpc_evt_to_thread (thread_index,
623				  session_enqueue_notify_rpc,
624				  uword_to_pointer (session_index, void *));
625}
626
627int
628session_dequeue_notify (session_t * s)
629{
630  app_worker_t *app_wrk;
631
632  svm_fifo_clear_deq_ntf (s->tx_fifo);
633
634  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
635  if (PREDICT_FALSE (!app_wrk))
636    return -1;
637
638  if (PREDICT_FALSE (app_worker_lock_and_send_event (app_wrk, s,
639						     SESSION_IO_EVT_TX)))
640    return -1;
641
642  if (PREDICT_FALSE (s->tx_fifo->n_subscribers))
643    return session_notify_subscribers (app_wrk->app_index, s,
644				       s->tx_fifo, SESSION_IO_EVT_TX);
645
646  return 0;
647}
648
649/**
650 * Flushes queue of sessions that are to be notified of new data
651 * enqueued events.
652 *
653 * @param thread_index Thread index for which the flush is to be performed.
654 * @return 0 on success or a positive number indicating the number of
655 *         failures due to API queue being full.
656 */
657int
658session_main_flush_enqueue_events (u8 transport_proto, u32 thread_index)
659{
660  session_worker_t *wrk = session_main_get_worker (thread_index);
661  session_t *s;
662  int i, errors = 0;
663  u32 *indices;
664
665  indices = wrk->session_to_enqueue[transport_proto];
666
667  for (i = 0; i < vec_len (indices); i++)
668    {
669      s = session_get_if_valid (indices[i], thread_index);
670      if (PREDICT_FALSE (!s))
671	{
672	  errors++;
673	  continue;
674	}
675
676      if (PREDICT_FALSE (session_enqueue_notify_inline (s)))
677	errors++;
678    }
679
680  vec_reset_length (indices);
681  wrk->session_to_enqueue[transport_proto] = indices;
682
683  return errors;
684}
685
686int
687session_main_flush_all_enqueue_events (u8 transport_proto)
688{
689  vlib_thread_main_t *vtm = vlib_get_thread_main ();
690  int i, errors = 0;
691  for (i = 0; i < 1 + vtm->n_threads; i++)
692    errors += session_main_flush_enqueue_events (transport_proto, i);
693  return errors;
694}
695
696static inline int
697session_stream_connect_notify_inline (transport_connection_t * tc, u8 is_fail,
698				      session_state_t opened_state)
699{
700  u32 opaque = 0, new_ti, new_si;
701  app_worker_t *app_wrk;
702  session_t *s = 0;
703  u64 ho_handle;
704
705  /*
706   * Find connection handle and cleanup half-open table
707   */
708  ho_handle = session_lookup_half_open_handle (tc);
709  if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
710    {
711      SESSION_DBG ("half-open was removed!");
712      return -1;
713    }
714  session_lookup_del_half_open (tc);
715
716  /* Get the app's index from the handle we stored when opening connection
717   * and the opaque (api_context for external apps) from transport session
718   * index */
719  app_wrk = app_worker_get_if_valid (ho_handle >> 32);
720  if (!app_wrk)
721    return -1;
722
723  opaque = tc->s_index;
724
725  if (is_fail)
726    return app_worker_connect_notify (app_wrk, s, opaque);
727
728  s = session_alloc_for_connection (tc);
729  s->session_state = SESSION_STATE_CONNECTING;
730  s->app_wrk_index = app_wrk->wrk_index;
731  new_si = s->session_index;
732  new_ti = s->thread_index;
733
734  if (app_worker_init_connected (app_wrk, s))
735    {
736      session_free (s);
737      app_worker_connect_notify (app_wrk, 0, opaque);
738      return -1;
739    }
740
741  s = session_get (new_si, new_ti);
742  s->session_state = opened_state;
743  session_lookup_add_connection (tc, session_handle (s));
744
745  if (app_worker_connect_notify (app_wrk, s, opaque))
746    {
747      s = session_get (new_si, new_ti);
748      session_free_w_fifos (s);
749      return -1;
750    }
751
752  return 0;
753}
754
755int
756session_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
757{
758  return session_stream_connect_notify_inline (tc, is_fail,
759					       SESSION_STATE_READY);
760}
761
762int
763session_ho_stream_connect_notify (transport_connection_t * tc, u8 is_fail)
764{
765  return session_stream_connect_notify_inline (tc, is_fail,
766					       SESSION_STATE_OPENED);
767}
768
769typedef struct _session_switch_pool_args
770{
771  u32 session_index;
772  u32 thread_index;
773  u32 new_thread_index;
774  u32 new_session_index;
775} session_switch_pool_args_t;
776
777/**
778 * Notify old thread of the session pool switch
779 */
780static void
781session_switch_pool (void *cb_args)
782{
783  session_switch_pool_args_t *args = (session_switch_pool_args_t *) cb_args;
784  app_worker_t *app_wrk;
785  session_t *s;
786
787  ASSERT (args->thread_index == vlib_get_thread_index ());
788  s = session_get (args->session_index, args->thread_index);
789  s->tx_fifo->master_session_index = args->new_session_index;
790  s->tx_fifo->master_thread_index = args->new_thread_index;
791  transport_cleanup (session_get_transport_proto (s), s->connection_index,
792		     s->thread_index);
793
794  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
795  if (app_wrk)
796    {
797      session_handle_t new_sh;
798      new_sh = session_make_handle (args->new_session_index,
799				    args->new_thread_index);
800      app_worker_migrate_notify (app_wrk, s, new_sh);
801
802      /* Trigger app read on the new thread */
803      session_enqueue_notify_thread (new_sh);
804    }
805
806  session_free (s);
807  clib_mem_free (cb_args);
808}
809
810/**
811 * Move dgram session to the right thread
812 */
813int
814session_dgram_connect_notify (transport_connection_t * tc,
815			      u32 old_thread_index, session_t ** new_session)
816{
817  session_t *new_s;
818  session_switch_pool_args_t *rpc_args;
819
820  /*
821   * Clone half-open session to the right thread.
822   */
823  new_s = session_clone_safe (tc->s_index, old_thread_index);
824  new_s->connection_index = tc->c_index;
825  new_s->rx_fifo->master_session_index = new_s->session_index;
826  new_s->rx_fifo->master_thread_index = new_s->thread_index;
827  new_s->session_state = SESSION_STATE_READY;
828  new_s->flags |= SESSION_F_IS_MIGRATING;
829  session_lookup_add_connection (tc, session_handle (new_s));
830
831  /*
832   * Ask thread owning the old session to clean it up and make us the tx
833   * fifo owner
834   */
835  rpc_args = clib_mem_alloc (sizeof (*rpc_args));
836  rpc_args->new_session_index = new_s->session_index;
837  rpc_args->new_thread_index = new_s->thread_index;
838  rpc_args->session_index = tc->s_index;
839  rpc_args->thread_index = old_thread_index;
840  session_send_rpc_evt_to_thread (rpc_args->thread_index, session_switch_pool,
841				  rpc_args);
842
843  tc->s_index = new_s->session_index;
844  new_s->connection_index = tc->c_index;
845  *new_session = new_s;
846  return 0;
847}
848
849/**
850 * Notification from transport that connection is being closed.
851 *
852 * A disconnect is sent to application but state is not removed. Once
853 * disconnect is acknowledged by application, session disconnect is called.
854 * Ultimately this leads to close being called on transport (passive close).
855 */
856void
857session_transport_closing_notify (transport_connection_t * tc)
858{
859  app_worker_t *app_wrk;
860  session_t *s;
861
862  s = session_get (tc->s_index, tc->thread_index);
863  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
864    return;
865  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
866  app_wrk = app_worker_get (s->app_wrk_index);
867  app_worker_close_notify (app_wrk, s);
868}
869
870/**
871 * Notification from transport that connection is being deleted
872 *
873 * This removes the session if it is still valid. It should be called only on
874 * previously fully established sessions. For instance failed connects should
875 * call stream_session_connect_notify and indicate that the connect has
876 * failed.
877 */
878void
879session_transport_delete_notify (transport_connection_t * tc)
880{
881  session_t *s;
882
883  /* App might've been removed already */
884  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
885    return;
886
887  switch (s->session_state)
888    {
889    case SESSION_STATE_CREATED:
890      /* Session was created but accept notification was not yet sent to the
891       * app. Cleanup everything. */
892      session_lookup_del_session (s);
893      segment_manager_dealloc_fifos (s->rx_fifo, s->tx_fifo);
894      session_free (s);
895      break;
896    case SESSION_STATE_ACCEPTING:
897    case SESSION_STATE_TRANSPORT_CLOSING:
898    case SESSION_STATE_CLOSING:
899    case SESSION_STATE_TRANSPORT_CLOSED:
900      /* If transport finishes or times out before we get a reply
901       * from the app, mark transport as closed and wait for reply
902       * before removing the session. Cleanup session table in advance
903       * because transport will soon be closed and closed sessions
904       * are assumed to have been removed from the lookup table */
905      session_lookup_del_session (s);
906      s->session_state = SESSION_STATE_TRANSPORT_DELETED;
907      session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
908      svm_fifo_dequeue_drop_all (s->tx_fifo);
909      break;
910    case SESSION_STATE_APP_CLOSED:
911      /* Cleanup lookup table as transport needs to still be valid.
912       * Program transport close to ensure that all session events
913       * have been cleaned up. Once transport close is called, the
914       * session is just removed because both transport and app have
915       * confirmed the close*/
916      session_lookup_del_session (s);
917      s->session_state = SESSION_STATE_TRANSPORT_DELETED;
918      session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
919      svm_fifo_dequeue_drop_all (s->tx_fifo);
920      session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
921      break;
922    case SESSION_STATE_TRANSPORT_DELETED:
923      break;
924    case SESSION_STATE_CLOSED:
925      session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
926      session_delete (s);
927      break;
928    default:
929      clib_warning ("session state %u", s->session_state);
930      session_cleanup_notify (s, SESSION_CLEANUP_TRANSPORT);
931      session_delete (s);
932      break;
933    }
934}
935
936/**
937 * Notification from transport that it is closed
938 *
939 * Should be called by transport, prior to calling delete notify, once it
940 * knows that no more data will be exchanged. This could serve as an
941 * early acknowledgment of an active close especially if transport delete
942 * can be delayed a long time, e.g., tcp time-wait.
943 */
944void
945session_transport_closed_notify (transport_connection_t * tc)
946{
947  app_worker_t *app_wrk;
948  session_t *s;
949
950  if (!(s = session_get_if_valid (tc->s_index, tc->thread_index)))
951    return;
952
953  /* Transport thinks that app requested close but it actually didn't.
954   * Can happen for tcp if fin and rst are received in close succession. */
955  if (s->session_state == SESSION_STATE_READY)
956    {
957      session_transport_closing_notify (tc);
958      svm_fifo_dequeue_drop_all (s->tx_fifo);
959      s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
960    }
961  /* If app close has not been received or has not yet resulted in
962   * a transport close, only mark the session transport as closed */
963  else if (s->session_state <= SESSION_STATE_CLOSING)
964    {
965      s->session_state = SESSION_STATE_TRANSPORT_CLOSED;
966    }
967  /* If app also closed, switch to closed */
968  else if (s->session_state == SESSION_STATE_APP_CLOSED)
969    s->session_state = SESSION_STATE_CLOSED;
970
971  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
972  if (app_wrk)
973    app_worker_transport_closed_notify (app_wrk, s);
974}
975
976/**
977 * Notify application that connection has been reset.
978 */
979void
980session_transport_reset_notify (transport_connection_t * tc)
981{
982  app_worker_t *app_wrk;
983  session_t *s;
984
985  s = session_get (tc->s_index, tc->thread_index);
986  svm_fifo_dequeue_drop_all (s->tx_fifo);
987  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
988    return;
989  s->session_state = SESSION_STATE_TRANSPORT_CLOSING;
990  app_wrk = app_worker_get (s->app_wrk_index);
991  app_worker_reset_notify (app_wrk, s);
992}
993
994int
995session_stream_accept_notify (transport_connection_t * tc)
996{
997  app_worker_t *app_wrk;
998  session_t *s;
999
1000  s = session_get (tc->s_index, tc->thread_index);
1001  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
1002  if (!app_wrk)
1003    return -1;
1004  s->session_state = SESSION_STATE_ACCEPTING;
1005  return app_worker_accept_notify (app_wrk, s);
1006}
1007
1008/**
1009 * Accept a stream session. Optionally ping the server by callback.
1010 */
1011int
1012session_stream_accept (transport_connection_t * tc, u32 listener_index,
1013		       u32 thread_index, u8 notify)
1014{
1015  session_t *s;
1016  int rv;
1017
1018  s = session_alloc_for_connection (tc);
1019  s->listener_handle = ((u64) thread_index << 32) | (u64) listener_index;
1020  s->session_state = SESSION_STATE_CREATED;
1021
1022  if ((rv = app_worker_init_accepted (s)))
1023    return rv;
1024
1025  session_lookup_add_connection (tc, session_handle (s));
1026
1027  /* Shoulder-tap the server */
1028  if (notify)
1029    {
1030      app_worker_t *app_wrk = app_worker_get (s->app_wrk_index);
1031      return app_worker_accept_notify (app_wrk, s);
1032    }
1033
1034  return 0;
1035}
1036
1037int
1038session_open_cl (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1039{
1040  transport_connection_t *tc;
1041  transport_endpoint_cfg_t *tep;
1042  app_worker_t *app_wrk;
1043  session_handle_t sh;
1044  session_t *s;
1045  int rv;
1046
1047  tep = session_endpoint_to_transport_cfg (rmt);
1048  rv = transport_connect (rmt->transport_proto, tep);
1049  if (rv < 0)
1050    {
1051      SESSION_DBG ("Transport failed to open connection.");
1052      return VNET_API_ERROR_SESSION_CONNECT;
1053    }
1054
1055  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1056
1057  /* For dgram type of service, allocate session and fifos now */
1058  app_wrk = app_worker_get (app_wrk_index);
1059  s = session_alloc_for_connection (tc);
1060  s->app_wrk_index = app_wrk->wrk_index;
1061  s->session_state = SESSION_STATE_OPENED;
1062  if (app_worker_init_connected (app_wrk, s))
1063    {
1064      session_free (s);
1065      return -1;
1066    }
1067
1068  sh = session_handle (s);
1069  session_lookup_add_connection (tc, sh);
1070  return app_worker_connect_notify (app_wrk, s, opaque);
1071}
1072
1073int
1074session_open_vc (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1075{
1076  transport_connection_t *tc;
1077  transport_endpoint_cfg_t *tep;
1078  u64 handle;
1079  int rv;
1080
1081  tep = session_endpoint_to_transport_cfg (rmt);
1082  rv = transport_connect (rmt->transport_proto, tep);
1083  if (rv < 0)
1084    {
1085      SESSION_DBG ("Transport failed to open connection.");
1086      return VNET_API_ERROR_SESSION_CONNECT;
1087    }
1088
1089  tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
1090
1091  /* If transport offers a stream service, only allocate session once the
1092   * connection has been established.
1093   * Add connection to half-open table and save app and tc index. The
1094   * latter is needed to help establish the connection while the former
1095   * is needed when the connect notify comes and we have to notify the
1096   * external app
1097   */
1098  handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
1099  session_lookup_add_half_open (tc, handle);
1100
1101  /* Store api_context (opaque) for when the reply comes. Not the nicest
1102   * thing but better than allocating a separate half-open pool.
1103   */
1104  tc->s_index = opaque;
1105  if (transport_half_open_has_fifos (rmt->transport_proto))
1106    return session_ho_stream_connect_notify (tc, 0 /* is_fail */ );
1107  return 0;
1108}
1109
1110int
1111session_open_app (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1112{
1113  session_endpoint_cfg_t *sep = (session_endpoint_cfg_t *) rmt;
1114  transport_endpoint_cfg_t *tep_cfg = session_endpoint_to_transport_cfg (sep);
1115
1116  sep->app_wrk_index = app_wrk_index;
1117  sep->opaque = opaque;
1118
1119  return transport_connect (rmt->transport_proto, tep_cfg);
1120}
1121
1122typedef int (*session_open_service_fn) (u32, session_endpoint_t *, u32);
1123
1124/* *INDENT-OFF* */
1125static session_open_service_fn session_open_srv_fns[TRANSPORT_N_SERVICES] = {
1126  session_open_vc,
1127  session_open_cl,
1128  session_open_app,
1129};
1130/* *INDENT-ON* */
1131
1132/**
1133 * Ask transport to open connection to remote transport endpoint.
1134 *
1135 * Stores handle for matching request with reply since the call can be
1136 * asynchronous. For instance, for TCP the 3-way handshake must complete
1137 * before reply comes. Session is only created once connection is established.
1138 *
1139 * @param app_index Index of the application requesting the connect
1140 * @param st Session type requested.
1141 * @param tep Remote transport endpoint
1142 * @param opaque Opaque data (typically, api_context) the application expects
1143 * 		 on open completion.
1144 */
1145int
1146session_open (u32 app_wrk_index, session_endpoint_t * rmt, u32 opaque)
1147{
1148  transport_service_type_t tst;
1149  tst = transport_protocol_service_type (rmt->transport_proto);
1150  return session_open_srv_fns[tst] (app_wrk_index, rmt, opaque);
1151}
1152
1153/**
1154 * Ask transport to listen on session endpoint.
1155 *
1156 * @param s Session for which listen will be called. Note that unlike
1157 * 	    established sessions, listen sessions are not associated to a
1158 * 	    thread.
1159 * @param sep Local endpoint to be listened on.
1160 */
1161int
1162session_listen (session_t * ls, session_endpoint_cfg_t * sep)
1163{
1164  transport_endpoint_t *tep;
1165  u32 tc_index, s_index;
1166
1167  /* Transport bind/listen */
1168  tep = session_endpoint_to_transport (sep);
1169  s_index = ls->session_index;
1170  tc_index = transport_start_listen (session_get_transport_proto (ls),
1171				     s_index, tep);
1172
1173  if (tc_index == (u32) ~ 0)
1174    return -1;
1175
1176  /* Attach transport to session. Lookup tables are populated by the app
1177   * worker because local tables (for ct sessions) are not backed by a fib */
1178  ls = listen_session_get (s_index);
1179  ls->connection_index = tc_index;
1180
1181  return 0;
1182}
1183
1184/**
1185 * Ask transport to stop listening on local transport endpoint.
1186 *
1187 * @param s Session to stop listening on. It must be in state LISTENING.
1188 */
1189int
1190session_stop_listen (session_t * s)
1191{
1192  transport_proto_t tp = session_get_transport_proto (s);
1193  transport_connection_t *tc;
1194
1195  if (s->session_state != SESSION_STATE_LISTENING)
1196    return -1;
1197
1198  tc = transport_get_listener (tp, s->connection_index);
1199  if (!tc)
1200    return VNET_API_ERROR_ADDRESS_NOT_IN_USE;
1201
1202  if (!(tc->flags & TRANSPORT_CONNECTION_F_NO_LOOKUP))
1203    session_lookup_del_connection (tc);
1204  transport_stop_listen (tp, s->connection_index);
1205  return 0;
1206}
1207
1208/**
1209 * Initialize session closing procedure.
1210 *
1211 * Request is always sent to session node to ensure that all outstanding
1212 * requests are served before transport is notified.
1213 */
1214void
1215session_close (session_t * s)
1216{
1217  if (!s)
1218    return;
1219
1220  if (s->session_state >= SESSION_STATE_CLOSING)
1221    {
1222      /* Session will only be removed once both app and transport
1223       * acknowledge the close */
1224      if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED
1225	  || s->session_state == SESSION_STATE_TRANSPORT_DELETED)
1226	session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1227      return;
1228    }
1229
1230  s->session_state = SESSION_STATE_CLOSING;
1231  session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_CLOSE);
1232}
1233
1234/**
1235 * Force a close without waiting for data to be flushed
1236 */
1237void
1238session_reset (session_t * s)
1239{
1240  if (s->session_state >= SESSION_STATE_CLOSING)
1241    return;
1242  /* Drop all outstanding tx data */
1243  svm_fifo_dequeue_drop_all (s->tx_fifo);
1244  s->session_state = SESSION_STATE_CLOSING;
1245  session_program_transport_ctrl_evt (s, SESSION_CTRL_EVT_RESET);
1246}
1247
1248/**
1249 * Notify transport the session can be disconnected. This should eventually
1250 * result in a delete notification that allows us to cleanup session state.
1251 * Called for both active/passive disconnects.
1252 *
1253 * Must be called from the session's thread.
1254 */
1255void
1256session_transport_close (session_t * s)
1257{
1258  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1259    {
1260      if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1261	s->session_state = SESSION_STATE_CLOSED;
1262      /* If transport is already deleted, just free the session */
1263      else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1264	session_free_w_fifos (s);
1265      return;
1266    }
1267
1268  /* If the tx queue wasn't drained, the transport can continue to try
1269   * sending the outstanding data (in closed state it cannot). It MUST however
1270   * at one point, either after sending everything or after a timeout, call
1271   * delete notify. This will finally lead to the complete cleanup of the
1272   * session.
1273   */
1274  s->session_state = SESSION_STATE_APP_CLOSED;
1275
1276  transport_close (session_get_transport_proto (s), s->connection_index,
1277		   s->thread_index);
1278}
1279
1280/**
1281 * Force transport close
1282 */
1283void
1284session_transport_reset (session_t * s)
1285{
1286  if (s->session_state >= SESSION_STATE_APP_CLOSED)
1287    {
1288      if (s->session_state == SESSION_STATE_TRANSPORT_CLOSED)
1289	s->session_state = SESSION_STATE_CLOSED;
1290      else if (s->session_state >= SESSION_STATE_TRANSPORT_DELETED)
1291	session_free_w_fifos (s);
1292      return;
1293    }
1294
1295  s->session_state = SESSION_STATE_APP_CLOSED;
1296  transport_reset (session_get_transport_proto (s), s->connection_index,
1297		   s->thread_index);
1298}
1299
1300/**
1301 * Cleanup transport and session state.
1302 *
1303 * Notify transport of the cleanup and free the session. This should
1304 * be called only if transport reported some error and is already
1305 * closed.
1306 */
1307void
1308session_transport_cleanup (session_t * s)
1309{
1310  /* Delete from main lookup table before we axe the the transport */
1311  session_lookup_del_session (s);
1312  if (s->session_state != SESSION_STATE_TRANSPORT_DELETED)
1313    transport_cleanup (session_get_transport_proto (s), s->connection_index,
1314		       s->thread_index);
1315  /* Since we called cleanup, no delete notification will come. So, make
1316   * sure the session is properly freed. */
1317  session_free_w_fifos (s);
1318}
1319
1320/**
1321 * Allocate event queues in the shared-memory segment
1322 *
1323 * That can either be a newly created memfd segment, that will need to be
1324 * mapped by all stack users, or the binary api's svm region. The latter is
1325 * assumed to be already mapped. NOTE that this assumption DOES NOT hold if
1326 * api clients bootstrap shm api over sockets (i.e. use memfd segments) and
1327 * vpp uses api svm region for event queues.
1328 */
1329void
1330session_vpp_event_queues_allocate (session_main_t * smm)
1331{
1332  u32 evt_q_length = 2048, evt_size = sizeof (session_event_t);
1333  ssvm_private_t *eqs = &smm->evt_qs_segment;
1334  api_main_t *am = &api_main;
1335  uword eqs_size = 64 << 20;
1336  pid_t vpp_pid = getpid ();
1337  void *oldheap;
1338  int i;
1339
1340  if (smm->configured_event_queue_length)
1341    evt_q_length = smm->configured_event_queue_length;
1342
1343  if (smm->evt_qs_use_memfd_seg)
1344    {
1345      if (smm->evt_qs_segment_size)
1346	eqs_size = smm->evt_qs_segment_size;
1347
1348      eqs->ssvm_size = eqs_size;
1349      eqs->i_am_master = 1;
1350      eqs->my_pid = vpp_pid;
1351      eqs->name = format (0, "%s%c", "evt-qs-segment", 0);
1352      eqs->requested_va = smm->session_baseva;
1353
1354      if (ssvm_master_init (eqs, SSVM_SEGMENT_MEMFD))
1355	{
1356	  clib_warning ("failed to initialize queue segment");
1357	  return;
1358	}
1359    }
1360
1361  if (smm->evt_qs_use_memfd_seg)
1362    oldheap = ssvm_push_heap (eqs->sh);
1363  else
1364    oldheap = svm_push_data_heap (am->vlib_rp);
1365
1366  for (i = 0; i < vec_len (smm->wrk); i++)
1367    {
1368      svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1369      svm_msg_q_ring_cfg_t rc[SESSION_MQ_N_RINGS] = {
1370	{evt_q_length, evt_size, 0}
1371	,
1372	{evt_q_length >> 1, 256, 0}
1373      };
1374      cfg->consumer_pid = 0;
1375      cfg->n_rings = 2;
1376      cfg->q_nitems = evt_q_length;
1377      cfg->ring_cfgs = rc;
1378      smm->wrk[i].vpp_event_queue = svm_msg_q_alloc (cfg);
1379      if (smm->evt_qs_use_memfd_seg)
1380	{
1381	  if (svm_msg_q_alloc_consumer_eventfd (smm->wrk[i].vpp_event_queue))
1382	    clib_warning ("eventfd returned");
1383	}
1384    }
1385
1386  if (smm->evt_qs_use_memfd_seg)
1387    ssvm_pop_heap (oldheap);
1388  else
1389    svm_pop_heap (oldheap);
1390}
1391
1392ssvm_private_t *
1393session_main_get_evt_q_segment (void)
1394{
1395  session_main_t *smm = &session_main;
1396  if (smm->evt_qs_use_memfd_seg)
1397    return &smm->evt_qs_segment;
1398  return 0;
1399}
1400
1401u64
1402session_segment_handle (session_t * s)
1403{
1404  svm_fifo_t *f;
1405
1406  if (!s->rx_fifo)
1407    return SESSION_INVALID_HANDLE;
1408
1409  f = s->rx_fifo;
1410  return segment_manager_make_segment_handle (f->segment_manager,
1411					      f->segment_index);
1412}
1413
1414/* *INDENT-OFF* */
1415static session_fifo_rx_fn *session_tx_fns[TRANSPORT_TX_N_FNS] = {
1416    session_tx_fifo_peek_and_snd,
1417    session_tx_fifo_dequeue_and_snd,
1418    session_tx_fifo_dequeue_internal,
1419    session_tx_fifo_dequeue_and_snd
1420};
1421/* *INDENT-ON* */
1422
1423/**
1424 * Initialize session layer for given transport proto and ip version
1425 *
1426 * Allocates per session type (transport proto + ip version) data structures
1427 * and adds arc from session queue node to session type output node.
1428 */
1429void
1430session_register_transport (transport_proto_t transport_proto,
1431			    const transport_proto_vft_t * vft, u8 is_ip4,
1432			    u32 output_node)
1433{
1434  session_main_t *smm = &session_main;
1435  session_type_t session_type;
1436  u32 next_index = ~0;
1437
1438  session_type = session_type_from_proto_and_ip (transport_proto, is_ip4);
1439
1440  vec_validate (smm->session_type_to_next, session_type);
1441  vec_validate (smm->session_tx_fns, session_type);
1442
1443  /* *INDENT-OFF* */
1444  if (output_node != ~0)
1445    {
1446      foreach_vlib_main (({
1447          next_index = vlib_node_add_next (this_vlib_main,
1448                                           session_queue_node.index,
1449                                           output_node);
1450      }));
1451    }
1452  /* *INDENT-ON* */
1453
1454  smm->session_type_to_next[session_type] = next_index;
1455  smm->session_tx_fns[session_type] =
1456    session_tx_fns[vft->transport_options.tx_type];
1457}
1458
1459transport_connection_t *
1460session_get_transport (session_t * s)
1461{
1462  if (s->session_state != SESSION_STATE_LISTENING)
1463    return transport_get_connection (session_get_transport_proto (s),
1464				     s->connection_index, s->thread_index);
1465  else
1466    return transport_get_listener (session_get_transport_proto (s),
1467				   s->connection_index);
1468}
1469
1470void
1471session_get_endpoint (session_t * s, transport_endpoint_t * tep, u8 is_lcl)
1472{
1473  if (s->session_state != SESSION_STATE_LISTENING)
1474    return transport_get_endpoint (session_get_transport_proto (s),
1475				   s->connection_index, s->thread_index, tep,
1476				   is_lcl);
1477  else
1478    return transport_get_listener_endpoint (session_get_transport_proto (s),
1479					    s->connection_index, tep, is_lcl);
1480}
1481
1482transport_connection_t *
1483listen_session_get_transport (session_t * s)
1484{
1485  return transport_get_listener (session_get_transport_proto (s),
1486				 s->connection_index);
1487}
1488
1489void
1490session_flush_frames_main_thread (vlib_main_t * vm)
1491{
1492  ASSERT (vlib_get_thread_index () == 0);
1493  vlib_process_signal_event_mt (vm, session_queue_process_node.index,
1494				SESSION_Q_PROCESS_FLUSH_FRAMES, 0);
1495}
1496
1497static clib_error_t *
1498session_manager_main_enable (vlib_main_t * vm)
1499{
1500  segment_manager_main_init_args_t _sm_args = { 0 }, *sm_args = &_sm_args;
1501  session_main_t *smm = &session_main;
1502  vlib_thread_main_t *vtm = vlib_get_thread_main ();
1503  u32 num_threads, preallocated_sessions_per_worker;
1504  session_worker_t *wrk;
1505  int i;
1506
1507  num_threads = 1 /* main thread */  + vtm->n_threads;
1508
1509  if (num_threads < 1)
1510    return clib_error_return (0, "n_thread_stacks not set");
1511
1512  /* Allocate cache line aligned worker contexts */
1513  vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
1514
1515  for (i = 0; i < num_threads; i++)
1516    {
1517      wrk = &smm->wrk[i];
1518      wrk->ctrl_head = clib_llist_make_head (wrk->event_elts, evt_list);
1519      wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
1520      wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
1521      wrk->vm = vlib_mains[i];
1522      wrk->last_vlib_time = vlib_time_now (vlib_mains[i]);
1523      wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
1524
1525      if (num_threads > 1)
1526	clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
1527    }
1528
1529  /* Allocate vpp event queues segment and queue */
1530  session_vpp_event_queues_allocate (smm);
1531
1532  /* Initialize fifo segment main baseva and timeout */
1533  sm_args->baseva = smm->session_baseva + smm->evt_qs_segment_size;
1534  sm_args->size = smm->session_va_space_size;
1535  segment_manager_main_init (sm_args);
1536
1537  /* Preallocate sessions */
1538  if (smm->preallocated_sessions)
1539    {
1540      if (num_threads == 1)
1541	{
1542	  pool_init_fixed (smm->wrk[0].sessions, smm->preallocated_sessions);
1543	}
1544      else
1545	{
1546	  int j;
1547	  preallocated_sessions_per_worker =
1548	    (1.1 * (f64) smm->preallocated_sessions /
1549	     (f64) (num_threads - 1));
1550
1551	  for (j = 1; j < num_threads; j++)
1552	    {
1553	      pool_init_fixed (smm->wrk[j].sessions,
1554			       preallocated_sessions_per_worker);
1555	    }
1556	}
1557    }
1558
1559  session_lookup_init ();
1560  app_namespaces_init ();
1561  transport_init ();
1562
1563  smm->is_enabled = 1;
1564
1565  /* Enable transports */
1566  transport_enable_disable (vm, 1);
1567  return 0;
1568}
1569
1570void
1571session_node_enable_disable (u8 is_en)
1572{
1573  u8 state = is_en ? VLIB_NODE_STATE_POLLING : VLIB_NODE_STATE_DISABLED;
1574  vlib_thread_main_t *vtm = vlib_get_thread_main ();
1575  u8 have_workers = vtm->n_threads != 0;
1576
1577  /* *INDENT-OFF* */
1578  foreach_vlib_main (({
1579    if (have_workers && ii == 0)
1580      {
1581	vlib_node_set_state (this_vlib_main, session_queue_process_node.index,
1582	                     state);
1583	if (is_en)
1584	  {
1585	    vlib_node_t *n = vlib_get_node (this_vlib_main,
1586	                                    session_queue_process_node.index);
1587	    vlib_start_process (this_vlib_main, n->runtime_index);
1588	  }
1589	else
1590	  {
1591	    vlib_process_signal_event_mt (this_vlib_main,
1592	                                  session_queue_process_node.index,
1593	                                  SESSION_Q_PROCESS_STOP, 0);
1594	  }
1595
1596	continue;
1597      }
1598    vlib_node_set_state (this_vlib_main, session_queue_node.index,
1599                         state);
1600  }));
1601  /* *INDENT-ON* */
1602}
1603
1604clib_error_t *
1605vnet_session_enable_disable (vlib_main_t * vm, u8 is_en)
1606{
1607  clib_error_t *error = 0;
1608  if (is_en)
1609    {
1610      if (session_main.is_enabled)
1611	return 0;
1612
1613      error = session_manager_main_enable (vm);
1614      session_node_enable_disable (is_en);
1615    }
1616  else
1617    {
1618      session_main.is_enabled = 0;
1619      session_node_enable_disable (is_en);
1620    }
1621
1622  return error;
1623}
1624
1625clib_error_t *
1626session_manager_main_init (vlib_main_t * vm)
1627{
1628  session_main_t *smm = &session_main;
1629  smm->session_baseva = HIGH_SEGMENT_BASEVA;
1630#if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
1631  smm->session_va_space_size = 128ULL << 30;
1632  smm->evt_qs_segment_size = 64 << 20;
1633#else
1634  smm->session_va_space_size = 128 << 20;
1635  smm->evt_qs_segment_size = 1 << 20;
1636#endif
1637  smm->is_enabled = 0;
1638  smm->session_enable_asap = 0;
1639  return 0;
1640}
1641
1642static clib_error_t *
1643session_main_init (vlib_main_t * vm)
1644{
1645  session_main_t *smm = &session_main;
1646  if (smm->session_enable_asap)
1647    {
1648      vlib_worker_thread_barrier_sync (vm);
1649      vnet_session_enable_disable (vm, 1 /* is_en */ );
1650      vlib_worker_thread_barrier_release (vm);
1651    }
1652  return 0;
1653}
1654
1655VLIB_INIT_FUNCTION (session_manager_main_init);
1656VLIB_MAIN_LOOP_ENTER_FUNCTION (session_main_init);
1657
1658static clib_error_t *
1659session_config_fn (vlib_main_t * vm, unformat_input_t * input)
1660{
1661  session_main_t *smm = &session_main;
1662  u32 nitems;
1663  uword tmp;
1664
1665  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
1666    {
1667      if (unformat (input, "event-queue-length %d", &nitems))
1668	{
1669	  if (nitems >= 2048)
1670	    smm->configured_event_queue_length = nitems;
1671	  else
1672	    clib_warning ("event queue length %d too small, ignored", nitems);
1673	}
1674      else if (unformat (input, "preallocated-sessions %d",
1675			 &smm->preallocated_sessions))
1676	;
1677      else if (unformat (input, "v4-session-table-buckets %d",
1678			 &smm->configured_v4_session_table_buckets))
1679	;
1680      else if (unformat (input, "v4-halfopen-table-buckets %d",
1681			 &smm->configured_v4_halfopen_table_buckets))
1682	;
1683      else if (unformat (input, "v6-session-table-buckets %d",
1684			 &smm->configured_v6_session_table_buckets))
1685	;
1686      else if (unformat (input, "v6-halfopen-table-buckets %d",
1687			 &smm->configured_v6_halfopen_table_buckets))
1688	;
1689      else if (unformat (input, "v4-session-table-memory %U",
1690			 unformat_memory_size, &tmp))
1691	{
1692	  if (tmp >= 0x100000000)
1693	    return clib_error_return (0, "memory size %llx (%lld) too large",
1694				      tmp, tmp);
1695	  smm->configured_v4_session_table_memory = tmp;
1696	}
1697      else if (unformat (input, "v4-halfopen-table-memory %U",
1698			 unformat_memory_size, &tmp))
1699	{
1700	  if (tmp >= 0x100000000)
1701	    return clib_error_return (0, "memory size %llx (%lld) too large",
1702				      tmp, tmp);
1703	  smm->configured_v4_halfopen_table_memory = tmp;
1704	}
1705      else if (unformat (input, "v6-session-table-memory %U",
1706			 unformat_memory_size, &tmp))
1707	{
1708	  if (tmp >= 0x100000000)
1709	    return clib_error_return (0, "memory size %llx (%lld) too large",
1710				      tmp, tmp);
1711	  smm->configured_v6_session_table_memory = tmp;
1712	}
1713      else if (unformat (input, "v6-halfopen-table-memory %U",
1714			 unformat_memory_size, &tmp))
1715	{
1716	  if (tmp >= 0x100000000)
1717	    return clib_error_return (0, "memory size %llx (%lld) too large",
1718				      tmp, tmp);
1719	  smm->configured_v6_halfopen_table_memory = tmp;
1720	}
1721      else if (unformat (input, "local-endpoints-table-memory %U",
1722			 unformat_memory_size, &tmp))
1723	{
1724	  if (tmp >= 0x100000000)
1725	    return clib_error_return (0, "memory size %llx (%lld) too large",
1726				      tmp, tmp);
1727	  smm->local_endpoints_table_memory = tmp;
1728	}
1729      else if (unformat (input, "local-endpoints-table-buckets %d",
1730			 &smm->local_endpoints_table_buckets))
1731	;
1732      else if (unformat (input, "evt_qs_memfd_seg"))
1733	smm->evt_qs_use_memfd_seg = 1;
1734      else if (unformat (input, "evt_qs_seg_size %U", unformat_memory_size,
1735			 &smm->evt_qs_segment_size))
1736	;
1737      else if (unformat (input, "enable"))
1738	smm->session_enable_asap = 1;
1739      else
1740	return clib_error_return (0, "unknown input `%U'",
1741				  format_unformat_error, input);
1742    }
1743  return 0;
1744}
1745
1746VLIB_CONFIG_FUNCTION (session_config_fn, "session");
1747
1748/*
1749 * fd.io coding-style-patch-verification: ON
1750 *
1751 * Local Variables:
1752 * eval: (c-set-style "gnu")
1753 * End:
1754 */
1755