diff --git a/lib/base/objectlock.cpp b/lib/base/objectlock.cpp index 9ed9d1f215..1c0d53106c 100644 --- a/lib/base/objectlock.cpp +++ b/lib/base/objectlock.cpp @@ -37,6 +37,24 @@ ObjectLock::ObjectLock(const Object *object) Lock(); } +/** + * Tries to lock the object without blocking. + * + * @returns true if the lock was acquired, false otherwise. + */ +bool ObjectLock::TryLock() noexcept +{ + ASSERT(!m_Locked && m_Object); + + m_Locked = m_Object->m_Mutex.try_lock(); +#ifdef I2_DEBUG + if (m_Locked && ++m_Object->m_LockCount == 1u) { + m_Object->m_LockOwner.store(std::this_thread::get_id()); + } +#endif /* I2_DEBUG */ + return m_Locked; +} + void ObjectLock::Lock() { ASSERT(!m_Locked && m_Object); diff --git a/lib/base/objectlock.hpp b/lib/base/objectlock.hpp index 3288874257..459c079708 100644 --- a/lib/base/objectlock.hpp +++ b/lib/base/objectlock.hpp @@ -23,6 +23,7 @@ struct ObjectLock ~ObjectLock(); + bool TryLock() noexcept; void Lock(); void Unlock(); diff --git a/lib/checker/checkercomponent.cpp b/lib/checker/checkercomponent.cpp index 04583833a0..d1ecc26318 100644 --- a/lib/checker/checkercomponent.cpp +++ b/lib/checker/checkercomponent.cpp @@ -135,16 +135,13 @@ void CheckerComponent::CheckThreadProc() bool forced = checkable->GetForceNextCheck(); bool check = true; - bool notifyNextCheck = false; double nextCheck = -1; if (!forced) { if (!checkable->IsReachable(DependencyCheckExecution)) { Log(LogNotice, "CheckerComponent") << "Skipping check for object '" << checkable->GetName() << "': Dependency failed."; - check = false; - notifyNextCheck = true; } Host::Ptr host; @@ -181,7 +178,6 @@ void CheckerComponent::CheckThreadProc() << Utility::FormatDateTime("%Y-%m-%d %H:%M:%S %z", nextCheck); check = false; - notifyNextCheck = true; } } } @@ -200,11 +196,6 @@ void CheckerComponent::CheckThreadProc() checkable->UpdateNextCheck(); } - if (notifyNextCheck) { - // Trigger update event for Icinga DB - Checkable::OnNextCheckUpdated(checkable); - } - lock.lock(); continue; diff --git a/lib/icinga/dependency-group.cpp b/lib/icinga/dependency-group.cpp index d60fec7c1a..f29ed23f9f 100644 --- a/lib/icinga/dependency-group.cpp +++ b/lib/icinga/dependency-group.cpp @@ -143,6 +143,22 @@ void DependencyGroup::LoadParents(std::set& parents) const } } +/** + * Retrieve any child Checkable from the current dependency group. + * + * @return - Returns the first child Checkable found in this group, or nullptr if there are no children. + */ +Checkable::Ptr DependencyGroup::GetAnyChild() const +{ + std::lock_guard lock(m_Mutex); + for (auto& [_, children] : m_Members) { + if (!children.empty()) { + return children.begin()->second->GetChild(); + } + } + return nullptr; +} + /** * Retrieve the number of dependency objects in the current dependency group. * diff --git a/lib/icinga/dependency.hpp b/lib/icinga/dependency.hpp index c70e578b4c..da25c2b9bb 100644 --- a/lib/icinga/dependency.hpp +++ b/lib/icinga/dependency.hpp @@ -163,6 +163,7 @@ class DependencyGroup final : public SharedObject void RemoveDependency(const Dependency::Ptr& dependency); std::vector GetDependenciesForChild(const Checkable* child) const; void LoadParents(std::set& parents) const; + Checkable::Ptr GetAnyChild() const; size_t GetDependenciesCount() const; void SetIcingaDBIdentifier(const String& identifier); diff --git a/lib/icingadb/CMakeLists.txt b/lib/icingadb/CMakeLists.txt index 133fb7d6dc..1f1aaa7856 100644 --- a/lib/icingadb/CMakeLists.txt +++ b/lib/icingadb/CMakeLists.txt @@ -5,7 +5,7 @@ mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp) mkembedconfig_target(icingadb-itl.conf icingadb-itl.cpp) set(icingadb_SOURCES - icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp + icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp icingadb-worker.cpp redisconnection.cpp icingadb-ti.hpp icingadbchecktask.cpp icingadb-itl.cpp ) diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index 0d568a3cc1..4cef701eeb 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -28,9 +28,11 @@ #include "icinga/checkcommand.hpp" #include "icinga/eventcommand.hpp" #include "icinga/notificationcommand.hpp" +#include "icinga/scheduleddowntime.hpp" #include "icinga/timeperiod.hpp" #include "icinga/pluginutility.hpp" #include "remote/zone.hpp" +#include #include #include #include @@ -45,9 +47,28 @@ using namespace icinga; -using Prio = RedisConnection::QueryPriority; - std::unordered_set IcingaDB::m_IndexedTypes; +/** + * Mapping of types to their custom variable Redis keys. + * + * This array defines the association between various configuration object types and their corresponding + * Redis keys used for storing custom variables. Each entry in the array consists of a pair, where the + * first element is a reference to the type instance and the second element is the associated Redis key. + */ +static constexpr std::pair l_CustomVarKeys[] = { + {CheckCommand::TypeInstance, RedisKey::CheckCmdCustomVar}, + {EventCommand::TypeInstance, RedisKey::EventCmdCustomVar}, + {Host::TypeInstance, RedisKey::HostCustomVar}, + {HostGroup::TypeInstance, RedisKey::HostGroupCustomVar}, + {Notification::TypeInstance, RedisKey::NotificationCustomVar}, + {NotificationCommand::TypeInstance, RedisKey::NotificationCmdCustomVar}, + {ScheduledDowntime::TypeInstance, RedisKey::ScheduledDowntimeCustomVar}, + {Service::TypeInstance, RedisKey::ServiceCustomVar}, + {ServiceGroup::TypeInstance, RedisKey::ServiceGroupCustomVar}, + {TimePeriod::TypeInstance, RedisKey::TimePeriodCustomVar}, + {User::TypeInstance, RedisKey::UserCustomVar}, + {UserGroup::TypeInstance, RedisKey::UserGroupCustomVar} +}; INITIALIZE_ONCE(&IcingaDB::ConfigStaticInitialize); @@ -135,8 +156,8 @@ void IcingaDB::ConfigStaticInitialize() IcingaDB::NewCheckResultHandler(checkable); }); - Checkable::OnNextCheckUpdated.connect([](const Checkable::Ptr& checkable) { - IcingaDB::NextCheckUpdatedHandler(checkable); + Checkable::OnNextCheckChanged.connect([](const Checkable::Ptr& checkable, const Value&) { + IcingaDB::NextCheckChangedHandler(checkable); }); Service::OnHostProblemChanged.connect([](const Service::Ptr& service, const CheckResult::Ptr&, const MessageOrigin::Ptr&) { @@ -167,10 +188,10 @@ void IcingaDB::ConfigStaticInitialize() Service::OnGroupsChangedWithOldValue.connect([](const Service::Ptr& service, const Value& oldValues, const Value& newValues) { IcingaDB::ServiceGroupsChangedHandler(service, oldValues, newValues); }); - Command::OnEnvChangedWithOldValue.connect([](const ConfigObject::Ptr& command, const Value& oldValues, const Value& newValues) { + Command::OnEnvChangedWithOldValue.connect([](const Command::Ptr& command, const Value& oldValues, const Value& newValues) { IcingaDB::CommandEnvChangedHandler(command, oldValues, newValues); }); - Command::OnArgumentsChangedWithOldValue.connect([](const ConfigObject::Ptr& command, const Value& oldValues, const Value& newValues) { + Command::OnArgumentsChangedWithOldValue.connect([](const Command::Ptr& command, const Value& oldValues, const Value& newValues) { IcingaDB::CommandArgumentsChangedHandler(command, oldValues, newValues); }); CustomVarObject::OnVarsChangedWithOldValue.connect([](const ConfigObject::Ptr& object, const Value& oldValues, const Value& newValues) { @@ -180,8 +201,9 @@ void IcingaDB::ConfigStaticInitialize() void IcingaDB::UpdateAllConfigObjects() { - m_Rcon->Sync(); - m_Rcon->FireAndForgetQuery({"XADD", "icinga:schema", "MAXLEN", "1", "*", "version", "6"}, Prio::Heartbeat); + // This function performs an initial dump of all configuration objects into Redis, thus there are no + // previously enqueued queries on m_Rcon that we need to wait for. So, no Sync() call is necessary here. + m_RconWorker->FireAndForgetQuery({"XADD", "icinga:schema", "MAXLEN", "1", "*", "version", "6"}, {}, true); Log(LogInformation, "IcingaDB") << "Starting initial config/status dump"; double startTime = Utility::GetTime(); @@ -198,16 +220,8 @@ void IcingaDB::UpdateAllConfigObjects() std::vector types = GetTypes(); - m_Rcon->SuppressQueryKind(Prio::CheckResult); - m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); - - Defer unSuppress ([this]() { - m_Rcon->UnsuppressQueryKind(Prio::RuntimeStateSync); - m_Rcon->UnsuppressQueryKind(Prio::CheckResult); - }); - // Add a new type=* state=wip entry to the stream and remove all previous entries (MAXLEN 1). - m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "MAXLEN", "1", "*", "key", "*", "state", "wip"}, Prio::Config); + m_RconWorker->FireAndForgetQuery({"XADD", "icinga:dump", "MAXLEN", "1", "*", "key", "*", "state", "wip"}); const std::vector globalKeys = { m_PrefixConfigObject + "customvar", @@ -224,9 +238,9 @@ void IcingaDB::UpdateAllConfigObjects() m_PrefixConfigObject + "redundancygroup", m_PrefixConfigObject + "redundancygroup:state", }; - DeleteKeys(m_Rcon, globalKeys, Prio::Config); - DeleteKeys(m_Rcon, {"icinga:nextupdate:host", "icinga:nextupdate:service"}, Prio::Config); - m_Rcon->Sync(); + DeleteKeys(m_RconWorker, globalKeys); + DeleteKeys(m_RconWorker, {"icinga:nextupdate:host", "icinga:nextupdate:service"}); + m_RconWorker->Sync(); Defer resetDumpedGlobals ([this]() { m_DumpedGlobals.CustomVar.Reset(); @@ -245,7 +259,7 @@ void IcingaDB::UpdateAllConfigObjects() auto& rcon (m_Rcons.at(ctype)); std::vector keys = GetTypeOverwriteKeys(lcType); - DeleteKeys(rcon, keys, Prio::Config); + DeleteKeys(rcon, keys); WorkQueue upqObjectType(25000, Configuration::Concurrency, LogNotice); upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType); @@ -257,9 +271,7 @@ void IcingaDB::UpdateAllConfigObjects() String cursor = "0"; do { - Array::Ptr res = rcon->GetResultOfQuery({ - "HSCAN", configCheckSum, cursor, "COUNT", "1000" - }, Prio::Config); + Array::Ptr res = rcon->GetResultOfQuery({"HSCAN", configCheckSum, cursor, "COUNT", "1000"}); AddKvsToMap(res->Get(1), redisCheckSums); @@ -276,8 +288,6 @@ void IcingaDB::UpdateAllConfigObjects() upqObjectType.ParallelFor(objectChunks, [&](decltype(objectChunks)::const_reference chunk) { std::map> hMSets; - std::vector hostZAdds = {"ZADD", "icinga:nextupdate:host"}, serviceZAdds = {"ZADD", "icinga:nextupdate:service"}; - auto skimObjects ([&]() { std::lock_guard l (ourContentMutex); @@ -336,18 +346,7 @@ void IcingaDB::UpdateAllConfigObjects() auto checkable (dynamic_pointer_cast(object)); if (checkable && checkable->GetEnableActiveChecks()) { - auto zAdds (dynamic_pointer_cast(checkable) ? &serviceZAdds : &hostZAdds); - - zAdds->emplace_back(Convert::ToString(checkable->GetNextUpdate())); - zAdds->emplace_back(GetObjectIdentifier(checkable)); - - if (zAdds->size() >= 102u) { - std::vector header (zAdds->begin(), zAdds->begin() + 2u); - - rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); - - *zAdds = std::move(header); - } + EnqueueConfigObject(checkable, NextUpdate); } } @@ -355,14 +354,8 @@ void IcingaDB::UpdateAllConfigObjects() ExecuteRedisTransaction(rcon, hMSets, {}); - for (auto zAdds : {&hostZAdds, &serviceZAdds}) { - if (zAdds->size() > 2u) { - rcon->FireAndForgetQuery(std::move(*zAdds), Prio::CheckResult); - } - } - Log(LogNotice, "IcingaDB") - << "Dumped " << bulkCounter << " objects of type " << lcType; + << "Dumped " << bulkCounter << " objects of type " << lcType; }); upqObjectType.Join(); @@ -421,7 +414,7 @@ void IcingaDB::UpdateAllConfigObjects() setChecksum.clear(); setObject.clear(); - rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig}); + rcon->FireAndForgetQueries(std::move(transaction), {affectedConfig}); }); auto flushDels ([&]() { @@ -440,7 +433,7 @@ void IcingaDB::UpdateAllConfigObjects() delChecksum.clear(); delObject.clear(); - rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {affectedConfig}); + rcon->FireAndForgetQueries(std::move(transaction), {affectedConfig}); }); auto setOne ([&]() { @@ -501,12 +494,14 @@ void IcingaDB::UpdateAllConfigObjects() } for (auto& key : GetTypeDumpSignalKeys(type)) { - rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config); + rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}); } rcon->Sync(); + rcon->Disconnect(); // We're done with this connection, so close it. }); upq.Join(); + m_Rcons.clear(); if (upq.HasExceptions()) { for (boost::exception_ptr exc : upq.GetExceptions()) { @@ -522,14 +517,14 @@ void IcingaDB::UpdateAllConfigObjects() } for (auto& key : globalKeys) { - m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}, Prio::Config); + m_RconWorker->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", key, "state", "done"}); } - m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", "*", "state", "done"}, Prio::Config); + m_RconWorker->FireAndForgetQuery({"XADD", "icinga:dump", "*", "key", "*", "state", "done"}); // enqueue a callback that will notify us once all previous queries were executed and wait for this event std::promise p; - m_Rcon->EnqueueCallback([&p](boost::asio::yield_context& yc) { p.set_value(); }, Prio::Config); + m_RconWorker->EnqueueCallback([&p](boost::asio::yield_context&) { p.set_value(); }); p.get_future().wait(); auto endTime (Utility::GetTime()); @@ -562,13 +557,13 @@ std::vector>> IcingaDB::ChunkObjects(std return chunks; } -void IcingaDB::DeleteKeys(const RedisConnection::Ptr& conn, const std::vector& keys, RedisConnection::QueryPriority priority) { +void IcingaDB::DeleteKeys(const RedisConnection::Ptr& conn, const std::vector& keys) { std::vector query = {"DEL"}; for (auto& key : keys) { query.emplace_back(key); } - conn->FireAndForgetQuery(std::move(query), priority); + conn->FireAndForgetQuery(std::move(query)); } std::vector IcingaDB::GetTypeOverwriteKeys(const String& type) @@ -1297,21 +1292,21 @@ void IcingaDB::InsertCheckableDependencies( * Update the state information of a checkable in Redis. * * What is updated exactly depends on the mode parameter: - * - Volatile: Update the volatile state information stored in icinga:host:state or icinga:service:state as well as - * the corresponding checksum stored in icinga:checksum:host:state or icinga:checksum:service:state. - * - RuntimeOnly: Write a runtime update to the icinga:runtime:state stream. It is up to the caller to ensure that + * - VolatileState: Update the volatile state information stored in icinga:host:state or icinga:service:state as well + * as the corresponding checksum stored in icinga:checksum:host:state or icinga:checksum:service:state. + * - RuntimeState: Write a runtime update to the icinga:runtime:state stream. It is up to the caller to ensure that * identical volatile state information was already written before to avoid inconsistencies. This mode is only * useful to upgrade a previous Volatile to a Full operation, otherwise Full should be used. - * - Full: Perform an update of all state information in Redis, that is updating the volatile information and sending - * a corresponding runtime update so that this state update gets written through to the persistent database by a - * running icingadb process. + * - FullState: Perform an update of all state information in Redis, that is updating the volatile information and + * sending a corresponding runtime update so that this state update gets written through to the persistent database + * by a running icingadb process. * * @param checkable State of this checkable is updated in Redis - * @param mode Mode of operation (StateUpdate::Volatile, StateUpdate::RuntimeOnly, or StateUpdate::Full) + * @param mode Mode of operation (DirtyBits:VolatileState, DirtyBits::RuntimeState, or DirtyBits::FullState) */ -void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode) +void IcingaDB::UpdateState(const Checkable::Ptr& checkable, uint32_t mode) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!m_RconWorker || !m_RconWorker->IsConnected()) return; String objectType = GetLowerCaseTypeNameDB(checkable); @@ -1323,14 +1318,14 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode) String redisChecksumKey = m_PrefixConfigCheckSum + objectType + ":state"; String checksum = HashValue(stateAttrs); - if (mode & StateUpdate::Volatile) { - m_Rcon->FireAndForgetQueries({ + if (mode & VolatileState) { + m_RconWorker->FireAndForgetQueries({ {"HSET", redisStateKey, objectKey, JsonEncode(stateAttrs)}, {"HSET", redisChecksumKey, objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))}, - }, Prio::RuntimeStateSync); + }); } - if (mode & StateUpdate::RuntimeOnly) { + if (mode & RuntimeState) { ObjectLock olock(stateAttrs); std::vector streamadd({ @@ -1345,35 +1340,23 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, StateUpdate mode) streamadd.emplace_back(IcingaToStreamValue(kv.second)); } - m_Rcon->FireAndForgetQuery(std::move(streamadd), Prio::RuntimeStateStream, {0, 1}); + m_RconWorker->FireAndForgetQuery(std::move(streamadd), {0, 1}); } } /** - * Send dependencies state information of the given Checkable to Redis. + * Update the dependency state information of the given checkable and its associated dependency groups in Redis. * - * If the dependencyGroup parameter is set, only the dependencies state of that group are sent. Otherwise, all - * dependency groups of the provided Checkable are processed. + * This function serializes the dependency state information of the provided Checkable object and its associated + * DependencyGroup into Redis HMSETs and streams the state updates to the runtime state stream. It's intended to + * be called by the background worker when processing runtime updates for Checkable objects that are part of some + * dependency graph. * - * @param checkable The Checkable you want to send the dependencies state update for - * @param onlyDependencyGroup If set, send state updates only for this dependency group and its dependencies. - * @param seenGroups A container to track already processed DependencyGroups to avoid duplicate state updates. + * @param checkable The checkable object whose dependency state is to be updated. + * @param depGroup The dependency group to process for the given checkable. */ -void IcingaDB::UpdateDependenciesState(const Checkable::Ptr& checkable, const DependencyGroup::Ptr& onlyDependencyGroup, - std::set* seenGroups) const +void IcingaDB::UpdateDependenciesState(const Checkable::Ptr& checkable, const DependencyGroup::Ptr& depGroup) const { - if (!m_Rcon || !m_Rcon->IsConnected()) { - return; - } - - std::vector dependencyGroups{onlyDependencyGroup}; - if (!onlyDependencyGroup) { - dependencyGroups = checkable->GetDependencyGroups(); - if (dependencyGroups.empty()) { - return; - } - } - RedisConnection::Queries streamStates; auto addDependencyStateToStream([&streamStates](const String& redisKey, const Dictionary::Ptr& stateAttrs) { RedisConnection::Query xAdd{ @@ -1389,61 +1372,39 @@ void IcingaDB::UpdateDependenciesState(const Checkable::Ptr& checkable, const De }); std::map hMSets; - for (auto& dependencyGroup : dependencyGroups) { - bool isRedundancyGroup(dependencyGroup->IsRedundancyGroup()); - if (isRedundancyGroup && dependencyGroup->GetIcingaDBIdentifier().IsEmpty()) { - // Way too soon! The Icinga DB hash will be set during the initial config dump, but this state - // update seems to occur way too early. So, we've to skip it for now and wait for the next one. - // The m_ConfigDumpInProgress flag is probably still set to true at this point! - continue; - } - - if (seenGroups && !seenGroups->insert(dependencyGroup.get()).second) { - // Usually, if the seenGroups set is provided, IcingaDB is triggering a runtime state update for ALL - // children of a given initiator Checkable (parent). In such cases, we may end up with lots of useless - // state updates as all the children of a non-redundant group a) share the same entry in the database b) - // it doesn't matter which child triggers the state update first all the subsequent updates are just useless. - // - // Likewise, for redundancy groups, all children of a redundancy group share the same set of parents - // and thus the resulting state information would be the same from each child Checkable perspective. - // So, serializing the redundancy group state information only once is sufficient. - continue; - } - - auto dependencies(dependencyGroup->GetDependenciesForChild(checkable.get())); - std::sort(dependencies.begin(), dependencies.end(), [](const Dependency::Ptr& lhs, const Dependency::Ptr& rhs) { - return lhs->GetParent() < rhs->GetParent(); - }); - for (auto it(dependencies.begin()); it != dependencies.end(); /* no increment */) { - const auto& dependency(*it); - - Dictionary::Ptr stateAttrs; - // Note: The following loop is intended to cover some possible special cases but may not occur in practice - // that often. That is, having two or more dependency objects that point to the same parent Checkable. - // So, traverse all those duplicates and merge their relevant state information into a single edge. - for (; it != dependencies.end() && (*it)->GetParent() == dependency->GetParent(); ++it) { - if (!stateAttrs || stateAttrs->Get("failed") == false) { - stateAttrs = SerializeDependencyEdgeState(dependencyGroup, *it); - } + auto dependencies(depGroup->GetDependenciesForChild(checkable.get())); + std::sort(dependencies.begin(), dependencies.end(), [](const Dependency::Ptr& lhs, const Dependency::Ptr& rhs) { + return lhs->GetParent() < rhs->GetParent(); + }); + for (auto it(dependencies.begin()); it != dependencies.end(); /* no increment */) { + const auto& dependency(*it); + + Dictionary::Ptr stateAttrs; + // Note: The following loop is intended to cover some possible special cases but may not occur in practice + // that often. That is, having two or more dependency objects that point to the same parent Checkable. + // So, traverse all those duplicates and merge their relevant state information into a single edge. + for (; it != dependencies.end() && (*it)->GetParent() == dependency->GetParent(); ++it) { + if (!stateAttrs || stateAttrs->Get("failed") == false) { + stateAttrs = SerializeDependencyEdgeState(depGroup, *it); } - - addDependencyStateToStream(m_PrefixConfigObject + "dependency:edge:state", stateAttrs); - AddDataToHmSets(hMSets, RedisKey::DependencyEdgeState, stateAttrs->Get("id"), stateAttrs); } - if (isRedundancyGroup) { - Dictionary::Ptr stateAttrs(SerializeRedundancyGroupState(checkable, dependencyGroup)); + addDependencyStateToStream(m_PrefixConfigObject + "dependency:edge:state", stateAttrs); + AddDataToHmSets(hMSets, RedisKey::DependencyEdgeState, stateAttrs->Get("id"), stateAttrs); + } - Dictionary::Ptr sharedGroupState(stateAttrs->ShallowClone()); - sharedGroupState->Remove("redundancy_group_id"); - sharedGroupState->Remove("is_reachable"); - sharedGroupState->Remove("last_state_change"); + if (depGroup->IsRedundancyGroup()) { + Dictionary::Ptr stateAttrs(SerializeRedundancyGroupState(checkable, depGroup)); - addDependencyStateToStream(m_PrefixConfigObject + "redundancygroup:state", stateAttrs); - addDependencyStateToStream(m_PrefixConfigObject + "dependency:edge:state", sharedGroupState); - AddDataToHmSets(hMSets, RedisKey::RedundancyGroupState, dependencyGroup->GetIcingaDBIdentifier(), stateAttrs); - AddDataToHmSets(hMSets, RedisKey::DependencyEdgeState, dependencyGroup->GetIcingaDBIdentifier(), sharedGroupState); - } + Dictionary::Ptr sharedGroupState(stateAttrs->ShallowClone()); + sharedGroupState->Remove("redundancy_group_id"); + sharedGroupState->Remove("is_reachable"); + sharedGroupState->Remove("last_state_change"); + + addDependencyStateToStream(m_PrefixConfigObject + "redundancygroup:state", stateAttrs); + addDependencyStateToStream(m_PrefixConfigObject + "dependency:edge:state", sharedGroupState); + AddDataToHmSets(hMSets, RedisKey::RedundancyGroupState, depGroup->GetIcingaDBIdentifier(), stateAttrs); + AddDataToHmSets(hMSets, RedisKey::DependencyEdgeState, depGroup->GetIcingaDBIdentifier(), sharedGroupState); } if (!streamStates.empty()) { @@ -1453,32 +1414,8 @@ void IcingaDB::UpdateDependenciesState(const Checkable::Ptr& checkable, const De queries.emplace_back(std::move(query)); } - m_Rcon->FireAndForgetQueries(std::move(queries), Prio::RuntimeStateSync); - m_Rcon->FireAndForgetQueries(std::move(streamStates), Prio::RuntimeStateStream, {0, 1}); - } -} - -// Used to update a single object, used for runtime updates -void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate) -{ - if (!m_Rcon || !m_Rcon->IsConnected()) - return; - - String typeName = GetLowerCaseTypeNameDB(object); - - std::map> hMSets; - std::vector runtimeUpdates; - - CreateConfigUpdate(object, typeName, hMSets, runtimeUpdates, runtimeUpdate); - Checkable::Ptr checkable = dynamic_pointer_cast(object); - if (checkable) { - UpdateState(checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile); - } - - ExecuteRedisTransaction(m_Rcon, hMSets, runtimeUpdates); - - if (checkable) { - SendNextUpdate(checkable); + m_RconWorker->FireAndForgetQueries(std::move(queries)); + m_RconWorker->FireAndForgetQueries(std::move(streamStates), {0, 1}); } } @@ -1814,7 +1751,7 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN return; */ - if (m_Rcon == nullptr) + if (m_RconWorker == nullptr) return; Dictionary::Ptr attr = new Dictionary; @@ -1844,29 +1781,14 @@ IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeN void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!m_RconWorker || !m_RconWorker->IsConnected()) return; - Type::Ptr type = object->GetReflectionType(); - String typeName = type->GetName().ToLower(); - String objectKey = GetObjectIdentifier(object); - - m_Rcon->FireAndForgetQueries({ - {"HDEL", m_PrefixConfigObject + typeName, objectKey}, - {"HDEL", m_PrefixConfigCheckSum + typeName, objectKey}, - { - "XADD", "icinga:runtime", "MAXLEN", "~", "1000000", "*", - "redis_key", m_PrefixConfigObject + typeName, "id", objectKey, "runtime_type", "delete" - } - }, Prio::Config); - - CustomVarObject::Ptr customVarObject = dynamic_pointer_cast(object); - - if (customVarObject) { - Dictionary::Ptr vars = customVarObject->GetVars(); - SendCustomVarsChanged(object, vars, nullptr); + if (auto customVarObject = dynamic_pointer_cast(object); customVarObject) { + SendCustomVarsChanged(object, customVarObject->GetVars(), nullptr); } + Type::Ptr type = object->GetReflectionType(); if (type == Host::TypeInstance || type == Service::TypeInstance) { Checkable::Ptr checkable = static_pointer_cast(object); @@ -1874,16 +1796,11 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) Service::Ptr service; tie(host, service) = GetHostService(checkable); - m_Rcon->FireAndForgetQuery({ - "ZREM", - service ? "icinga:nextupdate:service" : "icinga:nextupdate:host", - GetObjectIdentifier(checkable) - }, Prio::CheckResult); - - m_Rcon->FireAndForgetQueries({ - {"HDEL", m_PrefixConfigObject + typeName + ":state", objectKey}, - {"HDEL", m_PrefixConfigCheckSum + typeName + ":state", objectKey} - }, Prio::RuntimeStateSync); + EnqueueRelationsDeletion( + GetObjectIdentifier(checkable), + {{service ? RedisKey::ServiceState : RedisKey::HostState, true}} + ); + EnqueueConfigObject(object, ConfigDelete | NextUpdate); // Send also ZREM for next update if (service) { SendGroupsChanged(checkable, service->GetGroups(), nullptr); @@ -1894,6 +1811,8 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) return; } + EnqueueConfigObject(object, ConfigDelete); + if (type == TimePeriod::TypeInstance) { TimePeriod::Ptr timeperiod = static_pointer_cast(object); SendTimePeriodRangesChanged(timeperiod, timeperiod->GetRanges(), nullptr); @@ -1917,8 +1836,9 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) { Command::Ptr command = static_pointer_cast(object); - SendCommandArgumentsChanged(command, command->GetArguments(), nullptr); - SendCommandEnvChanged(command, command->GetEnv(), nullptr); + auto [cmdEnvKey, cmdArgKey] = GetCmdEnvArgKeys(command); + SendCommandArgumentsChanged(command, cmdArgKey, command->GetArguments(), nullptr); + SendCommandEnvChanged(command, cmdEnvKey, command->GetEnv(), nullptr); return; } } @@ -1953,7 +1873,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul tie(host, service) = GetHostService(checkable); - UpdateState(checkable, StateUpdate::RuntimeOnly); + EnqueueConfigObject(checkable, RuntimeState); int hard_state; if (!cr) { @@ -2105,7 +2025,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) return; } - SendConfigUpdate(downtime, true); + EnqueueConfigObject(downtime, ConfigUpdate); auto checkable (downtime->GetCheckable()); auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); @@ -2115,7 +2035,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) tie(host, service) = GetHostService(checkable); /* Update checkable state as in_downtime may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); std::vector xAdd ({ "XADD", "icinga:history:stream:downtime", "*", @@ -2204,7 +2124,7 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) return; /* Update checkable state as in_downtime may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); std::vector xAdd ({ "XADD", "icinga:history:stream:downtime", "*", @@ -2292,6 +2212,9 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment) Service::Ptr service; tie(host, service) = GetHostService(checkable); + // Update the checkable state to so that the "last_comment_id" is correctly reflected. + EnqueueConfigObject(checkable, FullState); + std::vector xAdd ({ "XADD", "icinga:history:stream:comment", "*", "comment_id", GetObjectIdentifier(comment), @@ -2334,7 +2257,6 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment) } m_HistoryBulker.ProduceOne(std::move(xAdd)); - UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) @@ -2364,6 +2286,9 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) Service::Ptr service; tie(host, service) = GetHostService(checkable); + // Update the checkable state to so that the "last_comment_id" is correctly reflected. + EnqueueConfigObject(checkable, FullState); + std::vector xAdd ({ "XADD", "icinga:history:stream:comment", "*", "comment_id", GetObjectIdentifier(comment), @@ -2414,7 +2339,6 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) } m_HistoryBulker.ProduceOne(std::move(xAdd)); - UpdateState(checkable, StateUpdate::Full); } void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double changeTime, double flappingLastChange) @@ -2486,27 +2410,25 @@ void IcingaDB::SendFlappingChange(const Checkable::Ptr& checkable, double change void IcingaDB::SendNextUpdate(const Checkable::Ptr& checkable) { - if (!m_Rcon || !m_Rcon->IsConnected()) + if (!m_RconWorker || !m_RconWorker->IsConnected()) return; - if (checkable->GetEnableActiveChecks()) { - m_Rcon->FireAndForgetQuery( + if (checkable->GetEnableActiveChecks() && checkable->IsActive()) { + m_RconWorker->FireAndForgetQuery( { "ZADD", dynamic_pointer_cast(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", Convert::ToString(checkable->GetNextUpdate()), GetObjectIdentifier(checkable) - }, - Prio::CheckResult + } ); - } else { - m_Rcon->FireAndForgetQuery( + } else if (!checkable->GetEnableActiveChecks() || checkable->GetExtension("ConfigObjectDeleted")) { + m_RconWorker->FireAndForgetQuery( { "ZREM", dynamic_pointer_cast(checkable) ? "icinga:nextupdate:service" : "icinga:nextupdate:host", GetObjectIdentifier(checkable) - }, - Prio::CheckResult + } ); } } @@ -2522,7 +2444,7 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str tie(host, service) = GetHostService(checkable); /* Update checkable state as is_acknowledged may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); std::vector xAdd ({ "XADD", "icinga:history:stream:acknowledgement", "*", @@ -2580,7 +2502,7 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const tie(host, service) = GetHostService(checkable); /* Update checkable state as is_acknowledged may have changed. */ - UpdateState(checkable, StateUpdate::Full); + EnqueueConfigObject(checkable, FullState); std::vector xAdd ({ "XADD", "icinga:history:stream:acknowledgement", "*", @@ -2672,7 +2594,7 @@ void IcingaDB::ForwardHistoryEntries() if (m_Rcon && m_Rcon->IsConnected()) { try { - m_Rcon->GetResultsOfQueries(haystack, Prio::History, {0, 0, haystack.size()}); + m_Rcon->GetResultsOfQueries(haystack, {0, 0, haystack.size()}); break; } catch (const std::exception& ex) { logFailure(ex.what()); @@ -2697,129 +2619,168 @@ void IcingaDB::ForwardHistoryEntries() } void IcingaDB::SendNotificationUsersChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedUsers = GetArrayDeletedValues(oldValues, newValues); - - for (const auto& userName : deletedUsers) { - String id = HashValue(new Array({m_EnvironmentId, "user", userName, notification->GetName()})); - DeleteRelationship(id, "notification:user"); - DeleteRelationship(id, "notification:recipient"); + for (const auto& userName : GetArrayDeletedValues(oldValues, newValues)) { + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, "user", userName, notification->GetName()}), + { + {RedisKey::NotificationUser, false}, + {RedisKey::NotificationRecipient, false}, + } + ); } } void IcingaDB::SendNotificationUserGroupsChanged(const Notification::Ptr& notification, const Array::Ptr& oldValues, const Array::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedUserGroups = GetArrayDeletedValues(oldValues, newValues); - - for (const auto& userGroupName : deletedUserGroups) { + for (const auto& userGroupName : GetArrayDeletedValues(oldValues, newValues)) { UserGroup::Ptr userGroup = UserGroup::GetByName(userGroupName); - String id = HashValue(new Array({m_EnvironmentId, "usergroup", userGroupName, notification->GetName()})); - DeleteRelationship(id, "notification:usergroup"); - DeleteRelationship(id, "notification:recipient"); + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, "usergroup", userGroupName, notification->GetName()}), + {{RedisKey::NotificationUserGroup, false}, {RedisKey::NotificationRecipient, false}} + ); for (const User::Ptr& user : userGroup->GetMembers()) { - String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", user->GetName(), userGroupName, notification->GetName()})); - DeleteRelationship(userId, "notification:recipient"); + EnqueueRelationsDeletion( + HashValue( + new Array{ + m_EnvironmentId, + "usergroupuser", + user->GetName(), + userGroupName, + notification->GetName(), + } + ), + {{RedisKey::NotificationRecipient, false}} + ); } } } void IcingaDB::SendTimePeriodRangesChanged(const TimePeriod::Ptr& timeperiod, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedKeys = GetDictionaryDeletedKeys(oldValues, newValues); - String typeName = GetLowerCaseTypeNameDB(timeperiod); - - for (const auto& rangeKey : deletedKeys) { - String id = HashValue(new Array({m_EnvironmentId, rangeKey, oldValues->Get(rangeKey), timeperiod->GetName()})); - DeleteRelationship(id, "timeperiod:range"); + for (const auto& rangeKey : GetDictionaryDeletedKeys(oldValues, newValues)) { + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, rangeKey, oldValues->Get(rangeKey), timeperiod->GetName()}), + {{RedisKey::TimePeriodRange, false}} + ); } } void IcingaDB::SendTimePeriodIncludesChanged(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedIncludes = GetArrayDeletedValues(oldValues, newValues); - - for (const auto& includeName : deletedIncludes) { - String id = HashValue(new Array({m_EnvironmentId, includeName, timeperiod->GetName()})); - DeleteRelationship(id, "timeperiod:override:include"); + for (const auto& includeName : GetArrayDeletedValues(oldValues, newValues)) { + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, includeName, timeperiod->GetName()}), + {{RedisKey::TimePeriodInclude, false}} + ); } } void IcingaDB::SendTimePeriodExcludesChanged(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedExcludes = GetArrayDeletedValues(oldValues, newValues); - - for (const auto& excludeName : deletedExcludes) { - String id = HashValue(new Array({m_EnvironmentId, excludeName, timeperiod->GetName()})); - DeleteRelationship(id, "timeperiod:override:exclude"); + for (const auto& excludeName : GetArrayDeletedValues(oldValues, newValues)) { + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, excludeName, timeperiod->GetName()}), + {{RedisKey::TimePeriodExclude, false}} + ); } } template void IcingaDB::SendGroupsChanged(const ConfigObject::Ptr& object, const Array::Ptr& oldValues, const Array::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedGroups = GetArrayDeletedValues(oldValues, newValues); - String typeName = GetLowerCaseTypeNameDB(object); + RedisKey keyType; + if constexpr (std::is_same_v) { + keyType = RedisKey::UserGroupMember; + } else if constexpr (std::is_same_v) { + keyType = RedisKey::HostGroupMember; + } else { + static_assert(std::is_same_v, "IcingaDB::SendGroupsChanged: T must be UserGroup, HostGroup or ServiceGroup"); + keyType = RedisKey::ServiceGroupMember; + } - for (const auto& groupName : deletedGroups) { + for (const auto& groupName : GetArrayDeletedValues(oldValues, newValues)) { typename T::Ptr group = ConfigObject::GetObject(groupName); - String id = HashValue(new Array({m_EnvironmentId, group->GetName(), object->GetName()})); - DeleteRelationship(id, typeName + "group:member"); + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, group->GetName(), object->GetName()}), + {{keyType, false}} + ); - if (std::is_same::value) { + if constexpr (std::is_same_v) { UserGroup::Ptr userGroup = dynamic_pointer_cast(group); for (const auto& notification : userGroup->GetNotifications()) { - String userId = HashValue(new Array({m_EnvironmentId, "usergroupuser", object->GetName(), groupName, notification->GetName()})); - DeleteRelationship(userId, "notification:recipient"); + EnqueueRelationsDeletion( + HashValue( + new Array{ + m_EnvironmentId, + "usergroupuser", + object->GetName(), + groupName, + notification->GetName() + } + ), + {{RedisKey::NotificationRecipient, false}} + ); } } } } -void IcingaDB::SendCommandEnvChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { +void IcingaDB::SendCommandEnvChanged( + const ConfigObject::Ptr& command, + RedisKey keyType, + const Dictionary::Ptr& oldValues, + const Dictionary::Ptr& newValues +) +{ + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedKeys = GetDictionaryDeletedKeys(oldValues, newValues); - String typeName = GetLowerCaseTypeNameDB(command); - - for (const auto& envvarKey : deletedKeys) { - String id = HashValue(new Array({m_EnvironmentId, envvarKey, command->GetName()})); - DeleteRelationship(id, typeName + ":envvar", true); + for (const auto& envvarKey : GetDictionaryDeletedKeys(oldValues, newValues)) { + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, envvarKey, command->GetName()}), + {{keyType, true}} + ); } } -void IcingaDB::SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { +void IcingaDB::SendCommandArgumentsChanged( + const ConfigObject::Ptr& command, + RedisKey keyType, + const Dictionary::Ptr& oldValues, + const Dictionary::Ptr& newValues +) +{ + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } - std::vector deletedKeys = GetDictionaryDeletedKeys(oldValues, newValues); - String typeName = GetLowerCaseTypeNameDB(command); - - for (const auto& argumentKey : deletedKeys) { - String id = HashValue(new Array({m_EnvironmentId, argumentKey, command->GetName()})); - DeleteRelationship(id, typeName + ":argument", true); + for (const auto& argumentKey : GetDictionaryDeletedKeys(oldValues, newValues)) { + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, argumentKey, command->GetName()}), + {{keyType, true}} + ); } } @@ -2828,111 +2789,22 @@ void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dict return; } - if (!m_Rcon || !m_Rcon->IsConnected() || oldValues == newValues) { + if (!m_RconWorker || !m_RconWorker->IsConnected() || oldValues == newValues) { return; } Dictionary::Ptr oldVars = SerializeVars(oldValues); Dictionary::Ptr newVars = SerializeVars(newValues); - std::vector deletedVars = GetDictionaryDeletedKeys(oldVars, newVars); - String typeName = GetLowerCaseTypeNameDB(object); - - for (const auto& varId : deletedVars) { - String id = HashValue(new Array({m_EnvironmentId, varId, object->GetName()})); - DeleteRelationship(id, typeName + ":customvar"); - } -} - -void IcingaDB::SendDependencyGroupChildRegistered(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup) -{ - if (!m_Rcon || !m_Rcon->IsConnected()) { - return; - } - - std::vector runtimeUpdates; - std::map hMSets; - InsertCheckableDependencies(child, hMSets, &runtimeUpdates, dependencyGroup); - ExecuteRedisTransaction(m_Rcon, hMSets, runtimeUpdates); - - UpdateState(child, StateUpdate::Full); - UpdateDependenciesState(child, dependencyGroup); - - std::set parents; - dependencyGroup->LoadParents(parents); - for (const auto& parent : parents) { - // The total_children and affects_children columns might now have different outcome, so update the parent - // Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's not - // worth traversing the whole tree way up and sending config updates for each one of them, as the next Redis - // config dump is going to fix it anyway. - SendConfigUpdate(parent, true); - } -} - -void IcingaDB::SendDependencyGroupChildRemoved( - const DependencyGroup::Ptr& dependencyGroup, - const std::vector& dependencies, - bool removeGroup -) -{ - if (!m_Rcon || !m_Rcon->IsConnected() || dependencies.empty()) { - return; - } - - Checkable::Ptr child; - std::set detachedParents; - for (const auto& dependency : dependencies) { - child = dependency->GetChild(); // All dependencies have the same child. - const auto& parent(dependency->GetParent()); - if (auto [_, inserted] = detachedParents.insert(dependency->GetParent().get()); inserted) { - String edgeId; - if (dependencyGroup->IsRedundancyGroup()) { - // If the redundancy group has no members left, it's going to be removed as well, so we need to - // delete dependency edges from that group to the parent Checkables. - if (removeGroup) { - auto id(HashValue(new Array{dependencyGroup->GetIcingaDBIdentifier(), GetObjectIdentifier(parent)})); - DeleteRelationship(id, RedisKey::DependencyEdge); - DeleteState(id, RedisKey::DependencyEdgeState); - } - - // Remove the connection from the child Checkable to the redundancy group. - edgeId = HashValue(new Array{GetObjectIdentifier(child), dependencyGroup->GetIcingaDBIdentifier()}); - } else { - // Remove the edge between the parent and child Checkable linked through the removed dependency. - edgeId = HashValue(new Array{GetObjectIdentifier(child), GetObjectIdentifier(parent)}); - } - - DeleteRelationship(edgeId, RedisKey::DependencyEdge); - - // The total_children and affects_children columns might now have different outcome, so update the parent - // Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's - // not worth traversing the whole tree way up and sending config updates for each one of them, as the next - // Redis config dump is going to fix it anyway. - SendConfigUpdate(parent, true); - - if (!parent->HasAnyDependencies()) { - // If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry. - DeleteRelationship(GetObjectIdentifier(parent), RedisKey::DependencyNode); - } - } - } - - if (removeGroup && dependencyGroup->IsRedundancyGroup()) { - String redundancyGroupId(dependencyGroup->GetIcingaDBIdentifier()); - DeleteRelationship(redundancyGroupId, RedisKey::DependencyNode); - DeleteRelationship(redundancyGroupId, RedisKey::RedundancyGroup); - - DeleteState(redundancyGroupId, RedisKey::RedundancyGroupState); - DeleteState(redundancyGroupId, RedisKey::DependencyEdgeState); - } else if (removeGroup) { - // Note: The Icinga DB identifier of a non-redundant dependency group is used as the edge state ID - // and shared by all of its dependency objects. See also SerializeDependencyEdgeState() for details. - DeleteState(dependencyGroup->GetIcingaDBIdentifier(), RedisKey::DependencyEdgeState); - } + const auto& typ = object->GetReflectionType(); + const auto& it = boost::range::find_if(l_CustomVarKeys, [&typ](const auto& pair) { return pair.first == typ; }); + VERIFY(it != boost::end(l_CustomVarKeys)); - if (!child->HasAnyDependencies()) { - // If the child Checkable has no parent and reverse dependencies, we can safely remove the dependency node. - DeleteRelationship(GetObjectIdentifier(child), RedisKey::DependencyNode); + for (const auto& varId : GetDictionaryDeletedKeys(oldVars, newVars)) { + EnqueueRelationsDeletion( + HashValue(new Array{m_EnvironmentId, varId, object->GetName()}), + {{it->second, false}} + ); } } @@ -3104,7 +2976,6 @@ IcingaDB::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, typeName = typeNameOverride.ToLower(); return {GetObjectIdentifier(object), JsonEncode(attrs)}; - //m_Rcon->FireAndForgetQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); } void IcingaDB::StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) @@ -3117,10 +2988,11 @@ void IcingaDB::StateChangeHandler(const ConfigObject::Ptr& object, const CheckRe void IcingaDB::ReachabilityChangeHandler(const std::set& children) { for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { - std::set seenGroups; for (auto& checkable : children) { - rw->UpdateState(checkable, StateUpdate::Full); - rw->UpdateDependenciesState(checkable, nullptr, &seenGroups); + rw->EnqueueConfigObject(checkable, FullState); + for (const auto& dependencyGroup : checkable->GetDependencyGroups()) { + rw->EnqueueDependencyGroupStateUpdate(dependencyGroup); + } } } } @@ -3134,17 +3006,13 @@ void IcingaDB::VersionChangedHandler(const ConfigObject::Ptr& object) } if (object->IsActive()) { - // Create or update the object config for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { - if (rw) - rw->SendConfigUpdate(object, true); + // A runtime config change triggers also a full state update as well as next update event. + rw->EnqueueConfigObject(object, ConfigUpdate | FullState | NextUpdate); } - } else if (!object->IsActive() && - object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp - // Delete object config + } else if (!object->IsActive() && object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { - if (rw) - rw->SendConfigDelete(object); + rw->SendConfigDelete(object); } } } @@ -3204,37 +3072,47 @@ void IcingaDB::FlappingChangeHandler(const Checkable::Ptr& checkable, double cha void IcingaDB::NewCheckResultHandler(const Checkable::Ptr& checkable) { for (auto& rw : ConfigType::GetObjectsByType()) { - rw->UpdateState(checkable, StateUpdate::Volatile); - rw->SendNextUpdate(checkable); + rw->EnqueueConfigObject(checkable, VolatileState); } } -void IcingaDB::NextCheckUpdatedHandler(const Checkable::Ptr& checkable) +void IcingaDB::NextCheckChangedHandler(const Checkable::Ptr& checkable) { for (auto& rw : ConfigType::GetObjectsByType()) { - rw->UpdateState(checkable, StateUpdate::Volatile); - rw->SendNextUpdate(checkable); + rw->EnqueueConfigObject(checkable, VolatileState | NextUpdate); } } void IcingaDB::DependencyGroupChildRegisteredHandler(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup) { for (const auto& rw : ConfigType::GetObjectsByType()) { - rw->SendDependencyGroupChildRegistered(child, dependencyGroup); + rw->EnqueueConfigObject(child, FullState); // Child requires a full state update. + rw->EnqueueDependencyChildRegistered(dependencyGroup, child); + rw->EnqueueDependencyGroupStateUpdate(dependencyGroup); + + std::set parents; + dependencyGroup->LoadParents(parents); + for (const auto& parent : parents) { + // The total_children and affects_children columns might now have different outcome, so update the parent + // Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's + // not worth traversing the whole tree way up and sending config updates for each one of them, as the next + // Redis config dump is going to fix it anyway. + rw->EnqueueConfigObject(parent, ConfigUpdate | FullState); + } } } void IcingaDB::DependencyGroupChildRemovedHandler(const DependencyGroup::Ptr& dependencyGroup, const std::vector& dependencies, bool removeGroup) { for (const auto& rw : ConfigType::GetObjectsByType()) { - rw->SendDependencyGroupChildRemoved(dependencyGroup, dependencies, removeGroup); + rw->EnqueueDependencyChildRemoved(dependencyGroup, dependencies, removeGroup); } } void IcingaDB::HostProblemChangedHandler(const Service::Ptr& service) { for (auto& rw : ConfigType::GetObjectsByType()) { /* Host state changes affect is_handled and severity of services. */ - rw->UpdateState(service, StateUpdate::Full); + rw->EnqueueConfigObject(service, FullState); } } @@ -3311,15 +3189,17 @@ void IcingaDB::ServiceGroupsChangedHandler(const Service::Ptr& service, const Ar } } -void IcingaDB::CommandEnvChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { +void IcingaDB::CommandEnvChangedHandler(const Command::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { - rw->SendCommandEnvChanged(command, oldValues, newValues); + auto [cmdEnvKey, _] = GetCmdEnvArgKeys(command); + rw->SendCommandEnvChanged(command, cmdEnvKey, oldValues, newValues); } } -void IcingaDB::CommandArgumentsChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { +void IcingaDB::CommandArgumentsChangedHandler(const Command::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues) { for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { - rw->SendCommandArgumentsChanged(command, oldValues, newValues); + auto [_, cmdArgKey] = GetCmdEnvArgKeys(command); + rw->SendCommandArgumentsChanged(command, cmdArgKey, oldValues, newValues); } } @@ -3346,21 +3226,99 @@ void IcingaDB::DeleteRelationship(const String& id, const String& redisKeyWithou "redis_key", redisKey, "id", id, "runtime_type", "delete" }); - m_Rcon->FireAndForgetQueries(queries, Prio::Config); + m_RconWorker->FireAndForgetQueries(queries); } void IcingaDB::DeleteRelationship(const String& id, RedisKey redisKey, bool hasChecksum) { switch (redisKey) { - case RedisKey::RedundancyGroup: - DeleteRelationship(id, "redundancygroup", hasChecksum); + case RedisKey::CheckCmdArg: + DeleteRelationship(id, "checkcommand:argument", hasChecksum); break; - case RedisKey::DependencyNode: - DeleteRelationship(id, "dependency:node", hasChecksum); + case RedisKey::CheckCmdCustomVar: + DeleteRelationship(id, "checkcommand:customvar", hasChecksum); + break; + case RedisKey::CheckCmdEnvVar: + DeleteRelationship(id, "checkcommand:envvar", hasChecksum); break; case RedisKey::DependencyEdge: DeleteRelationship(id, "dependency:edge", hasChecksum); break; + case RedisKey::DependencyNode: + DeleteRelationship(id, "dependency:node", hasChecksum); + break; + case RedisKey::EventCmdArg: + DeleteRelationship(id, "eventcommand:argument", hasChecksum); + break; + case RedisKey::EventCmdCustomVar: + DeleteRelationship(id, "eventcommand:customvar", hasChecksum); + break; + case RedisKey::EventCmdEnvVar: + DeleteRelationship(id, "eventcommand:envvar", hasChecksum); + break; + case RedisKey::HostCustomVar: + DeleteRelationship(id, "host:customvar", hasChecksum); + break; + case RedisKey::HostGroupCustomVar: + DeleteRelationship(id, "hostgroup:customvar", hasChecksum); + break; + case RedisKey::HostGroupMember: + DeleteRelationship(id, "hostgroup:member", hasChecksum); + break; + case RedisKey::NotificationCmdArg: + DeleteRelationship(id, "notificationcommand:argument", hasChecksum); + break; + case RedisKey::NotificationCmdCustomVar: + DeleteRelationship(id, "notificationcommand:customvar", hasChecksum); + break; + case RedisKey::NotificationCmdEnvVar: + DeleteRelationship(id, "notificationcommand:envvar", hasChecksum); + break; + case RedisKey::NotificationCustomVar: + DeleteRelationship(id, "notification:customvar", hasChecksum); + break; + case RedisKey::NotificationRecipient: + DeleteRelationship(id, "notification:recipient", hasChecksum); + break; + case RedisKey::NotificationUser: + DeleteRelationship(id, "notification:user", hasChecksum); + break; + case RedisKey::NotificationUserGroup: + DeleteRelationship(id, "notification:usergroup", hasChecksum); + break; + case RedisKey::RedundancyGroup: + DeleteRelationship(id, "redundancygroup", hasChecksum); + break; + case RedisKey::ServiceCustomVar: + DeleteRelationship(id, "service:customvar", hasChecksum); + break; + case RedisKey::ServiceGroupCustomVar: + DeleteRelationship(id, "servicegroup:customvar", hasChecksum); + break; + case RedisKey::ServiceGroupMember: + DeleteRelationship(id, "servicegroup:member", hasChecksum); + break; + case RedisKey::TimePeriodCustomVar: + DeleteRelationship(id, "timeperiod:customvar", hasChecksum); + break; + case RedisKey::TimePeriodExclude: + DeleteRelationship(id, "timeperiod:override:exclude", hasChecksum); + break; + case RedisKey::TimePeriodInclude: + DeleteRelationship(id, "timeperiod:override:include", hasChecksum); + break; + case RedisKey::TimePeriodRange: + DeleteRelationship(id, "timeperiod:range", hasChecksum); + break; + case RedisKey::UserCustomVar: + DeleteRelationship(id, "user:customvar", hasChecksum); + break; + case RedisKey::UserGroupCustomVar: + DeleteRelationship(id, "usergroup:customvar", hasChecksum); + break; + case RedisKey::UserGroupMember: + DeleteRelationship(id, "usergroup:member", hasChecksum); + break; default: BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid RedisKey provided")); } @@ -3370,6 +3328,12 @@ void IcingaDB::DeleteState(const String& id, RedisKey redisKey, bool hasChecksum { String redisKeyWithoutPrefix; switch (redisKey) { + case RedisKey::HostState: + redisKeyWithoutPrefix = "host:state"; + break; + case RedisKey::ServiceState: + redisKeyWithoutPrefix = "service:state"; + break; case RedisKey::RedundancyGroupState: redisKeyWithoutPrefix = "redundancygroup:state"; break; @@ -3389,7 +3353,7 @@ void IcingaDB::DeleteState(const String& id, RedisKey redisKey, bool hasChecksum } hdels.emplace_back(RedisConnection::Query{"HDEL", m_PrefixConfigObject + redisKeyWithoutPrefix, id}); - m_Rcon->FireAndForgetQueries(std::move(hdels), Prio::RuntimeStateSync); + m_RconWorker->FireAndForgetQueries(std::move(hdels)); // TODO: This is currently purposefully commented out due to how Icinga DB (Go) handles runtime state // upsert and delete events. See https://github.com/Icinga/icingadb/pull/894 for more details. /*m_Rcon->FireAndForgetQueries({{ @@ -3476,11 +3440,11 @@ void IcingaDB::ExecuteRedisTransaction(const RedisConnection::Ptr& rcon, std::ma if (transaction.size() > 1) { transaction.emplace_back(RedisConnection::Query{"EXEC"}); if (!runtimeUpdates.empty()) { - rcon->FireAndForgetQueries(std::move(transaction), Prio::Config, {1}); + rcon->FireAndForgetQueries(std::move(transaction), {1}); } else { // This is likely triggered by the initial Redis config dump, so a) we don't need to record the number of // affected objects and b) we don't really know how many objects are going to be affected by this tx. - rcon->FireAndForgetQueries(std::move(transaction), Prio::Config); + rcon->FireAndForgetQueries(std::move(transaction)); } } } diff --git a/lib/icingadb/icingadb-utility.cpp b/lib/icingadb/icingadb-utility.cpp index 8fa0e338c6..aeb1e4c475 100644 --- a/lib/icingadb/icingadb-utility.cpp +++ b/lib/icingadb/icingadb-utility.cpp @@ -25,6 +25,18 @@ using namespace icinga; +/** + * Checks if the given Redis key is a state key. + * + * @param key The Redis key to check. + * + * @return true if the key is a state key, false otherwise. + */ +bool IcingaDB::IsStateKey(RedisKey key) +{ + return key > RedisKey::_state_keys_begin && key < RedisKey::_state_keys_end; +} + String IcingaDB::FormatCheckSumBinary(const String& str) { char output[20*2+1]; @@ -169,30 +181,8 @@ Dictionary::Ptr IcingaDB::SerializeVars(const Dictionary::Ptr& vars) */ Dictionary::Ptr IcingaDB::SerializeDependencyEdgeState(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep) { - String edgeStateId; - // The edge state ID is computed a bit differently depending on whether this is for a redundancy group or not. - // For redundancy groups, the state ID is supposed to represent the connection state between the redundancy group - // and the parent Checkable of the given dependency. Hence, the outcome will always be different for each parent - // Checkable of the redundancy group. - if (dependencyGroup->IsRedundancyGroup()) { - edgeStateId = HashValue(new Array{ - dependencyGroup->GetIcingaDBIdentifier(), - GetObjectIdentifier(dep->GetParent()), - }); - } else if (dependencyGroup->GetIcingaDBIdentifier().IsEmpty()) { - // For non-redundant dependency groups, on the other hand, all dependency objects within that group will - // always have the same parent Checkable. Likewise, the state ID will be always the same as well it doesn't - // matter which dependency object is used to compute it. Therefore, it's sufficient to compute it only once - // and all the other dependency objects can reuse the cached state ID. - edgeStateId = HashValue(new Array{dependencyGroup->GetCompositeKey(), GetObjectIdentifier(dep->GetParent())}); - dependencyGroup->SetIcingaDBIdentifier(edgeStateId); - } else { - // Use the already computed state ID for the dependency group. - edgeStateId = dependencyGroup->GetIcingaDBIdentifier(); - } - return new Dictionary{ - {"id", std::move(edgeStateId)}, + {"id", GetDependencyEdgeStateId(dependencyGroup, dep)}, {"environment_id", m_EnvironmentId}, {"failed", !dep->IsAvailable(DependencyState) || !dep->GetParent()->IsReachable()} }; @@ -219,6 +209,42 @@ Dictionary::Ptr IcingaDB::SerializeRedundancyGroupState(const Checkable::Ptr& ch }; } +/** + * Computes the dependency edge state ID for the given dependency object. + * + * The edge state ID is computed a bit differently depending on whether this is for a redundancy group or not. + * For redundancy groups, the state ID is supposed to represent the connection state between the redundancy group + * and the parent Checkable of the given dependency. Hence, the outcome will always be different for each parent + * Checkable of the redundancy group. + * + * For non-redundant dependency groups, on the other hand, all dependency objects within that group will + * always have the same parent Checkable. Likewise, the state ID will be always the same as well it doesn't + * matter which dependency object is used to compute it. Therefore, it's sufficient to compute it only once + * and all the other dependency objects can reuse the cached state ID. Thus, this function will cache the just + * computed state ID in the dependency group object itself for later reuse. + * + * @param dependencyGroup The dependency group the dependency is part of. + * @param dep The dependency object to compute the state ID for. + * + * @return The computed edge state ID. + */ +String IcingaDB::GetDependencyEdgeStateId(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep) +{ + if (dependencyGroup->IsRedundancyGroup()) { + return HashValue(new Array{ + dependencyGroup->GetIcingaDBIdentifier(), + GetObjectIdentifier(dep->GetParent()), + }); + } + if (dependencyGroup->GetIcingaDBIdentifier().IsEmpty()) { + auto edgeStateId = HashValue(new Array{dependencyGroup->GetCompositeKey(), GetObjectIdentifier(dep->GetParent())}); + dependencyGroup->SetIcingaDBIdentifier(edgeStateId); + return edgeStateId; + } + // Use the already computed state ID for the dependency group. + return dependencyGroup->GetIcingaDBIdentifier(); +} + /** * Converts the given filter to its Redis value representation. * @@ -349,6 +375,31 @@ String IcingaDB::GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj) return obj->GetReflectionType()->GetName().ToLower(); } +/** + * Determines the Redis keys for environment variables and arguments based on the command type. + * + * @param command The command object to get the keys for. + * @return The pair of Redis keys for environment variables and arguments. + */ +std::pair IcingaDB::GetCmdEnvArgKeys(const Command::Ptr& command) +{ + RedisKey envKey, argKey; + const auto& cmdType = command->GetReflectionType(); + if (CheckCommand::TypeInstance->IsAssignableFrom(cmdType)) { + envKey = RedisKey::CheckCmdEnvVar; + argKey = RedisKey::CheckCmdArg; + } else if (NotificationCommand::TypeInstance->IsAssignableFrom(cmdType)) { + envKey = RedisKey::NotificationCmdEnvVar; + argKey = RedisKey::NotificationCmdArg; + } else if (EventCommand::TypeInstance->IsAssignableFrom(cmdType)) { + envKey = RedisKey::EventCmdEnvVar; + argKey = RedisKey::EventCmdArg; + } else { + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid command type specified")); + } + return {envKey, argKey}; +} + long long IcingaDB::TimestampToMilliseconds(double timestamp) { // In addition to the limits of the Icinga DB MySQL (0 - 2^64) and PostgreSQL (0 - 2^63) schemata, // years not fitting in YYYY may cause problems, see e.g. https://github.com/golang/go/issues/4556. diff --git a/lib/icingadb/icingadb-worker.cpp b/lib/icingadb/icingadb-worker.cpp new file mode 100644 index 0000000000..4f3a21fcf3 --- /dev/null +++ b/lib/icingadb/icingadb-worker.cpp @@ -0,0 +1,503 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "base/logger.hpp" +#include + +using namespace icinga; + +PendingQueueItem::PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits) + : DirtyBits{dirtyBits & DirtyBitsAll}, ID{std::move(id)}, EnqueueTime{std::chrono::steady_clock::now()} +{ +} + +PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits) + : PendingQueueItem{std::make_pair(obj, nullptr), bits}, Object{obj} +{ +} +PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup) + : PendingQueueItem{std::make_pair(nullptr, depGroup), 0}, DepGroup{depGroup} +{ +} + +PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child) + : PendingQueueItem{std::make_pair(child, depGroup), 0}, DepGroup{depGroup}, Child{child} +{ +} + +RelationsDeletionItem::RelationsDeletionItem(const String& id, RelationsKeyMap relations) + : PendingQueueItem{id, 0}, Relations{std::move(relations)} +{ +} + +/** + * Background worker thread procedure for processing pending items. + * + * This function runs in a separate thread and continuously processes pending items that have been + * enqueued for Redis updates. It waits for new items to be added to the pending items container, + * and processes them one at a time, ensuring that the Redis connection is active and not overloaded + * with too many pending queries. The function also implements a delay mechanism to allow for potential + * additional changes to be merged into the same item before processing it. + */ +void IcingaDB::PendingItemsThreadProc() +{ + using namespace std::chrono_literals; + namespace ch = std::chrono; + + // Limits the number of pending queries the Rcon can have at any given time to reduce the memory overhead to + // the absolute minimum necessary, since the size of the pending queue items is much smaller than the size + // of the actual Redis queries. Thus, this will slow down the worker thread a bit from generating too many + // Redis queries when the Redis connection is saturated. + constexpr size_t maxPendingQueries = 128; + + std::unique_lock lock(m_PendingItemsMutex); + // Wait until the initial config dump is done. IcingaDB::OnConnectedHandler will notify us once it's finished. + while (GetActive() && !m_ConfigDumpDone) m_PendingItemsCV.wait(lock); + + // Predicate to determine whether the worker thread is allowed to process pending items. + auto canContinue = [this] { + if (!GetActive()) { + return true; + } + return !m_PendingItems.empty() && m_RconWorker && m_RconWorker->IsConnected() && m_RconWorker->GetPendingQueryCount() < maxPendingQueries; + }; + + while (GetActive()) { + // Even if someone notifies us, we still need to verify whether the precondition is actually fulfilled. + // However, in case we don't receive any notification, we still want to wake up periodically on our own + // to check whether we can proceed (e.g. the Redis connection might have become available again and there + // was no activity on the pending items queue to trigger a notification). Thus, we use a timed wait here. + while (!canContinue()) m_PendingItemsCV.wait_for(lock, 100ms); + + // If we're not active anymore (shutting down) and the Redis connection is gone, we can't process anything + // anymore, so just exit right away. If we still have a working connection, we want to flush all remaining + // items as fast as possible before exiting. + if (!GetActive() && (!m_RconWorker || !m_RconWorker->IsConnected())) { + break; + } + + ch::duration retryAfter; + do { + retryAfter = DequeueAndProcessOne(lock); + // Flush all remaining items as fast as possible during shutdown. + // At this point, there shouldn't be any new items being enqueued anymore, + // so we won't end up in an infinite loop. + } while (!m_PendingItems.empty() && !GetActive()); + + if (retryAfter > 0ms) { + m_PendingItemsCV.wait_for(lock, retryAfter); + } + } +} + +/** + * Dequeue and process a single pending item. + * + * This function processes a single pending item from the pending items container. It iterates over + * the items in insertion order and checks if the first item is old enough to be processed (at least + * 1000ms old) unless we're being shutting down. If the item can be processed, it attempts to acquire + * a lock on the associated config object (if applicable) and processes the item accordingly. + * + * If the item cannot be processed right now because it's too new, the function returns a duration + * indicating how long to wait before retrying. Also, if no progress was made during this iteration + * (i.e., no item was processed), it returns a short delay to avoid busy-looping. + * + * @param lock A unique lock on the pending items mutex (must be acquired before calling this function). + * + * @return A duration indicating how long to wait before retrying. + */ +std::chrono::duration IcingaDB::DequeueAndProcessOne(std::unique_lock& lock) +{ + using namespace std::chrono_literals; + namespace ch = std::chrono; + + bool madeProgress = false; // Did we make any progress in this iteration? + ch::duration retryAfter{0}; // If we can't process anything right now, how long to wait before retrying? + auto now = ch::steady_clock::now(); + + auto& seqView = m_PendingItems.get<1>(); + for (auto it(seqView.begin()); it != seqView.end(); ++it) { + if (it != seqView.begin()) { + if (std::holds_alternative(*it)) { + // We don't know whether the previous items are related to this deletion item or not, + // thus we can't just process this right now when there are older items in the queue. + // Otherwise, we might delete something that is going to be updated/created. + break; + } + } + + auto age = now - std::visit([](const auto& item) { return item.EnqueueTime; }, *it); + if (GetActive() && 1000ms > age) { + if (it == seqView.begin()) { + retryAfter = 1000ms - age; + } + break; + } + + ConfigObject::Ptr cobj; + if (auto* citem = std::get_if(&*it); citem) { + cobj = citem->Object; + } + + ObjectLock olock(cobj, std::defer_lock); + if (cobj && !olock.TryLock()) { + continue; // Can't lock the object right now, try the next one. + } + + PendingItemVariant itemToProcess = *it; + seqView.erase(it); + madeProgress = true; + + lock.unlock(); + try { + std::visit([this](const auto& item) { ProcessPendingItem(item); }, std::move(itemToProcess)); + } catch (const std::exception& ex) { + Log(LogCritical, "IcingaDB") + << "Exception while processing pending item of type index '" << itemToProcess.index() << "': " + << DiagnosticInformation(ex, GetActive()); + } + lock.lock(); + break; + } + + if (!madeProgress && retryAfter == 0ms) { + // We haven't made any progress, so give it a short delay before retrying. + retryAfter = 10ms; + } + return retryAfter; +} + +/** + * Process a single pending object. + * + * This function processes a single pending object based on its dirty bits. It checks if the object is a + * @c ConfigObject and performs the appropriate actions such as sending configuration updates, state updates, + * or deletions to the Redis connection. The function handles different types of objects, including @c Checkable + * objects, and ensures that the correct updates are sent based on the dirty bits set for the object. + * + * @param item The pending item containing the object and its dirty bits. + */ +void IcingaDB::ProcessPendingItem(const PendingConfigItem& item) +{ + if (item.DirtyBits & ConfigDelete) { + String typeName = GetLowerCaseTypeNameDB(item.Object); + m_RconWorker->FireAndForgetQueries( + { + {"HDEL", m_PrefixConfigObject + typeName, GetObjectIdentifier(item.Object)}, + {"HDEL", m_PrefixConfigCheckSum + typeName, GetObjectIdentifier(item.Object)}, + { + "XADD", + "icinga:runtime", + "MAXLEN", + "~", + "1000000", + "*", + "redis_key", + m_PrefixConfigObject + typeName, + "id", + GetObjectIdentifier(item.Object), + "runtime_type", + "delete" + } + } + ); + } + + if (item.DirtyBits & ConfigUpdate) { + std::map> hMSets; + std::vector runtimeUpdates; + CreateConfigUpdate(item.Object, GetLowerCaseTypeNameDB(item.Object), hMSets, runtimeUpdates, true); + ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); + } + + if (auto checkable = dynamic_pointer_cast(item.Object); checkable) { + if (item.DirtyBits & FullState) { + UpdateState(checkable, item.DirtyBits); + } + if (item.DirtyBits & NextUpdate) { + SendNextUpdate(checkable); + } + } +} + +/** + * Process a single pending dependency group state item. + * + * This function processes a single pending dependency group state item by updating the dependencies + * state for the associated dependency group. It selects any child checkable from the dependency group + * to initiate the state update process. + * + * @param item The pending dependency group state item containing the dependency group. + */ +void IcingaDB::ProcessPendingItem(const PendingDependencyGroupStateItem& item) const +{ + // For dependency group state updates, we don't actually care which child triggered the update, + // since all children share the same dependency group state. Thus, we can just pick any child to + // start the update from. + if (auto child = item.DepGroup->GetAnyChild(); child) { + UpdateDependenciesState(child, item.DepGroup); + } +} + +/** + * Process a single pending dependency edge item. + * + * This function fully serializes a single pending dependency edge item (child registration) + * and sends all the resulting Redis queries in a single transaction. The dependencies (edges) + * to serialize are determined by the dependency group and child checkable the provided item represents. + * + * @param item The pending dependency edge item containing the dependency group and child checkable. + */ +void IcingaDB::ProcessPendingItem(const PendingDependencyEdgeItem& item) +{ + std::vector runtimeUpdates; + std::map hMSets; + InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup); + ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates); +} + +/** + * Process a single pending deletion item. + * + * This function processes a single pending deletion item by deleting the specified sub-keys + * from Redis based on the provided deletion keys map. It ensures that the object's ID is + * removed from the specified Redis keys and their corresponding checksum keys if indicated. + * + * @param item The pending deletion item containing the ID and deletion keys map. + */ +void IcingaDB::ProcessPendingItem(const RelationsDeletionItem& item) +{ + ASSERT(std::holds_alternative(item.ID)); // Relation deletion items must have real IDs. + + auto id = std::get(item.ID); + for (auto [redisKey, hasChecksum] : item.Relations) { + if (IsStateKey(redisKey)) { + DeleteState(id, redisKey, hasChecksum); + } else { + DeleteRelationship(id, redisKey, hasChecksum); + } + } +} + +/** + * Enqueue a configuration object for processing in the pending objects thread. + * + * @param object The configuration object to be enqueued for processing. + * @param bits The dirty bits indicating the type of changes to be processed for the object. + */ +void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits) +{ + if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { + return; // No need to enqueue anything if we're not connected. + } + + { + std::lock_guard lock(m_PendingItemsMutex); + if (auto [it, inserted] = m_PendingItems.insert(PendingConfigItem{object, bits}); !inserted) { + m_PendingItems.modify(it, [bits](PendingItemVariant& itemToProcess) mutable { + std::visit( + [&bits](auto& item) { + if (bits & ConfigDelete) { + // A config delete and config update cancel each other out, and we don't need + // to keep any state updates either, as the object is being deleted. + item.DirtyBits &= ~(ConfigUpdate | FullState); + bits &= ~(ConfigUpdate | FullState); // Must not add these bits either. + } else if (bits & ConfigUpdate) { + // A new config update cancels any pending config deletion for the same object. + item.DirtyBits &= ~ConfigDelete; + bits &= ~ConfigDelete; + } + item.DirtyBits |= bits & DirtyBitsAll; + }, + itemToProcess + ); + }); + } + } + m_PendingItemsCV.notify_one(); +} + +void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup) +{ + if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { + { + std::lock_guard lock(m_PendingItemsMutex); + m_PendingItems.insert(PendingDependencyGroupStateItem{depGroup}); + } + m_PendingItemsCV.notify_one(); + } +} + +/** + * Enqueue the registration of a dependency child to a dependency group. + * + * This function adds a pending item to the queue for processing the registration of a child checkable + * to a dependency group. If there is no active Redis connection available, this function is a no-op. + * + * @param depGroup The dependency group to which the child is being registered. + * @param child The child checkable being registered to the dependency group. + */ +void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child) +{ + if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) { + { + std::lock_guard lock(m_PendingItemsMutex); + m_PendingItems.insert(PendingDependencyEdgeItem{depGroup, child}); + } + m_PendingItemsCV.notify_one(); + } +} + +/** + * Enqueue the removal of a dependency child from a dependency group. + * + * This function handles the removal of a child checkable from a dependency group by first checking if there + * are any pending registration items for the same child and dependency group. If such an item exists, it is + * removed from the pending items queue, effectively canceling the registration. If there is also a pending + * dependency group state update triggered by the same child, it is either removed or updated to use a different + * child if the group is not being removed entirely. If no pending registration exists, the function proceeds + * to enqueue the necessary deletions in Redis for the dependencies and related nodes and edges. + * + * @param depGroup The dependency group from which the child is being removed. + * @param dependencies The list of dependencies associated with the child being removed. + * @param removeGroup A flag indicating whether the entire dependency group should be removed. + */ +void IcingaDB::EnqueueDependencyChildRemoved( + const DependencyGroup::Ptr& depGroup, + const std::vector& dependencies, + bool removeGroup +) +{ + if (dependencies.empty() || !GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { + return; // No need to enqueue anything if we're not connected or there are no dependencies. + } + + Checkable::Ptr child(dependencies.front()->GetChild()); + bool hadPendingRegistration = false; // Whether we had a pending child registration to cancel. + + { + std::lock_guard lock(m_PendingItemsMutex); + if (auto it(m_PendingItems.find(std::make_pair(child, depGroup))); it != m_PendingItems.end()) { + hadPendingRegistration = true; + m_PendingItems.erase(it); // Cancel the pending child registration. + if (removeGroup) { + // If we're removing the entire group registration, we can also drop any pending dependency group + // state update triggered previously as it should no longer have any children left. + m_PendingItems.erase(std::make_pair(nullptr, depGroup)); + } + } + } + + if (!child->HasAnyDependencies()) { + // If the child Checkable has no parent and reverse dependencies, we can safely remove the dependency node. + // This might be a no-op in some cases (e.g. if the child's only dependency was the one that we just canceled + // above), but since we can't reliably determine whether the node exists in Redis or not, we just enqueue the + // deletion anyway. + EnqueueRelationsDeletion(GetObjectIdentifier(child), {{RedisKey::DependencyNode, false}}); + } + + if (hadPendingRegistration && depGroup->GetIcingaDBIdentifier().IsEmpty()) { + // If we had a pending registration that we just canceled above, and the dependency group has no + // IcingaDB identifier yet, then there's no need to proceed with any deletions, as the dependency + // group was never serialized to Redis in the first place. + return; + } + + if (depGroup->IsRedundancyGroup() && depGroup->GetIcingaDBIdentifier().IsEmpty()) { + // An empty IcingaDB identifier indicates that the worker thread has just picked up the registration of the + // first child (removed from the pending items queue) but hasn't yet entered the InsertCheckableDependencies() + // function to actually fill in the IcingaDB identifier. Thus, we need to generate and set it here to ensure + // that the relation deletions below use the correct identifier. + // + // Note: keep this with IcingaDB::InsertCheckableDependencies in sync! + depGroup->SetIcingaDBIdentifier(HashValue(new Array{m_EnvironmentId, depGroup->GetCompositeKey()})); + } + + std::set detachedParents; + for (const auto& dependency : dependencies) { + const auto& parent(dependency->GetParent()); + if (auto [_, inserted] = detachedParents.insert(dependency->GetParent().get()); inserted) { + String edgeId; + if (depGroup->IsRedundancyGroup()) { + // If the redundancy group has no members left, it's going to be removed as well, so we need to + // delete dependency edges from that group to the parent Checkables. + if (removeGroup) { + EnqueueRelationsDeletion( + GetDependencyEdgeStateId(depGroup, dependency), + { + {RedisKey::DependencyEdge, false}, + {RedisKey::DependencyEdgeState, false}, + } + ); + } + + // Remove the connection from the child Checkable to the redundancy group. + edgeId = HashValue(new Array{GetObjectIdentifier(child), depGroup->GetIcingaDBIdentifier()}); + } else { + // Remove the edge between the parent and child Checkable linked through the removed dependency. + edgeId = HashValue(new Array{GetObjectIdentifier(child), GetObjectIdentifier(parent)}); + if (depGroup->GetIcingaDBIdentifier().IsEmpty()) { + (void)GetDependencyEdgeStateId(depGroup, dependency); + } + } + + EnqueueRelationsDeletion(std::move(edgeId), {{RedisKey::DependencyEdge, false}}); + + // The total_children and affects_children columns might now have different outcome, so update the parent + // Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's + // not worth traversing the whole tree way up and sending config updates for each one of them, as the next + // Redis config dump is going to fix it anyway. + EnqueueConfigObject(parent, ConfigUpdate); + + if (!parent->HasAnyDependencies()) { + // If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry. + EnqueueRelationsDeletion(GetObjectIdentifier(parent), {{RedisKey::DependencyNode, false}}); + } + } + } + + if (removeGroup && depGroup->IsRedundancyGroup()) { + EnqueueRelationsDeletion( + depGroup->GetIcingaDBIdentifier(), + { + {RedisKey::DependencyNode, false}, + {RedisKey::RedundancyGroup, false}, + {RedisKey::RedundancyGroupState, false}, + {RedisKey::DependencyEdgeState, false} + } + ); + } else if (removeGroup) { + // Note: The Icinga DB identifier of a non-redundant dependency group is used as the edge state ID + // and shared by all of its dependency objects. See also SerializeDependencyEdgeState() for details. + EnqueueRelationsDeletion(depGroup->GetIcingaDBIdentifier(), {{RedisKey::DependencyEdgeState, false}}); + } +} + +/** + * Enqueue a relation deletion for processing in the pending objects thread. + * + * This function adds a relation deletion item to the set of pending items to be processed by the + * pending items worker thread. The relation deletion item contains the ID of the relation to be + * deleted and a map of Redis keys from which to delete the relation. If the relation deletion item + * is already in the set, it updates the deletion keys accordingly. + * + * @param id The ID of the relation to be deleted. + * @param relations A map of Redis keys from which to delete the relation. + */ +void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& relations) +{ + if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) { + return; // No need to enqueue anything if we're not connected. + } + + { + std::lock_guard lock(m_PendingItemsMutex); + if (auto [it, inserted] = m_PendingItems.insert(RelationsDeletionItem{id, relations}); !inserted) { + m_PendingItems.modify(it, [&relations](PendingItemVariant& val) { + auto& item = std::get(val); + item.Relations.insert(relations.begin(), relations.end()); + }); + } + } + m_PendingItemsCV.notify_one(); +} diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index 8d3b9099bd..f562bdd4ee 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -22,15 +22,13 @@ using namespace icinga; #define MAX_EVENTS_DEFAULT 5000 -using Prio = RedisConnection::QueryPriority; - String IcingaDB::m_EnvironmentId; std::mutex IcingaDB::m_EnvironmentIdInitMutex; REGISTER_TYPE(IcingaDB); IcingaDB::IcingaDB() - : m_Rcon(nullptr) + : m_RconWorker(nullptr) { m_RconLocked.store(nullptr); @@ -81,19 +79,19 @@ void IcingaDB::Start(bool runtimeCreated) m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); - m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetUsername(), GetPassword(), GetDbIndex(), - GetEnableTls(), GetInsecureNoverify(), GetCertPath(), GetKeyPath(), GetCaPath(), GetCrlPath(), - GetTlsProtocolmin(), GetCipherList(), GetConnectTimeout(), GetDebugInfo()); + RedisConnInfo::ConstPtr connInfo = GetRedisConnInfo(); + + m_Rcon = new RedisConnection(connInfo); m_RconLocked.store(m_Rcon); + m_RconWorker = new RedisConnection(connInfo, m_Rcon, true); + for (const Type::Ptr& type : GetTypes()) { auto ctype (dynamic_cast(type.get())); if (!ctype) continue; - RedisConnection::Ptr con = new RedisConnection(GetHost(), GetPort(), GetPath(), GetUsername(), GetPassword(), GetDbIndex(), - GetEnableTls(), GetInsecureNoverify(), GetCertPath(), GetKeyPath(), GetCaPath(), GetCrlPath(), - GetTlsProtocolmin(), GetCipherList(), GetConnectTimeout(), GetDebugInfo(), m_Rcon); + RedisConnection::Ptr con = new RedisConnection(connInfo, m_Rcon); con->SetConnectedCallback([this, con](boost::asio::yield_context& yc) { con->SetConnectedCallback(nullptr); @@ -109,10 +107,10 @@ void IcingaDB::Start(bool runtimeCreated) } m_PendingRcons = m_Rcons.size(); - - m_Rcon->SetConnectedCallback([this](boost::asio::yield_context& yc) { + m_Rcon->SetConnectedCallback([this](boost::asio::yield_context&) { m_Rcon->SetConnectedCallback(nullptr); + m_RconWorker->Start(); for (auto& kv : m_Rcons) { kv.second->Start(); } @@ -126,12 +124,10 @@ void IcingaDB::Start(bool runtimeCreated) m_WorkQueue.SetName("IcingaDB"); - m_Rcon->SuppressQueryKind(Prio::CheckResult); - m_Rcon->SuppressQueryKind(Prio::RuntimeStateSync); - Ptr keepAlive (this); m_HistoryThread = std::async(std::launch::async, [this, keepAlive]() { ForwardHistoryEntries(); }); + m_PendingItemsThread = std::thread([this, keepAlive] { PendingItemsThreadProc(); }); } void IcingaDB::ExceptionHandler(boost::exception_ptr exp) @@ -155,9 +151,9 @@ void IcingaDB::OnConnectedHandler() UpdateAllConfigObjects(); - m_ConfigDumpDone = true; - + m_ConfigDumpDone.store(true); m_ConfigDumpInProgress = false; + m_PendingItemsCV.notify_one(); // Wake up the pending items worker to start processing items. } void IcingaDB::PublishStatsTimerHandler(void) @@ -185,7 +181,7 @@ void IcingaDB::PublishStats() } } - m_Rcon->FireAndForgetQuery(std::move(query), Prio::Heartbeat); + m_RconWorker->FireAndForgetQuery(std::move(query), {}, true /* high priority */); } void IcingaDB::Stop(bool runtimeRemoved) @@ -201,6 +197,9 @@ void IcingaDB::Stop(bool runtimeRemoved) m_StatsTimer->Stop(true); + m_PendingItemsCV.notify_all(); // Wake up the pending items worker to let it exit cleanly. + m_PendingItemsThread.join(); + Log(LogInformation, "IcingaDB") << "'" << GetName() << "' stopped."; @@ -313,3 +312,30 @@ void IcingaDB::PersistEnvironmentId() Utility::SaveJsonFile(path, 0600, m_EnvironmentId); } } + +/** + * Constructs a RedisConnInfo object from the IcingaDB configuration. + * + * @return The RedisConnInfo object + */ +RedisConnInfo::ConstPtr IcingaDB::GetRedisConnInfo() const +{ + RedisConnInfo::Ptr connInfo = new RedisConnInfo(); + connInfo->Port = GetPort(); + connInfo->DbIndex = GetDbIndex(); + connInfo->Host = GetHost(); + connInfo->Path = GetPath(); + connInfo->User = GetUsername(); + connInfo->Password = GetPassword(); + connInfo->EnableTls = GetEnableTls(); + connInfo->TlsCertPath = GetCertPath(); + connInfo->TlsKeyPath = GetKeyPath(); + connInfo->TlsCaPath = GetCaPath(); + connInfo->TlsCrlPath = GetCrlPath(); + connInfo->TlsCipherList = GetCipherList(); + connInfo->TlsProtocolMin = GetTlsProtocolmin(); + connInfo->TlsInsecureNoverify = GetInsecureNoverify(); + connInfo->ConnectTimeout = GetConnectTimeout(); + connInfo->DbgInfo = GetDebugInfo(); + return connInfo; +} diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 01fa6bbd61..eaaccfcefe 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -11,9 +11,13 @@ #include "base/workqueue.hpp" #include "icinga/customvarobject.hpp" #include "icinga/checkable.hpp" +#include "icinga/command.hpp" #include "icinga/service.hpp" #include "icinga/downtime.hpp" #include "remote/messageorigin.hpp" +#include +#include +#include #include #include #include @@ -27,6 +31,200 @@ namespace icinga { +/** + * RedisKey is the enumeration of all Redis keys used by IcingaDB. + * + * Each enum value represents a specific Redis key type from which the actual Redis key strings are derived. + * For instance, the `Host` enum value corresponds to the Redis key pattern `icinga:host`. These enums help + * in organizing and managing the various Redis keys in a transparent and consistent manner and avoid hardcoding + * key strings throughout the codebase. + * + * @ingroup icingadb + */ +enum class RedisKey : uint8_t +{ + /* Command-related keys */ + CheckCmdArg, + CheckCmdEnvVar, + CheckCmdCustomVar, + + EventCmdArg, + EventCmdEnvVar, + EventCmdCustomVar, + + NotificationCmdArg, + NotificationCmdEnvVar, + NotificationCmdCustomVar, + + /* Dependency related config */ + DependencyNode, + DependencyEdge, + RedundancyGroup, + + /* Hosts & Services */ + HostCustomVar, + HostGroupMember, + HostGroupCustomVar, + + ServiceCustomVar, + ServiceGroupMember, + ServiceGroupCustomVar, + + /* Users & Usergroups */ + UserCustomVar, + UserGroupMember, + UserGroupCustomVar, + + /* Notification */ + NotificationUser, + NotificationUserGroup, + NotificationRecipient, + NotificationCustomVar, + + /* Timeperiods */ + TimePeriodRange, + TimePeriodInclude, + TimePeriodExclude, + TimePeriodCustomVar, + + /* Downtimes */ + ScheduledDowntimeCustomVar, + + /* State keys marker and state entries */ + _state_keys_begin, + HostState, + ServiceState, + RedundancyGroupState, + DependencyEdgeState, + _state_keys_end, +}; + +/** + * Dirty bits for config/state changes. + * + * These are used to mark objects as "dirty" in order to trigger appropriate updates in Redis. + * Each bit represents a different type of change that requires a specific action to be taken. + * + * @ingroup icingadb + */ +enum DirtyBits : uint32_t +{ + ConfigUpdate = 1<<0, // Trigger a Redis config update for the object. + ConfigDelete = 1<<1, // Send a deletion command for the object to Redis. + VolatileState = 1<<2, // Send a volatile state update to Redis (affects only checkables). + RuntimeState = 1<<3, // Send a runtime state update to Redis (affects only checkables). + NextUpdate = 1<<4, // Update the `icinga:nextupdate:{host,service}` Redis keys (affects only checkables). + + FullState = VolatileState | RuntimeState, // A combination of all (non-dependency) state-related dirty bits. + + // All valid dirty bits combined used for masking input values. + DirtyBitsAll = ConfigUpdate | ConfigDelete | FullState | NextUpdate +}; + +/** + * A variant type representing the identifier of a pending item. + * + * This variant can hold either a string representing a real Redis hash key or a pair consisting of + * a configuration object pointer and a dependency group pointer. A pending item identified by the + * latter variant type operates primarily on the associated configuration object or dependency group, + * thus the pairs are used for uniqueness in the pending items container. + * + * @ingroup icingadb + */ +using PendingItemKey = std::variant>; + +/** + * A pending queue item. + * + * This struct represents a generic pending item in the queue that is associated with a unique identifier + * and dirty bits indicating the type of updates required in Redis. The @c EnqueueTime field records the + * time when the item was added to the queue, which can be useful for tracking how long an item waits before + * being processed. This base struct is extended by more specific pending item types that operate on different + * kinds of objects, such as configuration objects or dependency groups. + * + * @ingroup icingadb + */ +struct PendingQueueItem +{ + uint32_t DirtyBits; + PendingItemKey ID; + const std::chrono::steady_clock::time_point EnqueueTime; + + PendingQueueItem(PendingItemKey&& id, uint32_t dirtyBits); +}; + +/** + * A pending configuration object item. + * + * This struct represents a pending item in the queue that is associated with a configuration object. + * It contains a pointer to the configuration object and the dirty bits indicating the type of updates + * required for that object in Redis. A pending configuration item operates primarily on config objects, + * thus the @c ID field in the base struct is only used for uniqueness in the pending items container. + * + * @ingroup icingadb + */ +struct PendingConfigItem : PendingQueueItem +{ + ConfigObject::Ptr Object; + + PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits); +}; + +/** + * A pending dependency group state item. + * + * This struct represents a pending item in the queue that is associated with a dependency group. + * It contains a pointer to the dependency group for which state updates are required. The dirty bits + * in the base struct are not used for this item type, as the operation is specific to updating the + * state of the dependency group itself. + * + * @ingroup icingadb + */ +struct PendingDependencyGroupStateItem : PendingQueueItem +{ + DependencyGroup::Ptr DepGroup; + + explicit PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup); +}; + +/** + * A pending dependency edge item. + * + * This struct represents a pending dependency child registration into a dependency group. + * It contains a pointer to the dependency group and the checkable child being registered. + * The dirty bits in the base struct are not used for this item type, as the operation is specific + * to registering the child into the dependency group. + * + * @ingroup icingadb + */ +struct PendingDependencyEdgeItem : PendingQueueItem +{ + DependencyGroup::Ptr DepGroup; + Checkable::Ptr Child; + + PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); +}; + +// Map of Redis keys to a boolean indicating whether to delete the checksum key as well. +using RelationsKeyMap = std::map; + +/** + * A pending relations deletion item. + * + * This struct represents a pending item in the queue that is associated with the deletion of relations + * in Redis. It contains a map of Redis keys from which the relation identified by the @c ID field should + * be deleted. The @c ID field represents the unique identifier of the relation to be deleted, and the + * @c Relations map specifies the Redis keys and whether to delete the corresponding checksum keys. + * + * @ingroup icingadb + */ +struct RelationsDeletionItem : PendingQueueItem +{ + RelationsKeyMap Relations; + + RelationsDeletionItem(const String& id, RelationsKeyMap relations); +}; + /** * @ingroup icingadb */ @@ -46,7 +244,7 @@ class IcingaDB : public ObjectImpl String GetEnvironmentId() const override; - inline RedisConnection::Ptr GetConnection() + inline RedisConnection::Ptr GetConnection() const { return m_RconLocked.load(); } @@ -70,6 +268,7 @@ class IcingaDB : public ObjectImpl protected: void ValidateTlsProtocolmin(const Lazy& lvalue, const ValidationUtils& utils) override; void ValidateConnectTimeout(const Lazy& lvalue, const ValidationUtils& utils) override; + RedisConnInfo::ConstPtr GetRedisConnInfo() const; private: class DumpedGlobals @@ -83,22 +282,6 @@ class IcingaDB : public ObjectImpl std::mutex m_Mutex; }; - enum StateUpdate - { - Volatile = 1ull << 0, - RuntimeOnly = 1ull << 1, - Full = Volatile | RuntimeOnly, - }; - - enum class RedisKey : uint8_t - { - RedundancyGroup, - DependencyNode, - DependencyEdge, - RedundancyGroupState, - DependencyEdgeState, - }; - void OnConnectedHandler(); void PublishStatsTimerHandler(); @@ -107,17 +290,15 @@ class IcingaDB : public ObjectImpl /* config & status dump */ void UpdateAllConfigObjects(); std::vector>> ChunkObjects(std::vector> objects, size_t chunkSize); - void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector& keys, RedisConnection::QueryPriority priority); + void DeleteKeys(const RedisConnection::Ptr& conn, const std::vector& keys); std::vector GetTypeOverwriteKeys(const String& type); std::vector GetTypeDumpSignalKeys(const Type::Ptr& type); void InsertCheckableDependencies(const Checkable::Ptr& checkable, std::map& hMSets, std::vector* runtimeUpdates, const DependencyGroup::Ptr& onlyDependencyGroup = nullptr); void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, std::vector& runtimeUpdates, bool runtimeUpdate); - void UpdateDependenciesState(const Checkable::Ptr& checkable, const DependencyGroup::Ptr& onlyDependencyGroup = nullptr, - std::set* seenGroups = nullptr) const; - void UpdateState(const Checkable::Ptr& checkable, StateUpdate mode); - void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate); + void UpdateState(const Checkable::Ptr& checkable, uint32_t mode); + void UpdateDependenciesState(const Checkable::Ptr& checkable, const DependencyGroup::Ptr& depGroup) const; void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map>& hMSets, std::vector& runtimeUpdates, bool runtimeUpdate); void SendConfigDelete(const ConfigObject::Ptr& object); @@ -149,11 +330,9 @@ class IcingaDB : public ObjectImpl void SendTimePeriodExcludesChanged(const TimePeriod::Ptr& timeperiod, const Array::Ptr& oldValues, const Array::Ptr& newValues); template void SendGroupsChanged(const ConfigObject::Ptr& command, const Array::Ptr& oldValues, const Array::Ptr& newValues); - void SendCommandEnvChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); - void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + void SendCommandEnvChanged(const ConfigObject::Ptr& command, RedisKey keyType, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + void SendCommandArgumentsChanged(const ConfigObject::Ptr& command, RedisKey keyType, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); void SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); - void SendDependencyGroupChildRegistered(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup); - void SendDependencyGroupChildRemoved(const DependencyGroup::Ptr& dependencyGroup, const std::vector& dependencies, bool removeGroup); void ForwardHistoryEntries(); @@ -164,6 +343,7 @@ class IcingaDB : public ObjectImpl static Dictionary::Ptr GetStats(); /* utilities */ + static bool IsStateKey(RedisKey key); static String FormatCheckSumBinary(const String& str); static String FormatCommandLine(const Value& commandLine); static long long TimestampToMilliseconds(double timestamp); @@ -180,6 +360,8 @@ class IcingaDB : public ObjectImpl static Dictionary::Ptr SerializeVars(const Dictionary::Ptr& vars); static Dictionary::Ptr SerializeDependencyEdgeState(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep); static Dictionary::Ptr SerializeRedundancyGroupState(const Checkable::Ptr& child, const DependencyGroup::Ptr& redundancyGroup); + static String GetDependencyEdgeStateId(const DependencyGroup::Ptr& dependencyGroup, const Dependency::Ptr& dep); + static std::pair GetCmdEnvArgKeys(const Command::Ptr& command); static String HashValue(const Value& value); static String HashValue(const Value& value, const std::set& propertiesBlacklist, bool propertiesWhitelist = false); @@ -202,7 +384,7 @@ class IcingaDB : public ObjectImpl static void CommentRemovedHandler(const Comment::Ptr& comment); static void FlappingChangeHandler(const Checkable::Ptr& checkable, double changeTime); static void NewCheckResultHandler(const Checkable::Ptr& checkable); - static void NextCheckUpdatedHandler(const Checkable::Ptr& checkable); + static void NextCheckChangedHandler(const Checkable::Ptr& checkable); static void DependencyGroupChildRegisteredHandler(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup); static void DependencyGroupChildRemovedHandler(const DependencyGroup::Ptr& dependencyGroup, const std::vector& dependencies, bool removeGroup); static void HostProblemChangedHandler(const Service::Ptr& service); @@ -216,8 +398,8 @@ class IcingaDB : public ObjectImpl static void UserGroupsChangedHandler(const User::Ptr& user, const Array::Ptr&, const Array::Ptr& newValues); static void HostGroupsChangedHandler(const Host::Ptr& host, const Array::Ptr& oldValues, const Array::Ptr& newValues); static void ServiceGroupsChangedHandler(const Service::Ptr& service, const Array::Ptr& oldValues, const Array::Ptr& newValues); - static void CommandEnvChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); - static void CommandArgumentsChangedHandler(const ConfigObject::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + static void CommandEnvChangedHandler(const Command::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); + static void CommandArgumentsChangedHandler(const Command::Ptr& command, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); static void CustomVarsChangedHandler(const ConfigObject::Ptr& object, const Dictionary::Ptr& oldValues, const Dictionary::Ptr& newValues); static void ExecuteRedisTransaction(const RedisConnection::Ptr& rcon, std::map& hMSets, @@ -242,13 +424,31 @@ class IcingaDB : public ObjectImpl String m_PrefixConfigCheckSum; bool m_ConfigDumpInProgress; - bool m_ConfigDumpDone; - + std::atomic_bool m_ConfigDumpDone; + + /** + * The primary Redis connection used to send history and heartbeat queries. + * + * This connection is used exclusively for sending history and heartbeat queries to Redis. It ensures that + * history and heartbeat operations do not interfere with other Redis operations. Also, it is the leader for + * all other Redis connections including @c m_RconWorker, and is the only source of truth for all IcingaDB Redis + * related connection statistics. + * + * Note: This will still be shared with the icingadb check command, as that command also sends + * only XREAD queries which are similar in nature to history/heartbeat queries. + */ RedisConnection::Ptr m_Rcon; - // m_RconLocked containes a copy of the value in m_Rcon where all accesses are guarded by a mutex to allow safe - // concurrent access like from the icingadb check command. It's a copy to still allow fast access without additional - // syncronization to m_Rcon within the IcingaDB feature itself. + // m_RconLocked contains a copy of the value in m_Rcon where all accesses are guarded by a mutex to + // allow safe concurrent access like from the icingadb check command. It's a copy to still allow fast access + // without additional synchronization to m_Rcon within the IcingaDB feature itself. Locked m_RconLocked; + /** + * A Redis connection for general queries. + * + * This connection is used for all non-history and non-heartbeat related queries to Redis. + * It is a child of @c m_Rcon, meaning it forwards all its connection stats to @c m_Rcon as well. + */ + RedisConnection::Ptr m_RconWorker; std::unordered_map m_Rcons; std::atomic_size_t m_PendingRcons; @@ -263,6 +463,55 @@ class IcingaDB : public ObjectImpl static std::mutex m_EnvironmentIdInitMutex; static std::unordered_set m_IndexedTypes; + + // A variant type that can hold any of the pending item types used in the pending items container. + using PendingItemVariant = std::variant< + PendingConfigItem, + PendingDependencyGroupStateItem, + PendingDependencyEdgeItem, + RelationsDeletionItem + >; + + struct PendingItemKeyExtractor + { + // The type of the key extracted from a pending item required by Boost.MultiIndex. + using result_type = const PendingItemKey&; + + result_type operator()(const PendingItemVariant& item) const + { + return std::visit([](const auto& pendingItem) -> result_type { return pendingItem.ID; }, item); + } + }; + + // A multi-index container for managing pending items with unique IDs and maintaining insertion order. + // The first index is an ordered unique index based on the pending item key, allowing for efficient + // lookups and ensuring uniqueness of items. The second index is a sequenced index that maintains the + // order of insertion, enabling FIFO processing of pending items. + using PendingItemsSet = boost::multi_index_container< + PendingItemVariant, + boost::multi_index::indexed_by< + boost::multi_index::ordered_unique, // std::variant has operator< defined. + boost::multi_index::sequenced<> + > + >; + + std::thread m_PendingItemsThread; // The background worker thread (consumer of m_PendingItems). + PendingItemsSet m_PendingItems; // Container for pending items with dirty bits (access protected by m_PendingItemsMutex). + std::mutex m_PendingItemsMutex; // Mutex to protect access to m_PendingItems. + std::condition_variable m_PendingItemsCV; // Condition variable to forcefully wake up the worker thread. + + void PendingItemsThreadProc(); + std::chrono::duration DequeueAndProcessOne(std::unique_lock& lock); + void ProcessPendingItem(const PendingConfigItem& item); + void ProcessPendingItem(const PendingDependencyGroupStateItem& item) const; + void ProcessPendingItem(const PendingDependencyEdgeItem& item); + void ProcessPendingItem(const RelationsDeletionItem& item); + + void EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits); + void EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& depGroup); + void EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child); + void EnqueueDependencyChildRemoved(const DependencyGroup::Ptr& depGroup, const std::vector& dependencies, bool removeGroup); + void EnqueueRelationsDeletion(const String& id, const RelationsKeyMap& relations); }; } diff --git a/lib/icingadb/icingadbchecktask.cpp b/lib/icingadb/icingadbchecktask.cpp index 54e76e8941..7d563ed12d 100644 --- a/lib/icingadb/icingadbchecktask.cpp +++ b/lib/icingadb/icingadbchecktask.cpp @@ -100,7 +100,7 @@ void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckR auto redis (conn->GetConnection()); - if (!redis || !redis->GetConnected()) { + if (!redis || !redis->IsConnected()) { ReportIcingadbCheck(checkable, commandObj, cr, producer, "Icinga DB CRITICAL: Not connected to Redis.", ServiceCritical); return; } @@ -126,7 +126,8 @@ void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckR "0-0", "0-0", "0-0", "0-0", "0-0", "0-0", } }, - RedisConnection::QueryPriority::Heartbeat + {}, + true /* high priority */ )); redisTime = std::move(replies.at(0)); diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index 426567997e..4e6ac5bcb6 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -1,13 +1,11 @@ /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #include "icingadb/redisconnection.hpp" -#include "base/array.hpp" #include "base/convert.hpp" #include "base/defer.hpp" #include "base/exception.hpp" #include "base/io-engine.hpp" #include "base/logger.hpp" -#include "base/objectlock.hpp" #include "base/string.hpp" #include "base/tcpsocket.hpp" #include "base/tlsutility.hpp" @@ -15,14 +13,11 @@ #include #include #include -#include -#include #include #include #include #include #include -#include #include using namespace icinga; @@ -30,56 +25,91 @@ namespace asio = boost::asio; boost::regex RedisConnection::m_ErrAuth ("\\AERR AUTH "); -RedisConnection::RedisConnection(const String& host, int port, const String& path, const String& username, const String& password, int db, - bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath, - const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent) - : RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, username, password, db, - useTls, insecure, certPath, keyPath, caPath, crlPath, tlsProtocolmin, cipherList, connectTimeout, std::move(di), parent) +RedisConnection::RedisConnection(const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent, bool trackOwnPendingQueries) + : RedisConnection{IoEngine::Get().GetIoContext(), connInfo, parent, trackOwnPendingQueries} { } -RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String username, String password, - int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath, - String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent) - : m_Path(std::move(path)), m_Host(std::move(host)), m_Port(port), m_Username(std::move(username)), m_Password(std::move(password)), - m_DbIndex(db), m_CertPath(std::move(certPath)), m_KeyPath(std::move(keyPath)), m_Insecure(insecure), - m_CaPath(std::move(caPath)), m_CrlPath(std::move(crlPath)), m_TlsProtocolmin(std::move(tlsProtocolmin)), - m_CipherList(std::move(cipherList)), m_ConnectTimeout(connectTimeout), m_DebugInfo(std::move(di)), m_Strand(io), - m_Connecting(false), m_Connected(false), m_Started(false), m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent) +RedisConnection::RedisConnection(boost::asio::io_context& io, const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent, bool trackOwnPendingQueries) + : m_ConnInfo{connInfo}, m_Strand(io), m_Connecting(false), m_Connected(false), m_Stopped(false), + m_QueuedWrites(io), m_QueuedReads(io), m_TrackOwnPendingQueries{trackOwnPendingQueries}, m_LogStatsTimer(io), + m_Parent(parent) { - if (useTls && m_Path.IsEmpty()) { + if (connInfo->EnableTls && connInfo->Path.IsEmpty()) { UpdateTLSContext(); } } void RedisConnection::UpdateTLSContext() { - m_TLSContext = SetupSslContext(m_CertPath, m_KeyPath, m_CaPath, - m_CrlPath, m_CipherList, m_TlsProtocolmin, m_DebugInfo); + m_TLSContext = SetupSslContext( + m_ConnInfo->TlsCertPath, + m_ConnInfo->TlsKeyPath, + m_ConnInfo->TlsCaPath, + m_ConnInfo->TlsCrlPath, + m_ConnInfo->TlsCipherList, + m_ConnInfo->TlsProtocolMin, + m_ConnInfo->DbgInfo + ); } void RedisConnection::Start() { - if (!m_Started.exchange(true)) { - Ptr keepAlive (this); + ASSERT(!m_Connected && !m_Connecting); - IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); - IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); + Ptr keepAlive (this); + m_Stopped.store(false); - if (!m_Parent) { - IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); }); - } - } + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); - if (!m_Connecting.exchange(true)) { - Ptr keepAlive (this); - - IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); + if (!m_Parent) { + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); }); } + + m_Connecting.store(true); + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } -bool RedisConnection::IsConnected() { - return m_Connected.load(); +/** + * Disconnect the Redis connection gracefully. + * + * This function initiates a graceful disconnection of the Redis connection. It sets the stopped flag to true, + * and spawns a coroutine to handle the disconnection process. The coroutine waits for any ongoing read and write + * operations to complete before shutting down the connection. + */ +void RedisConnection::Disconnect() +{ + if (!m_Stopped.exchange(true)) { + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive = Ptr(this)](asio::yield_context yc) { + m_QueuedWrites.Set(); // Wake up write loop + m_QueuedReads.Set(); // Wake up read loop + + // Give the read and write loops some time to finish ongoing operations before disconnecting. + asio::deadline_timer waiter(m_Strand.context(), boost::posix_time::seconds(5)); + waiter.async_wait(yc); + + Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") + << "Disconnecting Redis connection."; + + boost::system::error_code ec; + if (m_TlsConn) { + m_TlsConn->GracefulDisconnect(m_Strand, yc); + m_TlsConn = nullptr; + } else if (m_TcpConn) { + m_TcpConn->lowest_layer().shutdown(Tcp::socket::shutdown_both, ec); + m_TcpConn->lowest_layer().close(ec); + m_TcpConn = nullptr; + } else if (m_UnixConn) { + m_UnixConn->lowest_layer().shutdown(Unix::socket::shutdown_both, ec); + m_UnixConn->lowest_layer().close(ec); + m_UnixConn = nullptr; + } + + m_Connected.store(false); + m_Connecting.store(false); + }); + } } /** @@ -108,23 +138,34 @@ void LogQuery(RedisConnection::Query& query, Log& msg) } /** - * Queue a Redis query for sending + * Queue a Redis query for sending without waiting for the response in a fire-and-forget manner. * - * @param query Redis query - * @param priority The query's priority + * If the highPriority flag is set to true, the query is treated with high priority and placed at the front of + * the write queue, ensuring it is sent before other queued queries. This is useful for time-sensitive operations + * that require to be executed promptly, which is the case for IcingaDB heartbeat queries. If there are already + * queries with high priority in the queue, the new query is inserted after all existing high priority queries but + * before any normal priority queries to maintain the order of high priority items. + * + * @note The highPriority flag should be used sparingly and only for critical queries, as it can affect the overall + * performance and responsiveness of the Redis connection by potentially delaying other queued queries. + * + * @param query The Redis query to be sent. + * @param affects Does the query affect config, state or history data. + * @param highPriority Whether the query should be treated with high priority. */ -void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects) +void RedisConnection::FireAndForgetQuery(Query query, QueryAffects affects, bool highPriority) { + ThrowIfStopped(); + if (LogDebug >= Logger::GetMinLogSeverity()) { Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:"); LogQuery(query, msg); } auto item (Shared::Make(std::move(query))); - auto ctime (Utility::GetTime()); - asio::post(m_Strand, [this, item, priority, ctime, affects]() { - m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects}); + asio::post(m_Strand, [this, item, highPriority, affects, ctime = Utility::GetTime()]() { + m_Queues.Push(WriteQueueItem{item, ctime, affects}, highPriority); m_QueuedWrites.Set(); IncreasePendingQueries(1); }); @@ -134,10 +175,11 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn * Queue Redis queries for sending * * @param queries Redis queries - * @param priority The queries' priority */ -void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects) +void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, QueryAffects affects) { + ThrowIfStopped(); + if (LogDebug >= Logger::GetMinLogSeverity()) { for (auto& query : queries) { Log msg(LogDebug, "IcingaDB", "Firing and forgetting query:"); @@ -146,10 +188,9 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red } auto item (Shared::Make(std::move(queries))); - auto ctime (Utility::GetTime()); - asio::post(m_Strand, [this, item, priority, ctime, affects]() { - m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects}); + asio::post(m_Strand, [this, item, affects, ctime = Utility::GetTime()]() { + m_Queues.Push(WriteQueueItem{item, ctime, affects}, false); m_QueuedWrites.Set(); IncreasePendingQueries(item->size()); }); @@ -159,12 +200,13 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red * Queue a Redis query for sending, wait for the response and return (or throw) it * * @param query Redis query - * @param priority The query's priority * * @return The response */ -RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects) +RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, QueryAffects affects) { + ThrowIfStopped(); + if (LogDebug >= Logger::GetMinLogSeverity()) { Log msg (LogDebug, "IcingaDB", "Executing query:"); LogQuery(query, msg); @@ -173,10 +215,9 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query std::promise promise; auto future (promise.get_future()); auto item (Shared>>::Make(std::move(query), std::move(promise))); - auto ctime (Utility::GetTime()); - asio::post(m_Strand, [this, item, priority, ctime, affects]() { - m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects}); + asio::post(m_Strand, [this, item, affects, ctime = Utility::GetTime()]() { + m_Queues.Push(WriteQueueItem{item, ctime, affects}, false); m_QueuedWrites.Set(); IncreasePendingQueries(1); }); @@ -190,12 +231,13 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query * Queue Redis queries for sending, wait for the responses and return (or throw) them * * @param queries Redis queries - * @param priority The queries' priority * * @return The responses */ -RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects) +RedisConnection::Replies RedisConnection::GetResultsOfQueries(Queries queries, QueryAffects affects, bool highPriority) { + ThrowIfStopped(); + if (LogDebug >= Logger::GetMinLogSeverity()) { for (auto& query : queries) { Log msg(LogDebug, "IcingaDB", "Executing query:"); @@ -206,10 +248,9 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q std::promise promise; auto future (promise.get_future()); auto item (Shared>>::Make(std::move(queries), std::move(promise))); - auto ctime (Utility::GetTime()); - asio::post(m_Strand, [this, item, priority, ctime, affects]() { - m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects}); + asio::post(m_Strand, [this, item, highPriority, affects, ctime = Utility::GetTime()]() { + m_Queues.Push(WriteQueueItem{item, ctime, affects}, highPriority); m_QueuedWrites.Set(); IncreasePendingQueries(item->first.size()); }); @@ -219,12 +260,12 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q return future.get(); } -void RedisConnection::EnqueueCallback(const std::function& callback, RedisConnection::QueryPriority priority) +void RedisConnection::EnqueueCallback(const std::function& callback) { - auto ctime (Utility::GetTime()); + ThrowIfStopped(); - asio::post(m_Strand, [this, callback, priority, ctime]() { - m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime, QueryAffects{}}); + asio::post(m_Strand, [this, callback, ctime = Utility::GetTime()]() { + m_Queues.Push(WriteQueueItem{callback, ctime}, false); m_QueuedWrites.Set(); }); } @@ -237,7 +278,7 @@ void RedisConnection::EnqueueCallback(const std::function>::Make()); auto future (promise->get_future()); asio::post(m_Strand, [this, promise]() { double oldest = 0; - - for (auto& queue : m_Queues.Writes) { - if (m_SuppressedQueryKinds.find(queue.first) == m_SuppressedQueryKinds.end() && !queue.second.empty()) { - auto ctime (queue.second.front().CTime); - - if (ctime < oldest || oldest == 0) { - oldest = ctime; - } + if (!m_Queues.HighWriteQ.empty()) { + oldest = m_Queues.HighWriteQ.front().CTime; + } + if (!m_Queues.NormalWriteQ.empty()) { + auto normalOldest = m_Queues.NormalWriteQ.front().CTime; + if (oldest == 0 || normalOldest < oldest) { + oldest = normalOldest; } } @@ -270,29 +310,6 @@ double RedisConnection::GetOldestPendingQueryTs() return future.get(); } -/** - * Mark kind as kind of queries not to actually send yet - * - * @param kind Query kind - */ -void RedisConnection::SuppressQueryKind(RedisConnection::QueryPriority kind) -{ - asio::post(m_Strand, [this, kind]() { m_SuppressedQueryKinds.emplace(kind); }); -} - -/** - * Unmark kind as kind of queries not to actually send yet - * - * @param kind Query kind - */ -void RedisConnection::UnsuppressQueryKind(RedisConnection::QueryPriority kind) -{ - asio::post(m_Strand, [this, kind]() { - m_SuppressedQueryKinds.erase(kind); - m_QueuedWrites.Set(); - }); -} - /** * Try to connect to Redis */ @@ -302,21 +319,21 @@ void RedisConnection::Connect(asio::yield_context& yc) boost::asio::deadline_timer timer (m_Strand.context()); - for (;;) { + while (!m_Stopped) { try { - if (m_Path.IsEmpty()) { + if (m_ConnInfo->Path.IsEmpty()) { if (m_TLSContext) { Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") - << "Trying to connect to Redis server (async, TLS) on host '" << m_Host << ":" << m_Port << "'"; + << "Trying to connect to Redis server (async, TLS) on host '" << m_ConnInfo->Host << ":" << m_ConnInfo->Port << "'"; - auto conn (Shared::Make(m_Strand.context(), *m_TLSContext, m_Host)); + auto conn (Shared::Make(m_Strand.context(), *m_TLSContext, m_ConnInfo->Host)); auto& tlsConn (conn->next_layer()); auto connectTimeout (MakeTimeout(conn)); - icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc); + icinga::Connect(conn->lowest_layer(), m_ConnInfo->Host, Convert::ToString(m_ConnInfo->Port), yc); tlsConn.async_handshake(tlsConn.client, yc); - if (!m_Insecure) { + if (!m_ConnInfo->TlsInsecureNoverify) { std::shared_ptr cert (tlsConn.GetPeerCertificate()); if (!cert) { @@ -337,24 +354,24 @@ void RedisConnection::Connect(asio::yield_context& yc) m_TlsConn = std::move(conn); } else { Log(m_Parent ? LogNotice : LogInformation, "IcingaDB") - << "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'"; + << "Trying to connect to Redis server (async) on host '" << m_ConnInfo->Host << ":" << m_ConnInfo->Port << "'"; auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); - icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); + icinga::Connect(conn->next_layer(), m_ConnInfo->Host, Convert::ToString(m_ConnInfo->Port), yc); Handshake(conn, yc); m_QueuedReads.WaitForClear(yc); m_TcpConn = std::move(conn); } } else { Log(LogInformation, "IcingaDB") - << "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'"; + << "Trying to connect to Redis server (async) on unix socket path '" << m_ConnInfo->Path << "'"; auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); - conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); + conn->next_layer().async_connect(Unix::endpoint(m_ConnInfo->Path.CStr()), yc); Handshake(conn, yc); m_QueuedReads.WaitForClear(yc); m_UnixConn = std::move(conn); @@ -373,7 +390,9 @@ void RedisConnection::Connect(asio::yield_context& yc) break; } catch (const std::exception& ex) { Log(LogCritical, "IcingaDB") - << "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what(); + << "Cannot connect to Redis server ('" + << (m_ConnInfo->Path.IsEmpty() ? m_ConnInfo->Host+":"+Convert::ToString(m_ConnInfo->Port) : m_ConnInfo->Path) + << "'): " << ex.what(); } timer.expires_from_now(boost::posix_time::seconds(5)); @@ -387,7 +406,7 @@ void RedisConnection::Connect(asio::yield_context& yc) */ void RedisConnection::ReadLoop(asio::yield_context& yc) { - for (;;) { + while (!m_Stopped) { m_QueuedReads.WaitForSet(yc); while (!m_Queues.FutureResponseActions.empty()) { @@ -463,21 +482,19 @@ void RedisConnection::ReadLoop(asio::yield_context& yc) */ void RedisConnection::WriteLoop(asio::yield_context& yc) { - for (;;) { + while (!m_Stopped) { m_QueuedWrites.Wait(yc); - WriteFirstOfHighestPrio: - for (auto& queue : m_Queues.Writes) { - if (m_SuppressedQueryKinds.find(queue.first) != m_SuppressedQueryKinds.end() || queue.second.empty()) { - continue; - } - - auto next (std::move(queue.second.front())); - queue.second.pop(); - - WriteItem(yc, std::move(next)); - - goto WriteFirstOfHighestPrio; + while (m_Queues.HasWrites()) { + auto queuedWrite(m_Queues.PopFront()); + std::visit( + [this, &yc, &queuedWrite](const auto& item) { + if (WriteItem(item, yc)) { + RecordAffected(queuedWrite.Affects, Utility::GetTime()); + } + }, + std::move(queuedWrite.Item) + ); } m_QueuedWrites.Clear(); @@ -493,7 +510,7 @@ void RedisConnection::LogStats(asio::yield_context& yc) m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); - for (;;) { + while (!m_Stopped) { m_LogStatsTimer.async_wait(yc); m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); @@ -520,111 +537,138 @@ void RedisConnection::LogStats(asio::yield_context& yc) } /** - * Send next and schedule receiving the response + * Write a single Redis query in a fire-and-forget manner. * - * @param next Redis queries + * @param item Redis query + * + * @return true on success, false on failure. */ -void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next) +bool RedisConnection::WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc) { - if (next.FireAndForgetQuery) { - auto& item (*next.FireAndForgetQuery); - DecreasePendingQueries(1); - - try { - WriteOne(item, yc); - } catch (const std::exception& ex) { - Log msg (LogCritical, "IcingaDB", "Error during sending query"); - LogQuery(item, msg); - msg << " which has been fired and forgotten: " << ex.what(); + DecreasePendingQueries(1); - return; - } + try { + WriteOne(*item, yc); + } catch (const std::exception& ex) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(*item, msg); + msg << " which has been fired and forgotten: " << ex.what(); - if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { - m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); - } else { - ++m_Queues.FutureResponseActions.back().Amount; - } + return false; + } - m_QueuedReads.Set(); + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; } - if (next.FireAndForgetQueries) { - auto& item (*next.FireAndForgetQueries); - size_t i = 0; + m_QueuedReads.Set(); + return true; +} - DecreasePendingQueries(item.size()); +/** + * Write multiple Redis queries in a fire-and-forget manner. + * + * @param item Redis queries + * + * @return true on success, false on failure. + */ +bool RedisConnection::WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc) +{ + size_t i = 0; - try { - for (auto& query : item) { - WriteOne(query, yc); - ++i; - } - } catch (const std::exception& ex) { - Log msg (LogCritical, "IcingaDB", "Error during sending query"); - LogQuery(item[i], msg); - msg << " which has been fired and forgotten: " << ex.what(); + DecreasePendingQueries(item->size()); - return; + try { + for (auto& query : *item) { + WriteOne(query, yc); + ++i; } + } catch (const std::exception& ex) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery((*item)[i], msg); + msg << " which has been fired and forgotten: " << ex.what(); - if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { - m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore}); - } else { - m_Queues.FutureResponseActions.back().Amount += item.size(); - } + return false; + } - m_QueuedReads.Set(); + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->size(), ResponseAction::Ignore}); + } else { + m_Queues.FutureResponseActions.back().Amount += item->size(); } - if (next.GetResultOfQuery) { - auto& item (*next.GetResultOfQuery); - DecreasePendingQueries(1); + m_QueuedReads.Set(); + return true; +} - try { - WriteOne(item.first, yc); - } catch (const std::exception&) { - item.second.set_exception(std::current_exception()); +/** + * Write a single Redis query and enqueue a response promise to be fulfilled once the response has been received. + * + * @param item Redis query and promise for the response + */ +bool RedisConnection::WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc) +{ + DecreasePendingQueries(1); - return; - } + try { + WriteOne(item->first, yc); + } catch (const std::exception&) { + item->second.set_exception(std::current_exception()); - m_Queues.ReplyPromises.emplace(std::move(item.second)); + return false; + } - if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) { - m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver}); - } else { - ++m_Queues.FutureResponseActions.back().Amount; - } + m_Queues.ReplyPromises.push(std::move(item->second)); - m_QueuedReads.Set(); + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; } - if (next.GetResultsOfQueries) { - auto& item (*next.GetResultsOfQueries); - DecreasePendingQueries(item.first.size()); + m_QueuedReads.Set(); + return true; +} - try { - for (auto& query : item.first) { - WriteOne(query, yc); - } - } catch (const std::exception&) { - item.second.set_exception(std::current_exception()); +/** + * Write multiple Redis queries and enqueue a response promise to be fulfilled once all responses have been received. + * + * @param item Redis queries and promise for the responses. + * + * @return true on success, false on failure. + */ +bool RedisConnection::WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc) +{ + DecreasePendingQueries(item->first.size()); - return; + try { + for (auto& query : item->first) { + WriteOne(query, yc); } + } catch (const std::exception&) { + item->second.set_exception(std::current_exception()); - m_Queues.RepliesPromises.emplace(std::move(item.second)); - m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk}); - - m_QueuedReads.Set(); + return false; } - if (next.Callback) { - next.Callback(yc); - } + m_Queues.RepliesPromises.emplace(std::move(item->second)); + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->first.size(), ResponseAction::DeliverBulk}); - RecordAffected(next.Affects, Utility::GetTime()); + m_QueuedReads.Set(); + return true; +} + +/** + * Invokes the provided callback immediately. + * + * @param item Callback to execute + */ +bool RedisConnection::WriteItem(const QueryCallback& item, boost::asio::yield_context& yc) +{ + item(yc); + return true; } /** @@ -634,7 +678,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection: */ RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) { - if (m_Path.IsEmpty()) { + if (m_ConnInfo->Path.IsEmpty()) { if (m_TLSContext) { return ReadOne(m_TlsConn, yc); } else { @@ -652,7 +696,7 @@ RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) */ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) { - if (m_Path.IsEmpty()) { + if (m_ConnInfo->Path.IsEmpty()) { if (m_TLSContext) { WriteOne(m_TlsConn, query, yc); } else { @@ -687,8 +731,12 @@ void RedisConnection::IncreasePendingQueries(int count) asio::post(parent->m_Strand, [parent, count]() { parent->IncreasePendingQueries(count); }); - } else { - m_PendingQueries += count; + } + + // Only track the pending queries of the root connection or if explicitly + // requested to do so for child connections as well. + if (!m_Parent || m_TrackOwnPendingQueries) { + m_PendingQueries.fetch_add(count); m_InputQueries.InsertValue(Utility::GetTime(), count); } } @@ -701,8 +749,11 @@ void RedisConnection::DecreasePendingQueries(int count) asio::post(parent->m_Strand, [parent, count]() { parent->DecreasePendingQueries(count); }); - } else { - m_PendingQueries -= count; + } + + // Same as in IncreasePendingQueries(). + if (!m_Parent || m_TrackOwnPendingQueries) { + m_PendingQueries.fetch_sub(count); m_OutputQueries.InsertValue(Utility::GetTime(), count); } } @@ -729,3 +780,10 @@ void RedisConnection::RecordAffected(RedisConnection::QueryAffects affected, dou } } } + +void RedisConnection::ThrowIfStopped() const +{ + if (m_Stopped) { + throw RedisDisconnected(); + } +} diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index 308cf36d4d..895a17c5bc 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -10,9 +10,11 @@ #include "base/object.hpp" #include "base/ringbuffer.hpp" #include "base/shared.hpp" +#include "base/shared-object.hpp" #include "base/string.hpp" #include "base/tlsstream.hpp" #include "base/value.hpp" +#include "icingadb/icingadb-ti.hpp" #include #include #include @@ -36,13 +38,42 @@ #include #include #include -#include +#include #include #include #include +#include namespace icinga { + +/** + * Information required to connect to a Redis server. + * + * @ingroup icingadb + */ +struct RedisConnInfo final : SharedObject +{ + DECLARE_PTR_TYPEDEFS(RedisConnInfo); + + bool EnableTls; + bool TlsInsecureNoverify; + int Port; + int DbIndex; + double ConnectTimeout; + String Host; + String Path; + String User; + String Password; + String TlsCertPath; + String TlsKeyPath; + String TlsCaPath; + String TlsCrlPath; + String TlsProtocolMin; + String TlsCipherList; + DebugInfo DbgInfo; +}; + /** * An Async Redis connection. * @@ -58,22 +89,6 @@ namespace icinga typedef Value Reply; typedef std::vector Replies; - /** - * Redis query priorities, highest first. - * - * @ingroup icingadb - */ - enum class QueryPriority : unsigned char - { - Heartbeat, - RuntimeStateStream, // runtime state updates, doesn't affect initially synced states - Config, // includes initially synced states - RuntimeStateSync, // updates initially synced states at runtime, in parallel to config dump, therefore must be < Config - History, - CheckResult, - SyncConnection = 255 - }; - struct QueryAffects { size_t Config; @@ -84,39 +99,32 @@ namespace icinga : Config(config), State(state), History(history) { } }; - RedisConnection(const String& host, int port, const String& path, const String& username, const String& password, int db, - bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath, - const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const Ptr& parent = nullptr); - + explicit RedisConnection(const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent = nullptr, bool trackOwnPendingQueries = true); void UpdateTLSContext(); void Start(); + void Disconnect(); - bool IsConnected(); + bool IsConnected() const + { + return m_Connected.load(); + } - void FireAndForgetQuery(Query query, QueryPriority priority, QueryAffects affects = {}); - void FireAndForgetQueries(Queries queries, QueryPriority priority, QueryAffects affects = {}); + void FireAndForgetQuery(Query query, QueryAffects affects = {}, bool highPriority = false); + void FireAndForgetQueries(Queries queries, QueryAffects affects = {}); - Reply GetResultOfQuery(Query query, QueryPriority priority, QueryAffects affects = {}); - Replies GetResultsOfQueries(Queries queries, QueryPriority priority, QueryAffects affects = {}); + Reply GetResultOfQuery(Query query, QueryAffects affects = {}); + Replies GetResultsOfQueries(Queries queries, QueryAffects affects = {}, bool highPriority = false); - void EnqueueCallback(const std::function& callback, QueryPriority priority); + void EnqueueCallback(const std::function& callback); void Sync(); - double GetOldestPendingQueryTs(); - - void SuppressQueryKind(QueryPriority kind); - void UnsuppressQueryKind(QueryPriority kind); + double GetOldestPendingQueryTs() const; void SetConnectedCallback(std::function callback); - inline bool GetConnected() - { - return m_Connected.load(); - } - int GetQueryCount(RingBuffer::SizeType span); - inline int GetPendingQueryCount() + inline std::size_t GetPendingQueryCount() const { return m_PendingQueries; } @@ -160,20 +168,21 @@ namespace icinga ResponseAction Action; }; + using FireAndForgetQ = Shared::Ptr; // A single query that does not expect a result. + using FireAndForgetQs = Shared::Ptr; // Multiple queries that do not expect results. + using QueryWithPromise = Shared>>::Ptr; // A single query expecting a result. + using QueriesWithPromise = Shared>>::Ptr; // Multiple queries expecting results. + using QueryCallback = std::function; // A callback to be executed. + /** - * Something to be send to Redis. + * An item in the write queue to be sent to Redis. * * @ingroup icingadb */ struct WriteQueueItem { - Shared::Ptr FireAndForgetQuery; - Shared::Ptr FireAndForgetQueries; - Shared>>::Ptr GetResultOfQuery; - Shared>>::Ptr GetResultsOfQueries; - std::function Callback; - - double CTime; + std::variant Item; + double CTime; // When was this item queued? QueryAffects Affects; }; @@ -196,15 +205,17 @@ namespace icinga static boost::regex m_ErrAuth; - RedisConnection(boost::asio::io_context& io, String host, int port, String path, String username, String password, - int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath, - String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const Ptr& parent); + RedisConnection(boost::asio::io_context& io, const RedisConnInfo::ConstPtr& connInfo, const Ptr& parent, bool trackOwnPendingQueries); void Connect(boost::asio::yield_context& yc); void ReadLoop(boost::asio::yield_context& yc); void WriteLoop(boost::asio::yield_context& yc); void LogStats(boost::asio::yield_context& yc); - void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item); + bool WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc); + bool WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc); + bool WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc); + bool WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc); + bool WriteItem(const QueryCallback& item, boost::asio::yield_context& yc); Reply ReadOne(boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc); @@ -218,48 +229,58 @@ namespace icinga void DecreasePendingQueries(int count); void RecordAffected(QueryAffects affected, double when); + void ThrowIfStopped() const; + template void Handshake(StreamPtr& stream, boost::asio::yield_context& yc); template Timeout MakeTimeout(StreamPtr& stream); - String m_Path; - String m_Host; - int m_Port; - String m_Username; - String m_Password; - int m_DbIndex; - - String m_CertPath; - String m_KeyPath; - bool m_Insecure; - String m_CaPath; - String m_CrlPath; - String m_TlsProtocolmin; - String m_CipherList; - double m_ConnectTimeout; - DebugInfo m_DebugInfo; + RedisConnInfo::ConstPtr m_ConnInfo; // Redis connection info (immutable) boost::asio::io_context::strand m_Strand; Shared::Ptr m_TcpConn; Shared::Ptr m_UnixConn; Shared::Ptr m_TlsConn; - Atomic m_Connecting, m_Connected, m_Started; + Atomic m_Connecting, m_Connected, m_Stopped; struct { - // Items to be send to Redis - std::map> Writes; + std::queue HighWriteQ; // High priority writes to be sent to Redis. + std::queue NormalWriteQ; // Normal priority writes to be sent to Redis. // Requestors, each waiting for a single response std::queue> ReplyPromises; // Requestors, each waiting for multiple responses at once std::queue> RepliesPromises; // Metadata about all of the above std::queue FutureResponseActions; - } m_Queues; - // Kinds of queries not to actually send yet - std::set m_SuppressedQueryKinds; + WriteQueueItem PopFront() + { + if (!HighWriteQ.empty()) { + WriteQueueItem item(std::move(HighWriteQ.front())); + HighWriteQ.pop(); + return item; + } + WriteQueueItem item(std::move(NormalWriteQ.front())); + NormalWriteQ.pop(); + return item; + } + + void Push(WriteQueueItem&& item, bool highPriority) + { + if (highPriority) { + HighWriteQ.push(std::move(item)); + } else { + NormalWriteQ.push(std::move(item)); + } + } + + bool HasWrites() const + { + return !HighWriteQ.empty() || !NormalWriteQ.empty(); + } + } m_Queues; // Indicate that there's something to send/receive AsioEvent m_QueuedWrites; @@ -273,7 +294,9 @@ namespace icinga RingBuffer m_WrittenConfig{15 * 60}; RingBuffer m_WrittenState{15 * 60}; RingBuffer m_WrittenHistory{15 * 60}; - int m_PendingQueries{0}; + // Number of pending Redis queries, always 0 if m_Parent is set unless m_TrackOwnPendingQueries is true. + std::atomic_size_t m_PendingQueries{0}; + bool m_TrackOwnPendingQueries; // Whether to track pending queries even if m_Parent is set. boost::asio::deadline_timer m_LogStatsTimer; Ptr m_Parent; }; @@ -391,7 +414,7 @@ RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio:: try { return ReadRESP(*strm, yc); } catch (const std::exception&) { - if (m_Connecting.exchange(false)) { + if (!m_Stopped && m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; @@ -427,7 +450,7 @@ void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, WriteRESP(*strm, query, yc); strm->async_flush(yc); } catch (const std::exception&) { - if (m_Connecting.exchange(false)) { + if (!m_Stopped && m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; @@ -451,24 +474,24 @@ void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, template void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) { - if (m_Password.IsEmpty() && !m_DbIndex) { + if (m_ConnInfo->Password.IsEmpty() && !m_ConnInfo->DbIndex) { // Trigger NOAUTH WriteRESP(*strm, {"PING"}, yc); } else { - if (!m_Username.IsEmpty()) { - WriteRESP(*strm, {"AUTH", m_Username, m_Password}, yc); - } else if (!m_Password.IsEmpty()) { - WriteRESP(*strm, {"AUTH", m_Password}, yc); + if (!m_ConnInfo->User.IsEmpty()) { + WriteRESP(*strm, {"AUTH", m_ConnInfo->User, m_ConnInfo->Password}, yc); + } else if (!m_ConnInfo->Password.IsEmpty()) { + WriteRESP(*strm, {"AUTH", m_ConnInfo->Password}, yc); } - if (m_DbIndex) { - WriteRESP(*strm, {"SELECT", Convert::ToString(m_DbIndex)}, yc); + if (m_ConnInfo->DbIndex) { + WriteRESP(*strm, {"SELECT", Convert::ToString(m_ConnInfo->DbIndex)}, yc); } } strm->async_flush(yc); - if (m_Password.IsEmpty() && !m_DbIndex) { + if (m_ConnInfo->Password.IsEmpty() && !m_ConnInfo->DbIndex) { Reply pong (ReadRESP(*strm, yc)); if (pong.IsObjectType()) { @@ -476,7 +499,7 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(pong)->GetMessage())); } } else { - if (!m_Password.IsEmpty()) { + if (!m_ConnInfo->Password.IsEmpty()) { Reply auth (ReadRESP(*strm, yc)); if (auth.IsObjectType()) { @@ -492,7 +515,7 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) } } - if (m_DbIndex) { + if (m_ConnInfo->DbIndex) { Reply select (ReadRESP(*strm, yc)); if (select.IsObjectType()) { @@ -513,7 +536,7 @@ Timeout RedisConnection::MakeTimeout(StreamPtr& stream) { return Timeout( m_Strand, - boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), + boost::posix_time::microseconds(intmax_t(m_ConnInfo->ConnectTimeout * 1000000)), [stream] { boost::system::error_code ec; stream->lowest_layer().cancel(ec);