Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1583d60
Missing --memkeys and --keystats for some options in redis-cli help t…
yveslb Feb 13, 2025
87124a3
Fix wrongly updating fsynced_reploff_pending when appendfsync=everyse…
ShooterIT Feb 13, 2025
662cb2f
Don't send unnecessary PING to replicas (#13790)
ShooterIT Feb 13, 2025
d29f04a
Add support for module to defrag incremental
sundb Feb 13, 2025
53c1d99
Fix complaint
sundb Feb 13, 2025
7f5f588
AOF offset info (#13773)
ShooterIT Feb 13, 2025
57807cd
Memory Usage command LIST accuracy fix (#13783)
ofirluzon Feb 14, 2025
0fd9d74
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 14, 2025
e260847
Add HGETDEL, HGETEX and HSETEX hash commands (#13798)
tezc Feb 14, 2025
6c202f4
Remove DENYOOM flag from hexpire command (#13800)
tezc Feb 16, 2025
467ddbb
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 17, 2025
6155198
Unify RedisModuleDefragFunc and RedisModuleTypeDefragFunc
sundb Feb 17, 2025
e85ebc4
Sync branch DefragRedisModuleDictDict
sundb Feb 17, 2025
98e13b3
Add RedisModule_RegisterDefragFunc2 module api
sundb Feb 17, 2025
4f0f7a7
Cleanup
sundb Feb 17, 2025
0f41e19
Add defragtest_global_strings_pauses to check string pauses
sundb Feb 17, 2025
69879fa
Cleanup
sundb Feb 17, 2025
2e0ff33
defragModuleCtx no longer stores module, which may be unlaoded midway
sundb Feb 17, 2025
9dfcdbb
Add support for both v1 and v2
sundb Feb 17, 2025
61fb33d
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 18, 2025
b4f859f
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 18, 2025
a64e6cf
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 18, 2025
a202a99
Style
sundb Feb 18, 2025
4cbe954
Fix RedisModuleDefragCtx calloc
sundb Feb 18, 2025
051705f
Update src/module.c
sundb Feb 18, 2025
c5f91ab
Fix syntax issue in comments of src/module.c (#13802)
yunxiao3 Feb 19, 2025
b045fe4
Fix overflow on 32-bit systems when calculating idle time for evictio…
luozongle01 Feb 19, 2025
66df58f
Do not send NL if replica client is already closed (#13813)
guybe7 Feb 19, 2025
dd40ed1
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 19, 2025
725cd26
Refactor of ActiveDefrag to reduce latencies (#13814)
sundb Feb 19, 2025
0a11110
Merge branch 'unstable' into incrementail_defrag_module
sundb Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ typedef struct {
} defragPubSubCtx;
static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");

typedef struct {
RedisModule *module;
RedisModuleDefragCtx module_ctx;
unsigned long cursor;
} defragModuleCtx;

/* When scanning a main kvstore, large elements are queued for later handling rather than
* causing a large latency spike while processing a hash table bucket. This list is only used
* for stage: "defragStageDbKeys". It will only contain values for the current kvstore being
Expand Down Expand Up @@ -1208,10 +1214,20 @@ static doneStatus defragLuaScripts(monotime endtime, void *ctx) {
return DEFRAG_DONE;
}

/* Handles defragmentation of module global data. This is a stage function
* that gets called periodically during the active defragmentation process. */
static doneStatus defragModuleGlobals(monotime endtime, void *ctx) {
UNUSED(endtime);
UNUSED(ctx);
moduleDefragGlobals();
defragModuleCtx *defrag_module_ctx = ctx;

/* Set up context for the module's defrag callback. */
defrag_module_ctx->module_ctx.endtime = endtime;
defrag_module_ctx->module_ctx.cursor = &defrag_module_ctx->cursor;

/* Call the module's defrag callback function and check if more work remains. */
if (defrag_module_ctx->module->defrag_cb_2(&defrag_module_ctx->module_ctx) != 0)
return DEFRAG_NOT_DONE;

return DEFRAG_DONE;
}

Expand Down Expand Up @@ -1497,7 +1513,20 @@ static void beginDefragCycle(void) {
addDefragStage(defragStagePubsubKvstore, defrag_pubsubshard_ctx);

addDefragStage(defragLuaScripts, NULL);
addDefragStage(defragModuleGlobals, NULL);

/* Add stages for modules. */
dictIterator *di = dictGetIterator(modules);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
struct RedisModule *module = dictGetVal(de);
if (module->defrag_cb_2) {
defragModuleCtx *ctx = zmalloc(sizeof(defragModuleCtx));
ctx->module = module;
ctx->cursor = 0;
addDefragStage(defragModuleGlobals, ctx);
}
}
dictReleaseIterator(di);

defrag.current_stage = NULL;
defrag.start_cycle = getMonotonicUs();
Expand Down
30 changes: 10 additions & 20 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -2309,6 +2309,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
module->options = 0;
module->info_cb = 0;
module->defrag_cb = 0;
module->defrag_cb_2 = 0;
module->defrag_start_cb = 0;
module->defrag_end_cb = 0;
module->loadmod = NULL;
Expand Down Expand Up @@ -13783,16 +13784,6 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) {
* ## Defrag API
* -------------------------------------------------------------------------- */

/* The defrag context, used to manage state during calls to the data type
* defrag callback.
*/
struct RedisModuleDefragCtx {
monotime endtime;
unsigned long *cursor;
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
int dbid; /* The dbid of the key being processed, -1 when unknown. */
};

/* Register a defrag callback for global data, i.e. anything that the module
* may allocate that is not tied to a specific data type.
*/
Expand All @@ -13801,6 +13792,14 @@ int RM_RegisterDefragFunc(RedisModuleCtx *ctx, RedisModuleDefragFunc cb) {
return REDISMODULE_OK;
}

/* Register a defrag callback for global data, i.e. anything that the module
* may allocate that is not tied to a specific data type.
*/
int RM_RegisterDefragFunc2(RedisModuleCtx *ctx, RedisModuleDefragFunc2 cb) {
ctx->module->defrag_cb_2 = cb;
return REDISMODULE_OK;
}

/* Register a defrag callbacks that will be called when defrag operation starts and ends.
*
* The callbacks are the same as `RM_RegisterDefragFunc` but the user
Expand Down Expand Up @@ -13992,16 +13991,6 @@ int moduleDefragValue(robj *key, robj *value, int dbid) {
return 1;
}

/* Call registered module API defrag functions */
void moduleDefragGlobals(void) {
dictForEach(modules, struct RedisModule, module,
if (module->defrag_cb) {
RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1};
module->defrag_cb(&defrag_ctx);
}
);
}

/* Call registered module API defrag start functions */
void moduleDefragStart(void) {
dictForEach(modules, struct RedisModule, module,
Expand Down Expand Up @@ -14381,6 +14370,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetCurrentCommandName);
REGISTER_API(GetTypeMethodVersion);
REGISTER_API(RegisterDefragFunc);
REGISTER_API(RegisterDefragFunc2);
REGISTER_API(RegisterDefragCallbacks);
REGISTER_API(DefragAlloc);
REGISTER_API(DefragAllocRaw);
Expand Down
4 changes: 4 additions & 0 deletions src/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ typedef struct RedisModuleDefragCtx RedisModuleDefragCtx;
* exposed since you can't cast a function pointer to (void *). */
typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx);
typedef int (*RedisModuleDefragFunc2)(RedisModuleDefragCtx *ctx);
typedef void (*RedisModuleDefragEventFunc)(RedisModuleDefragCtx *ctx);
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);

/* ------------------------- End of common defines ------------------------ */
Expand Down Expand Up @@ -1305,6 +1307,7 @@ REDISMODULE_API int *(*RedisModule_GetCommandKeys)(RedisModuleCtx *ctx, RedisMod
REDISMODULE_API int *(*RedisModule_GetCommandKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int *num_keys, int **out_flags) REDISMODULE_ATTR;
REDISMODULE_API const char *(*RedisModule_GetCurrentCommandName)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RegisterDefragFunc)(RedisModuleCtx *ctx, RedisModuleDefragFunc func) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RegisterDefragFunc2)(RedisModuleCtx *ctx, RedisModuleDefragFunc2 func) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RegisterDefragCallbacks)(RedisModuleCtx *ctx, RedisModuleDefragFunc start, RedisModuleDefragFunc end) REDISMODULE_ATTR;
REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR;
REDISMODULE_API void *(*RedisModule_DefragAllocRaw)(RedisModuleDefragCtx *ctx, size_t size) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1678,6 +1681,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(GetCommandKeysWithFlags);
REDISMODULE_GET_API(GetCurrentCommandName);
REDISMODULE_GET_API(RegisterDefragFunc);
REDISMODULE_GET_API(RegisterDefragFunc2);
REDISMODULE_GET_API(RegisterDefragCallbacks);
REDISMODULE_GET_API(DefragAlloc);
REDISMODULE_GET_API(DefragAllocRaw);
Expand Down
12 changes: 11 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ struct RedisModule {
int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */
RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */
RedisModuleDefragFunc defrag_cb; /* Callback for global data defrag. */
RedisModuleDefragFunc2 defrag_cb_2; /* Callback for global data defrag. */
RedisModuleDefragFunc defrag_start_cb; /* Callback indicating defrag started. */
RedisModuleDefragFunc defrag_end_cb; /* Callback indicating defrag ended. */
struct moduleLoadQueueEntry *loadmod; /* Module load arguments for config rewrite. */
Expand All @@ -900,6 +901,16 @@ struct RedisModule {
};
typedef struct RedisModule RedisModule;

/* The defrag context, used to manage state during calls to the data type
* defrag callback.
*/
struct RedisModuleDefragCtx {
monotime endtime;
unsigned long *cursor;
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
int dbid; /* The dbid of the key being processed, -1 when unknown. */
};

/* This is a wrapper for the 'rio' streams used inside rdb.c in Redis, so that
* the user does not have to take the total count of the written bytes nor
* to care about error conditions. */
Expand Down Expand Up @@ -2673,7 +2684,6 @@ size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
int moduleDefragValue(robj *key, robj *obj, int dbid);
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid);
void moduleDefragGlobals(void);
void moduleDefragStart(void);
void moduleDefragEnd(void);
void *moduleGetHandleByName(char *modulename);
Expand Down
41 changes: 25 additions & 16 deletions tests/modules/defragtest.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,43 @@ unsigned long int datatype_defragged = 0;
unsigned long int datatype_raw_defragged = 0;
unsigned long int datatype_resumes = 0;
unsigned long int datatype_wrong_cursor = 0;
unsigned long int global_attempts = 0;
unsigned long int global_strings_attempts = 0;
unsigned long int defrag_started = 0;
unsigned long int defrag_ended = 0;
unsigned long int global_defragged = 0;
unsigned long int global_strings_defragged = 0;

int global_strings_len = 0;
unsigned long global_strings_len = 0;
RedisModuleString **global_strings = NULL;

static void createGlobalStrings(RedisModuleCtx *ctx, int count)
static void createGlobalStrings(RedisModuleCtx *ctx, unsigned long count)
{
global_strings_len = count;
global_strings = RedisModule_Alloc(sizeof(RedisModuleString *) * count);

for (int i = 0; i < count; i++) {
for (unsigned long i = 0; i < count; i++) {
global_strings[i] = RedisModule_CreateStringFromLongLong(ctx, i);
}
}

static void defragGlobalStrings(RedisModuleDefragCtx *ctx)
static int defragGlobalStrings(RedisModuleDefragCtx *ctx)
{
for (int i = 0; i < global_strings_len; i++) {
RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[i]);
global_attempts++;
unsigned long cursor = 0;
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oranagra i don't use a global index here, i think it's better to tell module developer another way to use the cursor.

RedisModule_DefragCursorGet(ctx, &cursor);
RedisModule_Assert(cursor < global_strings_len);
for (; cursor < global_strings_len; cursor++) {
RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[cursor]);
global_strings_attempts++;
if (new != NULL) {
global_strings[i] = new;
global_defragged++;
global_strings[cursor] = new;
global_strings_defragged++;
}

if (RedisModule_DefragShouldStop(ctx)) {
RedisModule_DefragCursorSet(ctx, cursor);
return 1;
}
}
return 0;
}

static void defragStart(RedisModuleDefragCtx *ctx) {
Expand All @@ -70,8 +79,8 @@ static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) {
RedisModule_InfoAddFieldLongLong(ctx, "datatype_raw_defragged", datatype_raw_defragged);
RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes);
RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor);
RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts);
RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged);
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts);
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged);
RedisModule_InfoAddFieldLongLong(ctx, "defrag_started", defrag_started);
RedisModule_InfoAddFieldLongLong(ctx, "defrag_ended", defrag_ended);
}
Expand Down Expand Up @@ -99,8 +108,8 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
datatype_raw_defragged = 0;
datatype_resumes = 0;
datatype_wrong_cursor = 0;
global_attempts = 0;
global_defragged = 0;
global_strings_attempts = 0;
global_strings_defragged = 0;
defrag_started = 0;
defrag_ended = 0;

Expand Down Expand Up @@ -258,7 +267,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;

RedisModule_RegisterInfoFunc(ctx, FragInfo);
RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings);
RedisModule_RegisterDefragFunc2(ctx, defragGlobalStrings);
RedisModule_RegisterDefragCallbacks(ctx, defragStart, defragEnd);

return REDISMODULE_OK;
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/moduleapi/defrag.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ start_server {tags {"modules"} overrides {{save ""}}} {

after 2000
set info [r info defragtest_stats]
assert {[getInfoProperty $info defragtest_global_attempts] > 0}
assert {[getInfoProperty $info defragtest_global_strings_attempts] > 0}
assert_morethan [getInfoProperty $info defragtest_defrag_started] 0
assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0
}
Expand Down
Loading