From 9ce10d34ad3359d22205b1942c08075b7f55416d Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Tue, 28 May 2024 21:09:37 +0300 Subject: [PATCH 01/20] Introduce shared query buffer for client reads (#258) This PR optimizes client query buffer handling in Valkey by introducing a shared query buffer that is used by default for client reads. This reduces memory usage by ~20KB per client by avoiding allocations for most clients using short (<16KB) complete commands. For larger or partial commands, the client still gets its own private buffer. The primary changes are: * Adding a shared query buffer `shared_qb` that clients use by default * Modifying client querybuf initialization and reset logic * Copying any partial query from shared to private buffer before command execution * Freeing idle client query buffers when empty to allow reuse of shared buffer * Master client query buffers are kept private as their contents need to be preserved for replication stream In addition to the memory savings, this change shows a 3% improvement in latency and throughput when running with 1000 active clients. The memory reduction may also help reduce the need to evict clients when reaching max memory limit, as the query buffer is the main memory consumer per client. --------- Signed-off-by: Uri Yagelnik Signed-off-by: Madelyn Olson Co-authored-by: Madelyn Olson --- src/networking.c | 104 ++++++++++++++++++++++++++++------- src/replication.c | 3 + src/server.c | 25 +++++++-- src/server.h | 1 + tests/unit/introspection.tcl | 4 +- tests/unit/querybuf.tcl | 23 +++++++- 6 files changed, 133 insertions(+), 27 deletions(-) diff --git a/src/networking.c b/src/networking.c index d3d9256642b..0694c865335 100644 --- a/src/networking.c +++ b/src/networking.c @@ -27,6 +27,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ +__thread sds thread_shared_qb = NULL; /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -144,7 +145,7 @@ client *createClient(connection *conn) { c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; - c->querybuf = sdsempty(); + c->querybuf = NULL; c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -1629,7 +1630,11 @@ void freeClient(client *c) { } /* Free the query buffer */ - sdsfree(c->querybuf); + if (c->querybuf && c->querybuf == thread_shared_qb) { + sdsclear(c->querybuf); + } else { + sdsfree(c->querybuf); + } c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -2132,6 +2137,48 @@ void resetClient(client *c) { } } +/* Initializes the shared query buffer to a new sds with the default capacity */ +void initSharedQueryBuf(void) { + thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_shared_qb); +} + +/* Resets the shared query buffer used by the given client. + * If any data remained in the buffer, the client will take ownership of the buffer + * and a new empty buffer will be allocated for the shared buffer. */ +void resetSharedQueryBuf(client *c) { + serverAssert(c->querybuf == thread_shared_qb); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + + if (remaining > 0) { + /* Let the client take ownership of the shared buffer. */ + initSharedQueryBuf(); + return; + } + + c->querybuf = NULL; + sdsclear(thread_shared_qb); + c->qb_pos = 0; +} + +/* Trims the client query buffer to the current position. */ +void trimClientQueryBuffer(client *c) { + if (c->querybuf == thread_shared_qb) { + resetSharedQueryBuf(c); + } + + if (c->querybuf == NULL) { + return; + } + + serverAssert(c->qb_pos <= sdslen(c->querybuf)); + + if (c->qb_pos > 0) { + sdsrange(c->querybuf, c->qb_pos, -1); + c->qb_pos = 0; + } +} + /* This function is used when we want to re-enter the event loop but there * is the risk that the client we are dealing with will be freed in some * way. This happens for instance in: @@ -2394,6 +2441,10 @@ int processMultibulkBuffer(client *c) { * ll+2, trimming querybuf is just a waste of time, because * at this time the querybuf contains not only our bulk. */ if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { + if (c->querybuf == thread_shared_qb) { + /* Let the client take the ownership of the shared buffer. */ + initSharedQueryBuf(); + } sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; /* Hint the sds library about the amount of bytes this string is @@ -2558,7 +2609,7 @@ int processPendingCommandAndInputBuffer(client *c) { * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { + while (c->querybuf && c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2609,6 +2660,13 @@ int processInputBuffer(client *c) { break; } + if (c->querybuf == thread_shared_qb) { + /* Before processing the command, reset the shared query buffer to its default state. + * This avoids unintentionally modifying the shared qb during processCommand as we may use + * the shared qb for other clients during processEventsWhileBlocked */ + resetSharedQueryBuf(c); + } + /* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this @@ -2637,10 +2695,8 @@ int processInputBuffer(client *c) { c->qb_pos -= c->repl_applied; c->repl_applied = 0; } - } else if (c->qb_pos) { - /* Trim to pos */ - sdsrange(c->querybuf,c->qb_pos,-1); - c->qb_pos = 0; + } else { + trimClientQueryBuffer(c); } /* Update client memory usage after processing the query buffer, this is @@ -2665,16 +2721,16 @@ void readQueryFromClient(connection *conn) { atomicIncr(server.stat_total_reads_processed, 1); readlen = PROTO_IOBUF_LEN; + qblen = c->querybuf ? sdslen(c->querybuf) : 0; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the - * Redis Object representing the argument. */ - if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 - && c->bulklen >= PROTO_MBULK_BIG_ARG) - { - ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); + * robj representing the argument. */ + + if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { + ssize_t remaining = (size_t)(c->bulklen + 2) - (qblen - c->qb_pos); big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, @@ -2687,7 +2743,12 @@ void readQueryFromClient(connection *conn) { readlen = PROTO_IOBUF_LEN; } - qblen = sdslen(c->querybuf); + if (c->querybuf == NULL) { + serverAssert(sdslen(thread_shared_qb) == 0); + c->querybuf = big_arg ? sdsempty() : thread_shared_qb; + qblen = sdslen(c->querybuf); + } + if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg @@ -2708,7 +2769,7 @@ void readQueryFromClient(connection *conn) { nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { - return; + goto done; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); @@ -2760,6 +2821,10 @@ void readQueryFromClient(connection *conn) { c = NULL; done: + if (c && c->querybuf == thread_shared_qb) { + sdsclear(thread_shared_qb); + c->querybuf = NULL; + } beforeNextClient(c); } @@ -2875,8 +2940,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", (unsigned long long) sdslen(client->querybuf), - " qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), + " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, + " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3856,9 +3921,9 @@ size_t getClientOutputBufferMemoryUsage(client *c) { * the client output buffer memory usage portion of the total. */ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); - if (output_buffer_mem_usage != NULL) - *output_buffer_mem_usage = mem; - mem += sdsZmallocSize(c->querybuf); + + if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; + mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory @@ -4248,6 +4313,7 @@ void *IOThreadMain(void *myid) { redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist); makeThreadKillable(); + initSharedQueryBuf(); while(1) { /* Wait for start */ diff --git a/src/replication.c b/src/replication.c index 52e901e88ff..c64ce239141 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1765,6 +1765,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) { * connection. */ server.master->flags |= CLIENT_MASTER; + /* Allocate a private query buffer for the master client instead of using the shared query buffer. + * This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */ + server.master->querybuf = sdsempty(); server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; diff --git a/src/server.c b/src/server.c index 1a7bf27001e..57ed3a8a85a 100644 --- a/src/server.c +++ b/src/server.c @@ -734,6 +734,8 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + /* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */ + if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -743,7 +745,18 @@ int clientsCronResizeQueryBuffer(client *c) { /* There are two conditions to resize the query buffer: */ if (idletime > 2) { /* 1) Query is idle for a long time. */ - c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + if (!(c->flags & CLIENT_MASTER) && !remaining) { + /* If the client is not a master and no data is pending, + * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ + sdsfree(c->querybuf); + /* By setting the querybuf to NULL, the client will use the shared query buffer in the next read. + * We don't move the client to the shared query buffer immediately, because if we allocated a private + * query buffer for the client, it's likely that the client will use it again soon. */ + c->querybuf = NULL; + } else { + c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + } } else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) { /* 2) Query buffer is too big for latest peak and is larger than * resize threshold. Trim excess space but only up to a limit, @@ -759,7 +772,7 @@ int clientsCronResizeQueryBuffer(client *c) { /* Reset the peak again to capture the peak memory usage in the next * cycle. */ - c->querybuf_peak = sdslen(c->querybuf); + c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0; /* We reset to either the current used, or currently processed bulk size, * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2; @@ -834,8 +847,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; int clientsCronTrackExpansiveClients(client *c, int time_idx) { - size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum + - (c->argv ? zmalloc_size(c->argv) : 0); + size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; + size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; + size_t in_usage = qb_size + c->argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -2790,6 +2804,7 @@ void initServer(void) { } slowlogInit(); latencyMonitorInit(); + initSharedQueryBuf(); /* Initialize ACL default password if it exists */ ACLUpdateDefaultUserPassword(server.requirepass); @@ -6567,7 +6582,7 @@ void dismissMemory(void* ptr, size_t size_hint) { void dismissClientMemory(client *c) { /* Dismiss client query buffer and static reply buffer. */ dismissMemory(c->buf, c->buf_usable_size); - dismissSds(c->querybuf); + if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { diff --git a/src/server.h b/src/server.h index 459d5b9744e..f67c3c7f27b 100644 --- a/src/server.h +++ b/src/server.h @@ -2687,6 +2687,7 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initThreadedIO(void); +void initSharedQueryBuf(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index fbd1d14fefa..be84a78b279 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=*} test {CLIENT KILL with illegal arguments} { assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill} diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index f4859dd2782..e964098bbae 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -24,8 +24,24 @@ start_server {tags {"querybuf slow"}} { # The test will run at least 2s to check if client query # buffer will be resized when client idle 2s. test "query buffer resized correctly" { - set rd [redis_client] + + set rd [redis_deferring_client] + $rd client setname test_client + $rd read + + # Make sure query buff has size of 0 bytes at start as the client uses the shared qb. + assert {[client_query_buffer test_client] == 0} + + # Send partial command to client to make sure it doesn't use the shared qb. + $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" + $rd flush + after 100 + # send the rest of the command + $rd write "a\r\n\$1\r\nb\r\n" + $rd flush + assert_equal {OK} [$rd read] + set orig_test_client_qbuf [client_query_buffer test_client] # Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k # but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k @@ -78,6 +94,11 @@ start_server {tags {"querybuf slow"}} { $rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" $rd flush + after 200 + # Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size. + $rd write "a" + $rd flush + after 20 if {[client_query_buffer test_client] < 1000000} { fail "query buffer should not be resized when client idle time smaller than 2s" From 14cf2d1d4662f2157c1222a10085b56d09361ccb Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Mon, 3 Jun 2024 21:15:28 +0300 Subject: [PATCH 02/20] Adjust query buffer resized correctly test to non-jemalloc allocators. (#593) Test `query buffer resized correctly` start to fail (https://github.com/valkey-io/valkey/actions/runs/9278013807) with non-jemalloc allocators after https://github.com/valkey-io/valkey/pull/258 PR. With Jemalloc we allocate ~20K for the query buffer, in the test we read 1 byte in the first read, in the second read we make sure we have at least 16KB free place in the query buffer and we have as Jemalloc allocated 20KB, But with non jemalloc we allocate in the first read exactly 16KB. in the second read we check and see that we don't have 16KB free space as we already read 1 byte hence we reallocate this time greedly (*2 of the requested size of 16KB+1) hence the test condition that the querybuf size is < 32KB is no longer true The `query buffer resized correctly test` starts [failing](https://github.com/valkey-io/valkey/actions/runs/9278013807) with non-jemalloc allocators after PR #258 . With jemalloc, we allocate ~20KB for the query buffer. In the test, we read 1 byte initially and then ensure there is at least 16KB of free space in the buffer for the second read, which is satisfied by jemalloc's 20KB allocation. However, with non-jemalloc allocators, the first read allocates exactly 16KB. When we check again, we don't have 16KB free due to the 1 byte already read. This triggers a greedy reallocation (doubling the requested size of 16KB+1), causing the query buffer size to exceed the 32KB limit, thus failing the test condition. This PR adjusted the test query buffer upper limit to be 32KB +2. Signed-off-by: Uri Yagelnik --- tests/unit/querybuf.tcl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index e964098bbae..1d8bffca5bd 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -45,7 +45,8 @@ start_server {tags {"querybuf slow"}} { set orig_test_client_qbuf [client_query_buffer test_client] # Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k # but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k - assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf < 32768} + set MAX_QUERY_BUFFER_SIZE [expr 32768 + 2] ; # 32k + 2, allowing for potential greedy allocation of (16k + 1) * 2 bytes for the query buffer. + assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf <= $MAX_QUERY_BUFFER_SIZE} # Check that the initial query buffer is resized after 2 sec wait_for_condition 1000 10 { From 4723b0104794b7562e8ed03e5156cb2fafa80832 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 22 Aug 2024 17:00:24 +0800 Subject: [PATCH 03/20] New way to use shared query buffer --- src/networking.c | 135 ++++++++++++++++------------------- src/server.c | 1 - src/server.h | 2 +- tests/unit/introspection.tcl | 4 +- tests/unit/querybuf.tcl | 19 +++++ 5 files changed, 82 insertions(+), 79 deletions(-) diff --git a/src/networking.c b/src/networking.c index 0694c865335..57323a93e7b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,6 +28,8 @@ int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; +__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using shared query + * buffer due to nested command execution. */ /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -1576,6 +1578,28 @@ void deauthenticateAndCloseClient(client *c) { } } +/* Resets the shared query buffer used by the given client. + * If any data remained in the buffer, the client will take ownership of the buffer + * and a new empty buffer will be allocated for the shared buffer. */ +static void resetSharedQueryBuf(client *c) { + serverAssert(c->flags & CLIENT_SHARED_QUERYBUFFER); + if (c->querybuf != thread_shared_qb || sdslen(c->querybuf) > c->qb_pos) { + /* If querybuf has been reallocated or there is still data left, + * let the client take ownership of the shared buffer. */ + thread_shared_qb = NULL; + } else { + /* It is safe to dereference and reuse the shared query buffer. */ + c->querybuf = NULL; + c->qb_pos = 0; + sdsclear(thread_shared_qb); + } + + /* Mark that the client is no longer using the shared query buffer + * and indicate that it is no longer used by any client. */ + c->flags &= ~CLIENT_SHARED_QUERYBUFFER; + thread_shared_qb_used = 0; +} + void freeClient(client *c) { listNode *ln; @@ -1630,11 +1654,9 @@ void freeClient(client *c) { } /* Free the query buffer */ - if (c->querybuf && c->querybuf == thread_shared_qb) { - sdsclear(c->querybuf); - } else { - sdsfree(c->querybuf); - } + if (c->flags & CLIENT_SHARED_QUERYBUFFER) + resetSharedQueryBuf(c); + sdsfree(c->querybuf); c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -2137,48 +2159,6 @@ void resetClient(client *c) { } } -/* Initializes the shared query buffer to a new sds with the default capacity */ -void initSharedQueryBuf(void) { - thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); - sdsclear(thread_shared_qb); -} - -/* Resets the shared query buffer used by the given client. - * If any data remained in the buffer, the client will take ownership of the buffer - * and a new empty buffer will be allocated for the shared buffer. */ -void resetSharedQueryBuf(client *c) { - serverAssert(c->querybuf == thread_shared_qb); - size_t remaining = sdslen(c->querybuf) - c->qb_pos; - - if (remaining > 0) { - /* Let the client take ownership of the shared buffer. */ - initSharedQueryBuf(); - return; - } - - c->querybuf = NULL; - sdsclear(thread_shared_qb); - c->qb_pos = 0; -} - -/* Trims the client query buffer to the current position. */ -void trimClientQueryBuffer(client *c) { - if (c->querybuf == thread_shared_qb) { - resetSharedQueryBuf(c); - } - - if (c->querybuf == NULL) { - return; - } - - serverAssert(c->qb_pos <= sdslen(c->querybuf)); - - if (c->qb_pos > 0) { - sdsrange(c->querybuf, c->qb_pos, -1); - c->qb_pos = 0; - } -} - /* This function is used when we want to re-enter the event loop but there * is the risk that the client we are dealing with will be freed in some * way. This happens for instance in: @@ -2441,10 +2421,6 @@ int processMultibulkBuffer(client *c) { * ll+2, trimming querybuf is just a waste of time, because * at this time the querybuf contains not only our bulk. */ if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { - if (c->querybuf == thread_shared_qb) { - /* Let the client take the ownership of the shared buffer. */ - initSharedQueryBuf(); - } sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; /* Hint the sds library about the amount of bytes this string is @@ -2609,7 +2585,7 @@ int processPendingCommandAndInputBuffer(client *c) { * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ - while (c->querybuf && c->qb_pos < sdslen(c->querybuf)) { + while(c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2660,13 +2636,6 @@ int processInputBuffer(client *c) { break; } - if (c->querybuf == thread_shared_qb) { - /* Before processing the command, reset the shared query buffer to its default state. - * This avoids unintentionally modifying the shared qb during processCommand as we may use - * the shared qb for other clients during processEventsWhileBlocked */ - resetSharedQueryBuf(c); - } - /* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this @@ -2695,8 +2664,10 @@ int processInputBuffer(client *c) { c->qb_pos -= c->repl_applied; c->repl_applied = 0; } - } else { - trimClientQueryBuffer(c); + } else if (c->qb_pos) { + /* Trim to pos */ + sdsrange(c->querybuf,c->qb_pos,-1); + c->qb_pos = 0; } /* Update client memory usage after processing the query buffer, this is @@ -2721,16 +2692,17 @@ void readQueryFromClient(connection *conn) { atomicIncr(server.stat_total_reads_processed, 1); readlen = PROTO_IOBUF_LEN; - qblen = c->querybuf ? sdslen(c->querybuf) : 0; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the - * robj representing the argument. */ - - if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { - ssize_t remaining = (size_t)(c->bulklen + 2) - (qblen - c->qb_pos); + * Redis Object representing the argument. */ + if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 + && c->bulklen >= PROTO_MBULK_BIG_ARG) + { + if (!c->querybuf) c->querybuf = sdsempty(); + ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, @@ -2741,14 +2713,28 @@ void readQueryFromClient(connection *conn) { * but doesn't need align to the next arg, we can read more data. */ if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; - } + } else if (c->querybuf == NULL) { + if (unlikely(thread_shared_qb_used)) { + /* The shared query buffer is already used by another client, + * switch to using the client's private query buffer. This only + * occurs when commands are executed nested via processEventsWhileBlocked(). */ + c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + } else { + /* Create the shared query buffer if it doesn't exist. */ + if (!thread_shared_qb) { + thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_shared_qb); + } - if (c->querybuf == NULL) { - serverAssert(sdslen(thread_shared_qb) == 0); - c->querybuf = big_arg ? sdsempty() : thread_shared_qb; - qblen = sdslen(c->querybuf); + /* Assign the shared query buffer to the client and mark it as in use. */ + serverAssert(sdslen(thread_shared_qb) == 0); + c->querybuf = thread_shared_qb; + c->flags |= CLIENT_SHARED_QUERYBUFFER; + thread_shared_qb_used = 1; + } } + qblen = sdslen(c->querybuf); if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg @@ -2821,9 +2807,9 @@ void readQueryFromClient(connection *conn) { c = NULL; done: - if (c && c->querybuf == thread_shared_qb) { - sdsclear(thread_shared_qb); - c->querybuf = NULL; + if (c && (c->flags & CLIENT_SHARED_QUERYBUFFER)) { + serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */ + resetSharedQueryBuf(c); } beforeNextClient(c); } @@ -4313,7 +4299,6 @@ void *IOThreadMain(void *myid) { redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist); makeThreadKillable(); - initSharedQueryBuf(); while(1) { /* Wait for start */ diff --git a/src/server.c b/src/server.c index 57ed3a8a85a..a3fef3afd15 100644 --- a/src/server.c +++ b/src/server.c @@ -2804,7 +2804,6 @@ void initServer(void) { } slowlogInit(); latencyMonitorInit(); - initSharedQueryBuf(); /* Initialize ACL default password if it exists */ ACLUpdateDefaultUserPassword(server.requirepass); diff --git a/src/server.h b/src/server.h index f67c3c7f27b..59d421b628a 100644 --- a/src/server.h +++ b/src/server.h @@ -388,6 +388,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ +#define CLIENT_SHARED_QUERYBUFFER (1ULL<<51) /* The client is using the shared query buffer. */ /* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */ #define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE) @@ -2687,7 +2688,6 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initThreadedIO(void); -void initSharedQueryBuf(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index be84a78b279..fbd1d14fefa 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=* lib-name=* lib-ver=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=* lib-name=* lib-ver=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} test {CLIENT KILL with illegal arguments} { assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill} diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 1d8bffca5bd..860b1046443 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -116,3 +116,22 @@ start_server {tags {"querybuf slow"}} { } } + +start_server {tags {"querybuf"}} { + test "Client executes small argv commands using shared query buffer" { + set rd [redis_deferring_client] + $rd client setname test_client + set res [r client list] + + # Verify that the client does not create a private query buffer after + # executing a small parameter command. + assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res + + # The client executing the command is currently using the shared query buffer, + # so the size shown is that of the shared query buffer. It will be returned + # to the shared query buffer after command execution. + assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res + + $rd close + } +} From c4b3090563c883a7ce59fba9a37602346d3bd35c Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 22 Aug 2024 17:04:33 +0800 Subject: [PATCH 04/20] Add license --- src/replication.c | 4 +++- src/server.c | 5 +++++ tests/unit/querybuf.tcl | 13 +++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index c64ce239141..7f1c12f1245 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3,13 +3,15 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). * * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ - #include "server.h" #include "cluster.h" #include "bio.h" diff --git a/src/server.c b/src/server.c index a3fef3afd15..3e63e1a9f29 100644 --- a/src/server.c +++ b/src/server.c @@ -2,8 +2,13 @@ * Copyright (c) 2009-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 860b1046443..f4352855752 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -1,3 +1,16 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + proc client_idle_sec {name} { set clients [split [r client list] "\r\n"] set c [lsearch -inline $clients *name=$name*] From 908268db1a831e00bb7bfe6bd94062c7d28777a4 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 22 Aug 2024 18:06:38 +0800 Subject: [PATCH 05/20] Fix missing clear c->querybuf --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 57323a93e7b..aa7e0e06b77 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2718,7 +2718,7 @@ void readQueryFromClient(connection *conn) { /* The shared query buffer is already used by another client, * switch to using the client's private query buffer. This only * occurs when commands are executed nested via processEventsWhileBlocked(). */ - c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + c->querybuf = sdsempty(); } else { /* Create the shared query buffer if it doesn't exist. */ if (!thread_shared_qb) { From ebf45de5094293f975ecbb63527673060d4a73e1 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 22 Aug 2024 21:57:57 +0800 Subject: [PATCH 06/20] Skip shared query buffer for client list info --- src/networking.c | 6 +++--- tests/unit/introspection.tcl | 4 ++-- tests/unit/querybuf.tcl | 19 ------------------- 3 files changed, 5 insertions(+), 24 deletions(-) diff --git a/src/networking.c b/src/networking.c index aa7e0e06b77..42e9adc6d4f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2926,8 +2926,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, - " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, + " qbuf=%U", (client->querybuf && !(client->flags&CLIENT_SHARED_QUERYBUFFER)) ? (unsigned long long) sdslen(client->querybuf) : 0, + " qbuf-free=%U", (client->querybuf && !(client->flags&CLIENT_SHARED_QUERYBUFFER)) ? (unsigned long long) sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3909,7 +3909,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; - mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; + mem += (c->querybuf && !(c->flags&CLIENT_SHARED_QUERYBUFFER)) ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index fbd1d14fefa..365408567ac 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} test {CLIENT KILL with illegal arguments} { assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill} diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index f4352855752..9a4088bf1dc 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -129,22 +129,3 @@ start_server {tags {"querybuf slow"}} { } } - -start_server {tags {"querybuf"}} { - test "Client executes small argv commands using shared query buffer" { - set rd [redis_deferring_client] - $rd client setname test_client - set res [r client list] - - # Verify that the client does not create a private query buffer after - # executing a small parameter command. - assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res - - # The client executing the command is currently using the shared query buffer, - # so the size shown is that of the shared query buffer. It will be returned - # to the shared query buffer after command execution. - assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res - - $rd close - } -} From b50f5cc57c725e250a2997a4ec03fa764be266a8 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 22 Aug 2024 22:04:03 +0800 Subject: [PATCH 07/20] Revert code style --- src/networking.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 42e9adc6d4f..3d07deb0fef 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3908,7 +3908,8 @@ size_t getClientOutputBufferMemoryUsage(client *c) { size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); - if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; + if (output_buffer_mem_usage != NULL) + *output_buffer_mem_usage = mem; mem += (c->querybuf && !(c->flags&CLIENT_SHARED_QUERYBUFFER)) ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; From 0569168ee9b9deeeac1bb599651eedae81551239 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Thu, 22 Aug 2024 23:15:23 +0800 Subject: [PATCH 08/20] Allocate PROTO_IOBUF_LEN for querybuf that can't use the shared qb --- src/networking.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 3d07deb0fef..fdf340ef0d1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2718,7 +2718,8 @@ void readQueryFromClient(connection *conn) { /* The shared query buffer is already used by another client, * switch to using the client's private query buffer. This only * occurs when commands are executed nested via processEventsWhileBlocked(). */ - c->querybuf = sdsempty(); + c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(c->querybuf); } else { /* Create the shared query buffer if it doesn't exist. */ if (!thread_shared_qb) { From 7125bc6f2bd0d4e37f8b6629824ed1a96cc51918 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Fri, 23 Aug 2024 08:23:33 +0800 Subject: [PATCH 09/20] If the client is using shared qb, we also count it as the memory it is using --- src/networking.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/networking.c b/src/networking.c index fdf340ef0d1..39ab07cf5ce 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2927,8 +2927,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", (client->querybuf && !(client->flags&CLIENT_SHARED_QUERYBUFFER)) ? (unsigned long long) sdslen(client->querybuf) : 0, - " qbuf-free=%U", (client->querybuf && !(client->flags&CLIENT_SHARED_QUERYBUFFER)) ? (unsigned long long) sdsavail(client->querybuf) : 0, + " qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, + " qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3911,7 +3911,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; - mem += (c->querybuf && !(c->flags&CLIENT_SHARED_QUERYBUFFER)) ? sdsZmallocSize(c->querybuf) : 0; + mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory From 1c37cc5ed17833f15b1db050402d689c21ac9433 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Fri, 23 Aug 2024 08:24:11 +0800 Subject: [PATCH 10/20] Stablize querybuffer test --- tests/unit/querybuf.tcl | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 9a4088bf1dc..ebe9ab3b8f5 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -10,7 +10,6 @@ # # Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. # - proc client_idle_sec {name} { set clients [split [r client list] "\r\n"] set c [lsearch -inline $clients *name=$name*] @@ -46,10 +45,19 @@ start_server {tags {"querybuf slow"}} { # Make sure query buff has size of 0 bytes at start as the client uses the shared qb. assert {[client_query_buffer test_client] == 0} + # Pause cron to prevent premature shrinking (timing issue). + r debug pause-cron 1 + # Send partial command to client to make sure it doesn't use the shared qb. $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" $rd flush - after 100 + # Wait for the client to start using a private query buffer. + wait_for_condition 1000 10 { + [client_query_buffer test_client] > 0 + } else { + fail "client should start using a private query buffer" + } + # send the rest of the command $rd write "a\r\n\$1\r\nb\r\n" $rd flush @@ -61,9 +69,12 @@ start_server {tags {"querybuf slow"}} { set MAX_QUERY_BUFFER_SIZE [expr 32768 + 2] ; # 32k + 2, allowing for potential greedy allocation of (16k + 1) * 2 bytes for the query buffer. assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf <= $MAX_QUERY_BUFFER_SIZE} + # Allow shrinking to occur + r debug pause-cron 0 + # Check that the initial query buffer is resized after 2 sec wait_for_condition 1000 10 { - [client_idle_sec test_client] >= 3 && [client_query_buffer test_client] == 0 + [client_idle_sec test_client] >= 3 && [client_query_buffer test_client] < $orig_test_client_qbuf } else { fail "query buffer was not resized" } @@ -93,7 +104,7 @@ start_server {tags {"querybuf slow"}} { # Write something smaller, so query buf peak can shrink $rd set x [string repeat A 100] set new_test_client_qbuf [client_query_buffer test_client] - if {$new_test_client_qbuf < $orig_test_client_qbuf} { break } + if {$new_test_client_qbuf < $orig_test_client_qbuf && $new_test_client_qbuf > 0} { break } if {[expr [clock milliseconds] - $t] > 1000} { break } after 10 } @@ -127,5 +138,4 @@ start_server {tags {"querybuf slow"}} { $rd close } - } From 9b8d07e6e645ce71c915cc82e54d06982167c990 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Fri, 23 Aug 2024 08:29:45 +0800 Subject: [PATCH 11/20] Add test --- tests/unit/introspection.tcl | 4 ++-- tests/unit/querybuf.tcl | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 365408567ac..fbd1d14fefa 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} test {CLIENT KILL with illegal arguments} { assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill} diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index ebe9ab3b8f5..7177f9e6dcb 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -139,3 +139,22 @@ start_server {tags {"querybuf slow"}} { $rd close } } + +start_server {tags {"querybuf"}} { + test "Client executes small argv commands using shared query buffer" { + set rd [redis_deferring_client] + $rd client setname test_client + set res [r client list] + + # Verify that the client does not create a private query buffer after + # executing a small parameter command. + assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res + + # The client executing the command is currently using the shared query buffer, + # so the size shown is that of the shared query buffer. It will be returned + # to the shared query buffer after command execution. + assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res + + $rd close + } +} From 9d70fa33c74283ed2126919583ed8b334cfa5ba3 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Fri, 23 Aug 2024 08:33:05 +0800 Subject: [PATCH 12/20] Use ProcessingEventsWhileBlocked instead adding thread_shared_qb_used --- src/networking.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index 39ab07cf5ce..dd4a807b30c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,8 +28,6 @@ int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; -__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using shared query - * buffer due to nested command execution. */ /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -1597,7 +1595,6 @@ static void resetSharedQueryBuf(client *c) { /* Mark that the client is no longer using the shared query buffer * and indicate that it is no longer used by any client. */ c->flags &= ~CLIENT_SHARED_QUERYBUFFER; - thread_shared_qb_used = 0; } void freeClient(client *c) { @@ -2714,7 +2711,7 @@ void readQueryFromClient(connection *conn) { if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; } else if (c->querybuf == NULL) { - if (unlikely(thread_shared_qb_used)) { + if (unlikely(ProcessingEventsWhileBlocked)) { /* The shared query buffer is already used by another client, * switch to using the client's private query buffer. This only * occurs when commands are executed nested via processEventsWhileBlocked(). */ @@ -2731,7 +2728,6 @@ void readQueryFromClient(connection *conn) { serverAssert(sdslen(thread_shared_qb) == 0); c->querybuf = thread_shared_qb; c->flags |= CLIENT_SHARED_QUERYBUFFER; - thread_shared_qb_used = 1; } } From 0b64a5070b1d0684445230ec1d6d5528123afe11 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Fri, 23 Aug 2024 20:44:33 +0800 Subject: [PATCH 13/20] Revert "Use ProcessingEventsWhileBlocked instead adding thread_shared_qb_used" ProcessingEventsWhileBlocked is not thread safe This reverts commit 9d70fa33c74283ed2126919583ed8b334cfa5ba3. --- src/networking.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index dd4a807b30c..39ab07cf5ce 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,6 +28,8 @@ int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; +__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using shared query + * buffer due to nested command execution. */ /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -1595,6 +1597,7 @@ static void resetSharedQueryBuf(client *c) { /* Mark that the client is no longer using the shared query buffer * and indicate that it is no longer used by any client. */ c->flags &= ~CLIENT_SHARED_QUERYBUFFER; + thread_shared_qb_used = 0; } void freeClient(client *c) { @@ -2711,7 +2714,7 @@ void readQueryFromClient(connection *conn) { if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; } else if (c->querybuf == NULL) { - if (unlikely(ProcessingEventsWhileBlocked)) { + if (unlikely(thread_shared_qb_used)) { /* The shared query buffer is already used by another client, * switch to using the client's private query buffer. This only * occurs when commands are executed nested via processEventsWhileBlocked(). */ @@ -2728,6 +2731,7 @@ void readQueryFromClient(connection *conn) { serverAssert(sdslen(thread_shared_qb) == 0); c->querybuf = thread_shared_qb; c->flags |= CLIENT_SHARED_QUERYBUFFER; + thread_shared_qb_used = 1; } } From e5a4a67f1273da817d3029296f35adc79d6369c6 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 26 Aug 2024 13:05:24 +0800 Subject: [PATCH 14/20] Improve tests Co-authored-by: oranagra --- tests/unit/querybuf.tcl | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 7177f9e6dcb..22d59d0ffb2 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -104,7 +104,7 @@ start_server {tags {"querybuf slow"}} { # Write something smaller, so query buf peak can shrink $rd set x [string repeat A 100] set new_test_client_qbuf [client_query_buffer test_client] - if {$new_test_client_qbuf < $orig_test_client_qbuf && $new_test_client_qbuf > 0} { break } + if {$new_test_client_qbuf < $orig_test_client_qbuf} { break } if {[expr [clock milliseconds] - $t] > 1000} { break } after 10 } @@ -116,15 +116,27 @@ start_server {tags {"querybuf slow"}} { test "query buffer resized correctly with fat argv" { set rd [redis_client] $rd client setname test_client + + # Pause cron to prevent premature shrinking (timing issue). + r debug pause-cron 1 + $rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" $rd flush + + # Wait for the client to start using a private query buffer of > 1000000 size. + wait_for_condition 1000 10 { + [client_query_buffer test_client] > 1000000 + } else { + fail "client should start using a private query buffer" + } - after 200 # Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size. $rd write "a" $rd flush - after 20 + r debug pause-cron 0 + + after 120 if {[client_query_buffer test_client] < 1000000} { fail "query buffer should not be resized when client idle time smaller than 2s" } @@ -144,6 +156,7 @@ start_server {tags {"querybuf"}} { test "Client executes small argv commands using shared query buffer" { set rd [redis_deferring_client] $rd client setname test_client + $rd read set res [r client list] # Verify that the client does not create a private query buffer after From 3ebb1b36cba8a97f7b5972f7eb59d8a976145975 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 26 Aug 2024 13:12:08 +0800 Subject: [PATCH 15/20] Add comment for big argv Co-authored-by: oranagra --- src/networking.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/networking.c b/src/networking.c index 39ab07cf5ce..1417fbf78c3 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2701,7 +2701,11 @@ void readQueryFromClient(connection *conn) { if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { + /* For big argv, the client always uses its private query buffer. + * Using the shared query buffer would eventually expand it beyond 32k, + * causing the client to take ownership of the shared query buffer. */ if (!c->querybuf) c->querybuf = sdsempty(); + ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1; From 060cbb9d3668ac95aaaca75ad4057caad99c1648 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Wed, 12 Jun 2024 14:27:42 -0700 Subject: [PATCH 16/20] Improve reliability of querybuf test (#639) We've been seeing some pretty consistent failures from `test-valgrind-test` and `test-sanitizer-address` because of the querybuf test periodically failing. I tracked it down to the test periodically taking too long and the client cron getting triggered. A simple solution is to just disable the cron during the key race condition. I was able to run this locally for 100 iterations without seeing a failure. Example: https://github.com/valkey-io/valkey/actions/runs/9474458354/job/26104103514 and https://github.com/valkey-io/valkey/actions/runs/9474458354/job/26104106830. Signed-off-by: Madelyn Olson From e4727803e835b070e82853ea97c336af03fa29b5 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 26 Aug 2024 16:01:45 +0800 Subject: [PATCH 17/20] Replace shared concept with reusable --- src/networking.c | 34 +++++++++++++++++----------------- src/replication.c | 2 +- src/server.c | 8 ++++---- src/server.h | 2 +- tests/unit/querybuf.tcl | 8 ++++---- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/networking.c b/src/networking.c index 1417fbf78c3..df66fcaa082 100644 --- a/src/networking.c +++ b/src/networking.c @@ -28,7 +28,7 @@ int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ __thread sds thread_shared_qb = NULL; -__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using shared query +__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using reusable query * buffer due to nested command execution. */ /* Return the size consumed from the allocator, for the specified SDS string, @@ -1578,25 +1578,25 @@ void deauthenticateAndCloseClient(client *c) { } } -/* Resets the shared query buffer used by the given client. +/* Resets the reusable query buffer used by the given client. * If any data remained in the buffer, the client will take ownership of the buffer * and a new empty buffer will be allocated for the shared buffer. */ -static void resetSharedQueryBuf(client *c) { - serverAssert(c->flags & CLIENT_SHARED_QUERYBUFFER); +static void resetReusableQueryBuf(client *c) { + serverAssert(c->flags & CLIENT_REUSABLE_QUERYBUFFER); if (c->querybuf != thread_shared_qb || sdslen(c->querybuf) > c->qb_pos) { /* If querybuf has been reallocated or there is still data left, * let the client take ownership of the shared buffer. */ thread_shared_qb = NULL; } else { - /* It is safe to dereference and reuse the shared query buffer. */ + /* It is safe to dereference and reuse the reusable query buffer. */ c->querybuf = NULL; c->qb_pos = 0; sdsclear(thread_shared_qb); } - /* Mark that the client is no longer using the shared query buffer + /* Mark that the client is no longer using the reusable query buffer * and indicate that it is no longer used by any client. */ - c->flags &= ~CLIENT_SHARED_QUERYBUFFER; + c->flags &= ~CLIENT_REUSABLE_QUERYBUFFER; thread_shared_qb_used = 0; } @@ -1654,8 +1654,8 @@ void freeClient(client *c) { } /* Free the query buffer */ - if (c->flags & CLIENT_SHARED_QUERYBUFFER) - resetSharedQueryBuf(c); + if (c->flags & CLIENT_REUSABLE_QUERYBUFFER) + resetReusableQueryBuf(c); sdsfree(c->querybuf); c->querybuf = NULL; @@ -2702,8 +2702,8 @@ void readQueryFromClient(connection *conn) { && c->bulklen >= PROTO_MBULK_BIG_ARG) { /* For big argv, the client always uses its private query buffer. - * Using the shared query buffer would eventually expand it beyond 32k, - * causing the client to take ownership of the shared query buffer. */ + * Using the reusable query buffer would eventually expand it beyond 32k, + * causing the client to take ownership of the reusable query buffer. */ if (!c->querybuf) c->querybuf = sdsempty(); ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); @@ -2719,22 +2719,22 @@ void readQueryFromClient(connection *conn) { readlen = PROTO_IOBUF_LEN; } else if (c->querybuf == NULL) { if (unlikely(thread_shared_qb_used)) { - /* The shared query buffer is already used by another client, + /* The reusable query buffer is already used by another client, * switch to using the client's private query buffer. This only * occurs when commands are executed nested via processEventsWhileBlocked(). */ c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN); sdsclear(c->querybuf); } else { - /* Create the shared query buffer if it doesn't exist. */ + /* Create the reusable query buffer if it doesn't exist. */ if (!thread_shared_qb) { thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); sdsclear(thread_shared_qb); } - /* Assign the shared query buffer to the client and mark it as in use. */ + /* Assign the reusable query buffer to the client and mark it as in use. */ serverAssert(sdslen(thread_shared_qb) == 0); c->querybuf = thread_shared_qb; - c->flags |= CLIENT_SHARED_QUERYBUFFER; + c->flags |= CLIENT_REUSABLE_QUERYBUFFER; thread_shared_qb_used = 1; } } @@ -2812,9 +2812,9 @@ void readQueryFromClient(connection *conn) { c = NULL; done: - if (c && (c->flags & CLIENT_SHARED_QUERYBUFFER)) { + if (c && (c->flags & CLIENT_REUSABLE_QUERYBUFFER)) { serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */ - resetSharedQueryBuf(c); + resetReusableQueryBuf(c); } beforeNextClient(c); } diff --git a/src/replication.c b/src/replication.c index 7f1c12f1245..054cdacb753 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1767,7 +1767,7 @@ void replicationCreateMasterClient(connection *conn, int dbid) { * connection. */ server.master->flags |= CLIENT_MASTER; - /* Allocate a private query buffer for the master client instead of using the shared query buffer. + /* Allocate a private query buffer for the master client instead of using the reusable query buffer. * This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */ server.master->querybuf = sdsempty(); server.master->authenticated = 1; diff --git a/src/server.c b/src/server.c index 3e63e1a9f29..381b6f63bba 100644 --- a/src/server.c +++ b/src/server.c @@ -739,7 +739,7 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { - /* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */ + /* If the client query buffer is NULL, it is using the reusable query buffer and there is nothing to do. */ if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -753,10 +753,10 @@ int clientsCronResizeQueryBuffer(client *c) { size_t remaining = sdslen(c->querybuf) - c->qb_pos; if (!(c->flags & CLIENT_MASTER) && !remaining) { /* If the client is not a master and no data is pending, - * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ + * The client can safely use the reusable query buffer in the next read - free the client's querybuf. */ sdsfree(c->querybuf); - /* By setting the querybuf to NULL, the client will use the shared query buffer in the next read. - * We don't move the client to the shared query buffer immediately, because if we allocated a private + /* By setting the querybuf to NULL, the client will use the reusable query buffer in the next read. + * We don't move the client to the reusable query buffer immediately, because if we allocated a private * query buffer for the client, it's likely that the client will use it again soon. */ c->querybuf = NULL; } else { diff --git a/src/server.h b/src/server.h index 59d421b628a..70ef070a8c4 100644 --- a/src/server.h +++ b/src/server.h @@ -388,7 +388,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ -#define CLIENT_SHARED_QUERYBUFFER (1ULL<<51) /* The client is using the shared query buffer. */ +#define CLIENT_REUSABLE_QUERYBUFFER (1ULL<<51) /* The client is using the reusable query buffer. */ /* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */ #define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE) diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 22d59d0ffb2..9e5efb8d77b 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -153,7 +153,7 @@ start_server {tags {"querybuf slow"}} { } start_server {tags {"querybuf"}} { - test "Client executes small argv commands using shared query buffer" { + test "Client executes small argv commands using reusable query buffer" { set rd [redis_deferring_client] $rd client setname test_client $rd read @@ -163,9 +163,9 @@ start_server {tags {"querybuf"}} { # executing a small parameter command. assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res - # The client executing the command is currently using the shared query buffer, - # so the size shown is that of the shared query buffer. It will be returned - # to the shared query buffer after command execution. + # The client executing the command is currently using the reusable query buffer, + # so the size shown is that of the reusable query buffer. It will be returned + # to the reusable query buffer after command execution. assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res $rd close From 630c739478161f316ba6cf025315d62430ac2708 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 26 Aug 2024 16:03:57 +0800 Subject: [PATCH 18/20] Rename remain shared to reusable in comments --- src/networking.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index df66fcaa082..1b63e0a2061 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1580,12 +1580,12 @@ void deauthenticateAndCloseClient(client *c) { /* Resets the reusable query buffer used by the given client. * If any data remained in the buffer, the client will take ownership of the buffer - * and a new empty buffer will be allocated for the shared buffer. */ + * and a new empty buffer will be allocated for the reusable buffer. */ static void resetReusableQueryBuf(client *c) { serverAssert(c->flags & CLIENT_REUSABLE_QUERYBUFFER); if (c->querybuf != thread_shared_qb || sdslen(c->querybuf) > c->qb_pos) { /* If querybuf has been reallocated or there is still data left, - * let the client take ownership of the shared buffer. */ + * let the client take ownership of the reusable buffer. */ thread_shared_qb = NULL; } else { /* It is safe to dereference and reuse the reusable query buffer. */ From 5ea62d0170c59c374df643718ae3d5396bb0105a Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 26 Aug 2024 16:05:20 +0800 Subject: [PATCH 19/20] Rename remain shared to reusable in comments --- tests/unit/querybuf.tcl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index 9e5efb8d77b..d0591115645 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -42,13 +42,13 @@ start_server {tags {"querybuf slow"}} { $rd client setname test_client $rd read - # Make sure query buff has size of 0 bytes at start as the client uses the shared qb. + # Make sure query buff has size of 0 bytes at start as the client uses the reusable qb. assert {[client_query_buffer test_client] == 0} # Pause cron to prevent premature shrinking (timing issue). r debug pause-cron 1 - # Send partial command to client to make sure it doesn't use the shared qb. + # Send partial command to client to make sure it doesn't use the reusable qb. $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" $rd flush # Wait for the client to start using a private query buffer. @@ -130,7 +130,7 @@ start_server {tags {"querybuf slow"}} { fail "client should start using a private query buffer" } - # Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size. + # Send the start of the arg and make sure the client is not using reusable qb for it rather a private buf of > 1000000 size. $rd write "a" $rd flush From 95a5d9efd1dab4761be4f18b6a1035d8b158a346 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Mon, 26 Aug 2024 21:48:54 +0800 Subject: [PATCH 20/20] Rename thread_shared_qb* to thread_reusable_qb* --- src/networking.c | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/networking.c b/src/networking.c index 1b63e0a2061..f2e15e9782a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -27,8 +27,8 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ -__thread sds thread_shared_qb = NULL; -__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using reusable query +__thread sds thread_reusable_qb = NULL; +__thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query * buffer due to nested command execution. */ /* Return the size consumed from the allocator, for the specified SDS string, @@ -1583,21 +1583,21 @@ void deauthenticateAndCloseClient(client *c) { * and a new empty buffer will be allocated for the reusable buffer. */ static void resetReusableQueryBuf(client *c) { serverAssert(c->flags & CLIENT_REUSABLE_QUERYBUFFER); - if (c->querybuf != thread_shared_qb || sdslen(c->querybuf) > c->qb_pos) { + if (c->querybuf != thread_reusable_qb || sdslen(c->querybuf) > c->qb_pos) { /* If querybuf has been reallocated or there is still data left, * let the client take ownership of the reusable buffer. */ - thread_shared_qb = NULL; + thread_reusable_qb = NULL; } else { /* It is safe to dereference and reuse the reusable query buffer. */ c->querybuf = NULL; c->qb_pos = 0; - sdsclear(thread_shared_qb); + sdsclear(thread_reusable_qb); } /* Mark that the client is no longer using the reusable query buffer * and indicate that it is no longer used by any client. */ c->flags &= ~CLIENT_REUSABLE_QUERYBUFFER; - thread_shared_qb_used = 0; + thread_reusable_qb_used = 0; } void freeClient(client *c) { @@ -2718,7 +2718,7 @@ void readQueryFromClient(connection *conn) { if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; } else if (c->querybuf == NULL) { - if (unlikely(thread_shared_qb_used)) { + if (unlikely(thread_reusable_qb_used)) { /* The reusable query buffer is already used by another client, * switch to using the client's private query buffer. This only * occurs when commands are executed nested via processEventsWhileBlocked(). */ @@ -2726,16 +2726,16 @@ void readQueryFromClient(connection *conn) { sdsclear(c->querybuf); } else { /* Create the reusable query buffer if it doesn't exist. */ - if (!thread_shared_qb) { - thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); - sdsclear(thread_shared_qb); + if (!thread_reusable_qb) { + thread_reusable_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_reusable_qb); } /* Assign the reusable query buffer to the client and mark it as in use. */ - serverAssert(sdslen(thread_shared_qb) == 0); - c->querybuf = thread_shared_qb; + serverAssert(sdslen(thread_reusable_qb) == 0); + c->querybuf = thread_reusable_qb; c->flags |= CLIENT_REUSABLE_QUERYBUFFER; - thread_shared_qb_used = 1; + thread_reusable_qb_used = 1; } }