Skip to content

Commit 2099e59

Browse files
committed
RedisConnection: enhance WriteQueueItem & related usages
1 parent 59c5c3a commit 2099e59

File tree

2 files changed

+129
-89
lines changed

2 files changed

+129
-89
lines changed

lib/icingadb/redisconnection.cpp

Lines changed: 115 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ void RedisConnection::FireAndForgetQuery(Query query, QueryAffects affects, bool
170170
auto item (Shared<Query>::Make(std::move(query)));
171171

172172
asio::post(m_Strand, [this, item, highPriority, affects, ctime = std::chrono::steady_clock::now()]() {
173-
auto qitem = WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects, highPriority};
173+
auto qitem = WriteQueueItem{item, ctime, affects, highPriority};
174174
if (highPriority) {
175175
m_Queues.PushFront(std::move(qitem));
176176
} else {
@@ -200,7 +200,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Que
200200
auto item (Shared<Queries>::Make(std::move(queries)));
201201

202202
asio::post(m_Strand, [this, item, affects, ctime = std::chrono::steady_clock::now()]() {
203-
m_Queues.Writes.push_back(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects});
203+
m_Queues.Writes.push_back(WriteQueueItem{item, ctime, affects});
204204
m_QueuedWrites.Set();
205205
IncreasePendingQueries(item->size());
206206
});
@@ -227,7 +227,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
227227
auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
228228

229229
asio::post(m_Strand, [this, item, affects, ctime = std::chrono::steady_clock::now()]() {
230-
m_Queues.Writes.push_back(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects});
230+
m_Queues.Writes.push_back(WriteQueueItem{item, ctime, affects});
231231
m_QueuedWrites.Set();
232232
IncreasePendingQueries(1);
233233
});
@@ -260,7 +260,7 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(Queries queries, Q
260260
auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
261261

262262
asio::post(m_Strand, [this, item, highPriority, affects, ctime = std::chrono::steady_clock::now()]() {
263-
auto qitem = WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects, highPriority};
263+
auto qitem = WriteQueueItem{item, ctime, affects, highPriority};
264264
if (highPriority) {
265265
m_Queues.PushFront(std::move(qitem));
266266
} else {
@@ -280,7 +280,7 @@ void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yiel
280280
AssertNotStopped();
281281

282282
asio::post(m_Strand, [this, callback, ctime = std::chrono::steady_clock::now()]() {
283-
m_Queues.Writes.push_back(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime});
283+
m_Queues.Writes.push_back(WriteQueueItem{callback, ctime});
284284
m_QueuedWrites.Set();
285285
});
286286
}
@@ -515,7 +515,14 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
515515
auto queuedWrite(std::move(m_Queues.Writes.front()));
516516
m_Queues.Writes.pop_front();
517517

518-
WriteItem(yc, std::move(queuedWrite));
518+
std::visit(
519+
[this, &yc, &queuedWrite](const auto& item) {
520+
if (WriteItem(item, yc)) {
521+
RecordAffected(queuedWrite.Affects, Utility::GetTime());
522+
}
523+
},
524+
queuedWrite.Item
525+
);
519526
}
520527

521528
m_QueuedWrites.Clear();
@@ -558,111 +565,138 @@ void RedisConnection::LogStats(asio::yield_context& yc)
558565
}
559566

560567
/**
561-
* Send next and schedule receiving the response
568+
* Write a single Redis query in a fire-and-forget manner.
569+
*
570+
* @param item Redis query
562571
*
563-
* @param next Redis queries
572+
* @return true on success, false on failure.
564573
*/
565-
void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
574+
bool RedisConnection::WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc)
566575
{
567-
if (next.FireAndForgetQuery) {
568-
auto& item (*next.FireAndForgetQuery);
569-
DecreasePendingQueries(1);
576+
DecreasePendingQueries(1);
570577

571-
try {
572-
WriteOne(item, yc);
573-
} catch (const std::exception& ex) {
574-
Log msg (LogCritical, "IcingaDB", "Error during sending query");
575-
LogQuery(item, msg);
576-
msg << " which has been fired and forgotten: " << ex.what();
578+
try {
579+
WriteOne(*item, yc);
580+
} catch (const std::exception& ex) {
581+
Log msg (LogCritical, "IcingaDB", "Error during sending query");
582+
LogQuery(*item, msg);
583+
msg << " which has been fired and forgotten: " << ex.what();
577584

578-
return;
579-
}
580-
581-
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
582-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
583-
} else {
584-
++m_Queues.FutureResponseActions.back().Amount;
585-
}
585+
return false;
586+
}
586587

587-
m_QueuedReads.Set();
588+
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
589+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
590+
} else {
591+
++m_Queues.FutureResponseActions.back().Amount;
588592
}
589593

590-
if (next.FireAndForgetQueries) {
591-
auto& item (*next.FireAndForgetQueries);
592-
size_t i = 0;
594+
m_QueuedReads.Set();
595+
return true;
596+
}
593597

594-
DecreasePendingQueries(item.size());
598+
/**
599+
* Write multiple Redis queries in a fire-and-forget manner.
600+
*
601+
* @param item Redis queries
602+
*
603+
* @return true on success, false on failure.
604+
*/
605+
bool RedisConnection::WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc)
606+
{
607+
size_t i = 0;
595608

596-
try {
597-
for (auto& query : item) {
598-
WriteOne(query, yc);
599-
++i;
600-
}
601-
} catch (const std::exception& ex) {
602-
Log msg (LogCritical, "IcingaDB", "Error during sending query");
603-
LogQuery(item[i], msg);
604-
msg << " which has been fired and forgotten: " << ex.what();
609+
DecreasePendingQueries(item->size());
605610

606-
return;
611+
try {
612+
for (auto& query : *item) {
613+
WriteOne(query, yc);
614+
++i;
607615
}
616+
} catch (const std::exception& ex) {
617+
Log msg (LogCritical, "IcingaDB", "Error during sending query");
618+
LogQuery((*item)[i], msg);
619+
msg << " which has been fired and forgotten: " << ex.what();
608620

609-
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
610-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
611-
} else {
612-
m_Queues.FutureResponseActions.back().Amount += item.size();
613-
}
621+
return false;
622+
}
614623

615-
m_QueuedReads.Set();
624+
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
625+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->size(), ResponseAction::Ignore});
626+
} else {
627+
m_Queues.FutureResponseActions.back().Amount += item->size();
616628
}
617629

618-
if (next.GetResultOfQuery) {
619-
auto& item (*next.GetResultOfQuery);
620-
DecreasePendingQueries(1);
630+
m_QueuedReads.Set();
631+
return true;
632+
}
621633

622-
try {
623-
WriteOne(item.first, yc);
624-
} catch (const std::exception&) {
625-
item.second.set_exception(std::current_exception());
634+
/**
635+
* Write a single Redis query and enqueue a response promise to be fulfilled once the response has been received.
636+
*
637+
* @param item Redis query and promise for the response
638+
*/
639+
bool RedisConnection::WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc)
640+
{
641+
DecreasePendingQueries(1);
626642

627-
return;
628-
}
643+
try {
644+
WriteOne(item->first, yc);
645+
} catch (const std::exception&) {
646+
item->second.set_exception(std::current_exception());
629647

630-
m_Queues.ReplyPromises.emplace(std::move(item.second));
648+
return false;
649+
}
631650

632-
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
633-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
634-
} else {
635-
++m_Queues.FutureResponseActions.back().Amount;
636-
}
651+
m_Queues.ReplyPromises.push(std::move(item->second));
637652

638-
m_QueuedReads.Set();
653+
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
654+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
655+
} else {
656+
++m_Queues.FutureResponseActions.back().Amount;
639657
}
640658

641-
if (next.GetResultsOfQueries) {
642-
auto& item (*next.GetResultsOfQueries);
643-
DecreasePendingQueries(item.first.size());
659+
m_QueuedReads.Set();
660+
return true;
661+
}
644662

645-
try {
646-
for (auto& query : item.first) {
647-
WriteOne(query, yc);
648-
}
649-
} catch (const std::exception&) {
650-
item.second.set_exception(std::current_exception());
663+
/**
664+
* Write multiple Redis queries and enqueue a response promise to be fulfilled once all responses have been received.
665+
*
666+
* @param item Redis queries and promise for the responses.
667+
*
668+
* @return true on success, false on failure.
669+
*/
670+
bool RedisConnection::WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc)
671+
{
672+
DecreasePendingQueries(item->first.size());
651673

652-
return;
674+
try {
675+
for (auto& query : item->first) {
676+
WriteOne(query, yc);
653677
}
678+
} catch (const std::exception&) {
679+
item->second.set_exception(std::current_exception());
654680

655-
m_Queues.RepliesPromises.emplace(std::move(item.second));
656-
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
657-
658-
m_QueuedReads.Set();
681+
return false;
659682
}
660683

661-
if (next.Callback) {
662-
next.Callback(yc);
663-
}
684+
m_Queues.RepliesPromises.emplace(std::move(item->second));
685+
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item->first.size(), ResponseAction::DeliverBulk});
664686

665-
RecordAffected(next.Affects, Utility::GetTime());
687+
m_QueuedReads.Set();
688+
return true;
689+
}
690+
691+
/**
692+
* Invokes the provided callback immediately.
693+
*
694+
* @param item Callback to execute
695+
*/
696+
bool RedisConnection::WriteItem(const QueryCallback& item, boost::asio::yield_context& yc)
697+
{
698+
item(yc);
699+
return true;
666700
}
667701

668702
/**

lib/icingadb/redisconnection.hpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <stdexcept>
4242
#include <utility>
4343
#include <vector>
44+
#include <variant>
4445

4546
namespace icinga
4647
{
@@ -135,19 +136,20 @@ namespace icinga
135136
ResponseAction Action;
136137
};
137138

139+
using FireAndForgetQ = Shared<Query>::Ptr; // A single query that does not expect a result.
140+
using FireAndForgetQs = Shared<Queries>::Ptr; // Multiple queries that do not expect results.
141+
using QueryWithPromise = Shared<std::pair<Query, std::promise<Reply>>>::Ptr; // A single query expecting a result.
142+
using QueriesWithPromise = Shared<std::pair<Queries, std::promise<Replies>>>::Ptr; // Multiple queries expecting results.
143+
using QueryCallback = std::function<void(boost::asio::yield_context&)>; // A callback to be executed.
144+
138145
/**
139-
* Something to be send to Redis.
146+
* An item in the write queue to be sent to Redis.
140147
*
141148
* @ingroup icingadb
142149
*/
143150
struct WriteQueueItem
144151
{
145-
Shared<Query>::Ptr FireAndForgetQuery;
146-
Shared<Queries>::Ptr FireAndForgetQueries;
147-
Shared<std::pair<Query, std::promise<Reply>>>::Ptr GetResultOfQuery;
148-
Shared<std::pair<Queries, std::promise<Replies>>>::Ptr GetResultsOfQueries;
149-
std::function<void(boost::asio::yield_context&)> Callback;
150-
152+
std::variant<FireAndForgetQ, FireAndForgetQs, QueryWithPromise, QueriesWithPromise, QueryCallback> Item;
151153
std::chrono::steady_clock::time_point CTime; // When was this item queued?
152154
QueryAffects Affects;
153155
bool HighPriority; // Does this item have high priority?
@@ -178,7 +180,11 @@ namespace icinga
178180
void ReadLoop(boost::asio::yield_context& yc);
179181
void WriteLoop(boost::asio::yield_context& yc);
180182
void LogStats(boost::asio::yield_context& yc);
181-
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
183+
bool WriteItem(const FireAndForgetQ& item, boost::asio::yield_context& yc);
184+
bool WriteItem(const FireAndForgetQs& item, boost::asio::yield_context& yc);
185+
bool WriteItem(const QueryWithPromise& item, boost::asio::yield_context& yc);
186+
bool WriteItem(const QueriesWithPromise& item, boost::asio::yield_context& yc);
187+
bool WriteItem(const QueryCallback& item, boost::asio::yield_context& yc);
182188
Reply ReadOne(boost::asio::yield_context& yc);
183189
void WriteOne(Query& query, boost::asio::yield_context& yc);
184190

0 commit comments

Comments
 (0)