Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "source/common/upstream/load_balancer_context_base.h"
#include "source/common/upstream/priority_conn_pool_map_impl.h"

#include "absl/hash/hash.h"
#include "absl/status/status.h"

#ifdef ENVOY_ENABLE_QUIC
Expand Down Expand Up @@ -1551,18 +1552,28 @@ ClusterManagerImpl::allocateOdCdsApi(OdCdsCreationFunction creation_function,
const envoy::config::core::v3::ConfigSource& odcds_config,
OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
ProtobufMessage::ValidationVisitor& validation_visitor) {
// TODO(krnowak): Instead of creating a new handle every time, store the handles internally and
// return an already existing one if the config or locator matches. Note that this may need a
// way to clean up the unused handles, so we can close the unnecessary connections.
// Generate a unique key for this subscription based on config and locator.
// This enables reuse of subscriptions with the same configuration.
uint64_t config_hash = MessageUtil::hash(odcds_config);
if (odcds_resources_locator.has_value()) {
config_hash = absl::HashOf(config_hash, MessageUtil::hash(*odcds_resources_locator));
}

auto it = odcds_subscriptions_.find(config_hash);
if (it != odcds_subscriptions_.end()) {
return OdCdsApiHandleImpl::create(*this, config_hash);
}

auto odcds_or_error =
creation_function(odcds_config, odcds_resources_locator, xds_manager_, *this, *this,
*stats_.rootScope(), validation_visitor, context_);
RETURN_IF_NOT_OK_REF(odcds_or_error.status());
return OdCdsApiHandleImpl::create(*this, std::move(*odcds_or_error));
odcds_subscriptions_.emplace(config_hash, std::move(*odcds_or_error));
return OdCdsApiHandleImpl::create(*this, config_hash);
}

ClusterDiscoveryCallbackHandlePtr
ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name,
ClusterManagerImpl::requestOnDemandClusterDiscovery(uint64_t subscription_key, std::string name,
ClusterDiscoveryCallbackPtr callback,
std::chrono::milliseconds timeout) {
ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
Expand Down Expand Up @@ -1591,9 +1602,20 @@ ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std
name);
// This seems to be the first request for discovery of this cluster in this worker thread. Rest
// of the process may only happen in the main thread.
dispatcher_.post([this, odcds = std::move(odcds), timeout, name = std::move(name),
dispatcher_.post([this, subscription_key, timeout, name = std::move(name),
invoker = std::move(invoker),
&thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] {
auto it = odcds_subscriptions_.find(subscription_key);
if (it == odcds_subscriptions_.end()) {
ENVOY_LOG(warn, "cm odcds: subscription with key {} not found, discovery request ignored",
subscription_key);
thread_local_dispatcher.post([invoker = std::move(invoker)] {
invoker.invokeCallback(ClusterDiscoveryStatus::Missing);
});
return;
}
OdCdsApiSharedPtr odcds = it->second;

// Check for the cluster here too. It might have been added between the time when this closure
// was posted and when it is being executed.
if (getThreadLocalCluster(name) != nullptr) {
Expand All @@ -1607,7 +1629,8 @@ ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std
return;
}

if (auto it = pending_cluster_creations_.find(name); it != pending_cluster_creations_.end()) {
if (auto pending_it = pending_cluster_creations_.find(name);
pending_it != pending_cluster_creations_.end()) {
ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name);
// We already began the discovery process for this cluster, nothing to do. If we got here,
// it means that it was other worker thread that requested the discovery.
Expand Down
22 changes: 12 additions & 10 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/cleanup.h"
#include "source/common/common/thread.h"
#include "source/common/http/async_client_impl.h"
#include "source/common/http/http_server_properties_cache_impl.h"
#include "source/common/http/http_server_properties_cache_manager_impl.h"
Expand Down Expand Up @@ -472,25 +473,23 @@ class ClusterManagerImpl : public ClusterManager,
*/
class OdCdsApiHandleImpl : public OdCdsApiHandle {
public:
static OdCdsApiHandlePtr create(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds) {
return std::make_unique<OdCdsApiHandleImpl>(parent, std::move(odcds));
static OdCdsApiHandlePtr create(ClusterManagerImpl& parent, uint64_t subscription_key) {
return std::make_unique<OdCdsApiHandleImpl>(parent, subscription_key);
}

OdCdsApiHandleImpl(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds)
: parent_(parent), odcds_(std::move(odcds)) {
ASSERT(odcds_ != nullptr);
}
OdCdsApiHandleImpl(ClusterManagerImpl& parent, uint64_t subscription_key)
: parent_(parent), subscription_key_(subscription_key) {}

ClusterDiscoveryCallbackHandlePtr
requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback,
std::chrono::milliseconds timeout) override {
return parent_.requestOnDemandClusterDiscovery(odcds_, std::string(name), std::move(callback),
timeout);
return parent_.requestOnDemandClusterDiscovery(subscription_key_, std::string(name),
std::move(callback), timeout);
}

private:
ClusterManagerImpl& parent_;
OdCdsApiSharedPtr odcds_;
uint64_t subscription_key_;
};

virtual void postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
Expand Down Expand Up @@ -896,14 +895,17 @@ class ClusterManagerImpl : public ClusterManager,
std::function<ConnectionPool::Instance*()> preconnect_pool);

ClusterDiscoveryCallbackHandlePtr
requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name,
requestOnDemandClusterDiscovery(uint64_t subscription_key, std::string name,
ClusterDiscoveryCallbackPtr callback,
std::chrono::milliseconds timeout);

void notifyClusterDiscoveryStatus(absl::string_view name, ClusterDiscoveryStatus status);

protected:
ClusterInitializationMap cluster_initialization_map_;
// Stores OdCDS API subscriptions keyed by config hash. Subscriptions are owned by
// ClusterManagerImpl (main thread) and handles only store the key to look them up.
absl::flat_hash_map<uint64_t, OdCdsApiSharedPtr> odcds_subscriptions_;

private:
/**
Expand Down
32 changes: 32 additions & 0 deletions test/common/upstream/odcd_test.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
#include <chrono>

#include "envoy/api/api.h"
#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/thread.h"
#include "source/common/config/xds_resource.h"

#include "test/common/upstream/cluster_manager_impl_test_common.h"
#include "test/mocks/upstream/od_cds_api.h"
#include "test/test_common/utility.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::Return;

namespace Envoy {
namespace Upstream {
namespace {
Expand Down Expand Up @@ -275,6 +280,33 @@ TEST_F(ODCDTest, TestMainThreadDiscoveryInProgressDetection) {
odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb2), timeout_);
}

// Test that destroying an OdCdsApiHandle from a worker thread does not cause SIGABRT.
// This simulates the scenario where a filter is removed during VHDS updates, causing
// the handle to be destroyed on a worker thread. Handles can be safely destroyed from
// any thread since they don't own subscriptions.
TEST_F(ODCDTest, TestDestroyHandleFromWorkerThread) {
auto handle_to_destroy = cluster_manager_->createOdCdsApiHandle(odcds_);

bool destruction_completed = false;
Api::ApiPtr api = Api::createApiForTest();
Event::DispatcherPtr worker_dispatcher(api->allocateDispatcher("test_worker_thread"));

Thread::ThreadPtr worker_thread = Thread::threadFactoryForTest().createThread(
[&handle_to_destroy, &destruction_completed, &worker_dispatcher]() {
Thread::SkipAsserts skip;

EXPECT_FALSE(Thread::MainThread::isMainThread());

handle_to_destroy.reset();
destruction_completed = true;

worker_dispatcher->run(Event::Dispatcher::RunType::NonBlock);
});

worker_thread->join();
EXPECT_TRUE(destruction_completed);
}

} // namespace
} // namespace Upstream
} // namespace Envoy
5 changes: 4 additions & 1 deletion test/common/upstream/test_cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ class TestClusterManagerImpl : public ClusterManagerImpl {
}

OdCdsApiHandlePtr createOdCdsApiHandle(OdCdsApiSharedPtr odcds) {
return ClusterManagerImpl::OdCdsApiHandleImpl::create(*this, std::move(odcds));
// For tests, generate a unique key based on the pointer address.
uint64_t subscription_key = reinterpret_cast<uint64_t>(odcds.get());
odcds_subscriptions_.emplace(subscription_key, odcds);
return ClusterManagerImpl::OdCdsApiHandleImpl::create(*this, subscription_key);
}

void notifyExpiredDiscovery(absl::string_view name) {
Expand Down
Loading