1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8#include <ngx_config.h>
9#include <ngx_core.h>
10#include <ngx_event.h>
11#include <ngx_event_pipe.h>
12
13
14static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
15static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
16
17static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
18static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
19static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
20
21
22ngx_int_t
23ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
24{
25    ngx_int_t     rc;
26    ngx_uint_t    flags;
27    ngx_event_t  *rev, *wev;
28
29    for ( ;; ) {
30        if (do_write) {
31            p->log->action = "sending to client";
32
33            rc = ngx_event_pipe_write_to_downstream(p);
34
35            if (rc == NGX_ABORT) {
36                return NGX_ABORT;
37            }
38
39            if (rc == NGX_BUSY) {
40                return NGX_OK;
41            }
42        }
43
44        p->read = 0;
45        p->upstream_blocked = 0;
46
47        p->log->action = "reading upstream";
48
49        if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
50            return NGX_ABORT;
51        }
52
53        if (!p->read && !p->upstream_blocked) {
54            break;
55        }
56
57        do_write = 1;
58    }
59
60    if (p->upstream->fd != (ngx_socket_t) -1) {
61        rev = p->upstream->read;
62
63        flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
64
65        if (ngx_handle_read_event(rev, flags) != NGX_OK) {
66            return NGX_ABORT;
67        }
68
69        if (!rev->delayed) {
70            if (rev->active && !rev->ready) {
71                ngx_add_timer(rev, p->read_timeout);
72
73            } else if (rev->timer_set) {
74                ngx_del_timer(rev);
75            }
76        }
77    }
78
79    if (p->downstream->fd != (ngx_socket_t) -1
80        && p->downstream->data == p->output_ctx)
81    {
82        wev = p->downstream->write;
83        if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
84            return NGX_ABORT;
85        }
86
87        if (!wev->delayed) {
88            if (wev->active && !wev->ready) {
89                ngx_add_timer(wev, p->send_timeout);
90
91            } else if (wev->timer_set) {
92                ngx_del_timer(wev);
93            }
94        }
95    }
96
97    return NGX_OK;
98}
99
100
101static ngx_int_t
102ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
103{
104    off_t         limit;
105    ssize_t       n, size;
106    ngx_int_t     rc;
107    ngx_buf_t    *b;
108    ngx_msec_t    delay;
109    ngx_chain_t  *chain, *cl, *ln;
110
111    if (p->upstream_eof || p->upstream_error || p->upstream_done) {
112        return NGX_OK;
113    }
114
115#if (NGX_THREADS)
116
117    if (p->aio) {
118        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
119                       "pipe read upstream: aio");
120        return NGX_AGAIN;
121    }
122
123    if (p->writing) {
124        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
125                       "pipe read upstream: writing");
126
127        rc = ngx_event_pipe_write_chain_to_temp_file(p);
128
129        if (rc != NGX_OK) {
130            return rc;
131        }
132    }
133
134#endif
135
136    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
137                   "pipe read upstream: %d", p->upstream->read->ready);
138
139    for ( ;; ) {
140
141        if (p->upstream_eof || p->upstream_error || p->upstream_done) {
142            break;
143        }
144
145        if (p->preread_bufs == NULL && !p->upstream->read->ready) {
146            break;
147        }
148
149        if (p->preread_bufs) {
150
151            /* use the pre-read bufs if they exist */
152
153            chain = p->preread_bufs;
154            p->preread_bufs = NULL;
155            n = p->preread_size;
156
157            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
158                           "pipe preread: %z", n);
159
160            if (n) {
161                p->read = 1;
162            }
163
164        } else {
165
166#if (NGX_HAVE_KQUEUE)
167
168            /*
169             * kqueue notifies about the end of file or a pending error.
170             * This test allows not to allocate a buf on these conditions
171             * and not to call c->recv_chain().
172             */
173
174            if (p->upstream->read->available == 0
175                && p->upstream->read->pending_eof)
176            {
177                p->upstream->read->ready = 0;
178                p->upstream->read->eof = 1;
179                p->upstream_eof = 1;
180                p->read = 1;
181
182                if (p->upstream->read->kq_errno) {
183                    p->upstream->read->error = 1;
184                    p->upstream_error = 1;
185                    p->upstream_eof = 0;
186
187                    ngx_log_error(NGX_LOG_ERR, p->log,
188                                  p->upstream->read->kq_errno,
189                                  "kevent() reported that upstream "
190                                  "closed connection");
191                }
192
193                break;
194            }
195#endif
196
197            if (p->limit_rate) {
198                if (p->upstream->read->delayed) {
199                    break;
200                }
201
202                limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
203                        - p->read_length;
204
205                if (limit <= 0) {
206                    p->upstream->read->delayed = 1;
207                    delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
208                    ngx_add_timer(p->upstream->read, delay);
209                    break;
210                }
211
212            } else {
213                limit = 0;
214            }
215
216            if (p->free_raw_bufs) {
217
218                /* use the free bufs if they exist */
219
220                chain = p->free_raw_bufs;
221                if (p->single_buf) {
222                    p->free_raw_bufs = p->free_raw_bufs->next;
223                    chain->next = NULL;
224                } else {
225                    p->free_raw_bufs = NULL;
226                }
227
228            } else if (p->allocated < p->bufs.num) {
229
230                /* allocate a new buf if it's still allowed */
231
232                b = ngx_create_temp_buf(p->pool, p->bufs.size);
233                if (b == NULL) {
234                    return NGX_ABORT;
235                }
236
237                p->allocated++;
238
239                chain = ngx_alloc_chain_link(p->pool);
240                if (chain == NULL) {
241                    return NGX_ABORT;
242                }
243
244                chain->buf = b;
245                chain->next = NULL;
246
247            } else if (!p->cacheable
248                       && p->downstream->data == p->output_ctx
249                       && p->downstream->write->ready
250                       && !p->downstream->write->delayed)
251            {
252                /*
253                 * if the bufs are not needed to be saved in a cache and
254                 * a downstream is ready then write the bufs to a downstream
255                 */
256
257                p->upstream_blocked = 1;
258
259                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
260                               "pipe downstream ready");
261
262                break;
263
264            } else if (p->cacheable
265                       || p->temp_file->offset < p->max_temp_file_size)
266            {
267
268                /*
269                 * if it is allowed, then save some bufs from p->in
270                 * to a temporary file, and add them to a p->out chain
271                 */
272
273                rc = ngx_event_pipe_write_chain_to_temp_file(p);
274
275                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
276                               "pipe temp offset: %O", p->temp_file->offset);
277
278                if (rc == NGX_BUSY) {
279                    break;
280                }
281
282                if (rc != NGX_OK) {
283                    return rc;
284                }
285
286                chain = p->free_raw_bufs;
287                if (p->single_buf) {
288                    p->free_raw_bufs = p->free_raw_bufs->next;
289                    chain->next = NULL;
290                } else {
291                    p->free_raw_bufs = NULL;
292                }
293
294            } else {
295
296                /* there are no bufs to read in */
297
298                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
299                               "no pipe bufs to read in");
300
301                break;
302            }
303
304            n = p->upstream->recv_chain(p->upstream, chain, limit);
305
306            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
307                           "pipe recv chain: %z", n);
308
309            if (p->free_raw_bufs) {
310                chain->next = p->free_raw_bufs;
311            }
312            p->free_raw_bufs = chain;
313
314            if (n == NGX_ERROR) {
315                p->upstream_error = 1;
316                break;
317            }
318
319            if (n == NGX_AGAIN) {
320                if (p->single_buf) {
321                    ngx_event_pipe_remove_shadow_links(chain->buf);
322                }
323
324                break;
325            }
326
327            p->read = 1;
328
329            if (n == 0) {
330                p->upstream_eof = 1;
331                break;
332            }
333        }
334
335        delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
336
337        p->read_length += n;
338        cl = chain;
339        p->free_raw_bufs = NULL;
340
341        while (cl && n > 0) {
342
343            ngx_event_pipe_remove_shadow_links(cl->buf);
344
345            size = cl->buf->end - cl->buf->last;
346
347            if (n >= size) {
348                cl->buf->last = cl->buf->end;
349
350                /* STUB */ cl->buf->num = p->num++;
351
352                if (p->input_filter(p, cl->buf) == NGX_ERROR) {
353                    return NGX_ABORT;
354                }
355
356                n -= size;
357                ln = cl;
358                cl = cl->next;
359                ngx_free_chain(p->pool, ln);
360
361            } else {
362                cl->buf->last += n;
363                n = 0;
364            }
365        }
366
367        if (cl) {
368            for (ln = cl; ln->next; ln = ln->next) { /* void */ }
369
370            ln->next = p->free_raw_bufs;
371            p->free_raw_bufs = cl;
372        }
373
374        if (delay > 0) {
375            p->upstream->read->delayed = 1;
376            ngx_add_timer(p->upstream->read, delay);
377            break;
378        }
379    }
380
381#if (NGX_DEBUG)
382
383    for (cl = p->busy; cl; cl = cl->next) {
384        ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
385                       "pipe buf busy s:%d t:%d f:%d "
386                       "%p, pos %p, size: %z "
387                       "file: %O, size: %O",
388                       (cl->buf->shadow ? 1 : 0),
389                       cl->buf->temporary, cl->buf->in_file,
390                       cl->buf->start, cl->buf->pos,
391                       cl->buf->last - cl->buf->pos,
392                       cl->buf->file_pos,
393                       cl->buf->file_last - cl->buf->file_pos);
394    }
395
396    for (cl = p->out; cl; cl = cl->next) {
397        ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
398                       "pipe buf out  s:%d t:%d f:%d "
399                       "%p, pos %p, size: %z "
400                       "file: %O, size: %O",
401                       (cl->buf->shadow ? 1 : 0),
402                       cl->buf->temporary, cl->buf->in_file,
403                       cl->buf->start, cl->buf->pos,
404                       cl->buf->last - cl->buf->pos,
405                       cl->buf->file_pos,
406                       cl->buf->file_last - cl->buf->file_pos);
407    }
408
409    for (cl = p->in; cl; cl = cl->next) {
410        ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
411                       "pipe buf in   s:%d t:%d f:%d "
412                       "%p, pos %p, size: %z "
413                       "file: %O, size: %O",
414                       (cl->buf->shadow ? 1 : 0),
415                       cl->buf->temporary, cl->buf->in_file,
416                       cl->buf->start, cl->buf->pos,
417                       cl->buf->last - cl->buf->pos,
418                       cl->buf->file_pos,
419                       cl->buf->file_last - cl->buf->file_pos);
420    }
421
422    for (cl = p->free_raw_bufs; cl; cl = cl->next) {
423        ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
424                       "pipe buf free s:%d t:%d f:%d "
425                       "%p, pos %p, size: %z "
426                       "file: %O, size: %O",
427                       (cl->buf->shadow ? 1 : 0),
428                       cl->buf->temporary, cl->buf->in_file,
429                       cl->buf->start, cl->buf->pos,
430                       cl->buf->last - cl->buf->pos,
431                       cl->buf->file_pos,
432                       cl->buf->file_last - cl->buf->file_pos);
433    }
434
435    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
436                   "pipe length: %O", p->length);
437
438#endif
439
440    if (p->free_raw_bufs && p->length != -1) {
441        cl = p->free_raw_bufs;
442
443        if (cl->buf->last - cl->buf->pos >= p->length) {
444
445            p->free_raw_bufs = cl->next;
446
447            /* STUB */ cl->buf->num = p->num++;
448
449            if (p->input_filter(p, cl->buf) == NGX_ERROR) {
450                return NGX_ABORT;
451            }
452
453            ngx_free_chain(p->pool, cl);
454        }
455    }
456
457    if (p->length == 0) {
458        p->upstream_done = 1;
459        p->read = 1;
460    }
461
462    if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
463
464        /* STUB */ p->free_raw_bufs->buf->num = p->num++;
465
466        if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
467            return NGX_ABORT;
468        }
469
470        p->free_raw_bufs = p->free_raw_bufs->next;
471
472        if (p->free_bufs && p->buf_to_file == NULL) {
473            for (cl = p->free_raw_bufs; cl; cl = cl->next) {
474                if (cl->buf->shadow == NULL) {
475                    ngx_pfree(p->pool, cl->buf->start);
476                }
477            }
478        }
479    }
480
481    if (p->cacheable && (p->in || p->buf_to_file)) {
482
483        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
484                       "pipe write chain");
485
486        rc = ngx_event_pipe_write_chain_to_temp_file(p);
487
488        if (rc != NGX_OK) {
489            return rc;
490        }
491    }
492
493    return NGX_OK;
494}
495
496
497static ngx_int_t
498ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
499{
500    u_char            *prev;
501    size_t             bsize;
502    ngx_int_t          rc;
503    ngx_uint_t         flush, flushed, prev_last_shadow;
504    ngx_chain_t       *out, **ll, *cl;
505    ngx_connection_t  *downstream;
506
507    downstream = p->downstream;
508
509    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
510                   "pipe write downstream: %d", downstream->write->ready);
511
512#if (NGX_THREADS)
513
514    if (p->writing) {
515        rc = ngx_event_pipe_write_chain_to_temp_file(p);
516
517        if (rc == NGX_ABORT) {
518            return NGX_ABORT;
519        }
520    }
521
522#endif
523
524    flushed = 0;
525
526    for ( ;; ) {
527        if (p->downstream_error) {
528            return ngx_event_pipe_drain_chains(p);
529        }
530
531        if (p->upstream_eof || p->upstream_error || p->upstream_done) {
532
533            /* pass the p->out and p->in chains to the output filter */
534
535            for (cl = p->busy; cl; cl = cl->next) {
536                cl->buf->recycled = 0;
537            }
538
539            if (p->out) {
540                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
541                               "pipe write downstream flush out");
542
543                for (cl = p->out; cl; cl = cl->next) {
544                    cl->buf->recycled = 0;
545                }
546
547                rc = p->output_filter(p->output_ctx, p->out);
548
549                if (rc == NGX_ERROR) {
550                    p->downstream_error = 1;
551                    return ngx_event_pipe_drain_chains(p);
552                }
553
554                p->out = NULL;
555            }
556
557            if (p->writing) {
558                break;
559            }
560
561            if (p->in) {
562                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
563                               "pipe write downstream flush in");
564
565                for (cl = p->in; cl; cl = cl->next) {
566                    cl->buf->recycled = 0;
567                }
568
569                rc = p->output_filter(p->output_ctx, p->in);
570
571                if (rc == NGX_ERROR) {
572                    p->downstream_error = 1;
573                    return ngx_event_pipe_drain_chains(p);
574                }
575
576                p->in = NULL;
577            }
578
579            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
580                           "pipe write downstream done");
581
582            /* TODO: free unused bufs */
583
584            p->downstream_done = 1;
585            break;
586        }
587
588        if (downstream->data != p->output_ctx
589            || !downstream->write->ready
590            || downstream->write->delayed)
591        {
592            break;
593        }
594
595        /* bsize is the size of the busy recycled bufs */
596
597        prev = NULL;
598        bsize = 0;
599
600        for (cl = p->busy; cl; cl = cl->next) {
601
602            if (cl->buf->recycled) {
603                if (prev == cl->buf->start) {
604                    continue;
605                }
606
607                bsize += cl->buf->end - cl->buf->start;
608                prev = cl->buf->start;
609            }
610        }
611
612        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
613                       "pipe write busy: %uz", bsize);
614
615        out = NULL;
616
617        if (bsize >= (size_t) p->busy_size) {
618            flush = 1;
619            goto flush;
620        }
621
622        flush = 0;
623        ll = NULL;
624        prev_last_shadow = 1;
625
626        for ( ;; ) {
627            if (p->out) {
628                cl = p->out;
629
630                if (cl->buf->recycled) {
631                    ngx_log_error(NGX_LOG_ALERT, p->log, 0,
632                                  "recycled buffer in pipe out chain");
633                }
634
635                p->out = p->out->next;
636
637            } else if (!p->cacheable && !p->writing && p->in) {
638                cl = p->in;
639
640                ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
641                               "pipe write buf ls:%d %p %z",
642                               cl->buf->last_shadow,
643                               cl->buf->pos,
644                               cl->buf->last - cl->buf->pos);
645
646                if (cl->buf->recycled && prev_last_shadow) {
647                    if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
648                        flush = 1;
649                        break;
650                    }
651
652                    bsize += cl->buf->end - cl->buf->start;
653                }
654
655                prev_last_shadow = cl->buf->last_shadow;
656
657                p->in = p->in->next;
658
659            } else {
660                break;
661            }
662
663            cl->next = NULL;
664
665            if (out) {
666                *ll = cl;
667            } else {
668                out = cl;
669            }
670            ll = &cl->next;
671        }
672
673    flush:
674
675        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
676                       "pipe write: out:%p, f:%ui", out, flush);
677
678        if (out == NULL) {
679
680            if (!flush) {
681                break;
682            }
683
684            /* a workaround for AIO */
685            if (flushed++ > 10) {
686                return NGX_BUSY;
687            }
688        }
689
690        rc = p->output_filter(p->output_ctx, out);
691
692        ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
693
694        if (rc == NGX_ERROR) {
695            p->downstream_error = 1;
696            return ngx_event_pipe_drain_chains(p);
697        }
698
699        for (cl = p->free; cl; cl = cl->next) {
700
701            if (cl->buf->temp_file) {
702                if (p->cacheable || !p->cyclic_temp_file) {
703                    continue;
704                }
705
706                /* reset p->temp_offset if all bufs had been sent */
707
708                if (cl->buf->file_last == p->temp_file->offset) {
709                    p->temp_file->offset = 0;
710                }
711            }
712
713            /* TODO: free buf if p->free_bufs && upstream done */
714
715            /* add the free shadow raw buf to p->free_raw_bufs */
716
717            if (cl->buf->last_shadow) {
718                if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
719                    return NGX_ABORT;
720                }
721
722                cl->buf->last_shadow = 0;
723            }
724
725            cl->buf->shadow = NULL;
726        }
727    }
728
729    return NGX_OK;
730}
731
732
733static ngx_int_t
734ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
735{
736    ssize_t       size, bsize, n;
737    ngx_buf_t    *b;
738    ngx_uint_t    prev_last_shadow;
739    ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free;
740
741#if (NGX_THREADS)
742
743    if (p->writing) {
744
745        if (p->aio) {
746            return NGX_AGAIN;
747        }
748
749        out = p->writing;
750        p->writing = NULL;
751
752        n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
753
754        if (n == NGX_ERROR) {
755            return NGX_ABORT;
756        }
757
758        goto done;
759    }
760
761#endif
762
763    if (p->buf_to_file) {
764        out = ngx_alloc_chain_link(p->pool);
765        if (out == NULL) {
766            return NGX_ABORT;
767        }
768
769        out->buf = p->buf_to_file;
770        out->next = p->in;
771
772    } else {
773        out = p->in;
774    }
775
776    if (!p->cacheable) {
777
778        size = 0;
779        cl = out;
780        ll = NULL;
781        prev_last_shadow = 1;
782
783        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
784                       "pipe offset: %O", p->temp_file->offset);
785
786        do {
787            bsize = cl->buf->last - cl->buf->pos;
788
789            ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
790                           "pipe buf ls:%d %p, pos %p, size: %z",
791                           cl->buf->last_shadow, cl->buf->start,
792                           cl->buf->pos, bsize);
793
794            if (prev_last_shadow
795                && ((size + bsize > p->temp_file_write_size)
796                    || (p->temp_file->offset + size + bsize
797                        > p->max_temp_file_size)))
798            {
799                break;
800            }
801
802            prev_last_shadow = cl->buf->last_shadow;
803
804            size += bsize;
805            ll = &cl->next;
806            cl = cl->next;
807
808        } while (cl);
809
810        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
811
812        if (ll == NULL) {
813            return NGX_BUSY;
814        }
815
816        if (cl) {
817            p->in = cl;
818            *ll = NULL;
819
820        } else {
821            p->in = NULL;
822            p->last_in = &p->in;
823        }
824
825    } else {
826        p->in = NULL;
827        p->last_in = &p->in;
828    }
829
830#if (NGX_THREADS)
831    if (p->thread_handler) {
832        p->temp_file->thread_write = 1;
833        p->temp_file->file.thread_task = p->thread_task;
834        p->temp_file->file.thread_handler = p->thread_handler;
835        p->temp_file->file.thread_ctx = p->thread_ctx;
836    }
837#endif
838
839    n = ngx_write_chain_to_temp_file(p->temp_file, out);
840
841    if (n == NGX_ERROR) {
842        return NGX_ABORT;
843    }
844
845#if (NGX_THREADS)
846
847    if (n == NGX_AGAIN) {
848        p->writing = out;
849        p->thread_task = p->temp_file->file.thread_task;
850        return NGX_AGAIN;
851    }
852
853done:
854
855#endif
856
857    if (p->buf_to_file) {
858        p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
859        n -= p->buf_to_file->last - p->buf_to_file->pos;
860        p->buf_to_file = NULL;
861        out = out->next;
862    }
863
864    if (n > 0) {
865        /* update previous buffer or add new buffer */
866
867        if (p->out) {
868            for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
869
870            b = cl->buf;
871
872            if (b->file_last == p->temp_file->offset) {
873                p->temp_file->offset += n;
874                b->file_last = p->temp_file->offset;
875                goto free;
876            }
877
878            last_out = &cl->next;
879
880        } else {
881            last_out = &p->out;
882        }
883
884        cl = ngx_chain_get_free_buf(p->pool, &p->free);
885        if (cl == NULL) {
886            return NGX_ABORT;
887        }
888
889        b = cl->buf;
890
891        ngx_memzero(b, sizeof(ngx_buf_t));
892
893        b->tag = p->tag;
894
895        b->file = &p->temp_file->file;
896        b->file_pos = p->temp_file->offset;
897        p->temp_file->offset += n;
898        b->file_last = p->temp_file->offset;
899
900        b->in_file = 1;
901        b->temp_file = 1;
902
903        *last_out = cl;
904    }
905
906free:
907
908    for (last_free = &p->free_raw_bufs;
909         *last_free != NULL;
910         last_free = &(*last_free)->next)
911    {
912        /* void */
913    }
914
915    for (cl = out; cl; cl = next) {
916        next = cl->next;
917
918        cl->next = p->free;
919        p->free = cl;
920
921        b = cl->buf;
922
923        if (b->last_shadow) {
924
925            tl = ngx_alloc_chain_link(p->pool);
926            if (tl == NULL) {
927                return NGX_ABORT;
928            }
929
930            tl->buf = b->shadow;
931            tl->next = NULL;
932
933            *last_free = tl;
934            last_free = &tl->next;
935
936            b->shadow->pos = b->shadow->start;
937            b->shadow->last = b->shadow->start;
938
939            ngx_event_pipe_remove_shadow_links(b->shadow);
940        }
941    }
942
943    return NGX_OK;
944}
945
946
947/* the copy input filter */
948
949ngx_int_t
950ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
951{
952    ngx_buf_t    *b;
953    ngx_chain_t  *cl;
954
955    if (buf->pos == buf->last) {
956        return NGX_OK;
957    }
958
959    cl = ngx_chain_get_free_buf(p->pool, &p->free);
960    if (cl == NULL) {
961        return NGX_ERROR;
962    }
963
964    b = cl->buf;
965
966    ngx_memcpy(b, buf, sizeof(ngx_buf_t));
967    b->shadow = buf;
968    b->tag = p->tag;
969    b->last_shadow = 1;
970    b->recycled = 1;
971    buf->shadow = b;
972
973    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
974
975    if (p->in) {
976        *p->last_in = cl;
977    } else {
978        p->in = cl;
979    }
980    p->last_in = &cl->next;
981
982    if (p->length == -1) {
983        return NGX_OK;
984    }
985
986    p->length -= b->last - b->pos;
987
988    return NGX_OK;
989}
990
991
992static ngx_inline void
993ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
994{
995    ngx_buf_t  *b, *next;
996
997    b = buf->shadow;
998
999    if (b == NULL) {
1000        return;
1001    }
1002
1003    while (!b->last_shadow) {
1004        next = b->shadow;
1005
1006        b->temporary = 0;
1007        b->recycled = 0;
1008
1009        b->shadow = NULL;
1010        b = next;
1011    }
1012
1013    b->temporary = 0;
1014    b->recycled = 0;
1015    b->last_shadow = 0;
1016
1017    b->shadow = NULL;
1018
1019    buf->shadow = NULL;
1020}
1021
1022
1023ngx_int_t
1024ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
1025{
1026    ngx_chain_t  *cl;
1027
1028    cl = ngx_alloc_chain_link(p->pool);
1029    if (cl == NULL) {
1030        return NGX_ERROR;
1031    }
1032
1033    if (p->buf_to_file && b->start == p->buf_to_file->start) {
1034        b->pos = p->buf_to_file->last;
1035        b->last = p->buf_to_file->last;
1036
1037    } else {
1038        b->pos = b->start;
1039        b->last = b->start;
1040    }
1041
1042    b->shadow = NULL;
1043
1044    cl->buf = b;
1045
1046    if (p->free_raw_bufs == NULL) {
1047        p->free_raw_bufs = cl;
1048        cl->next = NULL;
1049
1050        return NGX_OK;
1051    }
1052
1053    if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
1054
1055        /* add the free buf to the list start */
1056
1057        cl->next = p->free_raw_bufs;
1058        p->free_raw_bufs = cl;
1059
1060        return NGX_OK;
1061    }
1062
1063    /* the first free buf is partially filled, thus add the free buf after it */
1064
1065    cl->next = p->free_raw_bufs->next;
1066    p->free_raw_bufs->next = cl;
1067
1068    return NGX_OK;
1069}
1070
1071
1072static ngx_int_t
1073ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
1074{
1075    ngx_chain_t  *cl, *tl;
1076
1077    for ( ;; ) {
1078        if (p->busy) {
1079            cl = p->busy;
1080            p->busy = NULL;
1081
1082        } else if (p->out) {
1083            cl = p->out;
1084            p->out = NULL;
1085
1086        } else if (p->in) {
1087            cl = p->in;
1088            p->in = NULL;
1089
1090        } else {
1091            return NGX_OK;
1092        }
1093
1094        while (cl) {
1095            if (cl->buf->last_shadow) {
1096                if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
1097                    return NGX_ABORT;
1098                }
1099
1100                cl->buf->last_shadow = 0;
1101            }
1102
1103            cl->buf->shadow = NULL;
1104            tl = cl->next;
1105            cl->next = p->free;
1106            p->free = cl;
1107            cl = tl;
1108        }
1109    }
1110}
1111