Skip to content

Commit dc9d40f

Browse files
committed
use polymorphism for queue entries
1 parent a247654 commit dc9d40f

File tree

2 files changed

+101
-109
lines changed

2 files changed

+101
-109
lines changed

lib/icingadb/icingadb-worker.cpp

Lines changed: 66 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,23 @@
66

77
using namespace icinga;
88

9-
PendingQueueItem::PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits)
10-
: DirtyBits{dirtyBits & DirtyBitsAll}, ID{std::move(id)}, EnqueueTime{std::chrono::steady_clock::now()}
11-
{
12-
}
13-
149
PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits)
15-
: PendingQueueItem{std::make_pair(obj, nullptr), bits}, Object{obj}
10+
: Object(obj), DirtyBits(bits & DirtyBitsAll)
1611
{
1712
}
13+
1814
PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup)
19-
: PendingQueueItem{std::make_pair(nullptr, depGroup), 0}, DepGroup{depGroup}
15+
: DepGroup(depGroup)
2016
{
2117
}
2218

2319
PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child)
24-
: PendingQueueItem{std::make_pair(child, depGroup), 0}, DepGroup{depGroup}, Child{child}
20+
: DepGroup(depGroup), Child(child)
2521
{
2622
}
2723

2824
RelationsDeletionItem::RelationsDeletionItem(const String& id, RelationsKeyMap relations)
29-
: PendingQueueItem{id, 0}, Relations{std::move(relations)}
25+
: ID(id), Relations(relations)
3026
{
3127
}
3228

@@ -118,42 +114,39 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
118114
auto& seqView = m_PendingItems.get<1>();
119115
for (auto it(seqView.begin()); it != seqView.end(); ++it) {
120116
if (it != seqView.begin()) {
121-
if (std::holds_alternative<RelationsDeletionItem>(*it)) {
117+
if (dynamic_cast<const RelationsDeletionItem*>(it->get())) {
122118
// We don't know whether the previous items are related to this deletion item or not,
123119
// thus we can't just process this right now when there are older items in the queue.
124120
// Otherwise, we might delete something that is going to be updated/created.
125121
break;
126122
}
127123
}
128124

129-
auto age = now - std::visit([](const auto& item) { return item.EnqueueTime; }, *it);
125+
auto age = now - (*it)->EnqueueTime;
130126
if (GetActive() && 1000ms > age) {
131127
if (it == seqView.begin()) {
132128
retryAfter = 1000ms - age;
133129
}
134130
break;
135131
}
136132

137-
ConfigObject::Ptr cobj;
138-
if (auto* citem = std::get_if<PendingConfigItem>(&*it); citem) {
139-
cobj = citem->Object;
140-
}
141-
133+
ConfigObject::Ptr cobj = (*it)->GetObjectToLock();
142134
ObjectLock olock(cobj, std::defer_lock);
143135
if (cobj && !olock.TryLock()) {
144136
continue; // Can't lock the object right now, try the next one.
145137
}
146138

147-
PendingItemVariant itemToProcess = *it;
139+
140+
std::shared_ptr<PendingQueueItem> itemToProcess = *it;
148141
seqView.erase(it);
149142
madeProgress = true;
150143

151144
lock.unlock();
152145
try {
153-
std::visit([this](const auto& item) { ProcessPendingItem(item); }, std::move(itemToProcess));
146+
itemToProcess->Execute(*this);
154147
} catch (const std::exception& ex) {
155148
Log(LogCritical, "IcingaDB")
156-
<< "Exception while processing pending item of type index '" << itemToProcess.index() << "': "
149+
<< "Exception while processing pending item of type '" << typeid(*itemToProcess.get()).name() << "': "
157150
<< DiagnosticInformation(ex, GetActive());
158151
}
159152
lock.lock();
@@ -167,6 +160,12 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
167160
return retryAfter;
168161
}
169162

163+
ConfigObject::Ptr PendingConfigItem::GetObjectToLock() const
164+
{
165+
return Object;
166+
}
167+
168+
170169
/**
171170
* Process a single pending object.
172171
*
@@ -177,14 +176,13 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
177176
*
178177
* @param item The pending item containing the object and its dirty bits.
179178
*/
180-
void IcingaDB::ProcessPendingItem(const PendingConfigItem& item)
181-
{
182-
if (item.DirtyBits & ConfigDelete) {
183-
String typeName = GetLowerCaseTypeNameDB(item.Object);
184-
m_RconWorker->FireAndForgetQueries(
179+
void PendingConfigItem::Execute(IcingaDB& icingadb) const {
180+
if (DirtyBits & ConfigDelete) {
181+
String typeName = icingadb.GetLowerCaseTypeNameDB(Object);
182+
icingadb.m_RconWorker->FireAndForgetQueries(
185183
{
186-
{"HDEL", m_PrefixConfigObject + typeName, GetObjectIdentifier(item.Object)},
187-
{"HDEL", m_PrefixConfigCheckSum + typeName, GetObjectIdentifier(item.Object)},
184+
{"HDEL", icingadb.m_PrefixConfigObject + typeName, icingadb.GetObjectIdentifier(Object)},
185+
{"HDEL", icingadb.m_PrefixConfigCheckSum + typeName, icingadb.GetObjectIdentifier(Object)},
188186
{
189187
"XADD",
190188
"icinga:runtime",
@@ -193,29 +191,29 @@ void IcingaDB::ProcessPendingItem(const PendingConfigItem& item)
193191
"1000000",
194192
"*",
195193
"redis_key",
196-
m_PrefixConfigObject + typeName,
194+
icingadb.m_PrefixConfigObject + typeName,
197195
"id",
198-
GetObjectIdentifier(item.Object),
196+
icingadb.GetObjectIdentifier(Object),
199197
"runtime_type",
200198
"delete"
201199
}
202200
}
203201
);
204202
}
205203

206-
if (item.DirtyBits & ConfigUpdate) {
204+
if (DirtyBits & ConfigUpdate) {
207205
std::map<String, std::vector<String>> hMSets;
208206
std::vector<Dictionary::Ptr> runtimeUpdates;
209-
CreateConfigUpdate(item.Object, GetLowerCaseTypeNameDB(item.Object), hMSets, runtimeUpdates, true);
210-
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
207+
icingadb.CreateConfigUpdate(Object, icingadb.GetLowerCaseTypeNameDB(Object), hMSets, runtimeUpdates, true);
208+
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
211209
}
212210

213-
if (auto checkable = dynamic_pointer_cast<Checkable>(item.Object); checkable) {
214-
if (item.DirtyBits & FullState) {
215-
UpdateState(checkable, item.DirtyBits);
211+
if (auto checkable = dynamic_pointer_cast<Checkable>(Object); checkable) {
212+
if (DirtyBits & FullState) {
213+
icingadb.UpdateState(checkable, DirtyBits);
216214
}
217-
if (item.DirtyBits & NextUpdate) {
218-
SendNextUpdate(checkable);
215+
if (DirtyBits & NextUpdate) {
216+
icingadb.SendNextUpdate(checkable);
219217
}
220218
}
221219
}
@@ -229,13 +227,13 @@ void IcingaDB::ProcessPendingItem(const PendingConfigItem& item)
229227
*
230228
* @param item The pending dependency group state item containing the dependency group.
231229
*/
232-
void IcingaDB::ProcessPendingItem(const PendingDependencyGroupStateItem& item) const
230+
void PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const
233231
{
234232
// For dependency group state updates, we don't actually care which child triggered the update,
235233
// since all children share the same dependency group state. Thus, we can just pick any child to
236234
// start the update from.
237-
if (auto child = item.DepGroup->GetAnyChild(); child) {
238-
UpdateDependenciesState(child, item.DepGroup);
235+
if (auto child = DepGroup->GetAnyChild(); child) {
236+
icingadb.UpdateDependenciesState(child, DepGroup);
239237
}
240238
}
241239

@@ -248,12 +246,12 @@ void IcingaDB::ProcessPendingItem(const PendingDependencyGroupStateItem& item) c
248246
*
249247
* @param item The pending dependency edge item containing the dependency group and child checkable.
250248
*/
251-
void IcingaDB::ProcessPendingItem(const PendingDependencyEdgeItem& item)
249+
void PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const
252250
{
253251
std::vector<Dictionary::Ptr> runtimeUpdates;
254252
std::map<String, RedisConnection::Query> hMSets;
255-
InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup);
256-
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
253+
icingadb.InsertCheckableDependencies(Child, hMSets, &runtimeUpdates, DepGroup);
254+
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
257255
}
258256

259257
/**
@@ -265,16 +263,13 @@ void IcingaDB::ProcessPendingItem(const PendingDependencyEdgeItem& item)
265263
*
266264
* @param item The pending deletion item containing the ID and deletion keys map.
267265
*/
268-
void IcingaDB::ProcessPendingItem(const RelationsDeletionItem& item)
266+
void RelationsDeletionItem::Execute(IcingaDB& icingadb) const
269267
{
270-
ASSERT(std::holds_alternative<std::string>(item.ID)); // Relation deletion items must have real IDs.
271-
272-
auto id = std::get<std::string>(item.ID);
273-
for (auto [redisKey, hasChecksum] : item.Relations) {
274-
if (IsStateKey(redisKey)) {
275-
DeleteState(id, redisKey, hasChecksum);
268+
for (auto [redisKey, hasChecksum] : Relations) {
269+
if (icingadb.IsStateKey(redisKey)) {
270+
icingadb.DeleteState(ID, redisKey, hasChecksum);
276271
} else {
277-
DeleteRelationship(id, redisKey, hasChecksum);
272+
icingadb.DeleteRelationship(ID, redisKey, hasChecksum);
278273
}
279274
}
280275
}
@@ -293,24 +288,20 @@ void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bit
293288

294289
{
295290
std::lock_guard lock(m_PendingItemsMutex);
296-
if (auto [it, inserted] = m_PendingItems.insert(PendingConfigItem{object, bits}); !inserted) {
297-
m_PendingItems.modify(it, [bits](PendingItemVariant& itemToProcess) mutable {
298-
std::visit(
299-
[&bits](auto& item) {
300-
if (bits & ConfigDelete) {
301-
// A config delete and config update cancel each other out, and we don't need
302-
// to keep any state updates either, as the object is being deleted.
303-
item.DirtyBits &= ~(ConfigUpdate | FullState);
304-
bits &= ~(ConfigUpdate | FullState); // Must not add these bits either.
305-
} else if (bits & ConfigUpdate) {
306-
// A new config update cancels any pending config deletion for the same object.
307-
item.DirtyBits &= ~ConfigDelete;
308-
bits &= ~ConfigDelete;
309-
}
310-
item.DirtyBits |= bits & DirtyBitsAll;
311-
},
312-
itemToProcess
313-
);
291+
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<PendingConfigItem>(object, bits)); !inserted) {
292+
m_PendingItems.modify(it, [bits](const std::shared_ptr<PendingQueueItem>& item) mutable {
293+
auto configItem = dynamic_cast<PendingConfigItem*>(item.get());
294+
if (bits & ConfigDelete) {
295+
// A config delete and config update cancel each other out, and we don't need
296+
// to keep any state updates either, as the object is being deleted.
297+
configItem->DirtyBits &= ~(ConfigUpdate | FullState);
298+
bits &= ~(ConfigUpdate | FullState); // Must not add these bits either.
299+
} else if (bits & ConfigUpdate) {
300+
// A new config update cancels any pending config deletion for the same object.
301+
configItem->DirtyBits &= ~ConfigDelete;
302+
bits &= ~ConfigDelete;
303+
}
304+
configItem->DirtyBits |= bits & DirtyBitsAll;
314305
});
315306
}
316307
}
@@ -322,7 +313,7 @@ void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& dep
322313
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
323314
{
324315
std::lock_guard lock(m_PendingItemsMutex);
325-
m_PendingItems.insert(PendingDependencyGroupStateItem{depGroup});
316+
m_PendingItems.insert(std::make_shared<PendingDependencyGroupStateItem>(depGroup));
326317
}
327318
m_PendingItemsCV.notify_one();
328319
}
@@ -342,7 +333,7 @@ void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depG
342333
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
343334
{
344335
std::lock_guard lock(m_PendingItemsMutex);
345-
m_PendingItems.insert(PendingDependencyEdgeItem{depGroup, child});
336+
m_PendingItems.insert(std::make_shared<PendingDependencyEdgeItem>(depGroup, child));
346337
}
347338
m_PendingItemsCV.notify_one();
348339
}
@@ -484,18 +475,18 @@ void IcingaDB::EnqueueDependencyChildRemoved(
484475
* @param id The ID of the relation to be deleted.
485476
* @param relations A map of Redis keys from which to delete the relation.
486477
*/
487-
void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& relations)
478+
void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsDeletionItem::RelationsKeyMap& relations)
488479
{
489480
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
490481
return; // No need to enqueue anything if we're not connected.
491482
}
492483

493484
{
494485
std::lock_guard lock(m_PendingItemsMutex);
495-
if (auto [it, inserted] = m_PendingItems.insert(RelationsDeletionItem{id, relations}); !inserted) {
496-
m_PendingItems.modify(it, [&relations](PendingItemVariant& val) {
497-
auto& item = std::get<RelationsDeletionItem>(val);
498-
item.Relations.insert(relations.begin(), relations.end());
486+
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<RelationsDeletionItem>(id, relations)); !inserted) {
487+
m_PendingItems.modify(it, [&relations](std::shared_ptr<PendingQueueItem>& val) {
488+
auto item = dynamic_cast<RelationsDeletionItem*>(val.get());
489+
item->Relations.insert(relations.begin(), relations.end());
499490
});
500491
}
501492
}

0 commit comments

Comments
 (0)