Skip to content

Commit 2b23111

Browse files
authored
[release-7.3] Log all incoming connections (#11713)
* Log all incoming connections * Address review comments * Update FlowTransport.actor.cpp * Update FlowTransport.actor.cpp * Refactor * Format * initialize for simulation
1 parent 12c39e7 commit 2b23111

File tree

3 files changed

+131
-3
lines changed

3 files changed

+131
-3
lines changed

fdbrpc/FlowTransport.actor.cpp

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@
2020

2121
#include "fdbrpc/FlowTransport.h"
2222
#include "flow/Arena.h"
23+
#include "flow/IThreadPool.h"
24+
#include "flow/Knobs.h"
25+
#include "flow/NetworkAddress.h"
2326
#include "flow/network.h"
2427

2528
#include <cstdint>
29+
#include <fstream>
30+
#include <string>
2631
#include <unordered_map>
2732
#if VALGRIND
2833
#include <memcheck.h>
@@ -352,8 +357,112 @@ class TransportData {
352357
Future<Void> publicKeyFileWatch;
353358

354359
std::unordered_map<Standalone<StringRef>, PublicKey> publicKeys;
360+
361+
struct ConnectionHistoryEntry {
362+
int64_t time;
363+
NetworkAddress addr;
364+
bool failed;
365+
};
366+
std::deque<ConnectionHistoryEntry> connectionHistory;
367+
Future<Void> connectionHistoryLoggerF;
368+
Reference<IThreadPool> connectionLogWriterThread;
369+
};
370+
371+
struct ConnectionLogWriter : IThreadPoolReceiver {
372+
const std::string baseDir;
373+
std::string fileName;
374+
std::fstream file;
375+
376+
ConnectionLogWriter(const std::string baseDir) : baseDir(baseDir) {}
377+
378+
virtual ~ConnectionLogWriter() {
379+
if (file.is_open())
380+
file.close();
381+
}
382+
383+
struct AppendAction : TypedAction<ConnectionLogWriter, AppendAction> {
384+
std::string localAddr;
385+
std::deque<TransportData::ConnectionHistoryEntry> entries;
386+
AppendAction(std::string localAddr, std::deque<TransportData::ConnectionHistoryEntry>&& entries)
387+
: localAddr(localAddr), entries(std::move(entries)) {}
388+
389+
double getTimeEstimate() const { return 2; }
390+
};
391+
392+
std::string newFileName() const { return baseDir + "fdb-connection-log-" + time_str() + ".csv"; }
393+
394+
void init() { fileName = newFileName(); }
395+
396+
std::string time_str() const { return std::to_string(now()); }
397+
398+
void openOrRoll() {
399+
if (fileName.empty()) {
400+
fileName = newFileName();
401+
}
402+
403+
if (!file.is_open()) {
404+
TraceEvent("OpenConnectionLog").detail("FileName", fileName);
405+
file = std::fstream(fileName, std::ios::in | std::ios::out | std::ios::app);
406+
}
407+
408+
if (!file.is_open()) {
409+
TraceEvent(SevError, "ErrorOpenConnectionLog").detail("FileName", fileName);
410+
throw io_error();
411+
}
412+
413+
if (file.tellg() > 100 * 1024 * 1024 /* 100 MB */) {
414+
file.close();
415+
fileName = newFileName();
416+
TraceEvent("RollConnectionLog").detail("FileName", fileName);
417+
openOrRoll();
418+
}
419+
}
420+
421+
void action(AppendAction& a) {
422+
openOrRoll();
423+
424+
std::string output;
425+
for (const auto& entry : a.entries) {
426+
output += std::to_string(entry.time) + ",";
427+
output += a.localAddr + ",";
428+
output += entry.failed ? "failed," : "success,";
429+
output += entry.addr.toString() + "\n";
430+
}
431+
file << output;
432+
file.flush();
433+
}
355434
};
356435

436+
ACTOR Future<Void> connectionHistoryLogger(TransportData* self) {
437+
if (!FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
438+
return Void();
439+
}
440+
441+
state Future<Void> next = Void();
442+
443+
// One thread ensures async serialized execution on the log file.
444+
if (g_network->isSimulated()) {
445+
self->connectionLogWriterThread = Reference<IThreadPool>(new DummyThreadPool());
446+
} else {
447+
self->connectionLogWriterThread = createGenericThreadPool();
448+
}
449+
450+
self->connectionLogWriterThread->addThread(new ConnectionLogWriter(FLOW_KNOBS->CONNECTION_LOG_DIRECTORY));
451+
loop {
452+
wait(next);
453+
next = delay(FLOW_KNOBS->LOG_CONNECTION_INTERVAL_SECS);
454+
if (self->connectionHistory.size() == 0) {
455+
continue;
456+
}
457+
std::string localAddr = FlowTransport::getGlobalLocalAddress().toString();
458+
auto action = new ConnectionLogWriter::AppendAction(localAddr, std::move(self->connectionHistory));
459+
ASSERT(action != nullptr);
460+
self->connectionLogWriterThread->post(action);
461+
wait(delay(1));
462+
ASSERT(self->connectionHistory.size() == 0);
463+
}
464+
}
465+
357466
ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
358467
state NetworkAddress lastAddress = NetworkAddress();
359468
loop {
@@ -422,6 +531,8 @@ TransportData::TransportData(uint64_t transportId, int maxWellKnownEndpoints, IP
422531
allowList(allowList == nullptr ? IPAllowList() : *allowList) {
423532
degraded = makeReference<AsyncVar<bool>>(false);
424533
pingLogger = pingLatencyLogger(this);
534+
535+
connectionHistoryLoggerF = connectionHistoryLogger(this);
425536
}
426537

427538
#define CONNECT_PACKET_V0 0x0FDB00A444020001LL
@@ -1492,10 +1603,17 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
14921603
}
14931604

14941605
ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<IConnection> conn) {
1606+
state TransportData::ConnectionHistoryEntry entry;
1607+
entry.time = now();
1608+
entry.addr = conn->getPeerAddress();
14951609
try {
14961610
wait(conn->acceptHandshake());
14971611
state Promise<Reference<Peer>> onConnected;
14981612
state Future<Void> reader = connectionReader(self, conn, Reference<Peer>(), onConnected);
1613+
if (FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
1614+
entry.failed = false;
1615+
self->connectionHistory.push_back(entry);
1616+
}
14991617
choose {
15001618
when(wait(reader)) {
15011619
ASSERT(false);
@@ -1509,17 +1627,21 @@ ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<ICon
15091627
throw timed_out();
15101628
}
15111629
}
1512-
return Void();
15131630
} catch (Error& e) {
15141631
if (e.code() != error_code_actor_cancelled) {
15151632
TraceEvent("IncomingConnectionError", conn->getDebugID())
15161633
.errorUnsuppressed(e)
15171634
.suppressFor(1.0)
15181635
.detail("FromAddress", conn->getPeerAddress());
1636+
if (FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
1637+
entry.failed = true;
1638+
self->connectionHistory.push_back(entry);
1639+
}
15191640
}
15201641
conn->close();
1521-
return Void();
15221642
}
1643+
1644+
return Void();
15231645
}
15241646

15251647
ACTOR static Future<Void> listen(TransportData* self, NetworkAddress listenAddr) {

flow/Knobs.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
121121
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
122122
init( PING_LOGGING_INTERVAL, 3.0 );
123123
init( PING_SKETCH_ACCURACY, 0.1 );
124+
init( LOG_CONNECTION_ATTEMPTS_ENABLED, false );
125+
init( CONNECTION_LOG_DIRECTORY, "" );
126+
init( LOG_CONNECTION_INTERVAL_SECS, 3 );
124127

125128
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
126129
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
@@ -202,7 +205,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
202205
//IAsyncFile
203206
init( INCREMENTAL_DELETE_TRUNCATE_AMOUNT, 5e8 ); //500MB
204207
init( INCREMENTAL_DELETE_INTERVAL, 1.0 ); //every 1 second
205-
208+
206209
//Net2 and FlowTransport
207210
init( MIN_COALESCE_DELAY, 10e-6 ); if( randomize && BUGGIFY ) MIN_COALESCE_DELAY = 0;
208211
init( MAX_COALESCE_DELAY, 20e-6 ); if( randomize && BUGGIFY ) MAX_COALESCE_DELAY = 0;

flow/include/flow/Knobs.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ class FlowKnobs : public KnobsImpl<FlowKnobs> {
187187
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
188188
double PING_LOGGING_INTERVAL;
189189
double PING_SKETCH_ACCURACY;
190+
bool LOG_CONNECTION_ATTEMPTS_ENABLED;
191+
int LOG_CONNECTION_INTERVAL_SECS;
192+
std::string CONNECTION_LOG_DIRECTORY;
190193

191194
int TLS_CERT_REFRESH_DELAY_SECONDS;
192195
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;

0 commit comments

Comments
 (0)