Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ CONF_String(ssl_private_key_path, "");
// The max number of single connections maintained by the brpc client and each server.
// These connections are created during the first few access and will be used thereafter
CONF_Int32(brpc_max_connections_per_server, "1");
// BRPC stub cache expire configurations
// The expire time of BRPC stub cache, default 60 minutes.
CONF_mInt32(brpc_stub_expire_s, "3600"); // 60 minutes

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
#endif
_load_channel_mgr = new LoadChannelMgr();
_load_stream_mgr = new LoadStreamMgr();
_brpc_stub_cache = new BrpcStubCache();
_brpc_stub_cache = new BrpcStubCache(this);
_stream_load_executor = new StreamLoadExecutor(this);
_stream_context_mgr = new StreamContextMgr();
_transaction_mgr = new TransactionMgr(this);
Expand Down
165 changes: 137 additions & 28 deletions be/src/util/brpc_stub_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
#include "common/config.h"
#include "gen_cpp/internal_service.pb.h"
#include "gen_cpp/lake_service.pb.h"
#include "runtime/exec_env.h"
#include "util/failpoint/fail_point.h"
#include "util/starrocks_metrics.h"

namespace starrocks {

BrpcStubCache::BrpcStubCache() {
BrpcStubCache::BrpcStubCache(ExecEnv* exec_env) : _pipeline_timer(exec_env->pipeline_timer()) {
_stub_map.init(239);
REGISTER_GAUGE_STARROCKS_METRIC(brpc_endpoint_stub_count, [this]() {
std::lock_guard<SpinLock> l(_lock);
Expand All @@ -31,19 +32,41 @@ BrpcStubCache::BrpcStubCache() {
}

BrpcStubCache::~BrpcStubCache() {
for (auto& stub : _stub_map) {
delete stub.second;
std::vector<std::shared_ptr<StubPool>> pools_to_cleanup;
{
std::lock_guard<SpinLock> l(_lock);

for (auto& stub : _stub_map) {
pools_to_cleanup.push_back(stub.second);
}
}

for (auto& pool : pools_to_cleanup) {
pool->_cleanup_task->unschedule(_pipeline_timer);
}

_stub_map.clear();
}

std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

auto stub_pool = _stub_map.seek(endpoint);
if (stub_pool == nullptr) {
StubPool* pool = new StubPool();
_stub_map.insert(endpoint, pool);
return pool->get_or_create(endpoint);
auto new_pool = std::make_shared<StubPool>();
new_pool->_cleanup_task = new EndpointCleanupTask<BrpcStubCache>(this, endpoint);
_stub_map.insert(endpoint, new_pool);
stub_pool = _stub_map.seek(endpoint);
}

if (_pipeline_timer->unschedule((*stub_pool)->_cleanup_task) != TIMER_TASK_RUNNING) {
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
auto status = _pipeline_timer->schedule((*stub_pool)->_cleanup_task, tm);
if (!status.ok()) {
LOG(WARNING) << "Failed to schedule brpc cleanup task: " << endpoint;
}
}

return (*stub_pool)->get_or_create(endpoint);
}

Expand Down Expand Up @@ -71,10 +94,22 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const
return get_stub(endpoint);
}

void BrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

LOG(INFO) << "cleanup brpc stub, endpoint:" << endpoint;
_stub_map.erase(endpoint);
}

BrpcStubCache::StubPool::StubPool() : _idx(-1) {
_stubs.reserve(config::brpc_max_connections_per_server);
}

BrpcStubCache::StubPool::~StubPool() {
_stubs.clear();
SAFE_DELETE(_cleanup_task);
}

std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::StubPool::get_or_create(
const butil::EndPoint& endpoint) {
if (UNLIKELY(_stubs.size() < config::brpc_max_connections_per_server)) {
Expand All @@ -98,6 +133,24 @@ HttpBrpcStubCache* HttpBrpcStubCache::getInstance() {

HttpBrpcStubCache::HttpBrpcStubCache() {
_stub_map.init(500);
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
}

HttpBrpcStubCache::~HttpBrpcStubCache() {
std::vector<std::shared_ptr<EndpointCleanupTask<HttpBrpcStubCache>>> task_to_cleanup;

{
std::lock_guard<SpinLock> l(_lock);
for (auto& stub : _stub_map) {
task_to_cleanup.push_back(stub.second.second);
}
}

for (auto& task : task_to_cleanup) {
task->unschedule(_pipeline_timer);
}

_stub_map.clear();
}

StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::get_http_stub(
Expand All @@ -119,18 +172,37 @@ StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::g
}
// get is exist
std::lock_guard<SpinLock> l(_lock);
auto stub_ptr = _stub_map.seek(endpoint);
if (stub_ptr != nullptr) {
return *stub_ptr;

auto stub_pair_ptr = _stub_map.seek(endpoint);
if (stub_pair_ptr == nullptr) {
// create
auto new_task = std::make_shared<EndpointCleanupTask<HttpBrpcStubCache>>(this, endpoint);
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint, "http");
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init http brpc channel error on " + taddr.hostname + ":" +
std::to_string(taddr.port));
}
_stub_map.insert(endpoint, std::make_pair(stub, new_task));
stub_pair_ptr = _stub_map.seek(endpoint);
}
// create
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint, "http");
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" +
std::to_string(taddr.port));

// schedule clean up task
if (_pipeline_timer->unschedule((*stub_pair_ptr).second.get()) != TIMER_TASK_RUNNING) {
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm);
if (!status.ok()) {
LOG(WARNING) << "Failed to schedule http brpc cleanup task: " << endpoint;
}
}
_stub_map.insert(endpoint, stub);
return stub;

return (*stub_pair_ptr).first;
}

void HttpBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

LOG(INFO) << "cleanup http brpc stub, endpoint:" << endpoint;
_stub_map.erase(endpoint);
}

LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() {
Expand All @@ -140,6 +212,24 @@ LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() {

LakeServiceBrpcStubCache::LakeServiceBrpcStubCache() {
_stub_map.init(500);
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
}

LakeServiceBrpcStubCache::~LakeServiceBrpcStubCache() {
std::vector<std::shared_ptr<EndpointCleanupTask<LakeServiceBrpcStubCache>>> task_to_cleanup;

{
std::lock_guard<SpinLock> l(_lock);
for (auto& stub : _stub_map) {
task_to_cleanup.push_back(stub.second.second);
}
}

for (auto& task : task_to_cleanup) {
task->unschedule(_pipeline_timer);
}

_stub_map.clear();
}

DEFINE_FAIL_POINT(get_stub_return_nullptr);
Expand All @@ -158,18 +248,37 @@ StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> LakeServiceBrp
}
// get if exist
std::lock_guard<SpinLock> l(_lock);
auto stub_ptr = _stub_map.seek(endpoint);
FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_ptr = nullptr; });
if (stub_ptr != nullptr) {
return *stub_ptr;
}
// create
auto stub = std::make_shared<starrocks::LakeService_RecoverableStub>(endpoint, "");
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init brpc http channel error on " + host + ":" + std::to_string(port));
}
_stub_map.insert(endpoint, stub);
return stub;

auto stub_pair_ptr = _stub_map.seek(endpoint);
FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_pair_ptr = nullptr; });
if (stub_pair_ptr == nullptr) {
// create
auto stub = std::make_shared<starrocks::LakeService_RecoverableStub>(endpoint, "");
auto new_task = std::make_shared<EndpointCleanupTask<LakeServiceBrpcStubCache>>(this, endpoint);
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init lakeService brpc channel error on " + host + ":" + std::to_string(port));
}
_stub_map.insert(endpoint, std::make_pair(stub, new_task));
stub_pair_ptr = _stub_map.seek(endpoint);
}

// schedule clean up task
if (_pipeline_timer->unschedule((*stub_pair_ptr).second.get()) != TIMER_TASK_RUNNING) {
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm);
if (!status.ok()) {
LOG(WARNING) << "Failed to schedule lake brpc cleanup task: " << endpoint;
}
}

return (*stub_pair_ptr).first;
}

void LakeServiceBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

LOG(INFO) << "cleanup lake service brpc stub, endpoint:" << endpoint;
_stub_map.erase(endpoint);
}

} // namespace starrocks
38 changes: 34 additions & 4 deletions be/src/util/brpc_stub_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <vector>

#include "common/statusor.h"
#include "exec/pipeline/schedule/pipeline_timer.h"
#include "gen_cpp/Types_types.h" // TNetworkAddress
#include "service/brpc.h"
#include "util/internal_service_recoverable_stub.h"
Expand All @@ -48,54 +49,83 @@

namespace starrocks {

constexpr int TIMER_TASK_RUNNING = 1;

class ExecEnv;

template <typename StubCacheT>
class EndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
public:
EndpointCleanupTask(StubCacheT* cache, const butil::EndPoint& endpoint) : _cache(cache), _endpoint(endpoint){};
void Run() override { _cache->cleanup_expired(_endpoint); }

private:
StubCacheT* _cache;
butil::EndPoint _endpoint;
};

class BrpcStubCache {
public:
BrpcStubCache();
BrpcStubCache(ExecEnv* exec_env);
~BrpcStubCache();

std::shared_ptr<PInternalService_RecoverableStub> get_stub(const butil::EndPoint& endpoint);
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const TNetworkAddress& taddr);
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const std::string& host, int port);
void cleanup_expired(const butil::EndPoint& endpoint);

private:
struct StubPool {
StubPool();
~StubPool();
std::shared_ptr<PInternalService_RecoverableStub> get_or_create(const butil::EndPoint& endpoint);

std::vector<std::shared_ptr<PInternalService_RecoverableStub>> _stubs;
int64_t _idx;
EndpointCleanupTask<BrpcStubCache>* _cleanup_task = nullptr;
};

SpinLock _lock;
butil::FlatMap<butil::EndPoint, StubPool*> _stub_map;
butil::FlatMap<butil::EndPoint, std::shared_ptr<StubPool>> _stub_map;
pipeline::PipelineTimer* _pipeline_timer;
};

class HttpBrpcStubCache {
public:
static HttpBrpcStubCache* getInstance();
StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> get_http_stub(const TNetworkAddress& taddr);
void cleanup_expired(const butil::EndPoint& endpoint);

private:
HttpBrpcStubCache();
HttpBrpcStubCache(const HttpBrpcStubCache&) = delete;
HttpBrpcStubCache& operator=(const HttpBrpcStubCache&) = delete;
~HttpBrpcStubCache();

SpinLock _lock;
butil::FlatMap<butil::EndPoint, std::shared_ptr<PInternalService_RecoverableStub>> _stub_map;
butil::FlatMap<butil::EndPoint, std::pair<std::shared_ptr<PInternalService_RecoverableStub>,
std::shared_ptr<EndpointCleanupTask<HttpBrpcStubCache>>>>
_stub_map;
pipeline::PipelineTimer* _pipeline_timer;
};

class LakeServiceBrpcStubCache {
public:
static LakeServiceBrpcStubCache* getInstance();
StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> get_stub(const std::string& host, int port);
void cleanup_expired(const butil::EndPoint& endpoint);

private:
LakeServiceBrpcStubCache();
LakeServiceBrpcStubCache(const LakeServiceBrpcStubCache&) = delete;
LakeServiceBrpcStubCache& operator=(const LakeServiceBrpcStubCache&) = delete;
~LakeServiceBrpcStubCache();

SpinLock _lock;
butil::FlatMap<butil::EndPoint, std::shared_ptr<LakeService_RecoverableStub>> _stub_map;
butil::FlatMap<butil::EndPoint, std::pair<std::shared_ptr<LakeService_RecoverableStub>,
std::shared_ptr<EndpointCleanupTask<LakeServiceBrpcStubCache>>>>
_stub_map;
pipeline::PipelineTimer* _pipeline_timer;
};

} // namespace starrocks
2 changes: 1 addition & 1 deletion be/test/http/stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class StreamLoadActionTest : public testing::Test {
config::streaming_load_max_mb = 1;

_env._load_stream_mgr = new LoadStreamMgr();
_env._brpc_stub_cache = new BrpcStubCache();
_env._brpc_stub_cache = new BrpcStubCache(&_env);
_env._stream_load_executor = new StreamLoadExecutor(&_env);

_evhttp_req = evhttp_request_new(nullptr, nullptr);
Expand Down
2 changes: 1 addition & 1 deletion be/test/http/transaction_stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TransactionStreamLoadActionTest : public testing::Test {
config::streaming_load_max_mb = 1;

_env._load_stream_mgr = new LoadStreamMgr();
_env._brpc_stub_cache = new BrpcStubCache();
_env._brpc_stub_cache = new BrpcStubCache(&_env);
_env._stream_load_executor = new StreamLoadExecutor(&_env);
_env._stream_context_mgr = new StreamContextMgr();
_env._transaction_mgr = new TransactionMgr(&_env);
Expand Down
Loading
Loading