1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8#include <ngx_config.h>
9#include <ngx_core.h>
10#include <ngx_stream.h>
11
12
13static ngx_int_t ngx_stream_upstream_add_variables(ngx_conf_t *cf);
14static ngx_int_t ngx_stream_upstream_addr_variable(ngx_stream_session_t *s,
15    ngx_stream_variable_value_t *v, uintptr_t data);
16static ngx_int_t ngx_stream_upstream_response_time_variable(
17    ngx_stream_session_t *s, ngx_stream_variable_value_t *v, uintptr_t data);
18static ngx_int_t ngx_stream_upstream_bytes_variable(ngx_stream_session_t *s,
19    ngx_stream_variable_value_t *v, uintptr_t data);
20
21static char *ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd,
22    void *dummy);
23static char *ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd,
24    void *conf);
25static void *ngx_stream_upstream_create_main_conf(ngx_conf_t *cf);
26static char *ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf);
27
28
29static ngx_command_t  ngx_stream_upstream_commands[] = {
30
31    { ngx_string("upstream"),
32      NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_TAKE1,
33      ngx_stream_upstream,
34      0,
35      0,
36      NULL },
37
38    { ngx_string("server"),
39      NGX_STREAM_UPS_CONF|NGX_CONF_1MORE,
40      ngx_stream_upstream_server,
41      NGX_STREAM_SRV_CONF_OFFSET,
42      0,
43      NULL },
44
45      ngx_null_command
46};
47
48
49static ngx_stream_module_t  ngx_stream_upstream_module_ctx = {
50    ngx_stream_upstream_add_variables,     /* preconfiguration */
51    NULL,                                  /* postconfiguration */
52
53    ngx_stream_upstream_create_main_conf,  /* create main configuration */
54    ngx_stream_upstream_init_main_conf,    /* init main configuration */
55
56    NULL,                                  /* create server configuration */
57    NULL                                   /* merge server configuration */
58};
59
60
61ngx_module_t  ngx_stream_upstream_module = {
62    NGX_MODULE_V1,
63    &ngx_stream_upstream_module_ctx,       /* module context */
64    ngx_stream_upstream_commands,          /* module directives */
65    NGX_STREAM_MODULE,                     /* module type */
66    NULL,                                  /* init master */
67    NULL,                                  /* init module */
68    NULL,                                  /* init process */
69    NULL,                                  /* init thread */
70    NULL,                                  /* exit thread */
71    NULL,                                  /* exit process */
72    NULL,                                  /* exit master */
73    NGX_MODULE_V1_PADDING
74};
75
76
77static ngx_stream_variable_t  ngx_stream_upstream_vars[] = {
78
79    { ngx_string("upstream_addr"), NULL,
80      ngx_stream_upstream_addr_variable, 0,
81      NGX_STREAM_VAR_NOCACHEABLE, 0 },
82
83    { ngx_string("upstream_bytes_sent"), NULL,
84      ngx_stream_upstream_bytes_variable, 0,
85      NGX_STREAM_VAR_NOCACHEABLE, 0 },
86
87    { ngx_string("upstream_connect_time"), NULL,
88      ngx_stream_upstream_response_time_variable, 2,
89      NGX_STREAM_VAR_NOCACHEABLE, 0 },
90
91    { ngx_string("upstream_first_byte_time"), NULL,
92      ngx_stream_upstream_response_time_variable, 1,
93      NGX_STREAM_VAR_NOCACHEABLE, 0 },
94
95    { ngx_string("upstream_session_time"), NULL,
96      ngx_stream_upstream_response_time_variable, 0,
97      NGX_STREAM_VAR_NOCACHEABLE, 0 },
98
99    { ngx_string("upstream_bytes_received"), NULL,
100      ngx_stream_upstream_bytes_variable, 1,
101      NGX_STREAM_VAR_NOCACHEABLE, 0 },
102
103    { ngx_null_string, NULL, NULL, 0, 0, 0 }
104};
105
106
107static ngx_int_t
108ngx_stream_upstream_add_variables(ngx_conf_t *cf)
109{
110    ngx_stream_variable_t  *var, *v;
111
112    for (v = ngx_stream_upstream_vars; v->name.len; v++) {
113        var = ngx_stream_add_variable(cf, &v->name, v->flags);
114        if (var == NULL) {
115            return NGX_ERROR;
116        }
117
118        var->get_handler = v->get_handler;
119        var->data = v->data;
120    }
121
122    return NGX_OK;
123}
124
125
126static ngx_int_t
127ngx_stream_upstream_addr_variable(ngx_stream_session_t *s,
128    ngx_stream_variable_value_t *v, uintptr_t data)
129{
130    u_char                       *p;
131    size_t                        len;
132    ngx_uint_t                    i;
133    ngx_stream_upstream_state_t  *state;
134
135    v->valid = 1;
136    v->no_cacheable = 0;
137    v->not_found = 0;
138
139    if (s->upstream_states == NULL || s->upstream_states->nelts == 0) {
140        v->not_found = 1;
141        return NGX_OK;
142    }
143
144    len = 0;
145    state = s->upstream_states->elts;
146
147    for (i = 0; i < s->upstream_states->nelts; i++) {
148        if (state[i].peer) {
149            len += state[i].peer->len;
150        }
151
152        len += 2;
153    }
154
155    p = ngx_pnalloc(s->connection->pool, len);
156    if (p == NULL) {
157        return NGX_ERROR;
158    }
159
160    v->data = p;
161
162    i = 0;
163
164    for ( ;; ) {
165        if (state[i].peer) {
166            p = ngx_cpymem(p, state[i].peer->data, state[i].peer->len);
167        }
168
169        if (++i == s->upstream_states->nelts) {
170            break;
171        }
172
173        *p++ = ',';
174        *p++ = ' ';
175    }
176
177    v->len = p - v->data;
178
179    return NGX_OK;
180}
181
182
183static ngx_int_t
184ngx_stream_upstream_bytes_variable(ngx_stream_session_t *s,
185    ngx_stream_variable_value_t *v, uintptr_t data)
186{
187    u_char                       *p;
188    size_t                        len;
189    ngx_uint_t                    i;
190    ngx_stream_upstream_state_t  *state;
191
192    v->valid = 1;
193    v->no_cacheable = 0;
194    v->not_found = 0;
195
196    if (s->upstream_states == NULL || s->upstream_states->nelts == 0) {
197        v->not_found = 1;
198        return NGX_OK;
199    }
200
201    len = s->upstream_states->nelts * (NGX_OFF_T_LEN + 2);
202
203    p = ngx_pnalloc(s->connection->pool, len);
204    if (p == NULL) {
205        return NGX_ERROR;
206    }
207
208    v->data = p;
209
210    i = 0;
211    state = s->upstream_states->elts;
212
213    for ( ;; ) {
214
215        if (data == 1) {
216            p = ngx_sprintf(p, "%O", state[i].bytes_received);
217
218        } else {
219            p = ngx_sprintf(p, "%O", state[i].bytes_sent);
220        }
221
222        if (++i == s->upstream_states->nelts) {
223            break;
224        }
225
226        *p++ = ',';
227        *p++ = ' ';
228    }
229
230    v->len = p - v->data;
231
232    return NGX_OK;
233}
234
235
236static ngx_int_t
237ngx_stream_upstream_response_time_variable(ngx_stream_session_t *s,
238    ngx_stream_variable_value_t *v, uintptr_t data)
239{
240    u_char                       *p;
241    size_t                        len;
242    ngx_uint_t                    i;
243    ngx_msec_int_t                ms;
244    ngx_stream_upstream_state_t  *state;
245
246    v->valid = 1;
247    v->no_cacheable = 0;
248    v->not_found = 0;
249
250    if (s->upstream_states == NULL || s->upstream_states->nelts == 0) {
251        v->not_found = 1;
252        return NGX_OK;
253    }
254
255    len = s->upstream_states->nelts * (NGX_TIME_T_LEN + 4 + 2);
256
257    p = ngx_pnalloc(s->connection->pool, len);
258    if (p == NULL) {
259        return NGX_ERROR;
260    }
261
262    v->data = p;
263
264    i = 0;
265    state = s->upstream_states->elts;
266
267    for ( ;; ) {
268
269        if (data == 1) {
270            if (state[i].first_byte_time == (ngx_msec_t) -1) {
271                *p++ = '-';
272                goto next;
273            }
274
275            ms = state[i].first_byte_time;
276
277        } else if (data == 2 && state[i].connect_time != (ngx_msec_t) -1) {
278            ms = state[i].connect_time;
279
280        } else {
281            ms = state[i].response_time;
282        }
283
284        ms = ngx_max(ms, 0);
285        p = ngx_sprintf(p, "%T.%03M", (time_t) ms / 1000, ms % 1000);
286
287    next:
288
289        if (++i == s->upstream_states->nelts) {
290            break;
291        }
292
293        *p++ = ',';
294        *p++ = ' ';
295    }
296
297    v->len = p - v->data;
298
299    return NGX_OK;
300}
301
302
303static char *
304ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
305{
306    char                            *rv;
307    void                            *mconf;
308    ngx_str_t                       *value;
309    ngx_url_t                        u;
310    ngx_uint_t                       m;
311    ngx_conf_t                       pcf;
312    ngx_stream_module_t             *module;
313    ngx_stream_conf_ctx_t           *ctx, *stream_ctx;
314    ngx_stream_upstream_srv_conf_t  *uscf;
315
316    ngx_memzero(&u, sizeof(ngx_url_t));
317
318    value = cf->args->elts;
319    u.host = value[1];
320    u.no_resolve = 1;
321    u.no_port = 1;
322
323    uscf = ngx_stream_upstream_add(cf, &u, NGX_STREAM_UPSTREAM_CREATE
324                                           |NGX_STREAM_UPSTREAM_WEIGHT
325                                           |NGX_STREAM_UPSTREAM_MAX_CONNS
326                                           |NGX_STREAM_UPSTREAM_MAX_FAILS
327                                           |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
328                                           |NGX_STREAM_UPSTREAM_DOWN
329                                           |NGX_STREAM_UPSTREAM_BACKUP);
330    if (uscf == NULL) {
331        return NGX_CONF_ERROR;
332    }
333
334
335    ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
336    if (ctx == NULL) {
337        return NGX_CONF_ERROR;
338    }
339
340    stream_ctx = cf->ctx;
341    ctx->main_conf = stream_ctx->main_conf;
342
343    /* the upstream{}'s srv_conf */
344
345    ctx->srv_conf = ngx_pcalloc(cf->pool,
346                                sizeof(void *) * ngx_stream_max_module);
347    if (ctx->srv_conf == NULL) {
348        return NGX_CONF_ERROR;
349    }
350
351    ctx->srv_conf[ngx_stream_upstream_module.ctx_index] = uscf;
352
353    uscf->srv_conf = ctx->srv_conf;
354
355    for (m = 0; cf->cycle->modules[m]; m++) {
356        if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) {
357            continue;
358        }
359
360        module = cf->cycle->modules[m]->ctx;
361
362        if (module->create_srv_conf) {
363            mconf = module->create_srv_conf(cf);
364            if (mconf == NULL) {
365                return NGX_CONF_ERROR;
366            }
367
368            ctx->srv_conf[cf->cycle->modules[m]->ctx_index] = mconf;
369        }
370    }
371
372    uscf->servers = ngx_array_create(cf->pool, 4,
373                                     sizeof(ngx_stream_upstream_server_t));
374    if (uscf->servers == NULL) {
375        return NGX_CONF_ERROR;
376    }
377
378
379    /* parse inside upstream{} */
380
381    pcf = *cf;
382    cf->ctx = ctx;
383    cf->cmd_type = NGX_STREAM_UPS_CONF;
384
385    rv = ngx_conf_parse(cf, NULL);
386
387    *cf = pcf;
388
389    if (rv != NGX_CONF_OK) {
390        return rv;
391    }
392
393    if (uscf->servers->nelts == 0) {
394        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
395                           "no servers are inside upstream");
396        return NGX_CONF_ERROR;
397    }
398
399    return rv;
400}
401
402
403static char *
404ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
405{
406    ngx_stream_upstream_srv_conf_t  *uscf = conf;
407
408    time_t                         fail_timeout;
409    ngx_str_t                     *value, s;
410    ngx_url_t                      u;
411    ngx_int_t                      weight, max_conns, max_fails;
412    ngx_uint_t                     i;
413    ngx_stream_upstream_server_t  *us;
414
415    us = ngx_array_push(uscf->servers);
416    if (us == NULL) {
417        return NGX_CONF_ERROR;
418    }
419
420    ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
421
422    value = cf->args->elts;
423
424    weight = 1;
425    max_conns = 0;
426    max_fails = 1;
427    fail_timeout = 10;
428
429    for (i = 2; i < cf->args->nelts; i++) {
430
431        if (ngx_strncmp(value[i].data, "weight=", 7) == 0) {
432
433            if (!(uscf->flags & NGX_STREAM_UPSTREAM_WEIGHT)) {
434                goto not_supported;
435            }
436
437            weight = ngx_atoi(&value[i].data[7], value[i].len - 7);
438
439            if (weight == NGX_ERROR || weight == 0) {
440                goto invalid;
441            }
442
443            continue;
444        }
445
446        if (ngx_strncmp(value[i].data, "max_conns=", 10) == 0) {
447
448            if (!(uscf->flags & NGX_STREAM_UPSTREAM_MAX_CONNS)) {
449                goto not_supported;
450            }
451
452            max_conns = ngx_atoi(&value[i].data[10], value[i].len - 10);
453
454            if (max_conns == NGX_ERROR) {
455                goto invalid;
456            }
457
458            continue;
459        }
460
461        if (ngx_strncmp(value[i].data, "max_fails=", 10) == 0) {
462
463            if (!(uscf->flags & NGX_STREAM_UPSTREAM_MAX_FAILS)) {
464                goto not_supported;
465            }
466
467            max_fails = ngx_atoi(&value[i].data[10], value[i].len - 10);
468
469            if (max_fails == NGX_ERROR) {
470                goto invalid;
471            }
472
473            continue;
474        }
475
476        if (ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) {
477
478            if (!(uscf->flags & NGX_STREAM_UPSTREAM_FAIL_TIMEOUT)) {
479                goto not_supported;
480            }
481
482            s.len = value[i].len - 13;
483            s.data = &value[i].data[13];
484
485            fail_timeout = ngx_parse_time(&s, 1);
486
487            if (fail_timeout == (time_t) NGX_ERROR) {
488                goto invalid;
489            }
490
491            continue;
492        }
493
494        if (ngx_strcmp(value[i].data, "backup") == 0) {
495
496            if (!(uscf->flags & NGX_STREAM_UPSTREAM_BACKUP)) {
497                goto not_supported;
498            }
499
500            us->backup = 1;
501
502            continue;
503        }
504
505        if (ngx_strcmp(value[i].data, "down") == 0) {
506
507            if (!(uscf->flags & NGX_STREAM_UPSTREAM_DOWN)) {
508                goto not_supported;
509            }
510
511            us->down = 1;
512
513            continue;
514        }
515
516        goto invalid;
517    }
518
519    ngx_memzero(&u, sizeof(ngx_url_t));
520
521    u.url = value[1];
522
523    if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
524        if (u.err) {
525            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
526                               "%s in upstream \"%V\"", u.err, &u.url);
527        }
528
529        return NGX_CONF_ERROR;
530    }
531
532    if (u.no_port) {
533        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
534                           "no port in upstream \"%V\"", &u.url);
535        return NGX_CONF_ERROR;
536    }
537
538    us->name = u.url;
539    us->addrs = u.addrs;
540    us->naddrs = u.naddrs;
541    us->weight = weight;
542    us->max_conns = max_conns;
543    us->max_fails = max_fails;
544    us->fail_timeout = fail_timeout;
545
546    return NGX_CONF_OK;
547
548invalid:
549
550    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
551                       "invalid parameter \"%V\"", &value[i]);
552
553    return NGX_CONF_ERROR;
554
555not_supported:
556
557    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
558                       "balancing method does not support parameter \"%V\"",
559                       &value[i]);
560
561    return NGX_CONF_ERROR;
562}
563
564
565ngx_stream_upstream_srv_conf_t *
566ngx_stream_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags)
567{
568    ngx_uint_t                        i;
569    ngx_stream_upstream_server_t     *us;
570    ngx_stream_upstream_srv_conf_t   *uscf, **uscfp;
571    ngx_stream_upstream_main_conf_t  *umcf;
572
573    if (!(flags & NGX_STREAM_UPSTREAM_CREATE)) {
574
575        if (ngx_parse_url(cf->pool, u) != NGX_OK) {
576            if (u->err) {
577                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
578                                   "%s in upstream \"%V\"", u->err, &u->url);
579            }
580
581            return NULL;
582        }
583    }
584
585    umcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_upstream_module);
586
587    uscfp = umcf->upstreams.elts;
588
589    for (i = 0; i < umcf->upstreams.nelts; i++) {
590
591        if (uscfp[i]->host.len != u->host.len
592            || ngx_strncasecmp(uscfp[i]->host.data, u->host.data, u->host.len)
593               != 0)
594        {
595            continue;
596        }
597
598        if ((flags & NGX_STREAM_UPSTREAM_CREATE)
599             && (uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE))
600        {
601            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
602                               "duplicate upstream \"%V\"", &u->host);
603            return NULL;
604        }
605
606        if ((uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE) && !u->no_port) {
607            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
608                               "upstream \"%V\" may not have port %d",
609                               &u->host, u->port);
610            return NULL;
611        }
612
613        if ((flags & NGX_STREAM_UPSTREAM_CREATE) && !uscfp[i]->no_port) {
614            ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
615                          "upstream \"%V\" may not have port %d in %s:%ui",
616                          &u->host, uscfp[i]->port,
617                          uscfp[i]->file_name, uscfp[i]->line);
618            return NULL;
619        }
620
621        if (uscfp[i]->port != u->port) {
622            continue;
623        }
624
625        if (flags & NGX_STREAM_UPSTREAM_CREATE) {
626            uscfp[i]->flags = flags;
627        }
628
629        return uscfp[i];
630    }
631
632    uscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_srv_conf_t));
633    if (uscf == NULL) {
634        return NULL;
635    }
636
637    uscf->flags = flags;
638    uscf->host = u->host;
639    uscf->file_name = cf->conf_file->file.name.data;
640    uscf->line = cf->conf_file->line;
641    uscf->port = u->port;
642    uscf->no_port = u->no_port;
643
644    if (u->naddrs == 1 && (u->port || u->family == AF_UNIX)) {
645        uscf->servers = ngx_array_create(cf->pool, 1,
646                                         sizeof(ngx_stream_upstream_server_t));
647        if (uscf->servers == NULL) {
648            return NULL;
649        }
650
651        us = ngx_array_push(uscf->servers);
652        if (us == NULL) {
653            return NULL;
654        }
655
656        ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
657
658        us->addrs = u->addrs;
659        us->naddrs = 1;
660    }
661
662    uscfp = ngx_array_push(&umcf->upstreams);
663    if (uscfp == NULL) {
664        return NULL;
665    }
666
667    *uscfp = uscf;
668
669    return uscf;
670}
671
672
673static void *
674ngx_stream_upstream_create_main_conf(ngx_conf_t *cf)
675{
676    ngx_stream_upstream_main_conf_t  *umcf;
677
678    umcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_main_conf_t));
679    if (umcf == NULL) {
680        return NULL;
681    }
682
683    if (ngx_array_init(&umcf->upstreams, cf->pool, 4,
684                       sizeof(ngx_stream_upstream_srv_conf_t *))
685        != NGX_OK)
686    {
687        return NULL;
688    }
689
690    return umcf;
691}
692
693
694static char *
695ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf)
696{
697    ngx_stream_upstream_main_conf_t *umcf = conf;
698
699    ngx_uint_t                        i;
700    ngx_stream_upstream_init_pt       init;
701    ngx_stream_upstream_srv_conf_t  **uscfp;
702
703    uscfp = umcf->upstreams.elts;
704
705    for (i = 0; i < umcf->upstreams.nelts; i++) {
706
707        init = uscfp[i]->peer.init_upstream
708                                         ? uscfp[i]->peer.init_upstream
709                                         : ngx_stream_upstream_init_round_robin;
710
711        if (init(cf, uscfp[i]) != NGX_OK) {
712            return NGX_CONF_ERROR;
713        }
714    }
715
716    return NGX_CONF_OK;
717}
718