From 672bc2d4de052bd8ba7ce4187bd5cce05256f422 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Thu, 29 May 2025 10:05:43 +0200 Subject: [PATCH 1/2] Merge pull request #797 from Altinity/feature/antalya-25.3/rendezvous_hashing 25.3 Antalya port of #709, #760 - Rendezvous hashing --- src/Storages/IStorageCluster.cpp | 4 +-- src/Storages/IStorageCluster.h | 5 ++- .../StorageObjectStorageCluster.cpp | 20 +++++++++-- .../StorageObjectStorageCluster.h | 4 ++- ...rageObjectStorageStableTaskDistributor.cpp | 33 +++++++++++++++++-- ...torageObjectStorageStableTaskDistributor.h | 5 ++- src/Storages/StorageDistributed.cpp | 3 +- src/Storages/StorageFileCluster.cpp | 5 ++- src/Storages/StorageFileCluster.h | 5 ++- src/Storages/StorageReplicatedMergeTree.cpp | 3 +- src/Storages/StorageURLCluster.cpp | 5 ++- src/Storages/StorageURLCluster.h | 5 ++- 12 files changed, 78 insertions(+), 19 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 88f04e1e9c64..1209f3ae7d45 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -117,7 +117,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); + extension = storage->getTaskIteratorExtension(predicate, context, cluster); } /// The code executes on initiator @@ -196,7 +196,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const if (current_settings[Setting::max_parallel_replicas] > 1) max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value); - createExtension(nullptr, max_replicas_to_use); + createExtension(nullptr); for (const auto & shard_info : cluster->getShardsInfo()) { diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 6017613c7bea..e4ff87f9b5e4 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -35,7 +35,10 @@ class IStorageCluster : public IStorage ClusterPtr getCluster(ContextPtr context) const; /// Query is needed for pruning by virtual columns (_file, _path) - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0; + virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 87d32c0c0a44..c1d6e9545815 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -16,7 +16,6 @@ #include #include - namespace DB { namespace Setting @@ -177,13 +176,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const + const ActionsDAG::Node * predicate, + const ContextPtr & local_context, + ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, {}, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true); - auto task_distributor = std::make_shared(iterator, number_of_replicas); + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) + { + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + for (const auto & replica : shard.per_replica_pools) + { + if (!replica) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(replica->getAddress()); + } + } + + auto task_distributor = std::make_shared(iterator, ids_of_hosts); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 4f4d541008f1..319987122e45 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -24,7 +24,9 @@ class StorageObjectStorageCluster : public IStorageCluster std::string getName() const override; RemoteQueryExecutor::Extension getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr cluster) const override; String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 55ad2202ea94..d9ca7b344637 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -8,9 +8,10 @@ namespace DB StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - size_t number_of_replicas_) + std::vector ids_of_nodes_) : iterator(std::move(iterator_)) - , connection_to_files(number_of_replicas_) + , connection_to_files(ids_of_nodes_.size()) + , ids_of_nodes(ids_of_nodes_) , iterator_exhausted(false) { } @@ -37,13 +38,39 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) { - return ConsistentHashing(sipHash64(file_path), connection_to_files.size()); + size_t nodes_count = ids_of_nodes.size(); + + /// Trivial case + if (nodes_count < 2) + return 0; + + /// Rendezvous hashing + size_t best_id = 0; + UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path); + for (size_t id = 1; id < nodes_count; ++id) + { + UInt64 weight = sipHash64(ids_of_nodes[id] + file_path); + if (weight > best_weight) + { + best_weight = weight; + best_id = id; + } + } + return best_id; } std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica) { std::lock_guard lock(mutex); + if (connection_to_files.size() <= number_of_current_replica) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Replica number {} is out of range. Expected range: [0, {})", + number_of_current_replica, + connection_to_files.size() + ); + auto & files = connection_to_files[number_of_current_replica]; while (!files.empty()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 46e805a59603..678ff4372f5f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -17,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor public: StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - size_t number_of_replicas_); + std::vector ids_of_nodes_); std::optional getNextTask(size_t number_of_current_replica); @@ -32,6 +33,8 @@ class StorageObjectStorageStableTaskDistributor std::vector> connection_to_files; std::unordered_set unprocessed_files; + std::vector ids_of_nodes; + std::mutex mutex; bool iterator_exhausted = false; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 817289167e68..33c1087696f5 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1300,8 +1300,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto number_of_replicas = static_cast(cluster->getShardsInfo().size()); - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index ce5e3daf6828..d70e92fa3b45 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -77,7 +77,10 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto ); } -RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const +RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index a19790219af4..49d39a24ceba 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster const ConstraintsDescription & constraints_); std::string getName() const override { return "FileCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 98920e164429..b4f11441ade9 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6089,8 +6089,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto number_of_replicas = static_cast(src_cluster->getShardsAddresses().size()); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, number_of_replicas); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index f9dec87247a0..7ba7f22c62e8 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -114,7 +114,10 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS ); } -RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t) const +RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 98dd1d3ece12..9bfbaffe30f8 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -30,7 +30,10 @@ class StorageURLCluster : public IStorageCluster const StorageURL::Configuration & configuration_); std::string getName() const override { return "URLCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + ClusterPtr) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; From 465a742297032e912806dc534e251ef879af87e9 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 17 Jul 2025 18:46:25 +0200 Subject: [PATCH 2/2] Fix tests --- src/Storages/IStorageCluster.cpp | 10 +++------- .../configs/named_collections.xml | 2 +- tests/integration/test_s3_cache_locality/test.py | 10 ++++++---- .../configs/config.d/named_collections.xml | 2 +- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 1209f3ae7d45..94feacde1998 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -93,7 +93,7 @@ class ReadFromCluster : public SourceStepWithFilter std::optional extension; - void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas); + void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); }; @@ -105,14 +105,10 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); - auto max_replicas_to_use = static_cast(cluster->getShardsInfo().size()); - if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1) - max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value); - - createExtension(predicate, max_replicas_to_use); + createExtension(predicate); } -void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas) +void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) { if (extension) return; diff --git a/tests/integration/test_s3_cache_locality/configs/named_collections.xml b/tests/integration/test_s3_cache_locality/configs/named_collections.xml index 511078d6f0d9..6994aa3f5e77 100644 --- a/tests/integration/test_s3_cache_locality/configs/named_collections.xml +++ b/tests/integration/test_s3_cache_locality/configs/named_collections.xml @@ -3,7 +3,7 @@ http://minio1:9001/root/data/* minio - minio123 + ClickHouse_Minio_P@ssw0rd CSV> diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index a2020d7e0568..c72755a90965 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -7,6 +7,8 @@ import pytest from helpers.cluster import ClickHouseCluster +from helpers.config_cluster import minio_secret_key + logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) @@ -81,7 +83,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, result_first = node.query( f""" SELECT count(*) - FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 SETTINGS enable_filesystem_cache={enable_filesystem_cache}, @@ -95,7 +97,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, result_second = node.query( f""" SELECT count(*) - FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 SETTINGS enable_filesystem_cache={enable_filesystem_cache}, @@ -148,9 +150,9 @@ def test_cache_locality(started_cluster): node = started_cluster.instances["clickhouse0"] expected_result = node.query( - """ + f""" SELECT count(*) - FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 """ ) diff --git a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml index 892665d3934d..77f9e7e4b17b 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml @@ -14,7 +14,7 @@ http://minio1:9001/root/ minio - minio123 + ClickHouse_Minio_P@ssw0rd s3