Skip to content

Commit 44e0df7

Browse files
committed
Added support for tracking roles for simulation workers
Fixed the exclusion and inclusion address simulation API and integration within workloads Added more information within trace events for simulation
1 parent 581bd6c commit 44e0df7

File tree

7 files changed

+342
-169
lines changed

7 files changed

+342
-169
lines changed

fdbrpc/sim2.actor.cpp

Lines changed: 54 additions & 28 deletions
Large diffs are not rendered by default.

fdbrpc/simulator.h

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,81 @@ class ISimulator : public INetwork {
151151
virtual bool isAvailable() const = 0;
152152
virtual void displayWorkers() const;
153153

154+
virtual void addRole(NetworkAddress const& address, std::string const& role) {
155+
roleAddresses[address][role] ++;
156+
TraceEvent("RoleAdd").detail("Address", address).detail("Role", role).detail("Roles", roleAddresses[address].size()).detail("Value", roleAddresses[address][role]);
157+
}
158+
159+
virtual void removeRole(NetworkAddress const& address, std::string const& role) {
160+
auto addressIt = roleAddresses.find(address);
161+
if (addressIt != roleAddresses.end()) {
162+
auto rolesIt = addressIt->second.find(role);
163+
if (rolesIt != addressIt->second.end()) {
164+
if (rolesIt->second > 1) {
165+
rolesIt->second --;
166+
TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("Roles", addressIt->second.size()).detail("Value", rolesIt->second).detail("Result", "Decremented Role");
167+
}
168+
else {
169+
addressIt->second.erase(rolesIt);
170+
if (addressIt->second.size()) {
171+
TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("Roles", addressIt->second.size()).detail("Value", 0).detail("Result", "Removed Role");
172+
}
173+
else {
174+
roleAddresses.erase(addressIt);
175+
TraceEvent("RoleRemove").detail("Address", address).detail("Role", role).detail("Roles", 0).detail("Value", 0).detail("Result", "Removed Address");
176+
}
177+
}
178+
}
179+
else {
180+
TraceEvent(SevWarn,"RoleRemove").detail("Address", address).detail("Role", role).detail("Result", "Role Missing");
181+
}
182+
}
183+
else {
184+
TraceEvent(SevWarn,"RoleRemove").detail("Address", address).detail("Role", role).detail("Result", "Address Missing");
185+
}
186+
}
187+
188+
virtual std::string getRoles(NetworkAddress const& address, bool skipWorkers = true) const {
189+
auto addressIt = roleAddresses.find(address);
190+
std::string roleText;
191+
if (addressIt != roleAddresses.end()) {
192+
for (auto& roleIt : addressIt->second) {
193+
if ((!skipWorkers) || (roleIt.first != "Worker"))
194+
roleText += roleIt.first + ((roleIt.second > 1) ? format("-%d ", roleIt.second) : " ");
195+
}
196+
}
197+
if (roleText.empty())
198+
roleText = "[unset]";
199+
return roleText;
200+
}
201+
154202
virtual void excludeAddress(NetworkAddress const& address) {
155-
excludedAddresses.insert(address);
203+
excludedAddresses[address]++;
204+
TraceEvent("ExcludeAddress").detail("Address", address).detail("Value", excludedAddresses[address]);
156205
}
206+
157207
virtual void includeAddress(NetworkAddress const& address) {
158-
excludedAddresses.erase(address);
208+
auto addressIt = excludedAddresses.find(address);
209+
if (addressIt != excludedAddresses.end()) {
210+
if (addressIt->second > 1) {
211+
addressIt->second --;
212+
TraceEvent("IncludeAddress").detail("Address", address).detail("Value", addressIt->second).detail("Result", "Decremented");
213+
}
214+
else {
215+
excludedAddresses.erase(addressIt);
216+
TraceEvent("IncludeAddress").detail("Address", address).detail("Value", 0).detail("Result", "Removed");
217+
}
218+
}
219+
else {
220+
TraceEvent(SevWarn,"IncludeAddress").detail("Address", address).detail("Result", "Missing");
221+
}
159222
}
160223
virtual void includeAllAddresses() {
224+
TraceEvent("IncludeAddressAll").detail("AddressTotal", excludedAddresses.size());
161225
excludedAddresses.clear();
162226
}
163227
virtual bool isExcluded(NetworkAddress const& address) const {
164-
return excludedAddresses.count(address) == 0;
228+
return excludedAddresses.find(address) != excludedAddresses.end();
165229
}
166230

167231
virtual void disableSwapToMachine(Optional<Standalone<StringRef>> zoneId ) {
@@ -230,7 +294,8 @@ class ISimulator : public INetwork {
230294

231295
private:
232296
std::set<Optional<Standalone<StringRef>>> swapsDisabled;
233-
std::set<NetworkAddress> excludedAddresses;
297+
std::map<NetworkAddress, int> excludedAddresses;
298+
std::map<NetworkAddress, std::map<std::string, int>> roleAddresses;
234299
bool allSwapsDisabled;
235300
};
236301

fdbserver/ClusterController.actor.cpp

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
218218
std::vector<LocalityData> unavailableLocals;
219219
LocalitySetRef logServerSet;
220220
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap;
221+
UID functionId = g_nondeterministic_random->randomUniqueID();
221222
bool bCompleted = false;
222223

223224
logServerSet = Reference<LocalitySet>(new LocalityMap<std::pair<WorkerInterface, ProcessClass>>());
@@ -230,7 +231,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
230231
}
231232
else {
232233
if (it.second.interf.locality.dataHallId().present())
233-
TraceEvent(SevWarn,"GWFTADNotAvailable", id)
234+
TraceEvent(SevWarn,"GWFTADNotAvailable", functionId)
234235
.detail("Fitness", fitness)
235236
.detailext("Zone", it.second.interf.locality.zoneId())
236237
.detailext("DataHall", it.second.interf.locality.dataHallId())
@@ -243,7 +244,8 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
243244
.detail("Locality", it.second.interf.locality.toString())
244245
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
245246
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
246-
.detail("DesiredLogs", conf.getDesiredLogs());
247+
.detail("DesiredLogs", conf.getDesiredLogs())
248+
.detail("InterfaceId", id);
247249
unavailableLocals.push_back(it.second.interf.locality);
248250
}
249251
}
@@ -258,12 +260,13 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
258260
logServerMap->add(worker.first.locality, &worker);
259261
}
260262
if (logServerSet->size() < conf.tLogReplicationFactor) {
261-
TraceEvent(SevWarn,"GWFTADTooFew", id)
263+
TraceEvent(SevWarn,"GWFTADTooFew", functionId)
262264
.detail("Fitness", fitness)
263265
.detail("Processes", logServerSet->size())
264266
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
265267
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
266-
.detail("DesiredLogs", conf.getDesiredLogs());
268+
.detail("DesiredLogs", conf.getDesiredLogs())
269+
.detail("InterfaceId", id);
267270
}
268271
else if (logServerSet->size() <= conf.getDesiredLogs()) {
269272
ASSERT(conf.tLogPolicy);
@@ -275,12 +278,13 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
275278
break;
276279
}
277280
else {
278-
TraceEvent(SevWarn,"GWFTADNotAcceptable", id)
281+
TraceEvent(SevWarn,"GWFTADNotAcceptable", functionId)
279282
.detail("Fitness", fitness)
280283
.detail("Processes", logServerSet->size())
281284
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
282285
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
283-
.detail("DesiredLogs", conf.getDesiredLogs());
286+
.detail("DesiredLogs", conf.getDesiredLogs())
287+
.detail("InterfaceId", id);
284288
}
285289
}
286290
// Try to select the desired size, if larger
@@ -300,25 +304,27 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
300304
results.push_back(*object);
301305
tLocalities.push_back(object->first.locality);
302306
}
303-
TraceEvent("GWFTADBestResults", id)
307+
TraceEvent("GWFTADBestResults", functionId)
304308
.detail("Fitness", fitness)
305309
.detail("Processes", logServerSet->size())
306310
.detail("BestCount", bestSet.size())
307311
.detail("BestZones", ::describeZones(tLocalities))
308312
.detail("BestDataHalls", ::describeDataHalls(tLocalities))
309313
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
310314
.detail("TotalResults", results.size())
311-
.detail("DesiredLogs", conf.getDesiredLogs());
315+
.detail("DesiredLogs", conf.getDesiredLogs())
316+
.detail("InterfaceId", id);
312317
bCompleted = true;
313318
break;
314319
}
315320
else {
316-
TraceEvent(SevWarn,"GWFTADNoBest", id)
321+
TraceEvent(SevWarn,"GWFTADNoBest", functionId)
317322
.detail("Fitness", fitness)
318323
.detail("Processes", logServerSet->size())
319324
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
320325
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
321-
.detail("DesiredLogs", conf.getDesiredLogs());
326+
.detail("DesiredLogs", conf.getDesiredLogs())
327+
.detail("InterfaceId", id);
322328
}
323329
}
324330
}
@@ -331,7 +337,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
331337
tLocalities.push_back(object->first.locality);
332338
}
333339

334-
TraceEvent(SevWarn, "GetTLogTeamFailed")
340+
TraceEvent(SevWarn, "GetTLogTeamFailed", functionId)
335341
.detail("Policy", conf.tLogPolicy->info())
336342
.detail("Processes", logServerSet->size())
337343
.detail("Workers", id_worker.size())
@@ -344,7 +350,8 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
344350
.detail("DesiredLogs", conf.getDesiredLogs())
345351
.detail("RatingTests",SERVER_KNOBS->POLICY_RATING_TESTS)
346352
.detail("checkStable", checkStable)
347-
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS).backtrace();
353+
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS)
354+
.detail("InterfaceId", id).backtrace();
348355

349356
// Free the set
350357
logServerSet->clear();
@@ -356,14 +363,25 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
356363
id_used[result.first.locality.processId()]++;
357364
}
358365

359-
TraceEvent("GetTLogTeamDone")
366+
TraceEvent("GetTLogTeamDone", functionId)
360367
.detail("Completed", bCompleted).detail("Policy", conf.tLogPolicy->info())
361368
.detail("Results", results.size()).detail("Processes", logServerSet->size())
362369
.detail("Workers", id_worker.size())
363370
.detail("Replication", conf.tLogReplicationFactor)
364371
.detail("Desired", conf.getDesiredLogs())
365372
.detail("RatingTests",SERVER_KNOBS->POLICY_RATING_TESTS)
366-
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS);
373+
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS)
374+
.detail("InterfaceId", id);
375+
376+
for (auto& result : results) {
377+
TraceEvent("GetTLogTeamWorker", functionId)
378+
.detail("Class", result.second.toString())
379+
.detail("Address", result.first.address())
380+
.detailext("Zone", result.first.locality.zoneId())
381+
.detailext("DataHall", result.first.locality.dataHallId())
382+
.detail("isExcludedServer", conf.isExcludedServer(result.first.address()))
383+
.detail("isAvailable", IFailureMonitor::failureMonitor().getState(result.first.storage.getEndpoint()).isAvailable());
384+
}
367385

368386
// Free the set
369387
logServerSet->clear();

0 commit comments

Comments
 (0)