Skip to content

Commit 64e166d

Browse files
authored
feat: add method to reset read buf (#47)
1 parent e309d53 commit 64e166d

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

lib/resty/apisix/stream/xrpc/socket.lua

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,16 @@ int
3939
ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r,
4040
ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf,
4141
size_t *errbuf_size);
42+
43+
void
44+
ngx_stream_lua_ffi_socket_tcp_reset_read_buf(ngx_stream_lua_request_t *r,
45+
ngx_stream_lua_socket_tcp_upstream_t *u);
4246
]]
4347
local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf
4448
local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result
4549
local socket_tcp_move = C.ngx_stream_lua_ffi_socket_tcp_send_from_socket
4650
local socket_tcp_get_move_result = C.ngx_stream_lua_ffi_socket_tcp_get_send_result
51+
local socket_tcp_reset_read_buf = C.ngx_stream_lua_ffi_socket_tcp_reset_read_buf
4752

4853

4954
local ERR_BUF_SIZE = 256
@@ -191,6 +196,19 @@ local function move(dst, src)
191196
end
192197

193198

199+
-- reset buffer read from methods `read` or `drain`. Should be used when you don't
200+
-- want to forward some buffers
201+
local function reset_read_buf(cosocket)
202+
local r = get_request()
203+
if not r then
204+
error("no request found", 2)
205+
end
206+
207+
local u = get_tcp_socket(cosocket)
208+
socket_tcp_reset_read_buf(r, u)
209+
end
210+
211+
194212
local function patch_methods(sk)
195213
local methods = getmetatable(sk).__index
196214
local copy = tab_clone(methods)
@@ -202,6 +220,7 @@ local function patch_methods(sk)
202220
copy.read = read
203221
copy.drain = drain
204222
copy.move = move
223+
copy.reset_read_buf = reset_read_buf
205224

206225
return {__index = copy}
207226
end

patch/1.19.9/ngx_stream_lua-xrpc.patch

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
diff --git src/ngx_stream_lua_socket_tcp.c src/ngx_stream_lua_socket_tcp.c
2-
index 7fcfb45..13e942d 100644
2+
index 7fcfb45..007305b 100644
33
--- src/ngx_stream_lua_socket_tcp.c
44
+++ src/ngx_stream_lua_socket_tcp.c
55
@@ -234,6 +234,41 @@ enum {
@@ -44,7 +44,7 @@ index 7fcfb45..13e942d 100644
4444

4545
static char ngx_stream_lua_raw_req_socket_metatable_key;
4646
static char ngx_stream_lua_tcp_socket_metatable_key;
47-
@@ -6005,6 +6040,605 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
47+
@@ -6005,6 +6040,620 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
4848
}
4949

5050

@@ -55,6 +55,10 @@ index 7fcfb45..13e942d 100644
5555
+ ngx_chain_t *cl = u->bufs_in;
5656
+ ngx_chain_t **ll = NULL;
5757
+
58+
+ if (cl == NULL) {
59+
+ return;
60+
+ }
61+
+
5862
+ if (cl->next) {
5963
+ ll = &cl->next;
6064
+ }
@@ -77,6 +81,17 @@ index 7fcfb45..13e942d 100644
7781
+}
7882
+
7983
+
84+
+void
85+
+ngx_stream_lua_ffi_socket_tcp_reset_read_buf(ngx_stream_lua_request_t *r,
86+
+ ngx_stream_lua_socket_tcp_upstream_t *u)
87+
+{
88+
+ ngx_stream_lua_ctx_t *ctx;
89+
+
90+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
91+
+ ngx_stream_lua_ffi_socket_reset_buf(ctx, u);
92+
+}
93+
+
94+
+
8095
+static int
8196
+ngx_stream_lua_socket_tcp_dummy_retval_handler(ngx_stream_lua_request_t *r,
8297
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)

t/stream/xrpc/downstream.t

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,3 +299,76 @@ stream lua tcp socket allocate new new buf of size 144
299299
}
300300
--- stream_request
301301
hello world
302+
303+
304+
305+
=== TEST 11: read & drain & move & reset_read_buf
306+
--- stream_config
307+
server {
308+
listen 1995;
309+
content_by_lua_block {
310+
local sk = ngx.req.socket(true)
311+
local data = sk:receiveany(128)
312+
local exp = "rld"
313+
if data ~= exp then
314+
ngx.log(ngx.ERR, "actual: ", data, ", expected: ", exp)
315+
end
316+
}
317+
}
318+
--- stream_server_config
319+
content_by_lua_block {
320+
local ffi = require("ffi")
321+
local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket()
322+
ds:settimeout(5)
323+
324+
local us = require("resty.apisix.stream.xrpc.socket").upstream.socket()
325+
us:settimeout(50)
326+
assert(us:connect("127.0.0.1", 1995))
327+
328+
assert(ds:read(4))
329+
assert(ds:drain(4))
330+
ds:reset_read_buf()
331+
assert(us:move(ds))
332+
assert(ds:drain(3))
333+
assert(us:move(ds))
334+
}
335+
--- stream_request
336+
hello world
337+
338+
339+
340+
=== TEST 12: move & reset_read_buf + read over buffer in the middle
341+
--- stream_config
342+
server {
343+
listen 1995;
344+
content_by_lua_block {
345+
local sk = ngx.req.socket(true)
346+
local data = sk:receive(9 * 8)
347+
local exp = ("123456789"):rep(8)
348+
if data ~= exp then
349+
ngx.log(ngx.ERR, "actual: ", data, ", expected: ", exp)
350+
end
351+
}
352+
}
353+
--- stream_server_config
354+
lua_socket_buffer_size 128;
355+
content_by_lua_block {
356+
local ffi = require("ffi")
357+
local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket()
358+
sk:settimeout(5)
359+
360+
local us = require("resty.apisix.stream.xrpc.socket").upstream.socket()
361+
us:settimeout(50)
362+
assert(us:connect("127.0.0.1", 1995))
363+
364+
local len = 9 * 8
365+
local p = assert(sk:read(len))
366+
local len = 9 * 16
367+
local p = assert(sk:read(len))
368+
sk:reset_read_buf()
369+
local len = 9 * 8
370+
local p = assert(sk:read(len))
371+
assert(us:move(sk))
372+
}
373+
--- stream_request eval
374+
"123456789" x 32

0 commit comments

Comments
 (0)