@@ -106,6 +106,8 @@ class SharedRocksDBState {
106106 rocksdb::Options getOptions () const { return rocksdb::Options (this ->dbOptions , this ->cfOptions ); }
107107 rocksdb::ReadOptions getReadOptions () { return this ->readOptions ; }
108108 rocksdb::FlushOptions getFlushOptions () { return this ->flushOptions ; }
109+ double getLastFlushTime () const { return this ->lastFlushTime_ ; }
110+ void setLastFlushTime (double lastFlushTime) { this ->lastFlushTime_ = lastFlushTime; }
109111
110112private:
111113 const UID id;
@@ -119,6 +121,7 @@ class SharedRocksDBState {
119121 rocksdb::ColumnFamilyOptions cfOptions;
120122 rocksdb::ReadOptions readOptions;
121123 rocksdb::FlushOptions flushOptions;
124+ std::atomic<double > lastFlushTime_;
122125};
123126
124127SharedRocksDBState::SharedRocksDBState (UID id)
@@ -373,12 +376,14 @@ class RocksDBErrorListener : public rocksdb::EventListener {
373376
374377class RocksDBEventListener : public rocksdb ::EventListener {
375378public:
376- RocksDBEventListener (std::shared_ptr<double > lastFlushTime ) : lastFlushTime(lastFlushTime ){};
379+ RocksDBEventListener (std::shared_ptr<SharedRocksDBState> sharedState ) : sharedState(sharedState ){};
377380
378- void OnFlushCompleted (rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now (); }
381+ void OnFlushCompleted (rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override {
382+ sharedState->setLastFlushTime (now ());
383+ }
379384
380385private:
381- std::shared_ptr<double > lastFlushTime ;
386+ std::shared_ptr<SharedRocksDBState> sharedState ;
382387};
383388
384389using DB = rocksdb::DB*;
@@ -985,19 +990,23 @@ ACTOR Future<Void> flowLockLogger(UID id, const FlowLock* readLock, const FlowLo
985990 }
986991}
987992
988- ACTOR Future<Void> manualFlush (UID id,
989- rocksdb::DB* db,
990- std::shared_ptr<SharedRocksDBState> sharedState,
991- std::shared_ptr<double > lastFlushTime,
992- CF cf) {
993+ ACTOR Future<Void> manualFlush (UID id, rocksdb::DB* db, std::shared_ptr<SharedRocksDBState> sharedState, CF cf) {
993994 if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL ) {
994995 state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions ();
996+ state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL ;
997+ state double currTime = 0 ;
998+ state int timeElapsedAfterLastFlush = 0 ;
995999 loop {
996- wait (delay (SERVER_KNOBS-> ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL ));
1000+ wait (delay (waitTime ));
9971001
998- if ((now () - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL ) {
1002+ currTime = now ();
1003+ timeElapsedAfterLastFlush = currTime - sharedState->getLastFlushTime ();
1004+ if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL ) {
9991005 db->Flush (fOptions , cf);
1000- TraceEvent e (" RocksDBManualFlush" , id);
1006+ waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL ;
1007+ TraceEvent (" RocksDBManualFlush" , id).detail (" TimeElapsedAfterLastFlush" , timeElapsedAfterLastFlush);
1008+ } else {
1009+ waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush;
10011010 }
10021011 }
10031012 }
@@ -1288,11 +1297,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
12881297 const FlowLock* fetchLock,
12891298 std::shared_ptr<RocksDBErrorListener> errorListener,
12901299 std::shared_ptr<RocksDBEventListener> eventListener,
1291- std::shared_ptr<double > lastFlushTime,
12921300 Counters& counters)
12931301 : path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock),
1294- errorListener (errorListener), eventListener(eventListener), lastFlushTime(lastFlushTime),
1295- counters(counters) {}
1302+ errorListener (errorListener), eventListener(eventListener), counters(counters) {}
12961303
12971304 double getTimeEstimate () const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE ; }
12981305 };
@@ -1361,7 +1368,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
13611368 rocksDBMetricLogger (
13621369 id, sharedState, options.statistics , perfContextMetrics, db, readIterPool, &a.counters , cf) &&
13631370 flowLockLogger (id, a.readLock , a.fetchLock ) && refreshReadIteratorPool (readIterPool) &&
1364- manualFlush (id, db, sharedState, a. lastFlushTime , cf);
1371+ manualFlush (id, db, sharedState, cf);
13651372 } else {
13661373 onMainThread ([&] {
13671374 a.metrics = rocksDBMetricLogger (id,
@@ -1373,7 +1380,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
13731380 &a.counters ,
13741381 cf) &&
13751382 flowLockLogger (id, a.readLock , a.fetchLock ) && refreshReadIteratorPool (readIterPool) &&
1376- manualFlush (id, db, sharedState, a. lastFlushTime , cf);
1383+ manualFlush (id, db, sharedState, cf);
13771384 return Future<bool >(true );
13781385 }).blockUntilReady ();
13791386 }
@@ -1887,8 +1894,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
18871894 numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
18881895 numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
18891896 errorListener(std::make_shared<RocksDBErrorListener>(id)), errorFuture(errorListener->getFuture ()) {
1890- lastFlushTime = std::make_shared<double >(now ());
1891- eventListener = std::make_shared<RocksDBEventListener>(lastFlushTime);
1897+ eventListener = std::make_shared<RocksDBEventListener>(sharedState);
18921898 // In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
18931899 // is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
18941900 // block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
@@ -2082,7 +2088,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
20822088 return openFuture;
20832089 }
20842090 auto a = std::make_unique<Writer::OpenAction>(
2085- path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, lastFlushTime, counters);
2091+ path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, counters);
20862092 openFuture = a->done .getFuture ();
20872093 writeThread->post (a.release ());
20882094 return openFuture;
@@ -2400,7 +2406,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
24002406 Reference<IThreadPool> readThreads;
24012407 std::shared_ptr<RocksDBErrorListener> errorListener;
24022408 std::shared_ptr<RocksDBEventListener> eventListener;
2403- std::shared_ptr<double > lastFlushTime;
24042409 Future<Void> errorFuture;
24052410 Promise<Void> closePromise;
24062411 Future<Void> openFuture;
0 commit comments