diff --git a/src/networking.c b/src/networking.c index d3d9256642b..f2e15e9782a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -27,6 +27,9 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ +__thread sds thread_reusable_qb = NULL; +__thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query + * buffer due to nested command execution. */ /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -144,7 +147,7 @@ client *createClient(connection *conn) { c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; - c->querybuf = sdsempty(); + c->querybuf = NULL; c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -1575,6 +1578,28 @@ void deauthenticateAndCloseClient(client *c) { } } +/* Resets the reusable query buffer used by the given client. + * If any data remained in the buffer, the client will take ownership of the buffer + * and a new empty buffer will be allocated for the reusable buffer. */ +static void resetReusableQueryBuf(client *c) { + serverAssert(c->flags & CLIENT_REUSABLE_QUERYBUFFER); + if (c->querybuf != thread_reusable_qb || sdslen(c->querybuf) > c->qb_pos) { + /* If querybuf has been reallocated or there is still data left, + * let the client take ownership of the reusable buffer. */ + thread_reusable_qb = NULL; + } else { + /* It is safe to dereference and reuse the reusable query buffer. */ + c->querybuf = NULL; + c->qb_pos = 0; + sdsclear(thread_reusable_qb); + } + + /* Mark that the client is no longer using the reusable query buffer + * and indicate that it is no longer used by any client. */ + c->flags &= ~CLIENT_REUSABLE_QUERYBUFFER; + thread_reusable_qb_used = 0; +} + void freeClient(client *c) { listNode *ln; @@ -1629,6 +1654,8 @@ void freeClient(client *c) { } /* Free the query buffer */ + if (c->flags & CLIENT_REUSABLE_QUERYBUFFER) + resetReusableQueryBuf(c); sdsfree(c->querybuf); c->querybuf = NULL; @@ -2674,6 +2701,11 @@ void readQueryFromClient(connection *conn) { if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { + /* For big argv, the client always uses its private query buffer. + * Using the reusable query buffer would eventually expand it beyond 32k, + * causing the client to take ownership of the reusable query buffer. */ + if (!c->querybuf) c->querybuf = sdsempty(); + ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1; @@ -2685,6 +2717,26 @@ void readQueryFromClient(connection *conn) { * but doesn't need align to the next arg, we can read more data. */ if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; + } else if (c->querybuf == NULL) { + if (unlikely(thread_reusable_qb_used)) { + /* The reusable query buffer is already used by another client, + * switch to using the client's private query buffer. This only + * occurs when commands are executed nested via processEventsWhileBlocked(). */ + c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(c->querybuf); + } else { + /* Create the reusable query buffer if it doesn't exist. */ + if (!thread_reusable_qb) { + thread_reusable_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_reusable_qb); + } + + /* Assign the reusable query buffer to the client and mark it as in use. */ + serverAssert(sdslen(thread_reusable_qb) == 0); + c->querybuf = thread_reusable_qb; + c->flags |= CLIENT_REUSABLE_QUERYBUFFER; + thread_reusable_qb_used = 1; + } } qblen = sdslen(c->querybuf); @@ -2708,7 +2760,7 @@ void readQueryFromClient(connection *conn) { nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { - return; + goto done; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); @@ -2760,6 +2812,10 @@ void readQueryFromClient(connection *conn) { c = NULL; done: + if (c && (c->flags & CLIENT_REUSABLE_QUERYBUFFER)) { + serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */ + resetReusableQueryBuf(c); + } beforeNextClient(c); } @@ -2875,8 +2931,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", (unsigned long long) sdslen(client->querybuf), - " qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), + " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, + " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3856,9 +3912,10 @@ size_t getClientOutputBufferMemoryUsage(client *c) { * the client output buffer memory usage portion of the total. */ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); + if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; - mem += sdsZmallocSize(c->querybuf); + mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory diff --git a/src/replication.c b/src/replication.c index 52e901e88ff..054cdacb753 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3,13 +3,15 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). * * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ - #include "server.h" #include "cluster.h" #include "bio.h" @@ -1765,6 +1767,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) { * connection. */ server.master->flags |= CLIENT_MASTER; + /* Allocate a private query buffer for the master client instead of using the reusable query buffer. + * This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */ + server.master->querybuf = sdsempty(); server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; diff --git a/src/server.c b/src/server.c index 1a7bf27001e..381b6f63bba 100644 --- a/src/server.c +++ b/src/server.c @@ -2,8 +2,13 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" @@ -734,6 +739,8 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + /* If the client query buffer is NULL, it is using the reusable query buffer and there is nothing to do. */ + if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -743,7 +750,18 @@ int clientsCronResizeQueryBuffer(client *c) { /* There are two conditions to resize the query buffer: */ if (idletime > 2) { /* 1) Query is idle for a long time. */ - c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + if (!(c->flags & CLIENT_MASTER) && !remaining) { + /* If the client is not a master and no data is pending, + * The client can safely use the reusable query buffer in the next read - free the client's querybuf. */ + sdsfree(c->querybuf); + /* By setting the querybuf to NULL, the client will use the reusable query buffer in the next read. + * We don't move the client to the reusable query buffer immediately, because if we allocated a private + * query buffer for the client, it's likely that the client will use it again soon. */ + c->querybuf = NULL; + } else { + c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + } } else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) { /* 2) Query buffer is too big for latest peak and is larger than * resize threshold. Trim excess space but only up to a limit, @@ -759,7 +777,7 @@ int clientsCronResizeQueryBuffer(client *c) { /* Reset the peak again to capture the peak memory usage in the next * cycle. */ - c->querybuf_peak = sdslen(c->querybuf); + c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0; /* We reset to either the current used, or currently processed bulk size, * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2; @@ -834,8 +852,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; int clientsCronTrackExpansiveClients(client *c, int time_idx) { - size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum + - (c->argv ? zmalloc_size(c->argv) : 0); + size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; + size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; + size_t in_usage = qb_size + c->argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -6567,7 +6586,7 @@ void dismissMemory(void* ptr, size_t size_hint) { void dismissClientMemory(client *c) { /* Dismiss client query buffer and static reply buffer. */ dismissMemory(c->buf, c->buf_usable_size); - dismissSds(c->querybuf); + if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { diff --git a/src/server.h b/src/server.h index 459d5b9744e..70ef070a8c4 100644 --- a/src/server.h +++ b/src/server.h @@ -388,6 +388,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ +#define CLIENT_REUSABLE_QUERYBUFFER (1ULL<<51) /* The client is using the reusable query buffer. */ /* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */ #define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE) diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index f4859dd2782..d0591115645 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -1,3 +1,15 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# proc client_idle_sec {name} { set clients [split [r client list] "\r\n"] set c [lsearch -inline $clients *name=$name*] @@ -24,16 +36,45 @@ start_server {tags {"querybuf slow"}} { # The test will run at least 2s to check if client query # buffer will be resized when client idle 2s. test "query buffer resized correctly" { - set rd [redis_client] + + set rd [redis_deferring_client] + $rd client setname test_client + $rd read + + # Make sure query buff has size of 0 bytes at start as the client uses the reusable qb. + assert {[client_query_buffer test_client] == 0} + + # Pause cron to prevent premature shrinking (timing issue). + r debug pause-cron 1 + + # Send partial command to client to make sure it doesn't use the reusable qb. + $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" + $rd flush + # Wait for the client to start using a private query buffer. + wait_for_condition 1000 10 { + [client_query_buffer test_client] > 0 + } else { + fail "client should start using a private query buffer" + } + + # send the rest of the command + $rd write "a\r\n\$1\r\nb\r\n" + $rd flush + assert_equal {OK} [$rd read] + set orig_test_client_qbuf [client_query_buffer test_client] # Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k # but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k - assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf < 32768} + set MAX_QUERY_BUFFER_SIZE [expr 32768 + 2] ; # 32k + 2, allowing for potential greedy allocation of (16k + 1) * 2 bytes for the query buffer. + assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf <= $MAX_QUERY_BUFFER_SIZE} + + # Allow shrinking to occur + r debug pause-cron 0 # Check that the initial query buffer is resized after 2 sec wait_for_condition 1000 10 { - [client_idle_sec test_client] >= 3 && [client_query_buffer test_client] == 0 + [client_idle_sec test_client] >= 3 && [client_query_buffer test_client] < $orig_test_client_qbuf } else { fail "query buffer was not resized" } @@ -75,10 +116,27 @@ start_server {tags {"querybuf slow"}} { test "query buffer resized correctly with fat argv" { set rd [redis_client] $rd client setname test_client + + # Pause cron to prevent premature shrinking (timing issue). + r debug pause-cron 1 + $rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" $rd flush + + # Wait for the client to start using a private query buffer of > 1000000 size. + wait_for_condition 1000 10 { + [client_query_buffer test_client] > 1000000 + } else { + fail "client should start using a private query buffer" + } - after 20 + # Send the start of the arg and make sure the client is not using reusable qb for it rather a private buf of > 1000000 size. + $rd write "a" + $rd flush + + r debug pause-cron 0 + + after 120 if {[client_query_buffer test_client] < 1000000} { fail "query buffer should not be resized when client idle time smaller than 2s" } @@ -92,5 +150,24 @@ start_server {tags {"querybuf slow"}} { $rd close } +} + +start_server {tags {"querybuf"}} { + test "Client executes small argv commands using reusable query buffer" { + set rd [redis_deferring_client] + $rd client setname test_client + $rd read + set res [r client list] + + # Verify that the client does not create a private query buffer after + # executing a small parameter command. + assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res + # The client executing the command is currently using the reusable query buffer, + # so the size shown is that of the reusable query buffer. It will be returned + # to the reusable query buffer after command execution. + assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res + + $rd close + } }