Skip to content

Add function to calculate average TTL #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: redesign-expire
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 191 additions & 1 deletion src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ estore *estoreCreate(EbucketsType *type, int num_buckets_bits) {
}

es->count = 0;
es->sum_ttl = 0;
return es;
}

Expand All @@ -73,6 +74,7 @@ void estoreEmpty(estore *es) {
}

es->count = 0;
es->sum_ttl = 0;
}

/* Release an expiration store (free all memory) */
Expand All @@ -91,9 +93,14 @@ void estoreRelease(estore *es) {
void estoreRemove(estore *es, int slot, kvobj *kv) {
if (es == NULL) return;

long long expire_time = kvobjGetExpire(kv);
ebuckets *bucket = estoreGetBucket(es, slot);
if (ebRemove(bucket, es->bucket_type, kv)) {
es->count--;
if(es->count == 0) /* Reset sum_ttl if no items left */
es->sum_ttl = 0;
else /* Reduce the sum_ttl by the expired time, but not below zero */
es->sum_ttl -= expire_time - server.mstime > 0 ? expire_time - server.mstime : 0;
}
}

Expand All @@ -102,8 +109,10 @@ void estoreAdd(estore *es, kvobj *kv, int slot, long long when) {
if (es == NULL || kv == NULL) return;

ebuckets *bucket = estoreGetBucket(es, slot);
if (ebAdd(bucket, es->bucket_type, kv, when) == 0)
if (ebAdd(bucket, es->bucket_type, kv, when) == 0) {
es->count++;
es->sum_ttl += when - server.mstime > 0 ? when - server.mstime : 0;
}
}

void estoreIncrementalCascade(estore *es, uint64_t now, uint64_t maxCascade) {
Expand Down Expand Up @@ -341,11 +350,35 @@ static ExpireAction keyExpireCallback(eItem item, void *ctx) {
return ACT_REMOVE_EXP_ITEM;
}

void estoreReduceTTLSum(estore *es, long long elapsed) {
if (es == NULL || es->sum_ttl == 0 || elapsed <= 0) return;

/* Reduce the TTL sum by the elapsed time, but not below zero */
long long total_elapsed = elapsed * es->count;
if (es->sum_ttl > total_elapsed) {
es->sum_ttl -= total_elapsed;
} else {
es->sum_ttl = 0;
}

/* If there are no items left, reset the sum */
if (es->count == 0) {
es->sum_ttl = 0;
}
}

/* Perform active expiration on keys using ebuckets */
unsigned int estoreActiveExpire(redisDb *db, unsigned int max_keys) {
if (db == NULL || db->expiresNew == NULL) return 0;
if (estoreSize(db->expiresNew) == 0) return 0;

static mstime_t last_run_time = 0;
if (last_run_time == 0)
last_run_time = server.mstime;

mstime_t ttl_elapsed = server.mstime - last_run_time;
last_run_time = server.mstime;

unsigned int keys_expired = 0;
mstime_t now = mstime();

Expand Down Expand Up @@ -399,6 +432,10 @@ unsigned int estoreActiveExpire(redisDb *db, unsigned int max_keys) {
keys_expired = info.itemsExpired;
}

db->expiresNew->count -= keys_expired;
estoreReduceTTLSum(db->expiresNew, ttl_elapsed);
db->avg_ttl = estoreGetAvgTTL(db->expiresNew);

return keys_expired;
}

Expand All @@ -421,6 +458,12 @@ size_t estoreMemUsage(estore *es) {
return 0;
}

long long estoreGetAvgTTL(estore *es) {
if (es == NULL || es->count == 0) return 0;

/* Return the average TTL in milliseconds */
return es->sum_ttl / es->count;
}

void activeExpireCycle(int type) {
/* Adjust the running parameters according to the configured expire
Expand Down Expand Up @@ -1107,3 +1150,150 @@ void touchCommand(client *c) {
if (lookupKeyRead(c->db,c->argv[j]) != NULL) touched++;
addReplyLongLong(c,touched);
}

#ifdef REDIS_TEST
#include <stdio.h>
#include "testhelp.h"

#define TEST(name) printf("test — %s\n", name);
typedef struct TestKVObj {
kvobj obj;
ExpireMeta mexpire;
} TestKVObj;

ExpireMeta *getTestKVObjExpireMeta(const void *item) {
return &((TestKVObj *)item)->mexpire;
}

void deleteTestKVObjCallback(eItem item, void *ctx) {
UNUSED(ctx);
UNUSED(item);
}

EbucketsType testEbType = {
.getExpireMeta = getTestKVObjExpireMeta,
.onDeleteItem = deleteTestKVObjCallback,
.itemsAddrAreOdd = 0,
.ebp.precision = 0,
.ebp.keySize = EB_PRECISION2KEYSIZE(0 /*.precision*/),
};

/* ./redis-server test expire */
int expireTest(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);

TEST("estoreGetAvgTTL computes average TTL correctly") {
server.cluster_enabled = 0;

estore *es = estoreCreate(&testEbType, 1); /* 2 buckets */

TestKVObj *kv1 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv2 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv3 = zmalloc(sizeof(TestKVObj));

/* Add 3 objects with TTLs: 10, 20, 30 */
mstime_t now = mstime();
server.mstime = now; /* Set current time for consistency */
estoreAdd(es, (kvobj*)kv1, 0, now + 10 * 1000); /* 10 seconds */
estoreAdd(es, (kvobj*)kv2, 0, now + 20 * 1000); /* 20 seconds */
estoreAdd(es, (kvobj*)kv3, 0, now + 30 * 1000); /* 30 seconds */

/* Average TTL should be (10 + 20 + 30) / 3 = 20 seconds */
uint64_t expected = 20 * 1000;
uint64_t actual = estoreGetAvgTTL(es);

assert(actual == expected);

zfree(kv1);
zfree(kv2);
zfree(kv3);
estoreRelease(es);
}

TEST("estoreGetAvgTTL after removing some objects with mstime-based TTLs") {
server.cluster_enabled = 0;

estore *es = estoreCreate(&testEbType, 1); /* 2 buckets */

TestKVObj *kv1 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv2 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv3 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv4 = zmalloc(sizeof(TestKVObj));

mstime_t now = mstime();
server.mstime = now; /* Set current time for consistency */

/* Add 4 objects with TTLs: 10, 20, 30, 40 seconds */
estoreAdd(es, (kvobj*)kv1, 0, now + 10 * 1000);
estoreAdd(es, (kvobj*)kv2, 0, now + 20 * 1000);
estoreAdd(es, (kvobj*)kv3, 0, now + 30 * 1000);
estoreAdd(es, (kvobj*)kv4, 0, now + 40 * 1000);

/* Remove kv1 and kv3 */
((kvobj*)kv1)->expirable = 1; /* Set expirable flag */
((kvobj*)kv3)->expirable = 1; /* Set expirable flag */
estoreRemove(es, 0, (kvobj*)kv1);
estoreRemove(es, 0, (kvobj*)kv3);

/* Remaining TTLs: 20 and 40 seconds → avg = (20 + 40) / 2 = 30 seconds */
uint64_t expected = 30 * 1000;
uint64_t actual = estoreGetAvgTTL(es);

assert(actual == expected);

zfree(kv1);
zfree(kv2);
zfree(kv3);
zfree(kv4);
estoreRelease(es);
}

TEST("estoreReduceTTLSum reduces TTL sum by elapsed time") {
server.cluster_enabled = 0;

estore *es = estoreCreate(&testEbType, 1); /* 2 buckets */

TestKVObj *kv1 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv2 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv3 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv4 = zmalloc(sizeof(TestKVObj));
TestKVObj *kv5 = zmalloc(sizeof(TestKVObj));

mstime_t now = mstime();
server.mstime = now;

/* Add 5 objects with TTLs: 10, 20, 30, 40, 50 seconds */
estoreAdd(es, (kvobj*)kv1, 0, now + 10 * 1000);
estoreAdd(es, (kvobj*)kv2, 0, now + 20 * 1000);
estoreAdd(es, (kvobj*)kv3, 0, now + 30 * 1000);
estoreAdd(es, (kvobj*)kv4, 0, now + 40 * 1000);
estoreAdd(es, (kvobj*)kv5, 0, now + 50 * 1000);

/* Before reduction: average TTL = (10+20+30+40+50)/5 = 30s = 30000ms */
uint64_t expected_before = 30 * 1000;
uint64_t actual_before = estoreGetAvgTTL(es);
assert(actual_before == expected_before);

/* Simulate 2 seconds elapsed */
long long elapsed = 2 * 1000;
estoreReduceTTLSum(es, elapsed);

/* After reduction: TTL sum should drop by 2s per object → total 10s = 10000ms */
/* New average TTL = (150000 - 10000) / 5 = 28000 ms */
uint64_t expected = 28 * 1000;
uint64_t actual = estoreGetAvgTTL(es);
assert(actual == expected);

zfree(kv1);
zfree(kv2);
zfree(kv3);
zfree(kv4);
zfree(kv5);
estoreRelease(es);
}

return 0;
}
#endif
7 changes: 7 additions & 0 deletions src/expire.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ typedef struct _estore {
int num_buckets_bits; /* Log2 of the number of buckets */
int num_buckets; /* Number of buckets (1 << num_buckets_bits) */
unsigned long long count; /* Total number of kv's in this estore */
long long sum_ttl; /* Sum of TTLs of all kv's in this estore */
} estore;

extern EbucketsType estoreBucketsType;
Expand Down Expand Up @@ -60,4 +61,10 @@ ebuckets *estoreGetBucket(estore *es, int slot);

size_t estoreMemUsage(estore *es);

long long estoreGetAvgTTL(estore *es);

#ifdef REDIS_TEST
int expireTest(int argc, char *argv[], int flags);
#endif

#endif
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -7296,6 +7296,7 @@ struct redisTest {
{"listpack", listpackTest},
{"kvstore", kvstoreTest},
{"ebuckets", ebucketsTest},
{"expire", expireTest}
};
redisTestProc *getTestProcByName(const char *name) {
int numtests = sizeof(redisTests)/sizeof(struct redisTest);
Expand Down