Skip to content

Refactor the output component #2086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: mauro/ROX-28981-rework-collector-iservice-protobuf
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions collector/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
file(GLOB COLLECTOR_LIB_SRC_FILES
${CMAKE_CURRENT_SOURCE_DIR}/*.cpp
${CMAKE_CURRENT_SOURCE_DIR}/system-inspector/*.cpp
${CMAKE_CURRENT_SOURCE_DIR}/output/*.cpp
${CMAKE_CURRENT_SOURCE_DIR}/output/*/*.cpp
)

add_library(collector_lib ${DRIVER_HEADERS} ${COLLECTOR_LIB_SRC_FILES})
Expand Down
2 changes: 0 additions & 2 deletions collector/lib/CollectorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ class CollectorConfig {
}

protected:
FRIEND_TEST(SensorClientFormatterTest, NoProcessArguments);

int scrape_interval_;
CollectionMethod collection_method_;
bool turn_off_scrape_;
Expand Down
4 changes: 2 additions & 2 deletions collector/lib/CollectorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

#include "CivetWrapper.h"
#include "CollectorConfig.h"
#include "CollectorOutput.h"
#include "CollectorStatsExporter.h"
#include "ConfigLoader.h"
#include "Control.h"
#include "NetworkStatusInspector.h"
#include "NetworkStatusNotifier.h"
#include "output/Output.h"
#include "system-inspector/Service.h"

namespace collector {
Expand All @@ -33,7 +33,7 @@ class CollectorService {
bool WaitForGRPCServer();

CollectorConfig& config_;
CollectorOutput output_;
output::Output output_;
system_inspector::Service system_inspector_;

std::atomic<ControlValue>* control_;
Expand Down
8 changes: 4 additions & 4 deletions collector/lib/ProcessSignalHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
#include "CollectorConfig.h"
#include "ProcessSignalFormatter.h"
#include "RateLimit.h"
#include "SensorClientFormatter.h"
#include "SignalHandler.h"
#include "output/Formatter.h"
#include "system-inspector/Service.h"

// forward declarations
Expand All @@ -20,7 +20,7 @@ class ProcessSignalHandler : public SignalHandler {
public:
ProcessSignalHandler(
sinsp* inspector,
CollectorOutput* client,
output::Output* client,
system_inspector::Stats* stats,
const CollectorConfig& config)
: client_(client),
Expand Down Expand Up @@ -48,9 +48,9 @@ class ProcessSignalHandler : public SignalHandler {
Result HandleSensorSignal(sinsp_evt* evt);
Result HandleExistingProcessSensor(sinsp_threadinfo* tinfo);

CollectorOutput* client_;
output::Output* client_;
ProcessSignalFormatter signal_formatter_;
SensorClientFormatter sensor_formatter_;
output::Formatter sensor_formatter_;
system_inspector::Stats* stats_;
RateLimitCache rate_limiter_;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "SensorClientFormatter.h"
#include "Formatter.h"

#include <uuid/uuid.h>

Expand All @@ -13,12 +13,12 @@
#include "Utility.h"
#include "system-inspector/EventExtractor.h"

namespace collector {
namespace collector::output {

using SignalStreamMessage = sensor::SignalStreamMessage;
using Signal = SensorClientFormatter::Signal;
using ProcessSignal = SensorClientFormatter::ProcessSignal;
using LineageInfo = SensorClientFormatter::LineageInfo;
using Signal = Formatter::Signal;
using ProcessSignal = Formatter::ProcessSignal;
using LineageInfo = Formatter::LineageInfo;

using Timestamp = google::protobuf::Timestamp;
using TimeUtil = google::protobuf::util::TimeUtil;
Expand Down Expand Up @@ -57,17 +57,17 @@ std::string extract_proc_args(sinsp_threadinfo* tinfo) {

} // namespace

SensorClientFormatter::SensorClientFormatter(sinsp* inspector, const CollectorConfig& config)
Formatter::Formatter(sinsp* inspector, const CollectorConfig& config)
: event_names_(EventNames::GetInstance()),
event_extractor_(std::make_unique<system_inspector::EventExtractor>()),
container_metadata_(inspector),
config_(config) {
event_extractor_->Init(inspector);
}

SensorClientFormatter::~SensorClientFormatter() = default;
Formatter::~Formatter() = default;

const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* event) {
const sensor::ProcessSignal* Formatter::ToProtoMessage(sinsp_evt* event) {
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
return nullptr;
}
Expand All @@ -82,7 +82,7 @@ const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* ev
return CreateProcessSignal(event);
}

const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
const sensor::ProcessSignal* Formatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
Reset();
if (!ValidateProcessDetails(tinfo)) {
CLOG(INFO) << "Dropping process event: " << tinfo;
Expand All @@ -92,7 +92,7 @@ const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadi
return CreateProcessSignal(tinfo);
}

ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
ProcessSignal* Formatter::CreateProcessSignal(sinsp_evt* event) {
auto signal = AllocateRoot();

// set id
Expand Down Expand Up @@ -174,7 +174,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
return signal;
}

ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
ProcessSignal* Formatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
auto signal = AllocateRoot();

// set id
Expand Down Expand Up @@ -237,7 +237,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinf
return signal;
}

std::string SensorClientFormatter::ToString(sinsp_evt* event) {
std::string Formatter::ToString(sinsp_evt* event) {
std::stringstream ss;
const std::string* path = event_extractor_->get_exepath(event);
const std::string* name = event_extractor_->get_comm(event);
Expand All @@ -254,7 +254,7 @@ std::string SensorClientFormatter::ToString(sinsp_evt* event) {
return ss.str();
}

bool SensorClientFormatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo) {
bool Formatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo) {
if (tinfo == nullptr) {
return false;
}
Expand All @@ -266,11 +266,11 @@ bool SensorClientFormatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo
return true;
}

bool SensorClientFormatter::ValidateProcessDetails(sinsp_evt* event) {
bool Formatter::ValidateProcessDetails(sinsp_evt* event) {
return ValidateProcessDetails(event->get_thread_info());
}

void SensorClientFormatter::UpdateLineageStats(const std::vector<LineageInfo>& lineage) {
void Formatter::UpdateLineageStats(const std::vector<LineageInfo>& lineage) {
int string_total = std::accumulate(lineage.cbegin(), lineage.cend(), 0, [](int acc, const auto& l) {
return acc + l.parent_exec_file_path().size();
});
Expand All @@ -281,7 +281,7 @@ void SensorClientFormatter::UpdateLineageStats(const std::vector<LineageInfo>& l
COUNTER_ADD(CollectorStats::process_lineage_string_total, string_total);
}

std::vector<LineageInfo> SensorClientFormatter::GetProcessLineage(sinsp_threadinfo* tinfo) {
std::vector<LineageInfo> Formatter::GetProcessLineage(sinsp_threadinfo* tinfo) {
std::vector<LineageInfo> lineage;
if (tinfo == nullptr) {
return lineage;
Expand Down Expand Up @@ -331,4 +331,4 @@ std::vector<LineageInfo> SensorClientFormatter::GetProcessLineage(sinsp_threadin
return lineage;
}

} // namespace collector
} // namespace collector::output
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ namespace collector::system_inspector {
class EventExtractor;
}

namespace collector {
namespace collector::output {

class SensorClientFormatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
class Formatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
public:
SensorClientFormatter(const SensorClientFormatter&) = delete;
SensorClientFormatter(SensorClientFormatter&&) = delete;
SensorClientFormatter& operator=(const SensorClientFormatter&) = delete;
SensorClientFormatter& operator=(SensorClientFormatter&&) = delete;
virtual ~SensorClientFormatter();
Formatter(const Formatter&) = delete;
Formatter(Formatter&&) = delete;
Formatter& operator=(const Formatter&) = delete;
Formatter& operator=(Formatter&&) = delete;
virtual ~Formatter();

SensorClientFormatter(sinsp* inspector, const CollectorConfig& config);
Formatter(sinsp* inspector, const CollectorConfig& config);

using Signal = v1::Signal;
using ProcessSignal = sensor::ProcessSignal;
Expand Down Expand Up @@ -108,4 +108,4 @@ class SensorClientFormatter : public ProtoSignalFormatter<sensor::ProcessSignal>
const CollectorConfig& config_;
};

} // namespace collector
} // namespace collector::output
42 changes: 42 additions & 0 deletions collector/lib/output/IClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef OUTPUT_ICLIENT_H
#define OUTPUT_ICLIENT_H

#include "internalapi/sensor/collector_iservice.grpc.pb.h"

#include "SignalHandler.h"

namespace collector::output {

class IClient {
public:
using Service = sensor::CollectorService;

IClient() = default;
IClient(const IClient&) = default;
IClient(IClient&&) = delete;
IClient& operator=(const IClient&) = default;
IClient& operator=(IClient&&) = delete;
virtual ~IClient() = default;

/**
* Recreate the internal state of the object to allow communication.
*
* Mostly useful for handling gRPC reconnections.
*
* @returns true if the refresh was succesful, false otherwise.
*/
virtual bool Recreate() = 0;

/**
* Send a message to sensor through the iservice.
*
* @param msg The message to be sent to sensor.
* @returns A SignalHandler::Result with the outcome of the send
* operation.
*/
virtual SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) = 0;
};

} // namespace collector::output

#endif // OUTPUT_ICLIENT_H
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
#include "CollectorOutput.h"
#include "Output.h"

#include "GRPCUtil.h"
#include "HostInfo.h"
#include "output/grpc/Client.h"
#include "output/log/Client.h"

namespace collector {
namespace collector::output {

CollectorOutput::CollectorOutput(const CollectorConfig& config)
Output::Output(const CollectorConfig& config)
: use_sensor_client_(!config.UseLegacyServices()) {
if (config.grpc_channel != nullptr) {
channel_ = config.grpc_channel;

if (use_sensor_client_) {
auto sensor_client = std::make_unique<SensorClient>(channel_);
auto sensor_client = std::make_unique<grpc::Client>(channel_);
sensor_clients_.emplace_back(std::move(sensor_client));
} else {
auto signal_client = std::make_unique<SignalServiceClient>(channel_);
Expand All @@ -21,7 +22,7 @@ CollectorOutput::CollectorOutput(const CollectorConfig& config)

if (config.grpc_channel == nullptr || config.UseStdout()) {
if (use_sensor_client_) {
auto sensor_client = std::make_unique<SensorClientStdout>();
auto sensor_client = std::make_unique<log::Client>();
sensor_clients_.emplace_back(std::move(sensor_client));
} else {
auto signal_client = std::make_unique<StdoutSignalServiceClient>();
Expand All @@ -32,13 +33,13 @@ CollectorOutput::CollectorOutput(const CollectorConfig& config)
thread_.Start([this] { EstablishGrpcStream(); });
}

void CollectorOutput::HandleOutputError() {
void Output::HandleOutputError() {
CLOG(ERROR) << "GRPC stream interrupted";
stream_active_.store(false, std::memory_order_release);
stream_interrupted_.notify_one();
}

SignalHandler::Result CollectorOutput::SensorOutput(const sensor::ProcessSignal& msg) {
SignalHandler::Result Output::SensorOutput(const sensor::ProcessSignal& msg) {
for (auto& client : sensor_clients_) {
auto res = client->SendMsg(msg);
switch (res) {
Expand All @@ -58,7 +59,7 @@ SignalHandler::Result CollectorOutput::SensorOutput(const sensor::ProcessSignal&
return SignalHandler::PROCESSED;
}

SignalHandler::Result CollectorOutput::SignalOutput(const sensor::SignalStreamMessage& msg) {
SignalHandler::Result Output::SignalOutput(const sensor::SignalStreamMessage& msg) {
for (auto& client : signal_clients_) {
auto res = client->PushSignals(msg);
switch (res) {
Expand All @@ -78,7 +79,7 @@ SignalHandler::Result CollectorOutput::SignalOutput(const sensor::SignalStreamMe
return SignalHandler::PROCESSED;
}

SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
SignalHandler::Result Output::SendMsg(const MessageType& msg) {
auto visitor = [this](auto&& m) {
using T = std::decay_t<decltype(m)>;
if constexpr (std::is_same_v<T, sensor::ProcessSignal>) {
Expand All @@ -94,13 +95,13 @@ SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
return std::visit(visitor, msg);
}

void CollectorOutput::EstablishGrpcStream() {
void Output::EstablishGrpcStream() {
while (EstablishGrpcStreamSingle()) {
}
CLOG(INFO) << "Service client terminating.";
}

bool CollectorOutput::EstablishGrpcStreamSingle() {
bool Output::EstablishGrpcStreamSingle() {
std::mutex mtx;
std::unique_lock<std::mutex> lock(mtx);
stream_interrupted_.wait(lock, [this]() { return !stream_active_.load(std::memory_order_acquire) || thread_.should_stop(); });
Expand Down Expand Up @@ -135,4 +136,4 @@ bool CollectorOutput::EstablishGrpcStreamSingle() {
}
return true;
}
} // namespace collector
} // namespace collector::output
Loading
Loading