Skip to content

Commit ca8331a

Browse files
committed
implement event-based retry strategy
1 parent fd0562a commit ca8331a

File tree

9 files changed

+276
-26
lines changed

9 files changed

+276
-26
lines changed

orchagent/cbf/cbfnhgorch.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ void CbfNhgOrch::doTask(Consumer& consumer)
177177
if (success)
178178
{
179179
m_syncdNextHopGroups.erase(cbf_nhg_it);
180+
notifyRetry(gRouteOrch, APP_ROUTE_TABLE_NAME, make_constraint(RETRY_CST_ECMP));
180181
}
181182
}
182183
}

orchagent/nhgbase.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,13 @@ class NhgOrchCommon : public Orch
440440
--nhg_entry.ref_count;
441441
}
442442

443+
unsigned getNhgRefCount(const string& index)
444+
{
445+
SWSS_LOG_ENTER();
446+
auto& nhg_entry = m_syncdNextHopGroups.at(index);
447+
return nhg_entry.ref_count;
448+
}
449+
443450
/* Getters / Setters. */
444451
static inline unsigned getSyncedNhgCount() { return NhgBase::getSyncedCount(); }
445452

orchagent/nhgorch.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ extern sai_next_hop_api_t* sai_next_hop_api;
2323
NhgOrch::NhgOrch(DBConnector *db, string tableName) : NhgOrchCommon(db, tableName)
2424
{
2525
SWSS_LOG_ENTER();
26+
createRetryCache(tableName);
2627
}
2728

2829
/*
@@ -266,6 +267,7 @@ void NhgOrch::doTask(Consumer& consumer)
266267
if (nhg->sync())
267268
{
268269
m_syncdNextHopGroups.emplace(index, NhgEntry<NextHopGroup>(std::move(nhg)));
270+
notifyRetry(gRouteOrch, APP_ROUTE_TABLE_NAME, make_constraint(RETRY_CST_NHG, index));
269271
}
270272
else
271273
{
@@ -300,6 +302,7 @@ void NhgOrch::doTask(Consumer& consumer)
300302
success = false;
301303
}
302304
m_syncdNextHopGroups.emplace(index, NhgEntry<NextHopGroup>(std::move(nhg)));
305+
notifyRetry(gRouteOrch, APP_ROUTE_TABLE_NAME, make_constraint(RETRY_CST_NHG, index));
303306
}
304307
}
305308
}
@@ -413,7 +416,9 @@ void NhgOrch::doTask(Consumer& consumer)
413416
/* If the group does exist, but it's still referenced, skip. */
414417
else if (nhg_it->second.ref_count > 0)
415418
{
416-
SWSS_LOG_INFO("Unable to remove group %s which is referenced", index.c_str());
419+
SWSS_LOG_INFO("Unable to remove group %s which is referenced, move task entry to RetryCache", index.c_str());
420+
consumer.addToRetry(std::move(it->second), make_constraint(RETRY_CST_NHG_REF, index));
421+
success = true;
417422
}
418423
/* Else, if the group is no more referenced, remove it. */
419424
else
@@ -425,6 +430,7 @@ void NhgOrch::doTask(Consumer& consumer)
425430
if (success)
426431
{
427432
m_syncdNextHopGroups.erase(nhg_it);
433+
notifyRetry(gRouteOrch, APP_ROUTE_TABLE_NAME, make_constraint(RETRY_CST_ECMP));
428434
}
429435
}
430436
}

orchagent/orch.cpp

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,45 @@ vector<Selectable *> Orch::getSelectables()
147147
return selectables;
148148
}
149149

150-
void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
150+
void Orch::createRetryCache(const std::string &executorName) {
151+
if (m_retryCaches.find(executorName) == m_retryCaches.end())
152+
m_retryCaches[executorName] = std::make_shared<RetryCache>();
153+
}
154+
155+
RetryCache *Orch::getRetryCache(const std::string &executorName)
156+
{
157+
if (m_retryCaches.find(executorName) == m_retryCaches.end())
158+
return nullptr;
159+
else
160+
return m_retryCaches[executorName].get();
161+
}
162+
163+
ConsumerBase* Orch::getConsumerBase(const std::string &executorName)
164+
{
165+
if (m_consumerMap.find(executorName) == m_consumerMap.end())
166+
return nullptr;
167+
return dynamic_cast<ConsumerBase*>(m_consumerMap[executorName].get());
168+
}
169+
170+
void ConsumerBase::addToRetry(Task &&task, Constraint &&cst) {
171+
getOrch()->getRetryCache(getName())->insert_failed_task(std::move(task), std::move(cst));
172+
}
173+
174+
void Orch::addToRetry(std::string &&executorName, Task &&task, Constraint &&cst) {
175+
getRetryCache(executorName)->insert_failed_task(std::move(task), std::move(cst));
176+
}
177+
178+
void Orch::retryToSync(std::string &&executorName, Constraint &&cst)
179+
{
180+
auto tasks = getRetryCache(executorName)->resolve(cst);
181+
getConsumerBase(executorName)->addToSync(tasks, true);
182+
}
183+
184+
size_t ConsumerBase::addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries, bool onRetry) {
185+
return addToSync(*entries, onRetry);
186+
}
187+
188+
void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry, bool onRetry)
151189
{
152190
SWSS_LOG_ENTER();
153191

@@ -157,12 +195,24 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
157195
/* Record incoming tasks */
158196
Recorder::Instance().swss.record(dumpTuple(entry));
159197

198+
auto retryCache = getOrch()->getRetryCache(getName());
199+
200+
/* If a new task has the same key with a retry task */
201+
if (retryCache && retryCache->contains(key))
202+
{
203+
assert(!onRetry); // a retry task is already cleared from the cache before trying to add to m_toSync
204+
assert(m_toSync.find(key) == m_toSync.end());
205+
auto cache = retryCache->erase_failed_task(key);
206+
m_toSync.emplace(key, std::move(*cache));
207+
}
208+
160209
/*
161210
* m_toSync is a multimap which will allow one key with multiple values,
162211
* Also, the order of the key-value pairs whose keys compare equivalent
163212
* is the order of insertion and does not change. (since C++11)
164213
*/
165214

215+
166216
/* If a new task comes we directly put it into getConsumerTable().m_toSync map */
167217
if (m_toSync.find(key) == m_toSync.end())
168218
{
@@ -230,22 +280,18 @@ void ConsumerBase::addToSync(const KeyOpFieldsValuesTuple &entry)
230280

231281
}
232282

233-
size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries)
283+
size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries, bool onRetry)
234284
{
235285
SWSS_LOG_ENTER();
236286

237287
for (auto& entry: entries)
238288
{
239-
addToSync(entry);
289+
addToSync(entry, onRetry);
240290
}
241291

242292
return entries.size();
243293
}
244294

245-
size_t ConsumerBase::addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
246-
return addToSync(*entries);
247-
}
248-
249295
// TODO: Table should be const
250296
size_t ConsumerBase::refillToSync(Table* table)
251297
{

orchagent/orch.h

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ extern "C" {
2626
#include "response_publisher.h"
2727
#include "recorder.h"
2828
#include "schema.h"
29+
#include "retrycache.h"
2930

3031
const char delimiter = ':';
3132
const char list_item_delimiter = ',';
@@ -174,11 +175,18 @@ class ConsumerBase : public Executor {
174175
/* record the tuple */
175176
void recordTuple(const swss::KeyOpFieldsValuesTuple &tuple);
176177

177-
void addToSync(const swss::KeyOpFieldsValuesTuple &entry);
178+
void addToSync(const swss::KeyOpFieldsValuesTuple &entry, bool onRetry=false);
178179

179180
// Returns: the number of entries added to m_toSync
180-
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);
181-
size_t addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries);
181+
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries, bool onRetry=false);
182+
size_t addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries, bool onRetry=false);
183+
184+
/**
185+
* Add a single task into m_toRetry for future retry
186+
* @param task a task tuple
187+
* @param cst the constraint for the task
188+
*/
189+
void addToRetry(Task &&task, Constraint &&cst);
182190

183191
size_t refillToSync();
184192
size_t refillToSync(swss::Table* table);
@@ -264,6 +272,7 @@ typedef enum
264272
typedef std::pair<swss::DBConnector *, std::string> TableConnector;
265273
typedef std::pair<swss::DBConnector *, std::vector<std::string>> TablesConnector;
266274

275+
267276
class Orch
268277
{
269278
public:
@@ -296,13 +305,36 @@ class Orch
296305
virtual void doTask(swss::SelectableTimer &timer) { }
297306

298307
void dumpPendingTasks(std::vector<std::string> &ts);
308+
309+
void createRetryCache(const std::string &executorName);
310+
RetryCache* getRetryCache(const std::string &executorName);
311+
ConsumerBase* getConsumerBase(const std::string &executorName);
312+
313+
/* Add a task and its constraint to the retry cache */
314+
void addToRetry(std::string &&executorName, Task &&task, Constraint &&cst);
315+
316+
/** Delete tasks whose constraints are resolved in this executor's retry cache , then add them back to its m_toSync.
317+
* @param executorName name of the executor (actually a ConsumerBase instance)
318+
* @param cst task constraint **/
319+
virtual void retryToSync(std::string &&executorName, Constraint &&cst);
320+
321+
/** Notify the constrained orch to retry tasks
322+
* @param retryOrch the orch to be notified
323+
* @param executorName name of the executor to be notified
324+
* @param cst the constraint that can be resolved
325+
* **/
326+
virtual void notifyRetry(Orch *retryOrch, std::string &&executorName, Constraint &&cst)
327+
{
328+
retryOrch->retryToSync(std::move(executorName), std::move(cst));
329+
}
299330

300331
/**
301332
* @brief Flush pending responses
302333
*/
303334
void flushResponses();
304335
protected:
305336
ConsumerMap m_consumerMap;
337+
RetryCacheMap m_retryCaches;
306338

307339
Orch();
308340
ref_resolve_status resolveFieldRefValue(type_map&, const std::string&, const std::string&, swss::KeyOpFieldsValuesTuple&, sai_object_id_t&, std::string&);

orchagent/retrycache.h

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#ifndef SWSS_RETRY_CACHE_H
2+
#define SWSS_RETRY_CACHE_H
3+
4+
#include <unordered_set>
5+
#include <unordered_map>
6+
7+
using namespace swss;
8+
9+
enum ConstraintType
10+
{
11+
RETRY_CST_DUMMY,
12+
RETRY_CST_NHG, // nhg doesn't exist
13+
RETRY_CST_NHG_REF, // nhg refcnt nonzero
14+
RETRY_CST_PIC, // context doesn't exist
15+
RETRY_CST_PIC_REF, // context refcnt nonzero
16+
RETRY_CST_ECMP // ecmp resources exhausted
17+
};
18+
19+
using ConstraintData = std::string;
20+
using Constraint = std::pair<ConstraintType, ConstraintData>;
21+
22+
const Constraint DUMMY_CONSTRAINT{RETRY_CST_DUMMY, ""};
23+
24+
template<typename T = ConstraintData>
25+
inline Constraint make_constraint(ConstraintType type, T&& data = T()) {
26+
return {type, std::forward<T>(data)};
27+
}
28+
29+
typedef swss::KeyOpFieldsValuesTuple Task;
30+
typedef std::pair<Constraint, Task> FailedTask;
31+
typedef std::unordered_map<std::string, FailedTask> RetryMap;
32+
33+
namespace std {
34+
template<>
35+
struct hash<::Constraint> {
36+
std::size_t operator()(const ::Constraint& c) const {
37+
return hash<::ConstraintType>{}(c.first) ^
38+
(hash<::ConstraintData>{}(c.second) << 2);
39+
}
40+
};
41+
}
42+
43+
using RetryKeysMap = std::unordered_map<Constraint, std::unordered_set<std::string>>;
44+
45+
class RetryCache
46+
{
47+
public:
48+
49+
// cache the data about the failed tasks for a ConsumerBase instance
50+
RetryMap m_toRetry;
51+
RetryKeysMap m_retryKeys; // maps a constraint to a set of failedKeys
52+
53+
bool contains(const std::string &key) {
54+
return m_toRetry.find(key) != m_toRetry.end();
55+
}
56+
57+
/** Insert a failed task with its constraint to m_toRetry and m_retryKeys
58+
* @param task the task that has failed
59+
* @param cst constraint needs to be resolved for the task to succeed
60+
*/
61+
void insert_failed_task(Task &&task, Constraint &&cst) {
62+
const auto& key = kfvKey(task);
63+
m_retryKeys[cst].insert(key);
64+
m_toRetry.emplace(
65+
std::piecewise_construct,
66+
std::forward_as_tuple(key),
67+
std::forward_as_tuple(std::move(cst), std::move(task))
68+
);
69+
}
70+
71+
/** Erase a task from m_toRetry and m_retryKeys
72+
* @param key key of swss::KeyOpFieldsValuesTuple task
73+
* @return the task that has failed but is ready for retry
74+
*/
75+
std::shared_ptr<Task> erase_failed_task(const std::string &key) {
76+
77+
auto it = m_toRetry.find(key);
78+
if (it == m_toRetry.end())
79+
return std::make_shared<Task>();
80+
81+
Constraint& cst = it->second.first;
82+
auto task = std::make_shared<Task>(std::move(it->second.second));
83+
84+
m_retryKeys[cst].erase(key);
85+
m_toRetry.erase(key);
86+
87+
if (m_retryKeys[cst].empty())
88+
m_retryKeys.erase(cst);
89+
90+
return task;
91+
}
92+
93+
/** Find cached failed tasks that can be resolved by the constraint, remove them from the retry cache.
94+
* @param cst the retry constraint
95+
* @return the resolved failed tasks
96+
*/
97+
std::shared_ptr<std::deque<KeyOpFieldsValuesTuple>> resolve(const Constraint &cst) {
98+
99+
auto tasks = std::make_shared<std::deque<KeyOpFieldsValuesTuple>>();
100+
101+
if (m_retryKeys.find(cst) == m_retryKeys.end())
102+
return tasks;
103+
104+
// get a set of keys that correspond to tasks constrained by the cst
105+
std::unordered_set<std::string>& keys = m_retryKeys[cst];
106+
107+
for (auto it = keys.begin(); it != keys.end();)
108+
{
109+
auto failed_task_it = m_toRetry.find(*it);
110+
if (failed_task_it != m_toRetry.end())
111+
{
112+
tasks->push_back(std::move(failed_task_it->second.second));
113+
m_toRetry.erase(failed_task_it);
114+
}
115+
it = keys.erase(it);
116+
}
117+
118+
m_retryKeys.erase(cst);
119+
120+
return tasks;
121+
}
122+
};
123+
124+
typedef std::unordered_map<std::string, std::shared_ptr<RetryCache>> RetryCacheMap;
125+
126+
#endif /* SWSS_RETRY_CACHE_H */

0 commit comments

Comments
 (0)