Skip to content

Commit c77b12b

Browse files
committed
feat: rehash
1 parent d30e49e commit c77b12b

File tree

7 files changed

+165
-47
lines changed

7 files changed

+165
-47
lines changed

client/client_main.cc

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,18 @@
11
#include <iostream>
22
#include "kvclient.h"
3+
#define DEBUG
4+
5+
#ifdef DEBUG
6+
#include "test.h"
7+
#endif
38

4-
void testExpire(KVClient* c) {
5-
int res = c->setKV("yunfei", "23", MiniKV_STRING);
6-
if (res == MiniKV_SET_SUCCESS) {
7-
fprintf(stdout, "set key success!\n");
8-
} else {
9-
fprintf(stderr, "set key failed!\n");
10-
exit(1);
11-
}
12-
GetRes ans = c->getK("yunfei");
13-
std::cout << ans.data[0] << std::endl;
14-
res = c->setExpires("yunfei", 1000);
15-
if (res == MiniKV_SET_EXPIRE_SUCCESS) {
16-
fprintf(stdout, "set expire success!\n");
17-
} else {
18-
fprintf(stderr, "set expire failed!\n");
19-
exit(1);
20-
}
21-
sleep(3);
22-
ans = c->getK("yunfei");
23-
std::cout << (ans.data.empty() ? "expired" : "expire doesn't work") << std::endl;
24-
}
259

2610
int main(int argc, char** argv) {
2711
KVClient* kvclient = new KVClient("127.0.0.1", 6789);
28-
testExpire(kvclient);
29-
// int res = kvclient->setKV("yunfei", "23", MiniKV_STRING);
30-
// if (res == MiniKV_SET_SUCCESS) {
31-
// fprintf(stdout, "set key success!\n");
32-
// } else {
33-
// fprintf(stderr, "set key failed!\n");
34-
// exit(1);
35-
// }
36-
// GetRes ans = kvclient->getK("yunfei");
37-
// if (ans.encoding == MiniKV_GET_SUCCESS) {
38-
// fprintf(stdout, "get key success!\n");
39-
// } else {
40-
// fprintf(stderr, "get key failed!\n");
41-
// exit(1);
42-
// }
43-
// for (int i = 0; i < ans.data.size(); ++i) {
44-
// std::cout << ans.data[i] << " ";
45-
// }std::cout << std::endl;
12+
// testExpire(kvclient);
13+
testSetLists(kvclient);
14+
15+
4616

4717
// res = kvclient->setKV("qjx", "23", MiniKV_LIST);
4818
// if (res == MiniKV_SET_SUCCESS) {

client/test.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#ifndef TEST_H
2+
#define TEST_H
3+
#include "kvclient.h"
4+
5+
void testExpire(KVClient* c) {
6+
int res = c->setKV("yunfei", "23", MiniKV_STRING);
7+
if (res == MiniKV_SET_SUCCESS) {
8+
fprintf(stdout, "set key success!\n");
9+
} else {
10+
fprintf(stderr, "set key failed!\n");
11+
exit(1);
12+
}
13+
GetRes ans = c->getK("yunfei");
14+
std::cout << ans.data[0] << std::endl;
15+
res = c->setExpires("yunfei", 1000);
16+
if (res == MiniKV_SET_EXPIRE_SUCCESS) {
17+
fprintf(stdout, "set expire success!\n");
18+
} else {
19+
fprintf(stderr, "set expire failed!\n");
20+
exit(1);
21+
}
22+
sleep(3);
23+
ans = c->getK("yunfei");
24+
std::cout << (ans.data.empty() ? "expired" : "expire doesn't work") << std::endl;
25+
}
26+
27+
void testSetLists(KVClient* c) {
28+
for (int i = 0; i < 100; ++i) {
29+
int res = c->setKV("yunfei", "23", MiniKV_LIST);
30+
if (res == MiniKV_SET_SUCCESS) {
31+
fprintf(stdout, "set key success!\n");
32+
} else {
33+
fprintf(stderr, "set key failed!\n");
34+
exit(1);
35+
}
36+
}
37+
GetRes ans = c->getK("yunfei");
38+
if (ans.encoding == MiniKV_GET_SUCCESS) {
39+
fprintf(stdout, "get key success!\n");
40+
} else {
41+
fprintf(stderr, "get key failed!\n");
42+
exit(1);
43+
}
44+
for (int i = 0; i < ans.data.size(); ++i) {
45+
std::cout << ans.data[i] << " ";
46+
} std::cout << std::endl;
47+
}
48+
49+
#endif

minikv.rdb

594 Bytes
Binary file not shown.

server/db.cc

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ static constexpr uint32_t sizeofChar = sizeof(char);
88

99
MiniKVDB::MiniKVDB() {
1010
hashSize_ = HASH_SIZE_INIT;
11-
hash1_ = std::unique_ptr<HashTable>(new HashTable(hashSize_));
11+
hash1_ = std::shared_ptr<HashTable>(new HashTable(hashSize_));
12+
hash2_ = std::shared_ptr<HashTable>(new HashTable(hashSize_));
1213
expires_ = std::unique_ptr<HashTable>(new HashTable(hashSize_));
1314
io_ = std::unique_ptr<KVio>(new KVio(RDB_FILE_NAME));
1415
rdbFileReadInitDB();
1516
rdbe_ = new rdbEntry();
1617
timer_ = std::shared_ptr<KVTimer>(new KVTimer());
1718
timer_->start(RDB_INTERVAL, std::bind(&MiniKVDB::rdbSave, this));
19+
timer_->start(REHASH_DETECT_INTERVAL, std::bind(&MiniKVDB::rehash, this));
20+
rehashFlag_ = false;
1821
}
1922

2023
MiniKVDB::~MiniKVDB() {
@@ -24,7 +27,16 @@ MiniKVDB::~MiniKVDB() {
2427
}
2528

2629
void MiniKVDB::insert(std::string key, std::string val, uint32_t encoding) {
27-
hash1_->insert(key, val, encoding);
30+
if (!rehashFlag_) {
31+
hash1_->insert(key, val, encoding);
32+
return;
33+
}
34+
if (hash1_->exist(key)) {
35+
hash1_->insert(key, val, encoding);
36+
return;
37+
}
38+
hash2_->insert(key, val, encoding);
39+
progressiveRehash(hash2_);
2840
}
2941

3042
void MiniKVDB::get(std::string key, std::vector<std::string>& res) {
@@ -44,7 +56,16 @@ void MiniKVDB::get(std::string key, std::vector<std::string>& res) {
4456
return;
4557
}
4658
}
47-
hash1_->get(key, res);
59+
if (!rehashFlag_) {
60+
hash1_->get(key, res);
61+
return;
62+
}
63+
if (hash1_->exist(key)) {
64+
hash1_->get(key, res);
65+
return;
66+
}
67+
hash2_->get(key, res);
68+
progressiveRehash(hash2_);
4869
}
4970

5071
int MiniKVDB::del(std::string key) {
@@ -196,4 +217,23 @@ void MiniKVDB::rdbFileReadInitDB() {
196217
rdbLoadEntry();
197218
}
198219
kvlogi("Rdb file loaded.");
220+
}
221+
222+
void MiniKVDB::rehash() {
223+
if (rehashFlag_) return;
224+
std::shared_lock<std::shared_mutex> lk(smutex_);
225+
if (!hash1_->needRehash()) return;
226+
lk.unlock();
227+
rehashFlag_ = true;
228+
hash2_->resize(hash1_->size() * 2);
229+
}
230+
231+
void MiniKVDB::progressiveRehash(std::shared_ptr<HashTable> h2) {
232+
if (!rehashFlag_) return;
233+
bool completed = hash1_->progressiveRehash(h2, REHASH_MAX_EMPTY_VISITS);
234+
if (completed) {
235+
rehashFlag_ = false;
236+
std::swap(hash1_, hash2_);
237+
hash2_->clear();
238+
}
199239
}

server/db.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
#include <mutex>
99
#include <shared_mutex>
1010
#define HASH_SIZE_INIT 512
11-
#define THREAD_POOL_NUM 4
11+
#define THREAD_POOL_NUM 2
1212
#define RDB_INTERVAL 5000 // 5000ms rdb save triggered
13+
#define REHASH_DETECT_INTERVAL 5000
14+
#define REHASH_MAX_EMPTY_VISITS 50
1315
#define RDB_FILE_NAME "minikv.rdb"
1416

1517

@@ -33,13 +35,14 @@ typedef struct rdbEntry {
3335

3436
typedef class MiniKVDB {
3537
private:
36-
std::unique_ptr<HashTable> hash1_;
37-
std::unique_ptr<HashTable> hash2_;
38+
std::shared_ptr<HashTable> hash1_;
39+
std::shared_ptr<HashTable> hash2_;
3840
std::unique_ptr<HashTable> expires_;
3941
uint64_t hashSize_;
4042
std::unique_ptr<KVio> io_;
4143
std::shared_ptr<KVTimer> timer_;
4244
rdbEntry* rdbe_;
45+
bool rehashFlag_;
4346

4447
void saveKVWithEncoding(std::string key, uint32_t encoding, void* data);
4548
void makeStringObject2RDBEntry(rdbEntry* rdbe, std::string key, uint32_t encoding, void* data);
@@ -48,6 +51,8 @@ typedef class MiniKVDB {
4851
void rdbFileReadInitDB();
4952
void rdbLoadEntry();
5053
void rdbSave();
54+
void rehash();
55+
void progressiveRehash(std::shared_ptr<HashTable> hash2);
5156
// void fixedTimeDeleteExpiredKey();
5257
public:
5358
void insert(std::string key, std::string val, uint32_t encoding);

server/hash.h

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
#include <vector>
88
#include "../type/encoding.h"
99
#include "../log/log.h"
10-
1110
typedef struct kvString {
1211
uint32_t len;
1312
std::shared_ptr<char[]> data;
@@ -23,6 +22,9 @@ typedef struct Entry {
2322
class HashTable {
2423
public:
2524
std::vector<std::list<std::shared_ptr<Entry>>> hash_;
25+
std::size_t keyNum_;
26+
int size_;
27+
int rehashIndex_;
2628
private:
2729
int hash(std::string key) {
2830
unsigned int seed = 131; // 31 131 1313 13131 131313 etc..
@@ -63,6 +65,8 @@ class HashTable {
6365
HashTable() = delete;
6466
HashTable(int size) {
6567
hash_.resize(size);
68+
keyNum_ = rehashIndex_ = 0;
69+
size_ = size;
6670
}
6771
void insert(std::string key, std::string val, uint32_t encoding) {
6872
int slot = hash(key);
@@ -82,6 +86,13 @@ class HashTable {
8286
Entry* entry = new Entry;
8387
insertWithEncoding(entry, key, val, encoding);
8488
hash_[slot].push_front(std::shared_ptr<Entry>(entry));
89+
keyNum_++;
90+
}
91+
92+
void insertEntry(std::shared_ptr<Entry> entry, std::string key) {
93+
int slot = hash(key);
94+
hash_[slot].push_front(entry);
95+
keyNum_++;
8596
}
8697

8798
void get(std::string key, std::vector<std::string>& res) {
@@ -126,6 +137,49 @@ class HashTable {
126137
});
127138
return flag ? MiniKV_DEL_SUCCESS : MiniKV_DEL_FAIL;
128139
}
140+
141+
bool needRehash() {
142+
if ((double)keyNum_ / size_ > 1.5) {
143+
return true;
144+
}
145+
return false;
146+
}
147+
148+
void resize(int size) {
149+
hash_.resize(size);
150+
size_ = size;
151+
}
152+
153+
size_t size() {
154+
return size_;
155+
}
156+
157+
void clear() {
158+
keyNum_ = rehashIndex_ = 0;
159+
hash_.clear();
160+
}
161+
162+
bool progressiveRehash(std::shared_ptr<HashTable> h2, int emptyVisits) {
163+
auto& d2 = h2.get()->hash_;
164+
auto& d1 = hash_;
165+
for (int i = rehashIndex_; i < d1.size() && emptyVisits > 0; ++i) {
166+
if (d1[i].empty()) {
167+
emptyVisits--;
168+
rehashIndex_ = i + 1;
169+
continue;
170+
}
171+
for (auto it = d1[i].begin(); it != d1[i].end(); ++it) {
172+
std::string key = (*it).get()->key;
173+
h2->insertEntry(*it, key);
174+
}
175+
d1[i].clear();
176+
rehashIndex_ = i + 1;
177+
}
178+
if (rehashIndex_ == d1.size()) {
179+
return true;
180+
}
181+
return false;
182+
}
129183
};
130184

131185
#endif

server/io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class KVio {
3939
if (feof(fp_)) {
4040
return true; // empty file
4141
}
42-
rewind(fp_);
42+
fseek(fp_,-1,SEEK_CUR);
4343
return false;
4444
}
4545
void clearFile() {

0 commit comments

Comments
 (0)