diff --git a/be/src/common/config.h b/be/src/common/config.h index 9239a6319e83a0..2d568e2701aaa5 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -58,6 +58,9 @@ CONF_String(ssl_private_key_path, ""); // The max number of single connections maintained by the brpc client and each server. // These connections are created during the first few access and will be used thereafter CONF_Int32(brpc_max_connections_per_server, "1"); +// BRPC stub cache expire configurations +// The expire time of BRPC stub cache, default 60 minutes. +CONF_mInt32(brpc_stub_expire_s, "3600"); // 60 minutes // Declare a selection strategy for those servers have many ips. // Note that there should at most one ip match this list. diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index d4c769b871aad0..9ad2fb9ca78ad9 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -498,7 +498,7 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { #endif _load_channel_mgr = new LoadChannelMgr(); _load_stream_mgr = new LoadStreamMgr(); - _brpc_stub_cache = new BrpcStubCache(); + _brpc_stub_cache = new BrpcStubCache(this); _stream_load_executor = new StreamLoadExecutor(this); _stream_context_mgr = new StreamContextMgr(); _transaction_mgr = new TransactionMgr(this); diff --git a/be/src/util/brpc_stub_cache.cpp b/be/src/util/brpc_stub_cache.cpp index d368fae1640fb3..f6dddf8c7edcd3 100644 --- a/be/src/util/brpc_stub_cache.cpp +++ b/be/src/util/brpc_stub_cache.cpp @@ -17,12 +17,13 @@ #include "common/config.h" #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/lake_service.pb.h" +#include "runtime/exec_env.h" #include "util/failpoint/fail_point.h" #include "util/starrocks_metrics.h" namespace starrocks { -BrpcStubCache::BrpcStubCache() { +BrpcStubCache::BrpcStubCache(ExecEnv* exec_env) : _pipeline_timer(exec_env->pipeline_timer()) { _stub_map.init(239); REGISTER_GAUGE_STARROCKS_METRIC(brpc_endpoint_stub_count, [this]() { std::lock_guard l(_lock); @@ -31,19 +32,41 @@ BrpcStubCache::BrpcStubCache() { } BrpcStubCache::~BrpcStubCache() { - for (auto& stub : _stub_map) { - delete stub.second; + std::vector> pools_to_cleanup; + { + std::lock_guard l(_lock); + + for (auto& stub : _stub_map) { + pools_to_cleanup.push_back(stub.second); + } + } + + for (auto& pool : pools_to_cleanup) { + pool->_cleanup_task->unschedule(_pipeline_timer); } + + _stub_map.clear(); } std::shared_ptr BrpcStubCache::get_stub(const butil::EndPoint& endpoint) { std::lock_guard l(_lock); + auto stub_pool = _stub_map.seek(endpoint); if (stub_pool == nullptr) { - StubPool* pool = new StubPool(); - _stub_map.insert(endpoint, pool); - return pool->get_or_create(endpoint); + auto new_pool = std::make_shared(); + new_pool->_cleanup_task = new EndpointCleanupTask(this, endpoint); + _stub_map.insert(endpoint, new_pool); + stub_pool = _stub_map.seek(endpoint); } + + if (_pipeline_timer->unschedule((*stub_pool)->_cleanup_task) != TIMER_TASK_RUNNING) { + timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s); + auto status = _pipeline_timer->schedule((*stub_pool)->_cleanup_task, tm); + if (!status.ok()) { + LOG(WARNING) << "Failed to schedule brpc cleanup task: " << endpoint; + } + } + return (*stub_pool)->get_or_create(endpoint); } @@ -71,10 +94,22 @@ std::shared_ptr BrpcStubCache::get_stub(const return get_stub(endpoint); } +void BrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) { + std::lock_guard l(_lock); + + LOG(INFO) << "cleanup brpc stub, endpoint:" << endpoint; + _stub_map.erase(endpoint); +} + BrpcStubCache::StubPool::StubPool() : _idx(-1) { _stubs.reserve(config::brpc_max_connections_per_server); } +BrpcStubCache::StubPool::~StubPool() { + _stubs.clear(); + SAFE_DELETE(_cleanup_task); +} + std::shared_ptr BrpcStubCache::StubPool::get_or_create( const butil::EndPoint& endpoint) { if (UNLIKELY(_stubs.size() < config::brpc_max_connections_per_server)) { @@ -98,6 +133,24 @@ HttpBrpcStubCache* HttpBrpcStubCache::getInstance() { HttpBrpcStubCache::HttpBrpcStubCache() { _stub_map.init(500); + _pipeline_timer = ExecEnv::GetInstance()->pipeline_timer(); +} + +HttpBrpcStubCache::~HttpBrpcStubCache() { + std::vector>> task_to_cleanup; + + { + std::lock_guard l(_lock); + for (auto& stub : _stub_map) { + task_to_cleanup.push_back(stub.second.second); + } + } + + for (auto& task : task_to_cleanup) { + task->unschedule(_pipeline_timer); + } + + _stub_map.clear(); } StatusOr> HttpBrpcStubCache::get_http_stub( @@ -119,18 +172,37 @@ StatusOr> HttpBrpcStubCache::g } // get is exist std::lock_guard l(_lock); - auto stub_ptr = _stub_map.seek(endpoint); - if (stub_ptr != nullptr) { - return *stub_ptr; + + auto stub_pair_ptr = _stub_map.seek(endpoint); + if (stub_pair_ptr == nullptr) { + // create + auto new_task = std::make_shared>(this, endpoint); + auto stub = std::make_shared(endpoint, "http"); + if (!stub->reset_channel().ok()) { + return Status::RuntimeError("init http brpc channel error on " + taddr.hostname + ":" + + std::to_string(taddr.port)); + } + _stub_map.insert(endpoint, std::make_pair(stub, new_task)); + stub_pair_ptr = _stub_map.seek(endpoint); } - // create - auto stub = std::make_shared(endpoint, "http"); - if (!stub->reset_channel().ok()) { - return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" + - std::to_string(taddr.port)); + + // schedule clean up task + if (_pipeline_timer->unschedule((*stub_pair_ptr).second.get()) != TIMER_TASK_RUNNING) { + timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s); + auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm); + if (!status.ok()) { + LOG(WARNING) << "Failed to schedule http brpc cleanup task: " << endpoint; + } } - _stub_map.insert(endpoint, stub); - return stub; + + return (*stub_pair_ptr).first; +} + +void HttpBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) { + std::lock_guard l(_lock); + + LOG(INFO) << "cleanup http brpc stub, endpoint:" << endpoint; + _stub_map.erase(endpoint); } LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() { @@ -140,6 +212,24 @@ LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() { LakeServiceBrpcStubCache::LakeServiceBrpcStubCache() { _stub_map.init(500); + _pipeline_timer = ExecEnv::GetInstance()->pipeline_timer(); +} + +LakeServiceBrpcStubCache::~LakeServiceBrpcStubCache() { + std::vector>> task_to_cleanup; + + { + std::lock_guard l(_lock); + for (auto& stub : _stub_map) { + task_to_cleanup.push_back(stub.second.second); + } + } + + for (auto& task : task_to_cleanup) { + task->unschedule(_pipeline_timer); + } + + _stub_map.clear(); } DEFINE_FAIL_POINT(get_stub_return_nullptr); @@ -158,18 +248,37 @@ StatusOr> LakeServiceBrp } // get if exist std::lock_guard l(_lock); - auto stub_ptr = _stub_map.seek(endpoint); - FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_ptr = nullptr; }); - if (stub_ptr != nullptr) { - return *stub_ptr; - } - // create - auto stub = std::make_shared(endpoint, ""); - if (!stub->reset_channel().ok()) { - return Status::RuntimeError("init brpc http channel error on " + host + ":" + std::to_string(port)); - } - _stub_map.insert(endpoint, stub); - return stub; + + auto stub_pair_ptr = _stub_map.seek(endpoint); + FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_pair_ptr = nullptr; }); + if (stub_pair_ptr == nullptr) { + // create + auto stub = std::make_shared(endpoint, ""); + auto new_task = std::make_shared>(this, endpoint); + if (!stub->reset_channel().ok()) { + return Status::RuntimeError("init lakeService brpc channel error on " + host + ":" + std::to_string(port)); + } + _stub_map.insert(endpoint, std::make_pair(stub, new_task)); + stub_pair_ptr = _stub_map.seek(endpoint); + } + + // schedule clean up task + if (_pipeline_timer->unschedule((*stub_pair_ptr).second.get()) != TIMER_TASK_RUNNING) { + timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s); + auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm); + if (!status.ok()) { + LOG(WARNING) << "Failed to schedule lake brpc cleanup task: " << endpoint; + } + } + + return (*stub_pair_ptr).first; +} + +void LakeServiceBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) { + std::lock_guard l(_lock); + + LOG(INFO) << "cleanup lake service brpc stub, endpoint:" << endpoint; + _stub_map.erase(endpoint); } } // namespace starrocks \ No newline at end of file diff --git a/be/src/util/brpc_stub_cache.h b/be/src/util/brpc_stub_cache.h index 664e95578e54f7..4659671b9855c1 100644 --- a/be/src/util/brpc_stub_cache.h +++ b/be/src/util/brpc_stub_cache.h @@ -39,6 +39,7 @@ #include #include "common/statusor.h" +#include "exec/pipeline/schedule/pipeline_timer.h" #include "gen_cpp/Types_types.h" // TNetworkAddress #include "service/brpc.h" #include "util/internal_service_recoverable_stub.h" @@ -48,54 +49,83 @@ namespace starrocks { +constexpr int TIMER_TASK_RUNNING = 1; + +class ExecEnv; + +template +class EndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask { +public: + EndpointCleanupTask(StubCacheT* cache, const butil::EndPoint& endpoint) : _cache(cache), _endpoint(endpoint){}; + void Run() override { _cache->cleanup_expired(_endpoint); } + +private: + StubCacheT* _cache; + butil::EndPoint _endpoint; +}; + class BrpcStubCache { public: - BrpcStubCache(); + BrpcStubCache(ExecEnv* exec_env); ~BrpcStubCache(); std::shared_ptr get_stub(const butil::EndPoint& endpoint); std::shared_ptr get_stub(const TNetworkAddress& taddr); std::shared_ptr get_stub(const std::string& host, int port); + void cleanup_expired(const butil::EndPoint& endpoint); private: struct StubPool { StubPool(); + ~StubPool(); std::shared_ptr get_or_create(const butil::EndPoint& endpoint); std::vector> _stubs; int64_t _idx; + EndpointCleanupTask* _cleanup_task = nullptr; }; SpinLock _lock; - butil::FlatMap _stub_map; + butil::FlatMap> _stub_map; + pipeline::PipelineTimer* _pipeline_timer; }; class HttpBrpcStubCache { public: static HttpBrpcStubCache* getInstance(); StatusOr> get_http_stub(const TNetworkAddress& taddr); + void cleanup_expired(const butil::EndPoint& endpoint); private: HttpBrpcStubCache(); HttpBrpcStubCache(const HttpBrpcStubCache&) = delete; HttpBrpcStubCache& operator=(const HttpBrpcStubCache&) = delete; + ~HttpBrpcStubCache(); SpinLock _lock; - butil::FlatMap> _stub_map; + butil::FlatMap, + std::shared_ptr>>> + _stub_map; + pipeline::PipelineTimer* _pipeline_timer; }; class LakeServiceBrpcStubCache { public: static LakeServiceBrpcStubCache* getInstance(); StatusOr> get_stub(const std::string& host, int port); + void cleanup_expired(const butil::EndPoint& endpoint); private: LakeServiceBrpcStubCache(); LakeServiceBrpcStubCache(const LakeServiceBrpcStubCache&) = delete; LakeServiceBrpcStubCache& operator=(const LakeServiceBrpcStubCache&) = delete; + ~LakeServiceBrpcStubCache(); SpinLock _lock; - butil::FlatMap> _stub_map; + butil::FlatMap, + std::shared_ptr>>> + _stub_map; + pipeline::PipelineTimer* _pipeline_timer; }; } // namespace starrocks diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index ee3a2a6d6ecb8f..757b5f3324fbcd 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -91,7 +91,7 @@ class StreamLoadActionTest : public testing::Test { config::streaming_load_max_mb = 1; _env._load_stream_mgr = new LoadStreamMgr(); - _env._brpc_stub_cache = new BrpcStubCache(); + _env._brpc_stub_cache = new BrpcStubCache(&_env); _env._stream_load_executor = new StreamLoadExecutor(&_env); _evhttp_req = evhttp_request_new(nullptr, nullptr); diff --git a/be/test/http/transaction_stream_load_test.cpp b/be/test/http/transaction_stream_load_test.cpp index 8a8473aca6c087..ec105d3a623742 100644 --- a/be/test/http/transaction_stream_load_test.cpp +++ b/be/test/http/transaction_stream_load_test.cpp @@ -68,7 +68,7 @@ class TransactionStreamLoadActionTest : public testing::Test { config::streaming_load_max_mb = 1; _env._load_stream_mgr = new LoadStreamMgr(); - _env._brpc_stub_cache = new BrpcStubCache(); + _env._brpc_stub_cache = new BrpcStubCache(&_env); _env._stream_load_executor = new StreamLoadExecutor(&_env); _env._stream_context_mgr = new StreamContextMgr(); _env._transaction_mgr = new TransactionMgr(&_env); diff --git a/be/test/util/brpc_stub_cache_test.cpp b/be/test/util/brpc_stub_cache_test.cpp index 4acb61a65d3ebd..bf456c3bbbfb8b 100644 --- a/be/test/util/brpc_stub_cache_test.cpp +++ b/be/test/util/brpc_stub_cache_test.cpp @@ -18,7 +18,9 @@ #include "util/brpc_stub_cache.h" #include +#include +#include "runtime/exec_env.h" #include "util/failpoint/fail_point.h" namespace starrocks { @@ -27,10 +29,22 @@ class BrpcStubCacheTest : public testing::Test { public: BrpcStubCacheTest() = default; ~BrpcStubCacheTest() override = default; + void SetUp() override { + _env._pipeline_timer = new pipeline::PipelineTimer(); + ASSERT_OK(_env._pipeline_timer->start()); + } + void TearDown() override { + delete _env._pipeline_timer; + _env._pipeline_timer = nullptr; + config::brpc_stub_expire_s = 3600; + } + +private: + ExecEnv _env; }; TEST_F(BrpcStubCacheTest, normal) { - BrpcStubCache cache; + BrpcStubCache cache(&_env); TNetworkAddress address; address.hostname = "127.0.0.1"; address.port = 123; @@ -46,7 +60,7 @@ TEST_F(BrpcStubCacheTest, normal) { } TEST_F(BrpcStubCacheTest, invalid) { - BrpcStubCache cache; + BrpcStubCache cache(&_env); TNetworkAddress address; address.hostname = "invalid.cm.invalid"; address.port = 123; @@ -55,7 +69,7 @@ TEST_F(BrpcStubCacheTest, invalid) { } TEST_F(BrpcStubCacheTest, reset) { - BrpcStubCache cache; + BrpcStubCache cache(&_env); TNetworkAddress address; address.hostname = "127.0.0.1"; address.port = 123; @@ -108,4 +122,53 @@ TEST_F(BrpcStubCacheTest, test_http_stub) { ASSERT_EQ(nullptr, *stub4); } +TEST_F(BrpcStubCacheTest, test_cleanup) { + config::brpc_stub_expire_s = 1; + BrpcStubCache cache(&_env); + TNetworkAddress address; + address.hostname = "127.0.0.1"; + address.port = 123; + auto stub1 = cache.get_stub(address); + ASSERT_NE(nullptr, stub1); + auto stub2 = cache.get_stub(address); + ASSERT_EQ(stub2, stub1); + + sleep(2); + auto stub3 = cache.get_stub(address); + ASSERT_NE(stub3, stub1); +} + +TEST_F(BrpcStubCacheTest, test_lake_cleanup) { + config::brpc_stub_expire_s = 1; + LakeServiceBrpcStubCache cache; + std::string hostname = "127.0.0.1"; + int32_t port = 123; + auto stub1 = cache.get_stub(hostname, port); + ASSERT_TRUE(stub1.ok()); + ASSERT_NE(nullptr, *stub1); + auto stub2 = cache.get_stub(hostname, port); + ASSERT_TRUE(stub1.ok()); + ASSERT_EQ(*stub2, *stub1); + + sleep(2); + auto stub3 = cache.get_stub(hostname, port); + ASSERT_NE(*stub3, *stub1); +} + +TEST_F(BrpcStubCacheTest, test_http_cleanup) { + config::brpc_stub_expire_s = 1; + HttpBrpcStubCache cache; + TNetworkAddress address; + address.hostname = "127.0.0.1"; + address.port = 123; + auto stub1 = cache.get_http_stub(address); + ASSERT_NE(nullptr, *stub1); + auto stub2 = cache.get_http_stub(address); + ASSERT_EQ(*stub2, *stub1); + + sleep(2); + auto stub3 = cache.get_http_stub(address); + ASSERT_NE(*stub3, *stub1); +} + } // namespace starrocks diff --git a/docs/en/administration/management/BE_configuration.md b/docs/en/administration/management/BE_configuration.md index 63ddfd2810b1a9..e0c40ca9c70a8f 100644 --- a/docs/en/administration/management/BE_configuration.md +++ b/docs/en/administration/management/BE_configuration.md @@ -256,6 +256,15 @@ curl http://:/varz - Description: The maximum body size of a bRPC. - Introduced in: - +##### brpc_stub_expire_s + +- Default: 3600 +- Type: Int +- Unit: Seconds +- Is mutable: Yes +- Description: The expire time of BRPC stub cache, default 60 minutes. +- Introduced in: - +