Skip to content

Commit 59c5c3a

Browse files
committed
RedisConnection: simplify query prioritization logic
As opposed to the previous version which used a complex data structure to correctly manage the query priorities, this version uses a `std::deque` for the write queue and a simple mechanism to insert high-priority items at the front. By default, items are processed in FIFO order, but if someone wants to immediately send a high-priority query it will be placed at the front of the queue (remember std::deque allows efficient insertion at both ends), an will overtake any normal priority items already queued. However, if there are already high-priority items in the queue, the new high-priority item will be inserted after them but still before any normal priority items, ensuring that all high-priority items are processed in the order they were enqueued.
1 parent 6ec40d6 commit 59c5c3a

File tree

7 files changed

+119
-112
lines changed

7 files changed

+119
-112
lines changed

lib/icingadb/icingadb-objects.cpp

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@
4646

4747
using namespace icinga;
4848

49-
using Prio = RedisConnection::QueryPriority;
50-
5149
std::map<Type::Ptr, RedisKey> IcingaDB::m_CustomVarKeys;
5250
std::unordered_set<Type*> IcingaDB::m_IndexedTypes;
5351

@@ -197,7 +195,9 @@ void IcingaDB::ConfigStaticInitialize()
197195

198196
void IcingaDB::UpdateAllConfigObjects()
199197
{
200-
m_Rcon->FireAndForgetQuery({"XADD", "icinga:schema", "MAXLEN", "1", "*", "version", "6"}, Prio::Heartbeat);
198+
// This function performs an initial dump of all configuration objects into Redis, thus there are no
199+
// previously enqueued queries on m_Rcon that we need to wait for. So, no Sync() call is necessary here.
200+
m_Rcon->FireAndForgetQuery({"XADD", "icinga:schema", "MAXLEN", "1", "*", "version", "6"}, {}, true);
201201

202202
Log(LogInformation, "IcingaDB") << "Starting initial config/status dump";
203203
double startTime = Utility::GetTime();
@@ -215,7 +215,7 @@ void IcingaDB::UpdateAllConfigObjects()
215215
std::vector<Type::Ptr> types = GetTypes();
216216

217217
// Add a new type=* state=wip entry to the stream and remove all previous entries (MAXLEN 1).
218-
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "MAXLEN", "1", "*", "key", "*", "state", "wip"}, Prio::Config);
218+
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "MAXLEN", "1", "*", "key", "*", "state", "wip"});
219219

220220
const std::vector<String> globalKeys = {
221221
m_PrefixConfigObject + "customvar",
@@ -232,8 +232,8 @@ void IcingaDB::UpdateAllConfigObjects()
232232
m_PrefixConfigObject + "redundancygroup",
233233
m_PrefixConfigObject + "redundancygroup:state",
234234
};
235-
DeleteKeys(m_Rcon, globalKeys, Prio::Config);
236-
DeleteKeys(m_Rcon, {"icinga:nextupdate:host", "icinga:nextupdate:service"}, Prio::Config);
235+
DeleteKeys(m_Rcon, globalKeys);
236+
DeleteKeys(m_Rcon, {"icinga:nextupdate:host", "icinga:nextupdate:service"});
237237
m_Rcon->Sync();
238238

239239
Defer resetDumpedGlobals ([this]() {
@@ -253,7 +253,7 @@ void IcingaDB::UpdateAllConfigObjects()
253253
auto& rcon (m_Rcons.at(ctype));
254254

255255
std::vector<String> keys = GetTypeOverwriteKeys(lcType);
256-
DeleteKeys(rcon, keys, Prio::Config);
256+
DeleteKeys(rcon, keys);
257257

258258
WorkQueue upqObjectType(25000, Configuration::Concurrency, LogNotice);
259259
upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType);
@@ -265,9 +265,7 @@ void IcingaDB::UpdateAllConfigObjects()
265265
String cursor = "0";
266266

267267
do {
268-
Array::Ptr res = rcon->GetResultOfQuery({
269-
"HSCAN", configCheckSum, cursor, "COUNT", "1000"
270-
}, Prio::Config);
268+
Array::Ptr res = rcon->GetResultOfQuery({"HSCAN", configCheckSum, cursor, "COUNT", "1000"});
271269

272270
AddKvsToMap(res->Get(1), redisCheckSums);
273271

@@ -410,7 +408,7 @@ void IcingaDB::UpdateAllConfigObjects()
410408
setChecksum.clear();
411409
setObject.clear();
412410

413-
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig});
411+
rcon->FireAndForgetQueries(std::move(transaction), {affectedConfig});
414412
});
415413

416414
auto flushDels ([&]() {
@@ -429,7 +427,7 @@ void IcingaDB::UpdateAllConfigObjects()
429427
delChecksum.clear();
430428
delObject.clear();
431429

432-
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig});
430+
rcon->FireAndForgetQueries(std::move(transaction), {affectedConfig});
433431
});
434432

435433
auto setOne ([&]() {
@@ -490,7 +488,7 @@ void IcingaDB::UpdateAllConfigObjects()
490488
}
491489

492490
for (auto& key : GetTypeDumpSignalKeys(type)) {
493-
rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config);
491+
rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"});
494492
}
495493
rcon->Sync();
496494
rcon->Disconnect(); // We're done with this connection, so close it.
@@ -513,14 +511,14 @@ void IcingaDB::UpdateAllConfigObjects()
513511
}
514512

515513
for (auto& key : globalKeys) {
516-
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config);
514+
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"});
517515
}
518516

519-
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", "*", "state", "done"}, Prio::Config);
517+
m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", "*", "state", "done"});
520518

521519
// enqueue a callback that will notify us once all previous queries were executed and wait for this event
522520
std::promise<void> p;
523-
m_Rcon->EnqueueCallback([&p](boost::asio::yield_context& yc) { p.set_value(); }, Prio::Config);
521+
m_Rcon->EnqueueCallback([&p](boost::asio::yield_context&) { p.set_value(); });
524522
p.get_future().wait();
525523

526524
auto endTime (Utility::GetTime());
@@ -553,13 +551,13 @@ std::vector<std::vector<intrusive_ptr<ConfigObject>>> IcingaDB::ChunkObjects(std
553551
return chunks;
554552
}
555553

556-
void IcingaDB::DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys, RedisConnection::QueryPriority priority) {
554+
void IcingaDB::DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys) {
557555
std::vector<String> query = {"DEL"};
558556
for (auto& key : keys) {
559557
query.emplace_back(key);
560558
}
561559

562-
conn->FireAndForgetQuery(std::move(query), priority);
560+
conn->FireAndForgetQuery(std::move(query));
563561
}
564562

565563
std::vector<String> IcingaDB::GetTypeOverwriteKeys(const String& type)
@@ -1318,7 +1316,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, int mode)
13181316
m_Rcon->FireAndForgetQueries({
13191317
{"HSET", redisStateKey, objectKey, JsonEncode(stateAttrs)},
13201318
{"HSET", redisChecksumKey, objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))},
1321-
}, Prio::RuntimeStateSync);
1319+
});
13221320
}
13231321

13241322
if (mode & RuntimeState) {
@@ -1336,7 +1334,7 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, int mode)
13361334
streamadd.emplace_back(IcingaToStreamValue(kv.second));
13371335
}
13381336

1339-
m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream, {0, 1});
1337+
m_Rcon->FireAndForgetQuery(std::move(streamadd), {0, 1});
13401338
}
13411339
}
13421340

@@ -1410,8 +1408,8 @@ void IcingaDB::UpdateDependenciesState(const Checkable::Ptr& checkable, const De
14101408
queries.emplace_back(std::move(query));
14111409
}
14121410

1413-
m_Rcon->FireAndForgetQueries(std::move(queries), Prio::RuntimeStateSync);
1414-
m_Rcon->FireAndForgetQueries(std::move(streamStates), Prio::RuntimeStateStream, {0, 1});
1411+
m_Rcon->FireAndForgetQueries(std::move(queries));
1412+
m_Rcon->FireAndForgetQueries(std::move(streamStates), {0, 1});
14151413
}
14161414
}
14171415

@@ -2416,17 +2414,15 @@ void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable)
24162414
dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host",
24172415
Convert::ToString(checkable->GetNextUpdate()),
24182416
GetObjectIdentifier(checkable)
2419-
},
2420-
Prio::CheckResult
2417+
}
24212418
);
24222419
} else if (!checkable->GetEnableActiveChecks() || checkable->GetExtension("ConfigObjectDeleted")) {
24232420
m_Rcon->FireAndForgetQuery(
24242421
{
24252422
"ZREM",
24262423
dynamic_pointer_cast<Service>(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host",
24272424
GetObjectIdentifier(checkable)
2428-
},
2429-
Prio::CheckResult
2425+
}
24302426
);
24312427
}
24322428
}
@@ -2592,7 +2588,7 @@ void IcingaDB::ForwardHistoryEntries()
25922588

25932589
if (m_HistoryCon && m_HistoryCon->IsConnected()) {
25942590
try {
2595-
m_HistoryCon->GetResultsOfQueries(haystack, Prio::History, {0, 0, haystack.size()});
2591+
m_HistoryCon->GetResultsOfQueries(haystack, {0, 0, haystack.size()});
25962592
break;
25972593
} catch (const std::exception& ex) {
25982594
logFailure(ex.what());
@@ -3225,7 +3221,7 @@ void IcingaDB::DeleteRelationship(const String& id, const String& redisKeyWithou
32253221
"redis_key", redisKey, "id", id, "runtime_type", "delete"
32263222
});
32273223

3228-
m_Rcon->FireAndForgetQueries(queries, Prio::Config);
3224+
m_Rcon->FireAndForgetQueries(queries);
32293225
}
32303226

32313227
void IcingaDB::DeleteRelationship(const String& id, RedisKey redisKey, bool hasChecksum)
@@ -3352,7 +3348,7 @@ void IcingaDB::DeleteState(const String& id, RedisKey redisKey, bool hasChecksum
33523348
}
33533349
hdels.emplace_back(RedisConnection::Query{"HDEL", m_PrefixConfigObject + redisKeyWithoutPrefix, id});
33543350

3355-
m_Rcon->FireAndForgetQueries(std::move(hdels), Prio::RuntimeStateSync);
3351+
m_Rcon->FireAndForgetQueries(std::move(hdels));
33563352
// TODO: This is currently purposefully commented out due to how Icinga DB (Go) handles runtime state
33573353
// upsert and delete events. See https://github.com/Icinga/icingadb/pull/894 for more details.
33583354
/*m_Rcon->FireAndForgetQueries({{
@@ -3439,11 +3435,11 @@ void IcingaDB::ExecuteRedisTransaction(const RedisConnection::Ptr& rcon, std::ma
34393435
if (transaction.size() > 1) {
34403436
transaction.emplace_back(RedisConnection::Query{"EXEC"});
34413437
if (!runtimeUpdates.empty()) {
3442-
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1});
3438+
rcon->FireAndForgetQueries(std::move(transaction), {1});
34433439
} else {
34443440
// This is likely triggered by the initial Redis config dump, so a) we don't need to record the number of
34453441
// affected objects and b) we don't really know how many objects are going to be affected by this tx.
3446-
rcon->FireAndForgetQueries(std::move(transaction), Prio::Config);
3442+
rcon->FireAndForgetQueries(std::move(transaction));
34473443
}
34483444
}
34493445
}

lib/icingadb/icingadb-worker.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,7 @@ void IcingaDB::ProcessPendingItem(const PendingConfigItem& citem)
216216
"runtime_type",
217217
"delete"
218218
}
219-
},
220-
RedisConnection::QueryPriority::Config
219+
}
221220
);
222221
}
223222

lib/icingadb/icingadb.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ using namespace icinga;
2222

2323
#define MAX_EVENTS_DEFAULT 5000
2424

25-
using Prio = RedisConnection::QueryPriority;
26-
2725
String IcingaDB::m_EnvironmentId;
2826
std::mutex IcingaDB::m_EnvironmentIdInitMutex;
2927

@@ -184,7 +182,7 @@ void IcingaDB::PublishStats()
184182
}
185183
}
186184

187-
m_HistoryCon->FireAndForgetQuery(std::move(query), Prio::Heartbeat);
185+
m_HistoryCon->FireAndForgetQuery(std::move(query), {}, true /* high priority */);
188186
}
189187

190188
void IcingaDB::Stop(bool runtimeRemoved)

lib/icingadb/icingadb.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class IcingaDB : public ObjectImpl<IcingaDB>
275275
/* config & status dump */
276276
void UpdateAllConfigObjects();
277277
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
278-
void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys, RedisConnection::QueryPriority priority);
278+
void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector<String>& keys);
279279
std::vector<String> GetTypeOverwriteKeys(const String& type);
280280
std::vector<String> GetTypeDumpSignalKeys(const Type::Ptr& type);
281281
void InsertCheckableDependencies(const Checkable::Ptr& checkable, std::map<String, RedisConnection::Query>& hMSets,

lib/icingadb/icingadbchecktask.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckR
126126
"0-0", "0-0", "0-0", "0-0", "0-0", "0-0",
127127
}
128128
},
129-
RedisConnection::QueryPriority::Heartbeat
129+
{},
130+
true /* high priority */
130131
));
131132

132133
redisTime = std::move(replies.at(0));

0 commit comments

Comments
 (0)