Skip to content

Add support for module to defrag incremental #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: active-defrag
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1583d60
Missing --memkeys and --keystats for some options in redis-cli help t…
yveslb Feb 13, 2025
87124a3
Fix wrongly updating fsynced_reploff_pending when appendfsync=everyse…
ShooterIT Feb 13, 2025
662cb2f
Don't send unnecessary PING to replicas (#13790)
ShooterIT Feb 13, 2025
d29f04a
Add support for module to defrag incremental
sundb Feb 13, 2025
53c1d99
Fix complaint
sundb Feb 13, 2025
7f5f588
AOF offset info (#13773)
ShooterIT Feb 13, 2025
57807cd
Memory Usage command LIST accuracy fix (#13783)
ofirluzon Feb 14, 2025
0fd9d74
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 14, 2025
e260847
Add HGETDEL, HGETEX and HSETEX hash commands (#13798)
tezc Feb 14, 2025
6c202f4
Remove DENYOOM flag from hexpire command (#13800)
tezc Feb 16, 2025
467ddbb
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 17, 2025
6155198
Unify RedisModuleDefragFunc and RedisModuleTypeDefragFunc
sundb Feb 17, 2025
e85ebc4
Sync branch DefragRedisModuleDictDict
sundb Feb 17, 2025
98e13b3
Add RedisModule_RegisterDefragFunc2 module api
sundb Feb 17, 2025
4f0f7a7
Cleanup
sundb Feb 17, 2025
0f41e19
Add defragtest_global_strings_pauses to check string pauses
sundb Feb 17, 2025
69879fa
Cleanup
sundb Feb 17, 2025
2e0ff33
defragModuleCtx no longer stores module, which may be unlaoded midway
sundb Feb 17, 2025
9dfcdbb
Add support for both v1 and v2
sundb Feb 17, 2025
61fb33d
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 18, 2025
b4f859f
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 18, 2025
a64e6cf
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 18, 2025
a202a99
Style
sundb Feb 18, 2025
4cbe954
Fix RedisModuleDefragCtx calloc
sundb Feb 18, 2025
051705f
Update src/module.c
sundb Feb 18, 2025
c5f91ab
Fix syntax issue in comments of src/module.c (#13802)
yunxiao3 Feb 19, 2025
b045fe4
Fix overflow on 32-bit systems when calculating idle time for evictio…
luozongle01 Feb 19, 2025
66df58f
Do not send NL if replica client is already closed (#13813)
guybe7 Feb 19, 2025
dd40ed1
Merge branch 'active-defrag' into incrementail_defrag_module
sundb Feb 19, 2025
725cd26
Refactor of ActiveDefrag to reduce latencies (#13814)
sundb Feb 19, 2025
0a11110
Merge branch 'unstable' into incrementail_defrag_module
sundb Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 105 additions & 32 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath);
void aofManifestFreeAndUpdate(aofManifest *am);
void aof_background_fsync_and_close(int fd);

/* When we call 'startAppendOnly', we will create a temp INCR AOF, and rename
* it to the real INCR AOF name when the AOFRW is done, so if want to know the
* accurate start offset of the INCR AOF, we need to record it when we create
* the temp INCR AOF. This variable is used to record the start offset, and
* set the start offset of the real INCR AOF when the AOFRW is done. */
static long long tempIncAofStartReplOffset = 0;

/* ----------------------------------------------------------------------------
* AOF Manifest file implementation.
*
Expand Down Expand Up @@ -73,10 +80,15 @@ void aof_background_fsync_and_close(int fd);
#define AOF_MANIFEST_KEY_FILE_NAME "file"
#define AOF_MANIFEST_KEY_FILE_SEQ "seq"
#define AOF_MANIFEST_KEY_FILE_TYPE "type"
#define AOF_MANIFEST_KEY_FILE_STARTOFFSET "startoffset"
#define AOF_MANIFEST_KEY_FILE_ENDOFFSET "endoffset"

/* Create an empty aofInfo. */
aofInfo *aofInfoCreate(void) {
return zcalloc(sizeof(aofInfo));
aofInfo *ai = zcalloc(sizeof(aofInfo));
ai->start_offset = -1;
ai->end_offset = -1;
return ai;
}

/* Free the aofInfo structure (pointed to by ai) and its embedded file_name. */
Expand All @@ -93,6 +105,8 @@ aofInfo *aofInfoDup(aofInfo *orig) {
ai->file_name = sdsdup(orig->file_name);
ai->file_seq = orig->file_seq;
ai->file_type = orig->file_type;
ai->start_offset = orig->start_offset;
ai->end_offset = orig->end_offset;
return ai;
}

Expand All @@ -105,10 +119,19 @@ sds aofInfoFormat(sds buf, aofInfo *ai) {
if (sdsneedsrepr(ai->file_name))
filename_repr = sdscatrepr(sdsempty(), ai->file_name, sdslen(ai->file_name));

sds ret = sdscatprintf(buf, "%s %s %s %lld %s %c\n",
sds ret = sdscatprintf(buf, "%s %s %s %lld %s %c",
AOF_MANIFEST_KEY_FILE_NAME, filename_repr ? filename_repr : ai->file_name,
AOF_MANIFEST_KEY_FILE_SEQ, ai->file_seq,
AOF_MANIFEST_KEY_FILE_TYPE, ai->file_type);

if (ai->start_offset != -1) {
ret = sdscatprintf(ret, " %s %lld", AOF_MANIFEST_KEY_FILE_STARTOFFSET, ai->start_offset);
if (ai->end_offset != -1) {
ret = sdscatprintf(ret, " %s %lld", AOF_MANIFEST_KEY_FILE_ENDOFFSET, ai->end_offset);
}
}

ret = sdscatlen(ret, "\n", 1);
sdsfree(filename_repr);

return ret;
Expand Down Expand Up @@ -304,6 +327,10 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath) {
ai->file_seq = atoll(argv[i+1]);
} else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_TYPE)) {
ai->file_type = (argv[i+1])[0];
} else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_STARTOFFSET)) {
ai->start_offset = atoll(argv[i+1]);
} else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_FILE_ENDOFFSET)) {
ai->end_offset = atoll(argv[i+1]);
}
/* else if (!strcasecmp(argv[i], AOF_MANIFEST_KEY_OTHER)) {} */
}
Expand Down Expand Up @@ -433,12 +460,13 @@ sds getNewBaseFileNameAndMarkPreAsHistory(aofManifest *am) {
* for example:
* appendonly.aof.1.incr.aof
*/
sds getNewIncrAofName(aofManifest *am) {
sds getNewIncrAofName(aofManifest *am, long long start_reploff) {
aofInfo *ai = aofInfoCreate();
ai->file_type = AOF_FILE_TYPE_INCR;
ai->file_name = sdscatprintf(sdsempty(), "%s.%lld%s%s", server.aof_filename,
++am->curr_incr_file_seq, INCR_FILE_SUFFIX, AOF_FORMAT_SUFFIX);
ai->file_seq = am->curr_incr_file_seq;
ai->start_offset = start_reploff;
listAddNodeTail(am->incr_aof_list, ai);
am->dirty = 1;
return ai->file_name;
Expand All @@ -456,7 +484,7 @@ sds getLastIncrAofName(aofManifest *am) {

/* If 'incr_aof_list' is empty, just create a new one. */
if (!listLength(am->incr_aof_list)) {
return getNewIncrAofName(am);
return getNewIncrAofName(am, server.master_repl_offset);
}

/* Or return the last one. */
Expand Down Expand Up @@ -781,10 +809,11 @@ int openNewIncrAofForAppend(void) {
if (server.aof_state == AOF_WAIT_REWRITE) {
/* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */
new_aof_name = getTempIncrAofName();
tempIncAofStartReplOffset = server.master_repl_offset;
} else {
/* Dup a temp aof_manifest to modify. */
temp_am = aofManifestDup(server.aof_manifest);
new_aof_name = sdsdup(getNewIncrAofName(temp_am));
new_aof_name = sdsdup(getNewIncrAofName(temp_am, server.master_repl_offset));
}
sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
Expand Down Expand Up @@ -833,6 +862,50 @@ int openNewIncrAofForAppend(void) {
return C_ERR;
}

/* When we close gracefully the AOF file, we have the chance to persist the
* end replication offset of current INCR AOF. */
void updateCurIncrAofEndOffset(void) {
if (server.aof_state != AOF_ON) return;
serverAssert(server.aof_manifest != NULL);

if (listLength(server.aof_manifest->incr_aof_list) == 0) return;
aofInfo *ai = listNodeValue(listLast(server.aof_manifest->incr_aof_list));
ai->end_offset = server.master_repl_offset;
server.aof_manifest->dirty = 1;
/* It doesn't matter if the persistence fails since this information is not
* critical, we can get an approximate value by start offset plus file size. */
persistAofManifest(server.aof_manifest);
}

/* After loading AOF data, we need to update the `server.master_repl_offset`
* based on the information of the last INCR AOF, to avoid the rollback of
* the start offset of new INCR AOF. */
void updateReplOffsetAndResetEndOffset(void) {
if (server.aof_state != AOF_ON) return;
serverAssert(server.aof_manifest != NULL);

/* If the INCR file has an end offset, we directly use it, and clear it
* to avoid the next time we load the manifest file, we will use the same
* offset, but the real offset may have advanced. */
if (listLength(server.aof_manifest->incr_aof_list) == 0) return;
aofInfo *ai = listNodeValue(listLast(server.aof_manifest->incr_aof_list));
if (ai->end_offset != -1) {
server.master_repl_offset = ai->end_offset;
ai->end_offset = -1;
server.aof_manifest->dirty = 1;
/* We must update the end offset of INCR file correctly, otherwise we
* may keep wrong information in the manifest file, since we continue
* to append data to the same INCR file. */
if (persistAofManifest(server.aof_manifest) != AOF_OK)
exit(1);
} else {
/* If the INCR file doesn't have an end offset, we need to calculate
* the replication offset by the start offset plus the file size. */
server.master_repl_offset = (ai->start_offset == -1 ? 0 : ai->start_offset) +
getAppendOnlyFileSize(ai->file_name, NULL);
}
}

/* Whether to limit the execution of Background AOF rewrite.
*
* At present, if AOFRW fails, redis will automatically retry. If it continues
Expand Down Expand Up @@ -938,6 +1011,7 @@ void stopAppendOnly(void) {
server.aof_last_fsync = server.mstime;
}
close(server.aof_fd);
updateCurIncrAofEndOffset();

server.aof_fd = -1;
server.aof_selected_db = -1;
Expand Down Expand Up @@ -1071,35 +1145,34 @@ void flushAppendOnlyFile(int force) {
mstime_t latency;

if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_last_incr_fsync_offset != server.aof_last_incr_size &&
server.mstime - server.aof_last_fsync >= 1000 &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;

/* Check if we need to do fsync even the aof buffer is empty,
* the reason is described in the previous AOF_FSYNC_EVERYSEC block,
* and AOF_FSYNC_ALWAYS is also checked here to handle a case where
* aof_fsync is changed from everysec to always. */
} else if (server.aof_fsync == AOF_FSYNC_ALWAYS &&
server.aof_last_incr_fsync_offset != server.aof_last_incr_size)
{
goto try_fsync;
} else {
if (server.aof_last_incr_fsync_offset == server.aof_last_incr_size) {
/* All data is fsync'd already: Update fsynced_reploff_pending just in case.
* This is needed to avoid a WAITAOF hang in case a module used RM_Call with the NO_AOF flag,
* in which case master_repl_offset will increase but fsynced_reploff_pending won't be updated
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
* This is needed to avoid a WAITAOF hang in case a module used RM_Call
* with the NO_AOF flag, in which case master_repl_offset will increase but
* fsynced_reploff_pending won't be updated (because there's no reason, from
* the AOF POV, to call fsync) and then WAITAOF may wait on the higher offset
* (which contains data that was only propagated to replicas, and not to AOF) */
if (!aofFsyncInProgress())
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
return;
} else {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.mstime - server.aof_last_fsync >= 1000 &&
!(sync_in_progress = aofFsyncInProgress()))
goto try_fsync;

/* Check if we need to do fsync even the aof buffer is empty,
* the reason is described in the previous AOF_FSYNC_EVERYSEC block,
* and AOF_FSYNC_ALWAYS is also checked here to handle a case where
* aof_fsync is changed from everysec to always. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
goto try_fsync;
}
return;
}

if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
Expand Down Expand Up @@ -2665,7 +2738,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
sds temp_incr_aof_name = getTempIncrAofName();
sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name);
/* Get next new incr aof name. */
sds new_incr_filename = getNewIncrAofName(temp_am);
sds new_incr_filename = getNewIncrAofName(temp_am, tempIncAofStartReplOffset);
new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
latencyStartMonitor(latency);
if (rename(temp_incr_filepath, new_incr_filepath) == -1) {
Expand Down
Loading
Loading