vpp_echo.c revision ff5a9b6e
1/*
2 * Copyright (c) 2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <stdio.h>
17#include <signal.h>
18
19#include <vlibmemory/api.h>
20#include <svm/fifo_segment.h>
21
22#include <hs_apps/sapi/vpp_echo_common.h>
23
24echo_main_t echo_main;
25
26static void
27echo_session_prealloc (echo_main_t * em)
28{
29  /* We need to prealloc to avoid vec resize in threads */
30  echo_session_t *session;
31  int i;
32  for (i = 0; i < em->n_sessions; i++)
33    {
34      pool_get (em->sessions, session);
35      clib_memset (session, 0, sizeof (*session));
36      session->session_index = session - em->sessions;
37      session->listener_index = SESSION_INVALID_INDEX;
38      session->session_state = ECHO_SESSION_STATE_INITIAL;
39    }
40}
41
42static void
43echo_assert_test_suceeded (echo_main_t * em)
44{
45  if (em->rx_results_diff)
46    CHECK_DIFF (ECHO_FAIL_TEST_ASSERT_RX_TOTAL,
47		em->n_clients * em->bytes_to_receive, em->stats.rx_total,
48		"Invalid amount of data received");
49  else
50    CHECK_SAME (ECHO_FAIL_TEST_ASSERT_RX_TOTAL,
51		em->n_clients * em->bytes_to_receive, em->stats.rx_total,
52		"Invalid amount of data received");
53
54  if (em->tx_results_diff)
55    CHECK_DIFF (ECHO_FAIL_TEST_ASSERT_TX_TOTAL,
56		em->n_clients * em->bytes_to_send, em->stats.tx_total,
57		"Invalid amount of data sent");
58  else
59    CHECK_SAME (ECHO_FAIL_TEST_ASSERT_TX_TOTAL,
60		em->n_clients * em->bytes_to_send, em->stats.tx_total,
61		"Invalid amount of data sent");
62
63  clib_spinlock_lock (&em->sid_vpp_handles_lock);
64  CHECK_SAME (ECHO_FAIL_TEST_ASSERT_ALL_SESSIONS_CLOSED,
65	      0, hash_elts (em->session_index_by_vpp_handles),
66	      "Some sessions are still open");
67  clib_spinlock_unlock (&em->sid_vpp_handles_lock);
68}
69
70always_inline void
71echo_session_dequeue_notify (echo_session_t * s)
72{
73  int rv;
74  if (!svm_fifo_set_event (s->rx_fifo))
75    return;
76  if ((rv =
77       app_send_io_evt_to_vpp (s->vpp_evt_q, s->rx_fifo->master_session_index,
78			       SESSION_IO_EVT_RX, SVM_Q_WAIT)))
79    ECHO_FAIL (ECHO_FAIL_SEND_IO_EVT, "app_send_io_evt_to_vpp errored %d",
80	       rv);
81  svm_fifo_clear_deq_ntf (s->rx_fifo);
82}
83
84static void
85stop_signal (int signum)
86{
87  echo_main_t *em = &echo_main;
88  em->time_to_stop = 1;
89}
90
91int
92connect_to_vpp (char *name)
93{
94  echo_main_t *em = &echo_main;
95  api_main_t *am = &api_main;
96
97  if (em->use_sock_api)
98    {
99      if (vl_socket_client_connect ((char *) em->socket_name, name,
100				    0 /* default rx, tx buffer */ ))
101	{
102	  ECHO_FAIL (ECHO_FAIL_SOCKET_CONNECT, "socket connect failed");
103	  return -1;
104	}
105
106      if (vl_socket_client_init_shm (0, 1 /* want_pthread */ ))
107	{
108	  ECHO_FAIL (ECHO_FAIL_INIT_SHM_API, "init shm api failed");
109	  return -1;
110	}
111    }
112  else
113    {
114      if (vl_client_connect_to_vlib ("/vpe-api", name, 32) < 0)
115	{
116	  ECHO_FAIL (ECHO_FAIL_SHMEM_CONNECT, "shmem connect failed");
117	  return -1;
118	}
119    }
120  em->vl_input_queue = am->shmem_hdr->vl_input_queue;
121  em->my_client_index = am->my_client_index;
122  return 0;
123}
124
125static void
126print_global_json_stats (echo_main_t * em)
127{
128  u8 *start_evt =
129    format (0, "%U", echo_format_timing_event, em->timing.start_event);
130  u8 *end_evt =
131    format (0, "%U", echo_format_timing_event, em->timing.end_event);
132  u8 start_evt_missing = !(em->timing.events_sent & em->timing.start_event);
133  u8 end_evt_missing = (em->rx_results_diff || em->tx_results_diff) ? 0 :
134    !(em->timing.events_sent & em->timing.end_event);
135  f64 deltat = start_evt_missing || end_evt_missing ? 0 :
136    em->timing.end_time - em->timing.start_time;
137
138  if (start_evt_missing)
139    ECHO_FAIL (ECHO_FAIL_MISSING_START_EVENT,
140	       "Expected event %v to happen, but it did not!", start_evt);
141
142  if (end_evt_missing)
143    ECHO_FAIL (ECHO_FAIL_MISSING_END_EVENT,
144	       "Expected event %v to happen, but it did not!", end_evt);
145
146  fformat (stdout, "vpp_echo JSON stats:\n{\n");
147  fformat (stdout, "  \"role\": \"%s\",\n",
148	   em->i_am_master ? "server" : "client");
149  fformat (stdout, "  \"time\": \"%.9f\",\n", deltat);
150  fformat (stdout, "  \"start_evt\": \"%v\",\n", start_evt);
151  fformat (stdout, "  \"start_evt_missing\": \"%s\",\n",
152	   start_evt_missing ? "True" : "False");
153  fformat (stdout, "  \"end_evt\": \"%v\",\n", end_evt);
154  fformat (stdout, "  \"end_evt_missing\": \"%s\",\n",
155	   end_evt_missing ? "True" : "False");
156  fformat (stdout, "  \"rx_data\": %lld,\n", em->stats.rx_total);
157  fformat (stdout, "  \"tx_rx\": %lld,\n", em->stats.tx_total);
158  fformat (stdout, "  \"closing\": {\n");
159  fformat (stdout, "    \"reset\": { \"q\": %d, \"s\": %d },\n",
160	   em->stats.reset_count.q, em->stats.reset_count.s);
161  fformat (stdout, "    \"close\": { \"q\": %d, \"s\": %d },\n",
162	   em->stats.close_count.q, em->stats.close_count.s);
163  fformat (stdout, "    \"active\": { \"q\": %d, \"s\": %d },\n",
164	   em->stats.active_count.q, em->stats.active_count.s);
165  fformat (stdout, "    \"clean\": { \"q\": %d, \"s\": %d }\n",
166	   em->stats.clean_count.q, em->stats.clean_count.s);
167  fformat (stdout, "  }\n");
168  fformat (stdout, "  \"results\": {\n");
169  fformat (stdout, "    \"has_failed\": \"%d\"\n", em->has_failed);
170  fformat (stdout, "    \"fail_descr\": \"%v\"\n", em->fail_descr);
171  fformat (stdout, "  }\n");
172  fformat (stdout, "}\n");
173  fflush (stdout);
174  vec_free (start_evt);
175  vec_free (end_evt);
176}
177
178static void
179print_global_stats (echo_main_t * em)
180{
181  u8 *start_evt =
182    format (0, "%U", echo_format_timing_event, em->timing.start_event);
183  u8 *end_evt =
184    format (0, "%U", echo_format_timing_event, em->timing.end_event);
185  u8 start_evt_missing = !(em->timing.events_sent & em->timing.start_event);
186  u8 end_evt_missing = (em->rx_results_diff || em->tx_results_diff) ? 0 :
187    !(em->timing.events_sent & em->timing.end_event);
188  f64 deltat = start_evt_missing || end_evt_missing ? 0 :
189    em->timing.end_time - em->timing.start_time;
190
191  if (start_evt_missing)
192    ECHO_FAIL (ECHO_FAIL_MISSING_START_EVENT,
193	       "Expected event %v to happen, but it did not!", start_evt);
194
195  if (end_evt_missing)
196    ECHO_FAIL (ECHO_FAIL_MISSING_END_EVENT,
197	       "Expected event %v to happen, but it did not!", end_evt);
198
199  fformat (stdout, "Timing %v:%v\n", start_evt, end_evt);
200  if (start_evt_missing)
201    fformat (stdout, "Missing Start Timing Event (%v)!\n", start_evt);
202  if (end_evt_missing)
203    fformat (stdout, "Missing End Timing Event (%v)!\n", end_evt);
204  fformat (stdout, "-------- TX --------\n");
205  fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.6f seconds\n",
206	   em->stats.tx_total, em->stats.tx_total / (1ULL << 20),
207	   em->stats.tx_total / (1ULL << 30), deltat);
208  if (deltat)
209    fformat (stdout, "%.4f Gbit/second\n",
210	     (em->stats.tx_total * 8.0) / deltat / 1e9);
211  fformat (stdout, "-------- RX --------\n");
212  fformat (stdout, "%lld bytes (%lld mbytes, %lld gbytes) in %.6f seconds\n",
213	   em->stats.rx_total, em->stats.rx_total / (1ULL << 20),
214	   em->stats.rx_total / (1ULL << 30), deltat);
215  if (deltat)
216    fformat (stdout, "%.4f Gbit/second\n",
217	     (em->stats.rx_total * 8.0) / deltat / 1e9);
218  fformat (stdout, "--------------------\n");
219  fformat (stdout, "Received close on %d streams (and %d Quic conn)\n",
220	   em->stats.close_count.s, em->stats.close_count.q);
221  fformat (stdout, "Received reset on %d streams (and %d Quic conn)\n",
222	   em->stats.reset_count.s, em->stats.reset_count.q);
223  fformat (stdout, "Sent close on     %d streams (and %d Quic conn)\n",
224	   em->stats.active_count.s, em->stats.active_count.q);
225  fformat (stdout, "Discarded         %d streams (and %d Quic conn)\n",
226	   em->stats.clean_count.s, em->stats.clean_count.q);
227  if (em->has_failed)
228    fformat (stdout, "\nFailure Return Status: %d\n%v", em->has_failed,
229	     em->fail_descr);
230  vec_free (start_evt);
231  vec_free (end_evt);
232}
233
234void
235echo_update_count_on_session_close (echo_main_t * em, echo_session_t * s)
236{
237
238  ECHO_LOG (1, "[%lu/%lu] -> %U -> [%lu/%lu]",
239	    s->bytes_received, s->bytes_received + s->bytes_to_receive,
240	    echo_format_session, s, s->bytes_sent,
241	    s->bytes_sent + s->bytes_to_send);
242  clib_atomic_fetch_add (&em->stats.tx_total, s->bytes_sent);
243  clib_atomic_fetch_add (&em->stats.rx_total, s->bytes_received);
244
245  if (PREDICT_FALSE (em->stats.rx_total ==
246		     em->n_clients * em->bytes_to_receive))
247    echo_notify_event (em, ECHO_EVT_LAST_BYTE);
248}
249
250static void
251echo_free_sessions (echo_main_t * em)
252{
253  /* Free marked sessions */
254  echo_session_t *s;
255  u32 *session_indexes = 0, *session_index;
256
257  /* *INDENT-OFF* */
258  pool_foreach (s, em->sessions,
259  ({
260    if (s->session_state == ECHO_SESSION_STATE_CLOSED)
261      vec_add1 (session_indexes, s->session_index);}
262  ));
263  /* *INDENT-ON* */
264  vec_foreach (session_index, session_indexes)
265  {
266    /* Free session */
267    s = pool_elt_at_index (em->sessions, *session_index);
268    echo_session_handle_add_del (em, s->vpp_session_handle,
269				 SESSION_INVALID_INDEX);
270    clib_memset (s, 0xfe, sizeof (*s));
271    pool_put (em->sessions, s);
272  }
273}
274
275static void
276test_recv_bytes (echo_main_t * em, echo_session_t * s, u8 * rx_buf,
277		 u32 n_read)
278{
279  u32 i;
280  u8 expected;
281  for (i = 0; i < n_read; i++)
282    {
283      expected = (s->bytes_received + i) & 0xff;
284      if (rx_buf[i] == expected || em->max_test_msg > 0)
285	continue;
286      ECHO_LOG (0, "Session 0x%lx byte %lld was 0x%x expected 0x%x",
287		s->vpp_session_handle, s->bytes_received + i, rx_buf[i],
288		expected);
289      em->max_test_msg--;
290      if (em->max_test_msg == 0)
291	ECHO_LOG (0, "Too many errors, hiding next ones");
292      if (em->test_return_packets == RETURN_PACKETS_ASSERT)
293	ECHO_FAIL (ECHO_FAIL_TEST_BYTES_ERR, "test-bytes errored");
294    }
295}
296
297static int
298recv_data_chunk (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
299{
300  int n_read;
301  n_read = app_recv ((app_session_t *) s, rx_buf, vec_len (rx_buf));
302  if (n_read <= 0)
303    return 0;
304  if (svm_fifo_needs_deq_ntf (s->rx_fifo, n_read))
305    echo_session_dequeue_notify (s);
306
307  if (em->test_return_packets)
308    test_recv_bytes (em, s, rx_buf, n_read);
309
310  s->bytes_received += n_read;
311  s->bytes_to_receive -= n_read;
312  return n_read;
313}
314
315static int
316send_data_chunk (echo_session_t * s, u8 * tx_buf, int offset, int len)
317{
318  int n_sent;
319  int bytes_this_chunk = clib_min (s->bytes_to_send, len - offset);
320  if (!bytes_this_chunk)
321    return 0;
322  n_sent = app_send ((app_session_t *) s, tx_buf + offset,
323		     bytes_this_chunk, SVM_Q_WAIT);
324  if (n_sent < 0)
325    return 0;
326  s->bytes_to_send -= n_sent;
327  s->bytes_sent += n_sent;
328  return n_sent;
329}
330
331static int
332mirror_data_chunk (echo_main_t * em, echo_session_t * s, u8 * tx_buf, u64 len)
333{
334  u64 n_sent = 0;
335  while (n_sent < len && !em->time_to_stop)
336    n_sent += send_data_chunk (s, tx_buf, n_sent, len);
337  return n_sent;
338}
339
340static inline void
341echo_check_closed_listener (echo_main_t * em, echo_session_t * s)
342{
343  echo_session_t *ls;
344  /* if parent has died, terminate gracefully */
345  if (s->listener_index == SESSION_INVALID_INDEX)
346    {
347      ECHO_LOG (2, "%U: listener_index == SESSION_INVALID_INDEX",
348		echo_format_session, s);
349      return;
350    }
351  ls = pool_elt_at_index (em->sessions, s->listener_index);
352  if (ls->session_state < ECHO_SESSION_STATE_CLOSING)
353    {
354      ECHO_LOG (3, "%U: ls->session_state (%d) < "
355		"ECHO_SESSION_STATE_CLOSING (%d)",
356		echo_format_session, ls, ls->session_state,
357		ECHO_SESSION_STATE_CLOSING);
358      return;
359    }
360
361  ECHO_LOG (2, "%U died, close child %U", echo_format_session, ls,
362	    echo_format_session, s);
363  echo_update_count_on_session_close (em, s);
364  em->proto_cb_vft->cleanup_cb (s, 1 /* parent_died */ );
365}
366
367/*
368 * Rx/Tx polling thread per connection
369 */
370static void
371echo_handle_data (echo_main_t * em, echo_session_t * s, u8 * rx_buf)
372{
373  int n_read, n_sent = 0;
374
375  n_read = recv_data_chunk (em, s, rx_buf);
376  if ((em->data_source == ECHO_TEST_DATA_SOURCE) && s->bytes_to_send)
377    n_sent = send_data_chunk (s, em->connect_test_data,
378			      s->bytes_sent % em->tx_buf_size,
379			      em->tx_buf_size);
380  else if (em->data_source == ECHO_RX_DATA_SOURCE)
381    n_sent = mirror_data_chunk (em, s, rx_buf, n_read);
382  if (!s->bytes_to_send && !s->bytes_to_receive)
383    {
384      /* Session is done, need to close */
385      if (s->session_state == ECHO_SESSION_STATE_AWAIT_DATA)
386	s->session_state = ECHO_SESSION_STATE_CLOSING;
387      else
388	{
389	  s->session_state = ECHO_SESSION_STATE_AWAIT_CLOSING;
390	  if (em->send_stream_disconnects == ECHO_CLOSE_F_ACTIVE)
391	    {
392	      echo_send_rpc (em, echo_send_disconnect_session,
393			     (void *) s->vpp_session_handle, 0);
394	      clib_atomic_fetch_add (&em->stats.active_count.s, 1);
395	    }
396	  else if (em->send_stream_disconnects == ECHO_CLOSE_F_NONE)
397	    {
398	      s->session_state = ECHO_SESSION_STATE_CLOSING;
399	      clib_atomic_fetch_add (&em->stats.clean_count.s, 1);
400	    }
401	}
402      return;
403    }
404
405  /* Check for idle clients */
406  if (em->log_lvl > 1)
407    {
408      if (n_sent || n_read)
409	s->idle_cycles = 0;
410      else if (s->idle_cycles++ == 1e7)
411	{
412	  s->idle_cycles = 0;
413	  ECHO_LOG (1, "Idle client TX:%dB RX:%dB", s->bytes_to_send,
414		    s->bytes_to_receive);
415	  ECHO_LOG (1, "Idle FIFOs TX:%dB RX:%dB",
416		    svm_fifo_max_dequeue (s->tx_fifo),
417		    svm_fifo_max_dequeue (s->rx_fifo));
418	  ECHO_LOG (1, "Session 0x%lx state %U", s->vpp_session_handle,
419		    echo_format_session_state, s->session_state);
420	}
421    }
422}
423
424static void *
425echo_data_thread_fn (void *arg)
426{
427  clib_mem_set_thread_index ();	/* First thing to do in client thread */
428  echo_main_t *em = &echo_main;
429  u32 N = em->n_clients;
430  u32 n = (N + em->n_rx_threads - 1) / em->n_rx_threads;
431  u32 idx = (u64) arg;
432  if (n * idx >= N)
433    {
434      ECHO_LOG (1, "Thread %u exiting, no sessions to care for", idx);
435      pthread_exit (0);
436    }
437  u32 thread_n_sessions = clib_min (n, N - n * idx);
438
439  u32 i = 0;
440  u32 n_closed_sessions = 0;
441  u32 session_index;
442  u8 *rx_buf = 0;
443  echo_session_t *s;
444  vec_validate (rx_buf, em->rx_buf_size);
445
446  for (i = 0; !em->time_to_stop; i = (i + 1) % thread_n_sessions)
447    {
448      n_closed_sessions = i == 0 ? 0 : n_closed_sessions;
449      session_index = em->data_thread_args[n * idx + i];
450      if (session_index == SESSION_INVALID_INDEX)
451	continue;
452      s = pool_elt_at_index (em->sessions, session_index);
453      switch (s->session_state)
454	{
455	case ECHO_SESSION_STATE_READY:
456	case ECHO_SESSION_STATE_AWAIT_DATA:
457	  echo_handle_data (em, s, rx_buf);
458	  echo_check_closed_listener (em, s);
459	  break;
460	case ECHO_SESSION_STATE_AWAIT_CLOSING:
461	  ECHO_LOG (3, "%U: %U", echo_format_session, s,
462		    echo_format_session_state, s->session_state);
463	  echo_check_closed_listener (em, s);
464	  break;
465	case ECHO_SESSION_STATE_CLOSING:
466	  ECHO_LOG (2, "%U: %U", echo_format_session, s,
467		    echo_format_session_state, s->session_state);
468	  echo_update_count_on_session_close (em, s);
469	  em->proto_cb_vft->cleanup_cb (s, 0 /* parent_died */ );
470	  break;
471	case ECHO_SESSION_STATE_CLOSED:
472	  ECHO_LOG (2, "%U: %U", echo_format_session, s,
473		    echo_format_session_state, s->session_state);
474	  n_closed_sessions++;
475	  break;
476	}
477      if (n_closed_sessions == thread_n_sessions)
478	break;
479    }
480  ECHO_LOG (1, "Mission accomplished!");
481  pthread_exit (0);
482}
483
484static void
485session_unlisten_handler (session_unlisten_msg_t * mp)
486{
487  echo_session_t *listen_session;
488  echo_main_t *em = &echo_main;
489  listen_session = pool_elt_at_index (em->sessions, em->listen_session_index);
490  em->proto_cb_vft->cleanup_cb (listen_session, 0 /* parent_died */ );
491  listen_session->session_state = ECHO_SESSION_STATE_CLOSED;
492  em->state = STATE_DISCONNECTED;
493}
494
495static void
496session_bound_handler (session_bound_msg_t * mp)
497{
498  echo_main_t *em = &echo_main;
499  echo_session_t *listen_session;
500  if (mp->retval)
501    {
502      ECHO_FAIL (ECHO_FAIL_BIND, "bind failed: %U", format_api_error,
503		 clib_net_to_host_u32 (mp->retval));
504      return;
505    }
506  ECHO_LOG (0, "listening on %U:%u", format_ip46_address, mp->lcl_ip,
507	    mp->lcl_is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
508	    clib_net_to_host_u16 (mp->lcl_port));
509
510  /* Allocate local session and set it up */
511  listen_session = echo_session_new (em);
512  listen_session->session_type = ECHO_SESSION_TYPE_LISTEN;
513  listen_session->vpp_session_handle = mp->handle;
514  echo_session_handle_add_del (em, mp->handle, listen_session->session_index);
515  em->state = STATE_LISTEN;
516  em->listen_session_index = listen_session->session_index;
517  if (em->proto_cb_vft->bound_uri_cb)
518    em->proto_cb_vft->bound_uri_cb (mp, listen_session);
519}
520
521static void
522session_accepted_handler (session_accepted_msg_t * mp)
523{
524  app_session_evt_t _app_evt, *app_evt = &_app_evt;
525  session_accepted_reply_msg_t *rmp;
526  svm_fifo_t *rx_fifo, *tx_fifo;
527  echo_main_t *em = &echo_main;
528  echo_session_t *session, *ls;
529
530  if (!(ls = echo_get_session_from_handle (em, mp->listener_handle)))
531    {
532      ECHO_FAIL (ECHO_FAIL_SESSION_ACCEPTED_BAD_LISTENER,
533		 "Unknown listener handle 0x%lx", mp->listener_handle);
534      return;
535    }
536  if (wait_for_segment_allocation (mp->segment_handle))
537    {
538      ECHO_FAIL (ECHO_FAIL_ACCEPTED_WAIT_FOR_SEG_ALLOC,
539		 "accepted wait_for_segment_allocation errored");
540      return;
541    }
542
543  /* Allocate local session and set it up */
544  session = echo_session_new (em);
545  session->vpp_session_handle = mp->handle;
546
547  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
548  rx_fifo->client_session_index = session->session_index;
549  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
550  tx_fifo->client_session_index = session->session_index;
551
552  session->rx_fifo = rx_fifo;
553  session->tx_fifo = tx_fifo;
554
555  /* session->transport needed by app_send_dgram */
556  clib_memcpy_fast (&session->transport.rmt_ip, &mp->rmt.ip,
557		    sizeof (ip46_address_t));
558  session->transport.is_ip4 = mp->rmt.is_ip4;
559  session->transport.rmt_port = mp->rmt.port;
560  clib_memcpy_fast (&session->transport.lcl_ip, &em->uri_elts.ip,
561		    sizeof (ip46_address_t));
562  session->transport.lcl_port = em->uri_elts.port;
563
564  session->vpp_session_handle = mp->handle;
565  session->start = clib_time_now (&em->clib_time);
566  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
567					 svm_msg_q_t *);
568  session->listener_index = ls->session_index;
569
570  /* Add it to lookup table */
571  ECHO_LOG (1, "Accepted session 0x%lx S[%u] -> 0x%lx S[%u]",
572	    mp->handle, session->session_index,
573	    mp->listener_handle, session->listener_index);
574  echo_session_handle_add_del (em, mp->handle, session->session_index);
575
576  app_alloc_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt,
577			     SESSION_CTRL_EVT_ACCEPTED_REPLY);
578  rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
579  rmp->handle = mp->handle;
580  rmp->context = mp->context;
581  app_send_ctrl_evt_to_vpp (session->vpp_evt_q, app_evt);
582  em->proto_cb_vft->accepted_cb (mp, session);
583}
584
585static void
586session_connected_handler (session_connected_msg_t * mp)
587{
588  echo_main_t *em = &echo_main;
589  echo_session_t *session;
590  u32 listener_index = htonl (mp->context);
591  svm_fifo_t *rx_fifo, *tx_fifo;
592
593  if (mp->retval)
594    {
595      if (em->proto_cb_vft->connected_cb)
596	em->
597	  proto_cb_vft->connected_cb ((session_connected_bundled_msg_t *) mp,
598				      listener_index, 1 /* is_failed */ );
599      return;
600    }
601
602  session = echo_session_new (em);
603  if (wait_for_segment_allocation (mp->segment_handle))
604    {
605      ECHO_FAIL (ECHO_FAIL_CONNECTED_WAIT_FOR_SEG_ALLOC,
606		 "connected wait_for_segment_allocation errored");
607      return;
608    }
609
610  rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
611  rx_fifo->client_session_index = session->session_index;
612  tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
613  tx_fifo->client_session_index = session->session_index;
614
615  session->rx_fifo = rx_fifo;
616  session->tx_fifo = tx_fifo;
617  session->vpp_session_handle = mp->handle;
618  session->start = clib_time_now (&em->clib_time);
619  session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
620					 svm_msg_q_t *);
621  session->listener_index = listener_index;
622  /* session->transport needed by app_send_dgram */
623  clib_memcpy_fast (&session->transport.lcl_ip, &mp->lcl.ip,
624		    sizeof (ip46_address_t));
625  session->transport.is_ip4 = mp->lcl.is_ip4;
626  session->transport.lcl_port = mp->lcl.port;
627  clib_memcpy_fast (&session->transport.rmt_ip, &em->uri_elts.ip,
628		    sizeof (ip46_address_t));
629  session->transport.rmt_port = em->uri_elts.port;
630
631  echo_session_handle_add_del (em, mp->handle, session->session_index);
632  em->proto_cb_vft->connected_cb ((session_connected_bundled_msg_t *) mp,
633				  session->session_index, 0 /* is_failed */ );
634}
635
636/*
637 *
638 *  End of ECHO callback definitions
639 *
640 */
641
642static void
643session_disconnected_handler (session_disconnected_msg_t * mp)
644{
645  app_session_evt_t _app_evt, *app_evt = &_app_evt;
646  session_disconnected_reply_msg_t *rmp;
647  echo_main_t *em = &echo_main;
648  echo_session_t *s;
649  if (!(s = echo_get_session_from_handle (em, mp->handle)))
650    {
651      ECHO_LOG (0, "Invalid vpp_session_handle: 0x%lx", mp->handle);
652      return;
653    }
654  if (s->session_state == ECHO_SESSION_STATE_CLOSED)
655    {
656      ECHO_LOG (1, "%U: already in ECHO_SESSION_STATE_CLOSED",
657		echo_format_session, s);
658    }
659  else
660    {
661      ECHO_LOG (1, "%U: passive close", echo_format_session, s);
662      em->proto_cb_vft->disconnected_cb (mp, s);
663    }
664  app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
665			     SESSION_CTRL_EVT_DISCONNECTED_REPLY);
666  rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
667  rmp->retval = 0;
668  rmp->handle = mp->handle;
669  rmp->context = mp->context;
670  app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
671}
672
673static void
674session_reset_handler (session_reset_msg_t * mp)
675{
676  app_session_evt_t _app_evt, *app_evt = &_app_evt;
677  echo_main_t *em = &echo_main;
678  session_reset_reply_msg_t *rmp;
679  echo_session_t *s = 0;
680  if (!(s = echo_get_session_from_handle (em, mp->handle)))
681    {
682      ECHO_LOG (0, "Invalid vpp_session_handle: 0x%lx", mp->handle);
683      return;
684    }
685  ECHO_LOG (1, "%U: session reset", echo_format_session, s);
686  em->proto_cb_vft->reset_cb (mp, s);
687
688  app_alloc_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt,
689			     SESSION_CTRL_EVT_RESET_REPLY);
690  rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
691  rmp->retval = 0;
692  rmp->handle = mp->handle;
693  app_send_ctrl_evt_to_vpp (s->vpp_evt_q, app_evt);
694}
695
696static void
697handle_mq_event (session_event_t * e)
698{
699  switch (e->event_type)
700    {
701    case SESSION_CTRL_EVT_BOUND:
702      return session_bound_handler ((session_bound_msg_t *) e->data);
703    case SESSION_CTRL_EVT_ACCEPTED:
704      return session_accepted_handler ((session_accepted_msg_t *) e->data);
705    case SESSION_CTRL_EVT_CONNECTED:
706      return session_connected_handler ((session_connected_msg_t *) e->data);
707    case SESSION_CTRL_EVT_DISCONNECTED:
708      return session_disconnected_handler ((session_disconnected_msg_t *)
709					   e->data);
710    case SESSION_CTRL_EVT_RESET:
711      return session_reset_handler ((session_reset_msg_t *) e->data);
712    case SESSION_CTRL_EVT_UNLISTEN_REPLY:
713      return session_unlisten_handler ((session_unlisten_msg_t *) e->data);
714    case SESSION_IO_EVT_RX:
715      break;
716    default:
717      ECHO_LOG (0, "unhandled event %u", e->event_type);
718    }
719}
720
721static void
722echo_process_rpcs (echo_main_t * em)
723{
724  echo_rpc_msg_t *rpc;
725  svm_msg_q_msg_t msg;
726  svm_msg_q_t *mq = em->rpc_msq_queue;
727
728  while (em->state < STATE_DATA_DONE && !em->time_to_stop)
729    {
730      svm_msg_q_lock (mq);
731      if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
732	{
733	  svm_msg_q_unlock (mq);
734	  continue;
735	}
736      svm_msg_q_sub_w_lock (mq, &msg);
737      rpc = svm_msg_q_msg_data (mq, &msg);
738      svm_msg_q_unlock (mq);
739      ((echo_rpc_t) rpc->fp) (rpc->arg, rpc->opaque);
740      svm_msg_q_free_msg (mq, &msg);
741    }
742}
743
744static inline int
745echo_mq_dequeue_batch (svm_msg_q_t * mq, svm_msg_q_msg_t * msg_vec,
746		       u32 n_max_msg)
747{
748  svm_msg_q_msg_t *msg;
749  u32 n_msgs;
750  int i;
751
752  n_msgs = clib_min (svm_msg_q_size (mq), n_max_msg);
753  for (i = 0; i < n_msgs; i++)
754    {
755      vec_add2 (msg_vec, msg, 1);
756      svm_msg_q_sub_w_lock (mq, msg);
757    }
758  return n_msgs;
759}
760
761static void *
762echo_mq_thread_fn (void *arg)
763{
764  clib_mem_set_thread_index ();	/* First thing to do in client thread */
765  svm_msg_q_msg_t *msg_vec = 0;
766  echo_main_t *em = &echo_main;
767  session_event_t *e;
768  svm_msg_q_msg_t *msg;
769  svm_msg_q_t *mq;
770  int i;
771
772  vec_validate (msg_vec, em->evt_q_size);
773  vec_reset_length (msg_vec);
774  wait_for_state_change (em, STATE_ATTACHED, 0);
775  mq = em->app_mq;
776  if (em->state < STATE_ATTACHED || !mq)
777    {
778      ECHO_FAIL (ECHO_FAIL_APP_ATTACH, "Application failed to attach");
779      pthread_exit (0);
780    }
781
782  while (em->state < STATE_DETACHED && !em->time_to_stop)
783    {
784      svm_msg_q_lock (mq);
785      if (svm_msg_q_is_empty (mq) && svm_msg_q_timedwait (mq, 1))
786	{
787	  svm_msg_q_unlock (mq);
788	  continue;
789	}
790      echo_mq_dequeue_batch (mq, msg_vec, ~0);
791      svm_msg_q_unlock (mq);
792
793      for (i = 0; i < vec_len (msg_vec); i++)
794	{
795	  msg = vec_elt_at_index (msg_vec, i);
796	  e = svm_msg_q_msg_data (mq, msg);
797	  handle_mq_event (e);
798	  svm_msg_q_free_msg (mq, msg);	/* No lock, single thread dequeuing */
799	}
800      vec_reset_length (msg_vec);
801    }
802  vec_free (msg_vec);
803  pthread_exit (0);
804}
805
806static void
807clients_run (echo_main_t * em)
808{
809  u64 i;
810  echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT);
811  for (i = 0; i < em->n_connects; i++)
812    echo_send_connect (SESSION_INVALID_HANDLE, SESSION_INVALID_INDEX);
813  wait_for_state_change (em, STATE_READY, 0);
814  ECHO_LOG (1, "App is ready");
815  echo_process_rpcs (em);
816}
817
818static void
819server_run (echo_main_t * em)
820{
821  echo_session_t *ls;
822  echo_send_listen (em);
823  wait_for_state_change (em, STATE_READY, 0);
824  ECHO_LOG (1, "App is ready");
825  echo_process_rpcs (em);
826  /* Cleanup */
827  ECHO_LOG (1, "Unbind listen port");
828  ls = pool_elt_at_index (em->sessions, em->listen_session_index);
829  echo_send_unbind (em, ls);
830  if (wait_for_state_change (em, STATE_DISCONNECTED, TIMEOUT))
831    {
832      ECHO_FAIL (ECHO_FAIL_SERVER_DISCONNECT_TIMEOUT,
833		 "Timeout waiting for state disconnected");
834      return;
835    }
836}
837
838static void
839print_usage_and_exit (void)
840{
841  echo_main_t *em = &echo_main;
842  int i;
843  fprintf (stderr,
844	   "Usage: vpp_echo [socket-name SOCKET] [client|server] [uri URI] [OPTIONS]\n"
845	   "Generates traffic and assert correct teardown of the QUIC hoststack\n"
846	   "\n"
847	   "  socket-name PATH    Specify the binary socket path to connect to VPP\n"
848	   "  use-svm-api         Use SVM API to connect to VPP\n"
849	   "  test-bytes[:assert] Check data correctness when receiving (assert fails on first error)\n"
850	   "  fifo-size N         Use N Kb fifos\n"
851	   "  mq-size N           Use N event slots for vpp_echo <-> vpp events\n"
852	   "  rx-buf N[Kb|Mb|GB]  Use N[Kb|Mb|GB] RX buffer\n"
853	   "  tx-buf N[Kb|Mb|GB]  Use N[Kb|Mb|GB] TX test buffer\n"
854	   "  appns NAMESPACE     Use the namespace NAMESPACE\n"
855	   "  all-scope           all-scope option\n"
856	   "  local-scope         local-scope option\n"
857	   "  global-scope        global-scope option\n"
858	   "  secret SECRET       set namespace secret\n"
859	   "  chroot prefix PATH  Use PATH as memory root path\n"
860	   "  sclose=[Y|N|W]      When a stream is done,    pass[N] send[Y] or wait[W] for close\n"
861	   "\n"
862	   "  time START:END      Time between evts START & END, events being :\n"
863	   "                       start - Start of the app\n"
864	   "                       qconnect    - first Connection connect sent\n"
865	   "                       qconnected  - last Connection connected\n"
866	   "                       sconnect    - first Stream connect sent\n"
867	   "                       sconnected  - last Stream got connected\n"
868	   "                       lastbyte    - Last expected byte received\n"
869	   "                       exit        - Exiting of the app\n"
870	   "  rx-results-diff     Rx results different to pass test\n"
871	   "  tx-results-diff     Tx results different to pass test\n"
872	   "  json                Output global stats in json\n"
873	   "  log=N               Set the log level to [0: no output, 1:errors, 2:log]\n"
874	   "\n"
875	   "  nclients N          Open N clients sending data\n"
876	   "  nthreads N          Use N busy loop threads for data [in addition to main & msg queue]\n"
877	   "  TX=1337[Kb|Mb|GB]   Send 1337 [K|M|G]bytes, use TX=RX to reflect the data\n"
878	   "  RX=1337[Kb|Mb|GB]   Expect 1337 [K|M|G]bytes\n" "\n");
879  for (i = 0; i < TRANSPORT_N_PROTO; i++)
880    {
881      echo_proto_cb_vft_t *vft = em->available_proto_cb_vft[i];
882      if (vft && vft->print_usage_cb)
883	vft->print_usage_cb ();
884    }
885  fprintf (stderr, "\nDefault configuration is :\n"
886	   " server nclients 1/1 RX=64Kb TX=RX\n"
887	   " client nclients 1/1 RX=64Kb TX=64Kb\n");
888  exit (ECHO_FAIL_USAGE);
889}
890
891static int
892echo_process_each_proto_opts (unformat_input_t * a)
893{
894  echo_main_t *em = &echo_main;
895  int i, rv;
896  for (i = 0; i < TRANSPORT_N_PROTO; i++)
897    {
898      echo_proto_cb_vft_t *vft = em->available_proto_cb_vft[i];
899      if (vft && vft->process_opts_cb)
900	if ((rv = vft->process_opts_cb (a)))
901	  return rv;
902    }
903  return 0;
904}
905
906static void
907echo_set_each_proto_defaults_before_opts (echo_main_t * em)
908{
909  int i;
910  for (i = 0; i < TRANSPORT_N_PROTO; i++)
911    {
912      echo_proto_cb_vft_t *vft = em->available_proto_cb_vft[i];
913      if (vft && vft->set_defaults_before_opts_cb)
914	vft->set_defaults_before_opts_cb ();
915    }
916}
917
918void
919echo_process_opts (int argc, char **argv)
920{
921  echo_main_t *em = &echo_main;
922  unformat_input_t _argv, *a = &_argv;
923  u32 tmp;
924  u8 *chroot_prefix;
925  u8 *uri = 0;
926  u8 default_f_active;
927
928  unformat_init_command_line (a, argv);
929  while (unformat_check_input (a) != UNFORMAT_END_OF_INPUT)
930    {
931      if (echo_process_each_proto_opts (a))
932	;
933      else if (unformat (a, "chroot prefix %s", &chroot_prefix))
934	vl_set_memory_root_path ((char *) chroot_prefix);
935      else if (unformat (a, "uri %s", &uri))
936	em->uri = format (0, "%s%c", uri, 0);
937      else if (unformat (a, "server"))
938	em->i_am_master = 1;
939      else if (unformat (a, "client"))
940	em->i_am_master = 0;
941      else if (unformat (a, "test-bytes:assert"))
942	em->test_return_packets = RETURN_PACKETS_ASSERT;
943      else if (unformat (a, "test-bytes"))
944	em->test_return_packets = RETURN_PACKETS_LOG_WRONG;
945      else if (unformat (a, "socket-name %s", &em->socket_name))
946	;
947      else if (unformat (a, "use-svm-api"))
948	em->use_sock_api = 0;
949      else if (unformat (a, "fifo-size %d", &tmp))
950	em->fifo_size = tmp << 10;
951      else if (unformat (a, "prealloc-fifos %u", &em->prealloc_fifo_pairs))
952	;
953      else if (unformat (a, "rx-buf %U", unformat_data, &em->rx_buf_size))
954	;
955      else if (unformat (a, "tx-buf %U", unformat_data, &em->tx_buf_size))
956	;
957      else if (unformat (a, "mq-size %d", &em->evt_q_size))
958	;
959      else if (unformat (a, "nclients %d", &em->n_clients))
960	{
961	  em->n_sessions = em->n_clients + 1;
962	  em->n_connects = em->n_clients;
963	}
964      else if (unformat (a, "nthreads %d", &em->n_rx_threads))
965	;
966      else
967	if (unformat
968	    (a, "crypto %U", echo_unformat_crypto_engine,
969	     &em->crypto_ctx_engine))
970	;
971      else if (unformat (a, "appns %_%v%_", &em->appns_id))
972	;
973      else if (unformat (a, "all-scope"))
974	em->appns_flags |= (APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE
975			    | APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE);
976      else if (unformat (a, "local-scope"))
977	em->appns_flags = APP_OPTIONS_FLAGS_USE_LOCAL_SCOPE;
978      else if (unformat (a, "global-scope"))
979	em->appns_flags = APP_OPTIONS_FLAGS_USE_GLOBAL_SCOPE;
980      else if (unformat (a, "secret %lu", &em->appns_secret))
981	;
982      else if (unformat (a, "TX=RX"))
983	em->data_source = ECHO_RX_DATA_SOURCE;
984      else if (unformat (a, "TX=%U", unformat_data, &em->bytes_to_send))
985	;
986      else if (unformat (a, "RX=%U", unformat_data, &em->bytes_to_receive))
987	;
988      else if (unformat (a, "rx-results-diff"))
989	em->rx_results_diff = 1;
990      else if (unformat (a, "tx-results-diff"))
991	em->tx_results_diff = 1;
992      else if (unformat (a, "json"))
993	em->output_json = 1;
994      else if (unformat (a, "log=%d", &em->log_lvl))
995	;
996      else if (unformat (a, "sclose=%U",
997			 echo_unformat_close, &em->send_stream_disconnects))
998	;
999      else if (unformat (a, "time %U:%U",
1000			 echo_unformat_timing_event, &em->timing.start_event,
1001			 echo_unformat_timing_event, &em->timing.end_event))
1002	;
1003      else
1004	print_usage_and_exit ();
1005    }
1006
1007  /* setting default for unset values
1008   *
1009   * bytes_to_send / bytes_to_receive & data_source  */
1010  if (em->bytes_to_receive == (u64) ~ 0)
1011    em->bytes_to_receive = 64 << 10;	/* default */
1012  if (em->bytes_to_send == (u64) ~ 0)
1013    em->bytes_to_send = 64 << 10;	/* default */
1014  else if (em->bytes_to_send == 0)
1015    em->data_source = ECHO_NO_DATA_SOURCE;
1016  else
1017    em->data_source = ECHO_TEST_DATA_SOURCE;
1018
1019  if (em->data_source == ECHO_INVALID_DATA_SOURCE)
1020    em->data_source =
1021      em->i_am_master ? ECHO_RX_DATA_SOURCE : ECHO_TEST_DATA_SOURCE;
1022  if (em->data_source == ECHO_RX_DATA_SOURCE)
1023    em->bytes_to_send = em->bytes_to_receive;
1024
1025  /* disconnect flags  */
1026  if (em->i_am_master)
1027    default_f_active =
1028      em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE;
1029  else
1030    default_f_active =
1031      em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE;
1032  if (em->send_stream_disconnects == ECHO_CLOSE_F_INVALID)
1033    em->send_stream_disconnects = default_f_active;
1034}
1035
1036void
1037echo_process_uri (echo_main_t * em)
1038{
1039  unformat_input_t _input, *input = &_input;
1040  u32 port;
1041  unformat_init_string (input, (char *) em->uri, strlen ((char *) em->uri));
1042  if (unformat
1043      (input, "%U://%U/%d", unformat_transport_proto,
1044       &em->uri_elts.transport_proto, unformat_ip4_address,
1045       &em->uri_elts.ip.ip4, &port))
1046    em->uri_elts.is_ip4 = 1;
1047  else
1048    if (unformat
1049	(input, "%U://%U/%d", unformat_transport_proto,
1050	 &em->uri_elts.transport_proto, unformat_ip6_address,
1051	 &em->uri_elts.ip.ip6, &port))
1052    em->uri_elts.is_ip4 = 0;
1053  else
1054    ECHO_FAIL (ECHO_FAIL_INVALID_URI, "Unable to process uri");
1055  em->uri_elts.port = clib_host_to_net_u16 (port);
1056  unformat_free (input);
1057}
1058
1059static void __clib_constructor
1060vpp_echo_init ()
1061{
1062  /* init memory before proto register themselves */
1063  echo_main_t *em = &echo_main;
1064  clib_mem_init_thread_safe (0, 256 << 20);
1065  clib_memset (em, 0, sizeof (*em));
1066}
1067
1068int
1069main (int argc, char **argv)
1070{
1071  echo_main_t *em = &echo_main;
1072  fifo_segment_main_t *sm = &em->segment_main;
1073  char *app_name;
1074  u64 i;
1075  svm_msg_q_cfg_t _cfg, *cfg = &_cfg;
1076  u32 rpc_queue_size = 64 << 10;
1077
1078  em->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
1079  clib_spinlock_init (&em->sid_vpp_handles_lock);
1080  em->shared_segment_handles = hash_create (0, sizeof (uword));
1081  clib_spinlock_init (&em->segment_handles_lock);
1082  em->socket_name = format (0, "%s%c", API_SOCKET_FILE, 0);
1083  em->use_sock_api = 1;
1084  em->fifo_size = 64 << 10;
1085  em->prealloc_fifo_pairs = 16;
1086  em->n_clients = 1;
1087  em->n_connects = 1;
1088  em->n_sessions = 2;
1089  em->max_test_msg = 50;
1090  em->time_to_stop = 0;
1091  em->i_am_master = 1;
1092  em->n_rx_threads = 4;
1093  em->evt_q_size = 256;
1094  em->test_return_packets = RETURN_PACKETS_NOTEST;
1095  em->timing.start_event = ECHO_EVT_FIRST_QCONNECT;
1096  em->timing.end_event = ECHO_EVT_LAST_BYTE;
1097  em->bytes_to_receive = ~0;	/* defaulted when we know if server/client */
1098  em->bytes_to_send = ~0;	/* defaulted when we know if server/client */
1099  em->rx_buf_size = 1 << 20;
1100  em->tx_buf_size = 1 << 20;
1101  em->data_source = ECHO_INVALID_DATA_SOURCE;
1102  em->uri = format (0, "%s%c", "tcp://0.0.0.0/1234", 0);
1103  em->crypto_ctx_engine = TLS_ENGINE_NONE;
1104  echo_set_each_proto_defaults_before_opts (em);
1105  echo_process_opts (argc, argv);
1106  echo_process_uri (em);
1107  em->proto_cb_vft = em->available_proto_cb_vft[em->uri_elts.transport_proto];
1108  if (!em->proto_cb_vft)
1109    {
1110      ECHO_FAIL (ECHO_FAIL_PROTOCOL_NOT_SUPPORTED,
1111		 "Protocol %U is not supported",
1112		 format_transport_proto, em->uri_elts.transport_proto);
1113      goto exit_on_error;
1114    }
1115  if (em->proto_cb_vft->set_defaults_after_opts_cb)
1116    em->proto_cb_vft->set_defaults_after_opts_cb ();
1117
1118  vec_validate (em->data_thread_handles, em->n_rx_threads);
1119  vec_validate (em->data_thread_args, em->n_clients);
1120  for (i = 0; i < em->n_clients; i++)
1121    em->data_thread_args[i] = SESSION_INVALID_INDEX;
1122  clib_time_init (&em->clib_time);
1123  init_error_string_table ();
1124  fifo_segment_main_init (sm, HIGH_SEGMENT_BASEVA, 20);
1125  vec_validate (em->connect_test_data, em->tx_buf_size);
1126  for (i = 0; i < em->tx_buf_size; i++)
1127    em->connect_test_data[i] = i & 0xff;
1128
1129  /* *INDENT-OFF* */
1130  svm_msg_q_ring_cfg_t rc[1] = {
1131    {rpc_queue_size, sizeof (echo_rpc_msg_t), 0},
1132  };
1133  /* *INDENT-ON* */
1134  cfg->consumer_pid = getpid ();
1135  cfg->n_rings = 1;
1136  cfg->q_nitems = rpc_queue_size;
1137  cfg->ring_cfgs = rc;
1138  em->rpc_msq_queue = svm_msg_q_alloc (cfg);
1139
1140  signal (SIGINT, stop_signal);
1141  signal (SIGQUIT, stop_signal);
1142  signal (SIGTERM, stop_signal);
1143  echo_api_hookup (em);
1144
1145  app_name = em->i_am_master ? "echo_server" : "echo_client";
1146  if (connect_to_vpp (app_name))
1147    {
1148      svm_region_exit ();
1149      ECHO_FAIL (ECHO_FAIL_CONNECT_TO_VPP, "Couldn't connect to vpp");
1150      goto exit_on_error;
1151    }
1152
1153  echo_session_prealloc (em);
1154  echo_notify_event (em, ECHO_EVT_START);
1155
1156  echo_send_attach (em);
1157  if (wait_for_state_change (em, STATE_ATTACHED_NO_CERT, TIMEOUT))
1158    {
1159      ECHO_FAIL (ECHO_FAIL_ATTACH_TO_VPP,
1160		 "Couldn't attach to vpp, did you run <session enable> ?");
1161      goto exit_on_error;
1162    }
1163
1164  if (em->crypto_ctx_engine == TLS_ENGINE_NONE)
1165    /* when no crypto engine specified, dont expect crypto ctx */
1166    em->state = STATE_ATTACHED;
1167  else
1168    {
1169      ECHO_LOG (1, "Adding crypto context %U", echo_format_crypto_engine,
1170		em->crypto_ctx_engine);
1171      echo_send_add_crypto_ctx (em);
1172      if (wait_for_state_change (em, STATE_ATTACHED, TIMEOUT))
1173	{
1174	  ECHO_FAIL (ECHO_FAIL_APP_ATTACH,
1175		     "Couldn't add crypto context to vpp\n");
1176	  exit (1);
1177	}
1178    }
1179
1180  if (pthread_create (&em->mq_thread_handle,
1181		      NULL /*attr */ , echo_mq_thread_fn, 0))
1182    {
1183      ECHO_FAIL (ECHO_FAIL_PTHREAD_CREATE, "pthread create errored");
1184      goto exit_on_error;
1185    }
1186
1187  for (i = 0; i < em->n_rx_threads; i++)
1188    if (pthread_create (&em->data_thread_handles[i],
1189			NULL /*attr */ , echo_data_thread_fn, (void *) i))
1190      {
1191	ECHO_FAIL (ECHO_FAIL_PTHREAD_CREATE,
1192		   "pthread create errored (index %d)", i);
1193	goto exit_on_error;
1194      }
1195  if (em->i_am_master)
1196    server_run (em);
1197  else
1198    clients_run (em);
1199  echo_notify_event (em, ECHO_EVT_EXIT);
1200  echo_free_sessions (em);
1201  echo_send_detach (em);
1202  if (wait_for_state_change (em, STATE_DETACHED, TIMEOUT))
1203    {
1204      ECHO_FAIL (ECHO_FAIL_DETACH, "Couldn't detach from vpp");
1205      goto exit_on_error;
1206    }
1207  int *rv;
1208  pthread_join (em->mq_thread_handle, (void **) &rv);
1209  if (rv)
1210    {
1211      ECHO_FAIL (ECHO_FAIL_MQ_PTHREAD, "mq pthread errored %d", rv);
1212      goto exit_on_error;
1213    }
1214  if (em->use_sock_api)
1215    vl_socket_client_disconnect ();
1216  else
1217    vl_client_disconnect_from_vlib ();
1218  echo_assert_test_suceeded (em);
1219exit_on_error:
1220  ECHO_LOG (0, "Test complete !\n");
1221  if (em->output_json)
1222    print_global_json_stats (em);
1223  else
1224    print_global_stats (em);
1225  vec_free (em->fail_descr);
1226  exit (em->has_failed);
1227}
1228
1229/*
1230 * fd.io coding-style-patch-verification: ON
1231 *
1232 * Local Variables:
1233 * eval: (c-set-style "gnu")
1234 * End:
1235 */
1236