ngx_stream_write_filter_module.c revision e18a033b
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8#include <ngx_config.h>
9#include <ngx_core.h>
10#include <ngx_stream.h>
11
12
13typedef struct {
14    ngx_chain_t  *from_upstream;
15    ngx_chain_t  *from_downstream;
16} ngx_stream_write_filter_ctx_t;
17
18
19static ngx_int_t ngx_stream_write_filter(ngx_stream_session_t *s,
20    ngx_chain_t *in, ngx_uint_t from_upstream);
21static ngx_int_t ngx_stream_write_filter_init(ngx_conf_t *cf);
22
23
24static ngx_stream_module_t  ngx_stream_write_filter_module_ctx = {
25    NULL,                                  /* preconfiguration */
26    ngx_stream_write_filter_init,          /* postconfiguration */
27
28    NULL,                                  /* create main configuration */
29    NULL,                                  /* init main configuration */
30
31    NULL,                                  /* create server configuration */
32    NULL                                   /* merge server configuration */
33};
34
35
36ngx_module_t  ngx_stream_write_filter_module = {
37    NGX_MODULE_V1,
38    &ngx_stream_write_filter_module_ctx,   /* module context */
39    NULL,                                  /* module directives */
40    NGX_STREAM_MODULE,                     /* module type */
41    NULL,                                  /* init master */
42    NULL,                                  /* init module */
43    NULL,                                  /* init process */
44    NULL,                                  /* init thread */
45    NULL,                                  /* exit thread */
46    NULL,                                  /* exit process */
47    NULL,                                  /* exit master */
48    NGX_MODULE_V1_PADDING
49};
50
51
52static ngx_int_t
53ngx_stream_write_filter(ngx_stream_session_t *s, ngx_chain_t *in,
54    ngx_uint_t from_upstream)
55{
56    off_t                           size;
57    ngx_uint_t                      last, flush, sync;
58    ngx_chain_t                    *cl, *ln, **ll, **out, *chain;
59    ngx_connection_t               *c;
60    ngx_stream_write_filter_ctx_t  *ctx;
61
62    ctx = ngx_stream_get_module_ctx(s, ngx_stream_write_filter_module);
63
64    if (ctx == NULL) {
65        ctx = ngx_pcalloc(s->connection->pool,
66                          sizeof(ngx_stream_write_filter_ctx_t));
67        if (ctx == NULL) {
68            return NGX_ERROR;
69        }
70
71        ngx_stream_set_ctx(s, ctx, ngx_stream_write_filter_module);
72    }
73
74    if (from_upstream) {
75        c = s->connection;
76        out = &ctx->from_upstream;
77
78    } else {
79        c = s->upstream->peer.connection;
80        out = &ctx->from_downstream;
81    }
82
83    if (c->error) {
84        return NGX_ERROR;
85    }
86
87    size = 0;
88    flush = 0;
89    sync = 0;
90    last = 0;
91    ll = out;
92
93    /* find the size, the flush point and the last link of the saved chain */
94
95    for (cl = *out; cl; cl = cl->next) {
96        ll = &cl->next;
97
98        ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
99                       "write old buf t:%d f:%d %p, pos %p, size: %z "
100                       "file: %O, size: %O",
101                       cl->buf->temporary, cl->buf->in_file,
102                       cl->buf->start, cl->buf->pos,
103                       cl->buf->last - cl->buf->pos,
104                       cl->buf->file_pos,
105                       cl->buf->file_last - cl->buf->file_pos);
106
107#if 1
108        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
109            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
110                          "zero size buf in writer "
111                          "t:%d r:%d f:%d %p %p-%p %p %O-%O",
112                          cl->buf->temporary,
113                          cl->buf->recycled,
114                          cl->buf->in_file,
115                          cl->buf->start,
116                          cl->buf->pos,
117                          cl->buf->last,
118                          cl->buf->file,
119                          cl->buf->file_pos,
120                          cl->buf->file_last);
121
122            ngx_debug_point();
123            return NGX_ERROR;
124        }
125#endif
126
127        size += ngx_buf_size(cl->buf);
128
129        if (cl->buf->flush || cl->buf->recycled) {
130            flush = 1;
131        }
132
133        if (cl->buf->sync) {
134            sync = 1;
135        }
136
137        if (cl->buf->last_buf) {
138            last = 1;
139        }
140    }
141
142    /* add the new chain to the existent one */
143
144    for (ln = in; ln; ln = ln->next) {
145        cl = ngx_alloc_chain_link(c->pool);
146        if (cl == NULL) {
147            return NGX_ERROR;
148        }
149
150        cl->buf = ln->buf;
151        *ll = cl;
152        ll = &cl->next;
153
154        ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
155                       "write new buf t:%d f:%d %p, pos %p, size: %z "
156                       "file: %O, size: %O",
157                       cl->buf->temporary, cl->buf->in_file,
158                       cl->buf->start, cl->buf->pos,
159                       cl->buf->last - cl->buf->pos,
160                       cl->buf->file_pos,
161                       cl->buf->file_last - cl->buf->file_pos);
162
163#if 1
164        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
165            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
166                          "zero size buf in writer "
167                          "t:%d r:%d f:%d %p %p-%p %p %O-%O",
168                          cl->buf->temporary,
169                          cl->buf->recycled,
170                          cl->buf->in_file,
171                          cl->buf->start,
172                          cl->buf->pos,
173                          cl->buf->last,
174                          cl->buf->file,
175                          cl->buf->file_pos,
176                          cl->buf->file_last);
177
178            ngx_debug_point();
179            return NGX_ERROR;
180        }
181#endif
182
183        size += ngx_buf_size(cl->buf);
184
185        if (cl->buf->flush || cl->buf->recycled) {
186            flush = 1;
187        }
188
189        if (cl->buf->sync) {
190            sync = 1;
191        }
192
193        if (cl->buf->last_buf) {
194            last = 1;
195        }
196    }
197
198    *ll = NULL;
199
200    ngx_log_debug3(NGX_LOG_DEBUG_STREAM, c->log, 0,
201                   "stream write filter: l:%ui f:%ui s:%O", last, flush, size);
202
203    if (size == 0
204        && !(c->buffered & NGX_LOWLEVEL_BUFFERED)
205        && !(last && c->need_last_buf))
206    {
207        if (last || flush || sync) {
208            for (cl = *out; cl; /* void */) {
209                ln = cl;
210                cl = cl->next;
211                ngx_free_chain(c->pool, ln);
212            }
213
214            *out = NULL;
215            c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
216
217            return NGX_OK;
218        }
219
220        ngx_log_error(NGX_LOG_ALERT, c->log, 0,
221                      "the stream output chain is empty");
222
223        ngx_debug_point();
224
225        return NGX_ERROR;
226    }
227
228    chain = c->send_chain(c, *out, 0);
229
230    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
231                   "stream write filter %p", chain);
232
233    if (chain == NGX_CHAIN_ERROR) {
234        c->error = 1;
235        return NGX_ERROR;
236    }
237
238    for (cl = *out; cl && cl != chain; /* void */) {
239        ln = cl;
240        cl = cl->next;
241        ngx_free_chain(c->pool, ln);
242    }
243
244    *out = chain;
245
246    if (chain) {
247        if (c->shared) {
248            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
249                          "shared connection is busy");
250            return NGX_ERROR;
251        }
252
253        c->buffered |= NGX_STREAM_WRITE_BUFFERED;
254        return NGX_AGAIN;
255    }
256
257    c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
258
259    if (c->buffered & NGX_LOWLEVEL_BUFFERED) {
260        return NGX_AGAIN;
261    }
262
263    return NGX_OK;
264}
265
266
267static ngx_int_t
268ngx_stream_write_filter_init(ngx_conf_t *cf)
269{
270    ngx_stream_top_filter = ngx_stream_write_filter;
271
272    return NGX_OK;
273}
274