vpp_echo_proto_quic.c revision 5bb23ecd
1/*
2 * Copyright (c) 2019 Cisco and/or its affiliates.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <stdio.h>
17#include <signal.h>
18
19#include <hs_apps/sapi/vpp_echo_common.h>
20
21typedef struct _quic_echo_cb_vft
22{
23  void (*quic_connected_cb) (session_connected_msg_t * mp, u32 session_index);
24  void (*client_stream_connected_cb) (session_connected_msg_t * mp,
25				      u32 session_index);
26  void (*server_stream_connected_cb) (session_connected_msg_t * mp,
27				      u32 session_index);
28  void (*quic_accepted_cb) (session_accepted_msg_t * mp, u32 session_index);
29  void (*client_stream_accepted_cb) (session_accepted_msg_t * mp,
30				     u32 session_index);
31  void (*server_stream_accepted_cb) (session_accepted_msg_t * mp,
32				     u32 session_index);
33} quic_echo_cb_vft_t;
34
35typedef struct
36{
37  quic_echo_cb_vft_t cb_vft;	/* cb vft for QUIC scenarios */
38  u8 send_quic_disconnects;	/* actively send disconnect */
39  u32 n_stream_clients;		/* Target Number of STREAM sessions per QUIC session */
40  volatile u32 n_quic_clients_connected;	/* Number of connected QUIC sessions */
41} quic_echo_proto_main_t;
42
43quic_echo_proto_main_t quic_echo_proto_main;
44
45/*
46 *
47 *  ECHO Callback definitions
48 *
49 */
50
51static void
52quic_echo_on_connected_connect (session_connected_msg_t * mp,
53				u32 session_index)
54{
55  echo_main_t *em = &echo_main;
56  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
57  u8 *uri = format (0, "quic://session/%lu", mp->handle);
58  u64 i;
59
60  echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
61  for (i = 0; i < eqm->n_stream_clients; i++)
62    echo_send_rpc (em, echo_send_connect, (void *) uri, session_index);
63
64  ECHO_LOG (0, "Qsession 0x%llx connected to %U:%d",
65	    mp->handle, format_ip46_address, &mp->lcl.ip,
66	    mp->lcl.is_ip4, clib_net_to_host_u16 (mp->lcl.port));
67}
68
69static void
70quic_echo_on_connected_send (session_connected_msg_t * mp, u32 session_index)
71{
72  static u32 client_index = 0;
73  echo_main_t *em = &echo_main;
74  echo_session_t *session;
75
76  session = pool_elt_at_index (em->sessions, session_index);
77  session->bytes_to_send = em->bytes_to_send;
78  session->bytes_to_receive = em->bytes_to_receive;
79  session->session_state = ECHO_SESSION_STATE_READY;
80  em->data_thread_args[client_index++] = session->session_index;
81}
82
83static void
84quic_echo_on_connected_error (session_connected_msg_t * mp, u32 session_index)
85{
86  ECHO_FAIL ("Got a wrong connected on session %u [%lx]", session_index,
87	     mp->handle);
88}
89
90static void
91quic_echo_on_accept_recv (session_accepted_msg_t * mp, u32 session_index)
92{
93  static u32 client_index = 0;
94  echo_main_t *em = &echo_main;
95  echo_session_t *session;
96
97  session = pool_elt_at_index (em->sessions, session_index);
98  session->bytes_to_send = em->bytes_to_send;
99  session->bytes_to_receive = em->bytes_to_receive;
100  em->data_thread_args[client_index++] = session->session_index;
101  session->session_state = ECHO_SESSION_STATE_READY;
102}
103
104static void
105quic_echo_on_accept_connect (session_accepted_msg_t * mp, u32 session_index)
106{
107  echo_main_t *em = &echo_main;
108  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
109  ECHO_LOG (1, "Accept on QSession 0x%lx %u", mp->handle);
110  u8 *uri = format (0, "quic://session/%lu", mp->handle);
111  u32 i;
112
113  echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
114  for (i = 0; i < eqm->n_stream_clients; i++)
115    echo_send_rpc (em, echo_send_connect, (void *) uri, session_index);
116}
117
118static void
119quic_echo_on_accept_error (session_accepted_msg_t * mp, u32 session_index)
120{
121  ECHO_FAIL ("Got a wrong accept on session %u [%lx]", session_index,
122	     mp->handle);
123}
124
125static void
126quic_echo_on_accept_log_ip (session_accepted_msg_t * mp, u32 session_index)
127{
128  u8 *ip_str;
129  ip_str = format (0, "%U", format_ip46_address, &mp->rmt.ip, mp->rmt.is_ip4);
130  ECHO_LOG (0, "Accepted session from: %s:%d", ip_str,
131	    clib_net_to_host_u16 (mp->rmt.port));
132
133}
134
135static const quic_echo_cb_vft_t default_cb_vft = {
136  /* Qsessions */
137  .quic_accepted_cb = quic_echo_on_accept_log_ip,
138  .quic_connected_cb = quic_echo_on_connected_connect,
139  /* client initiated streams */
140  .server_stream_accepted_cb = quic_echo_on_accept_recv,
141  .client_stream_connected_cb = quic_echo_on_connected_send,
142  /* server initiated streams */
143  .client_stream_accepted_cb = quic_echo_on_accept_error,
144  .server_stream_connected_cb = quic_echo_on_connected_error,
145};
146
147static const quic_echo_cb_vft_t server_stream_cb_vft = {
148  /* Qsessions */
149  .quic_accepted_cb = quic_echo_on_accept_connect,
150  .quic_connected_cb = NULL,
151  /* client initiated streams */
152  .server_stream_accepted_cb = quic_echo_on_accept_error,
153  .client_stream_connected_cb = quic_echo_on_connected_error,
154  /* server initiated streams */
155  .client_stream_accepted_cb = quic_echo_on_accept_recv,
156  .server_stream_connected_cb = quic_echo_on_connected_send,
157};
158
159static void quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died);
160
161static inline void
162quic_echo_cleanup_listener (u32 listener_index, echo_main_t * em,
163			    quic_echo_proto_main_t * eqm)
164{
165  echo_session_t *ls;
166  ls = pool_elt_at_index (em->sessions, listener_index);
167  ASSERT (ls->session_type == ECHO_SESSION_TYPE_QUIC);
168  if (!clib_atomic_sub_fetch (&ls->accepted_session_count, 1))
169    {
170      if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
171	{
172	  echo_send_rpc (em, echo_send_disconnect_session,
173			 (void *) ls->vpp_session_handle, 0);
174	  clib_atomic_fetch_add (&em->stats.active_count.q, 1);
175	}
176      else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
177	{
178	  quic_echo_cleanup_cb (ls, 0 /* parent_died */ );
179	  clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
180	}
181    }
182}
183
184static void
185quic_echo_cleanup_cb (echo_session_t * s, u8 parent_died)
186{
187  echo_main_t *em = &echo_main;
188  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
189  ASSERT (s->session_state < ECHO_SESSION_STATE_CLOSED);
190  if (s->session_type == ECHO_SESSION_TYPE_QUIC)
191    {
192      if (parent_died)
193	clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
194      /* Don't cleanup listener as it's handled by main() */
195      clib_atomic_sub_fetch (&eqm->n_quic_clients_connected, 1);
196    }
197  else if (s->session_type == ECHO_SESSION_TYPE_STREAM)
198    {
199      if (parent_died)
200	clib_atomic_fetch_add (&em->stats.clean_count.s, 1);
201      else
202	quic_echo_cleanup_listener (s->listener_index, em, eqm);
203      clib_atomic_sub_fetch (&em->n_clients_connected, 1);
204    }
205
206  ECHO_LOG (1, "Cleanup sessions (still %uQ %uS)",
207	    eqm->n_quic_clients_connected, em->n_clients_connected);
208  s->session_state = ECHO_SESSION_STATE_CLOSED;
209  if (!em->n_clients_connected && !eqm->n_quic_clients_connected)
210    em->state = STATE_DATA_DONE;
211}
212
213static void
214quic_echo_initiate_qsession_close_no_stream (echo_main_t * em)
215{
216  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
217  ECHO_LOG (1, "Closing Qsessions");
218  /* Close Quic session without streams */
219  echo_session_t *s;
220
221  /* *INDENT-OFF* */
222  pool_foreach (s, em->sessions,
223  ({
224    if (s->session_type == ECHO_SESSION_TYPE_QUIC)
225      {
226        if (eqm->send_quic_disconnects == ECHO_CLOSE_F_ACTIVE)
227          {
228            ECHO_LOG (1,"ACTIVE close 0x%lx", s->vpp_session_handle);
229            echo_send_rpc (em, echo_send_disconnect_session, (void *) s->vpp_session_handle, 0);
230            clib_atomic_fetch_add (&em->stats.active_count.q, 1);
231          }
232        else if (eqm->send_quic_disconnects == ECHO_CLOSE_F_NONE)
233          {
234            ECHO_LOG (1,"Discard close 0x%lx", s->vpp_session_handle);
235            quic_echo_cleanup_cb (s, 0 /* parent_died */);
236            clib_atomic_fetch_add (&em->stats.clean_count.q, 1);
237          }
238        else
239          ECHO_LOG (1,"Passive close 0x%lx", s->vpp_session_handle);
240      }
241  }));
242  /* *INDENT-ON* */
243}
244
245static void
246quic_echo_on_connected (session_connected_msg_t * mp, u32 session_index)
247{
248  echo_main_t *em = &echo_main;
249  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
250  echo_session_t *listen_session;
251  echo_session_t *session = pool_elt_at_index (em->sessions, session_index);
252  if (session->listener_index == SESSION_INVALID_INDEX)
253    {
254      ECHO_LOG (1, "Connected session 0x%lx -> URI", mp->handle);
255      session->session_type = ECHO_SESSION_TYPE_QUIC;
256      session->accepted_session_count = 0;
257      if (eqm->cb_vft.quic_connected_cb)
258	eqm->cb_vft.quic_connected_cb (mp, session->session_index);
259      clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
260    }
261  else
262    {
263      listen_session =
264	pool_elt_at_index (em->sessions, session->listener_index);
265      ECHO_LOG (1, "Connected session 0x%lx -> 0x%lx", mp->handle,
266		listen_session->vpp_session_handle);
267      session->session_type = ECHO_SESSION_TYPE_STREAM;
268      clib_atomic_fetch_add (&listen_session->accepted_session_count, 1);
269      if (em->i_am_master && eqm->cb_vft.server_stream_connected_cb)
270	eqm->cb_vft.server_stream_connected_cb (mp, session->session_index);
271      if (!em->i_am_master && eqm->cb_vft.client_stream_connected_cb)
272	eqm->cb_vft.client_stream_connected_cb (mp, session->session_index);
273      clib_atomic_fetch_add (&em->n_clients_connected, 1);
274    }
275
276  if (em->n_clients_connected == em->n_clients
277      && em->n_clients_connected != 0)
278    echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
279
280  if (eqm->n_quic_clients_connected == em->n_connects
281      && em->state < STATE_READY)
282    {
283      echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
284      em->state = STATE_READY;
285      if (eqm->n_stream_clients == 0)
286	quic_echo_initiate_qsession_close_no_stream (em);
287    }
288}
289
290static void
291quic_echo_retry_connect (u32 session_index)
292{
293  /* retry connect */
294  echo_session_t *session;
295  echo_main_t *em = &echo_main;
296  u8 *uri;
297  if (session_index == SESSION_INVALID_INDEX)
298    {
299      ECHO_LOG (1, "Retrying connect %s", em->uri);
300      echo_send_rpc (em, echo_send_connect, (void *) em->uri,
301		     SESSION_INVALID_INDEX);
302    }
303  else
304    {
305      session = pool_elt_at_index (em->sessions, session_index);
306      uri = format (0, "quic://session/%lu", session->vpp_session_handle);
307      ECHO_LOG (1, "Retrying connect %s", uri);
308      echo_send_rpc (em, echo_send_connect, (void *) uri, session_index);
309    }
310}
311
312static void
313quic_echo_connected_cb (session_connected_bundled_msg_t * mp,
314			u32 session_index, u8 is_failed)
315{
316  if (is_failed)
317    return quic_echo_retry_connect (session_index);
318  return quic_echo_on_connected ((session_connected_msg_t *) mp,
319				 session_index);
320}
321
322static void
323quic_echo_accepted_cb (session_accepted_msg_t * mp, echo_session_t * session)
324{
325  echo_main_t *em = &echo_main;
326  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
327  echo_session_t *ls;
328  ls = pool_elt_at_index (em->sessions, session->listener_index);
329  if (ls->session_type == ECHO_SESSION_TYPE_LISTEN)
330    {
331      echo_notify_event (em, ECHO_EVT_FIRST_QCONNECT);
332      session->session_type = ECHO_SESSION_TYPE_QUIC;
333      session->accepted_session_count = 0;
334      if (eqm->cb_vft.quic_accepted_cb)
335	eqm->cb_vft.quic_accepted_cb (mp, session->session_index);
336      clib_atomic_fetch_add (&eqm->n_quic_clients_connected, 1);
337    }
338  else
339    {
340      session->session_type = ECHO_SESSION_TYPE_STREAM;
341      echo_notify_event (em, ECHO_EVT_FIRST_SCONNECT);
342      clib_atomic_fetch_add (&ls->accepted_session_count, 1);
343      if (em->i_am_master && eqm->cb_vft.server_stream_accepted_cb)
344	eqm->cb_vft.server_stream_accepted_cb (mp, session->session_index);
345      if (!em->i_am_master && eqm->cb_vft.client_stream_accepted_cb)
346	eqm->cb_vft.client_stream_accepted_cb (mp, session->session_index);
347      clib_atomic_fetch_add (&em->n_clients_connected, 1);
348    }
349
350  if (em->n_clients_connected == em->n_clients
351      && em->n_clients_connected != 0)
352    echo_notify_event (em, ECHO_EVT_LAST_SCONNECTED);
353
354  if (eqm->n_quic_clients_connected == em->n_connects
355      && em->state < STATE_READY)
356    {
357      echo_notify_event (em, ECHO_EVT_LAST_QCONNECTED);
358      em->state = STATE_READY;
359      if (eqm->n_stream_clients == 0)
360	quic_echo_initiate_qsession_close_no_stream (em);
361    }
362}
363
364static void
365quic_echo_disconnected_reply_cb (echo_session_t * s)
366{
367  if (s->session_type == ECHO_SESSION_TYPE_STREAM)
368    s->session_state = ECHO_SESSION_STATE_CLOSING;
369  else
370    quic_echo_cleanup_cb (s, 0 /* parent_died */ );	/* We can clean Q/Lsessions right away */
371}
372
373static void
374quic_echo_disconnected_cb (session_disconnected_msg_t * mp,
375			   echo_session_t * s)
376{
377  echo_main_t *em = &echo_main;
378  if (s->session_type == ECHO_SESSION_TYPE_STREAM)
379    {
380      echo_session_print_stats (em, s);
381      if (s->bytes_to_receive || s->bytes_to_send)
382	s->session_state = ECHO_SESSION_STATE_AWAIT_DATA;
383      else
384	s->session_state = ECHO_SESSION_STATE_CLOSING;
385      clib_atomic_fetch_add (&em->stats.close_count.s, 1);
386    }
387  else
388    {
389      quic_echo_cleanup_cb (s, 0 /* parent_died */ );	/* We can clean Q/Lsessions right away */
390      clib_atomic_fetch_add (&em->stats.close_count.q, 1);
391    }
392}
393
394static void
395quic_echo_reset_cb (session_reset_msg_t * mp, echo_session_t * s)
396{
397  echo_main_t *em = &echo_main;
398  if (s->session_type == ECHO_SESSION_TYPE_STREAM)
399    {
400      clib_atomic_fetch_add (&em->stats.reset_count.s, 1);
401      s->session_state = ECHO_SESSION_STATE_CLOSING;
402    }
403  else
404    {
405      clib_atomic_fetch_add (&em->stats.reset_count.q, 1);
406      quic_echo_cleanup_cb (s, 0 /* parent_died */ );	/* We can clean Q/Lsessions right away */
407    }
408}
409
410static uword
411quic_echo_unformat_setup_vft (unformat_input_t * input, va_list * args)
412{
413  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
414  if (unformat (input, "serverstream"))
415    eqm->cb_vft = server_stream_cb_vft;
416  else if (unformat (input, "default"))
417    ;
418  else
419    return 0;
420  return 1;
421}
422
423static int
424quic_echo_process_opts_cb (unformat_input_t * a)
425{
426  echo_main_t *em = &echo_main;
427  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
428  if (unformat (a, "nclients %d/%d", &em->n_clients, &eqm->n_stream_clients))
429    ;
430  else if (unformat (a, "quic-setup %U", quic_echo_unformat_setup_vft))
431    ;
432  else if (unformat (a, "qclose=%U",
433		     echo_unformat_close, &eqm->send_quic_disconnects))
434    ;
435  else
436    return 0;
437  return 1;
438}
439
440static void
441quic_echo_set_defaults_before_opts_cb ()
442{
443  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
444  eqm->cb_vft = default_cb_vft;
445  eqm->n_stream_clients = 1;
446}
447
448static void
449quic_echo_set_defaults_after_opts_cb ()
450{
451  quic_echo_proto_main_t *eqm = &quic_echo_proto_main;
452  echo_main_t *em = &echo_main;
453  u8 default_f_active;
454
455  em->n_connects = em->n_clients;
456  em->n_sessions =
457    clib_max (1,
458	      eqm->n_stream_clients) * em->n_clients + eqm->n_stream_clients +
459    1;
460  em->n_clients = eqm->n_stream_clients * em->n_clients;
461
462  if (em->i_am_master)
463    default_f_active =
464      em->bytes_to_send == 0 ? ECHO_CLOSE_F_ACTIVE : ECHO_CLOSE_F_PASSIVE;
465  else
466    default_f_active =
467      em->bytes_to_receive == 0 ? ECHO_CLOSE_F_PASSIVE : ECHO_CLOSE_F_ACTIVE;
468  if (eqm->send_quic_disconnects == ECHO_CLOSE_F_INVALID)
469    eqm->send_quic_disconnects = default_f_active;
470}
471
472static void
473quic_echo_print_usage_cb ()
474{
475  fprintf (stderr,
476	   "-- QUIC specific options -- \n"
477	   "  quic-setup OPT      OPT=serverstream : Client open N connections. \n"
478	   "                       On each one server opens M streams\n"
479	   "                      OPT=default : Client open N connections.\n"
480	   "                       On each one client opens M streams\n"
481	   "  qclose=[Y|N|W]      When a connection is done pass[N] send[Y] or wait[W] for close\n"
482	   "\n"
483	   "  nclients N[/M]      Open N QUIC connections, each one with M streams (M defaults to 1)\n");
484}
485
486echo_proto_cb_vft_t quic_echo_proto_cb_vft = {
487  .disconnected_cb = quic_echo_disconnected_cb,
488  .connected_cb = quic_echo_connected_cb,
489  .accepted_cb = quic_echo_accepted_cb,
490  .reset_cb = quic_echo_reset_cb,
491  .disconnected_reply_cb = quic_echo_disconnected_reply_cb,
492  .cleanup_cb = quic_echo_cleanup_cb,
493  .process_opts_cb = quic_echo_process_opts_cb,
494  .print_usage_cb = quic_echo_print_usage_cb,
495  .set_defaults_before_opts_cb = quic_echo_set_defaults_before_opts_cb,
496  .set_defaults_after_opts_cb = quic_echo_set_defaults_after_opts_cb,
497};
498
499ECHO_REGISTER_PROTO (TRANSPORT_PROTO_QUIC, quic_echo_proto_cb_vft);
500
501/*
502 * fd.io coding-style-patch-verification: ON
503 *
504 * Local Variables:
505 * eval: (c-set-style "gnu")
506 * End:
507 */
508