diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md
index adfac0d106e0..d1e80e33c0c4 100644
--- a/docs/en/sql-reference/statements/system.md
+++ b/docs/en/sql-reference/statements/system.md
@@ -205,6 +205,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]
Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)
+## PRESHUTDOWN {#preshutdown}
+
+
+
+Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).
+
## KILL {#kill}
Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)
diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp
index bbba16dbf61f..9f7d6a945dd3 100644
--- a/programs/server/Server.cpp
+++ b/programs/server/Server.cpp
@@ -2261,6 +2261,8 @@ try
}
+ global_context->startSwarmMode();
+
{
std::lock_guard lock(servers_lock);
/// We should start interserver communications before (and more important shutdown after) tables.
@@ -2689,6 +2691,8 @@ try
is_cancelled = true;
+ global_context->stopSwarmMode();
+
LOG_DEBUG(log, "Waiting for current connections to close.");
size_t current_connections = 0;
diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h
index 240406b58fc9..593df374b6cb 100644
--- a/src/Access/Common/AccessType.h
+++ b/src/Access/Common/AccessType.h
@@ -199,6 +199,7 @@ enum class AccessType : uint8_t
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
+ M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \
diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp
index cbbe7ac2dc08..104ab0b44795 100644
--- a/src/Common/CurrentMetrics.cpp
+++ b/src/Common/CurrentMetrics.cpp
@@ -384,6 +384,7 @@
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
\
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
+ M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)
diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp
index d537f4a3c975..85985fdab419 100644
--- a/src/Interpreters/ClusterDiscovery.cpp
+++ b/src/Interpreters/ClusterDiscovery.cpp
@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
cv.notify_one();
}
+ void wakeup()
+ {
+ std::unique_lock lk(mu);
+ any_need_update = true;
+ cv.notify_one();
+ }
+
private:
std::condition_variable cv;
std::mutex mu;
@@ -390,7 +397,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
return true;
};
- if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name))
+ if (!cluster_info.current_node_is_observer
+ && context->isSwarmModeEnabled()
+ && !contains(node_uuids, current_node_name))
{
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
registerInZk(zk, cluster_info);
@@ -454,12 +463,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
return;
}
+ if (!context->isSwarmModeEnabled())
+ {
+ LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
+ return;
+ }
+
LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);
zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
}
+void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
+{
+ if (info.current_node_is_observer)
+ return;
+
+ String node_path = getShardsListPath(info.zk_root) / current_node_name;
+ LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);
+
+ zk->remove(node_path);
+ LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
+}
+
void ClusterDiscovery::initialUpdate()
{
LOG_DEBUG(log, "Initializing");
@@ -505,6 +532,18 @@ void ClusterDiscovery::initialUpdate()
is_initialized = true;
}
+void ClusterDiscovery::registerAll()
+{
+ register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
+ clusters_to_update->wakeup();
+}
+
+void ClusterDiscovery::unregisterAll()
+{
+ register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
+ clusters_to_update->wakeup();
+}
+
void ClusterDiscovery::findDynamicClusters(
std::unordered_map & info,
std::unordered_set * unchanged_roots)
@@ -728,6 +767,27 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback)
{
up_to_date_callback();
}
+
+ RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);
+
+ if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
+ {
+ LOG_DEBUG(log, "Register in all dynamic clusters");
+ for (auto & [_, info] : clusters_info)
+ {
+ auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
+ registerInZk(zk, info);
+ }
+ }
+ else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
+ {
+ LOG_DEBUG(log, "Unregister in all dynamic clusters");
+ for (auto & [_, info] : clusters_info)
+ {
+ auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
+ unregisterFromZk(zk, info);
+ }
+ }
}
LOG_DEBUG(log, "Worker thread stopped");
return finished;
diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h
index c0e4af3b86f3..2d3bbe489f4e 100644
--- a/src/Interpreters/ClusterDiscovery.h
+++ b/src/Interpreters/ClusterDiscovery.h
@@ -38,6 +38,9 @@ class ClusterDiscovery
~ClusterDiscovery();
+ void registerAll();
+ void unregisterAll();
+
private:
struct NodeInfo
{
@@ -125,6 +128,7 @@ class ClusterDiscovery
void initialUpdate();
void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
+ void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
const String & zk_root,
@@ -207,6 +211,15 @@ class ClusterDiscovery
std::shared_ptr>> multicluster_discovery_paths;
MultiVersion::Version macros;
+
+ enum RegisterChangeFlag
+ {
+ RCF_NONE,
+ RCF_REGISTER_ALL,
+ RCF_UNREGISTER_ALL,
+ };
+
+ std::atomic register_change_flag = RegisterChangeFlag::RCF_NONE;
};
}
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index 5d8ab7d2d506..93cd4fe5dcfd 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -185,6 +185,7 @@ namespace CurrentMetrics
extern const Metric IcebergCatalogThreads;
extern const Metric IcebergCatalogThreadsActive;
extern const Metric IcebergCatalogThreadsScheduled;
+ extern const Metric IsSwarmModeEnabled;
}
@@ -566,6 +567,7 @@ struct ContextSharedPart : boost::noncopyable
std::map server_ports;
std::atomic shutdown_called = false;
+ std::atomic swarm_mode_enabled = true;
Stopwatch uptime_watch TSA_GUARDED_BY(mutex);
@@ -734,6 +736,8 @@ struct ContextSharedPart : boost::noncopyable
*/
void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
+ swarm_mode_enabled = false;
+ CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
bool is_shutdown_called = shutdown_called.exchange(true);
if (is_shutdown_called)
return;
@@ -4481,7 +4485,6 @@ std::shared_ptr Context::getCluster(const std::string & cluster_name) c
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
}
-
std::shared_ptr Context::tryGetCluster(const std::string & cluster_name) const
{
std::shared_ptr res = nullptr;
@@ -4500,6 +4503,21 @@ std::shared_ptr Context::tryGetCluster(const std::string & cluster_name
return res;
}
+void Context::unregisterInAutodiscoveryClusters()
+{
+ std::lock_guard lock(shared->clusters_mutex);
+ if (!shared->cluster_discovery)
+ return;
+ shared->cluster_discovery->unregisterAll();
+}
+
+void Context::registerInAutodiscoveryClusters()
+{
+ std::lock_guard lock(shared->clusters_mutex);
+ if (!shared->cluster_discovery)
+ return;
+ shared->cluster_discovery->registerAll();
+}
void Context::reloadClusterConfig() const
{
@@ -5350,12 +5368,35 @@ void Context::stopServers(const ServerType & server_type) const
shared->stop_servers_callback(server_type);
}
-
void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
shared->shutdown();
}
+bool Context::stopSwarmMode()
+{
+ bool expected_is_enabled = true;
+ bool is_stopped_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, false);
+ if (is_stopped_now)
+ CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
+ // return true if stop successful
+ return is_stopped_now;
+}
+
+bool Context::startSwarmMode()
+{
+ bool expected_is_enabled = false;
+ bool is_started_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, true);
+ if (is_started_now)
+ CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1);
+ // return true if start successful
+ return is_started_now;
+}
+
+bool Context::isSwarmModeEnabled() const
+{
+ return shared->swarm_mode_enabled;
+}
Context::ApplicationType Context::getApplicationType() const
{
diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h
index fb690ce948dd..af17d748a09d 100644
--- a/src/Interpreters/Context.h
+++ b/src/Interpreters/Context.h
@@ -1225,6 +1225,8 @@ class Context: public ContextData, public std::enable_shared_from_this
size_t getClustersVersion() const;
void startClusterDiscovery();
+ void registerInAutodiscoveryClusters();
+ void unregisterInAutodiscoveryClusters();
/// Sets custom cluster, but doesn't update configuration
void setCluster(const String & cluster_name, const std::shared_ptr & cluster);
@@ -1335,6 +1337,15 @@ class Context: public ContextData, public std::enable_shared_from_this
void shutdown();
+ /// Stop some works to allow graceful shutdown later.
+ /// Returns true if stop successful.
+ bool stopSwarmMode();
+ /// Resume some works if we change our mind.
+ /// Returns true if start successful.
+ bool startSwarmMode();
+ /// Return current swarm mode state.
+ bool isSwarmModeEnabled() const;
+
bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; }
diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp
index f53df3f65a28..0be33f916682 100644
--- a/src/Interpreters/InterpreterSystemQuery.cpp
+++ b/src/Interpreters/InterpreterSystemQuery.cpp
@@ -693,6 +693,20 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_MOVES:
startStopAction(ActionLocks::PartsMove, true);
break;
+ case Type::STOP_SWARM_MODE:
+ {
+ getContext()->checkAccess(AccessType::SYSTEM_SWARM);
+ if (getContext()->stopSwarmMode())
+ getContext()->unregisterInAutodiscoveryClusters();
+ break;
+ }
+ case Type::START_SWARM_MODE:
+ {
+ getContext()->checkAccess(AccessType::SYSTEM_SWARM);
+ if (getContext()->startSwarmMode())
+ getContext()->registerInAutodiscoveryClusters();
+ break;
+ }
case Type::STOP_FETCHES:
startStopAction(ActionLocks::PartsFetch, false);
break;
@@ -1564,6 +1578,12 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
break;
}
+ case Type::STOP_SWARM_MODE:
+ case Type::START_SWARM_MODE:
+ {
+ required_access.emplace_back(AccessType::SYSTEM_SWARM);
+ break;
+ }
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
{
diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp
index 7dee946ef409..da9073e33019 100644
--- a/src/Parsers/ASTSystemQuery.cpp
+++ b/src/Parsers/ASTSystemQuery.cpp
@@ -485,6 +485,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::DROP_PAGE_CACHE:
case Type::STOP_REPLICATED_DDL_QUERIES:
case Type::START_REPLICATED_DDL_QUERIES:
+ case Type::STOP_SWARM_MODE:
+ case Type::START_SWARM_MODE:
break;
case Type::UNKNOWN:
case Type::END:
diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h
index 15ebae72e1fe..052e3db3b208 100644
--- a/src/Parsers/ASTSystemQuery.h
+++ b/src/Parsers/ASTSystemQuery.h
@@ -82,6 +82,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
START_FETCHES,
STOP_MOVES,
START_MOVES,
+ STOP_SWARM_MODE,
+ START_SWARM_MODE,
STOP_REPLICATED_SENDS,
START_REPLICATED_SENDS,
STOP_REPLICATION_QUEUES,
diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp
index beb7c517662f..908c25c938f8 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.cpp
+++ b/src/QueryPipeline/RemoteQueryExecutor.cpp
@@ -984,6 +984,11 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback)
profile_info_callback = std::move(callback);
}
+bool RemoteQueryExecutor::skipUnavailableShards() const
+{
+ return context->getSettingsRef()[Setting::skip_unavailable_shards];
+}
+
bool RemoteQueryExecutor::needToSkipUnavailableShard() const
{
return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size());
diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h
index f3381828e84d..7641eccf553b 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.h
+++ b/src/QueryPipeline/RemoteQueryExecutor.h
@@ -220,6 +220,8 @@ class RemoteQueryExecutor
IConnections & getConnections() { return *connections; }
+ bool skipUnavailableShards() const;
+
bool needToSkipUnavailableShard() const;
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
index 9090d045daae..934909da40fd 100644
--- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
+++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
@@ -16,6 +16,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_FROM_SOCKET;
extern const int CANNOT_OPEN_FILE;
extern const int SOCKET_TIMEOUT;
+ extern const int ATTEMPT_TO_READ_AFTER_EOF;
}
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
@@ -56,16 +57,35 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
while (true)
{
- read_context.has_read_packet_part = PacketPart::None;
-
- if (read_context.read_packet_type_separately)
+ try
+ {
+ read_context.has_read_packet_part = PacketPart::None;
+
+ if (read_context.read_packet_type_separately)
+ {
+ read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
+ read_context.has_read_packet_part = PacketPart::Type;
+ suspend_callback();
+ }
+ read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
+ read_context.has_read_packet_part = PacketPart::Body;
+ if (read_context.packet.type == Protocol::Server::Data)
+ read_context.has_data_packets = true;
+ }
+ catch (const Exception & e)
{
- read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
- read_context.has_read_packet_part = PacketPart::Type;
- suspend_callback();
+ /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
+ /// If initiator did not process any data packets before, this fact can be ignored.
+ /// Unprocessed tasks will be executed on other nodes.
+ if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
+ && !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
+ {
+ read_context.has_read_packet_part = PacketPart::None;
+ }
+ else
+ throw;
}
- read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
- read_context.has_read_packet_part = PacketPart::Body;
+
suspend_callback();
}
}
diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h
index abde6cb93ef3..e306aa3b3e47 100644
--- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h
+++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h
@@ -85,6 +85,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
/// None -> Type -> Body -> None
/// None -> Body -> None
std::atomic has_read_packet_part = PacketPart::None;
+ std::atomic_bool has_data_packets = false;
Packet packet;
RemoteQueryExecutor & executor;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
index 814b99d78032..a3c5681ab770 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
@@ -139,7 +139,10 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator(
if (distributed_processing)
{
- auto distributed_iterator = std::make_unique(local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads]);
+ auto distributed_iterator = std::make_unique(
+ local_context->getReadTaskCallback(),
+ local_context->getSettingsRef()[Setting::max_threads],
+ local_context);
if (is_archive)
return std::make_shared(object_storage, configuration, std::move(distributed_iterator), local_context, nullptr);
@@ -958,9 +961,16 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc
}
StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
- const ReadTaskCallback & callback_, size_t max_threads_count)
- : callback(callback_)
+ const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_)
+ : WithContext(context_)
+ , callback(callback_)
{
+ if (!getContext()->isSwarmModeEnabled())
+ {
+ LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM MODE called, stop getting new tasks");
+ return;
+ }
+
ThreadPool pool(
CurrentMetrics::StorageObjectStorageThreads,
CurrentMetrics::StorageObjectStorageThreadsActive,
@@ -990,6 +1000,12 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
{
+ if (!getContext()->isSwarmModeEnabled())
+ {
+ LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM MODE called, stop getting new tasks");
+ return nullptr;
+ }
+
auto key = callback();
if (key.empty())
return nullptr;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h
index 146484154cf6..2cc15fd8f152 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h
@@ -146,10 +146,10 @@ class StorageObjectStorageSource : public SourceWithKeyCondition
void lazyInitialize();
};
-class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator
+class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, WithContext
{
public:
- ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count);
+ ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_);
ObjectInfoPtr next(size_t) override;
diff --git a/tests/integration/test_s3_cluster/data/graceful/part0.csv b/tests/integration/test_s3_cluster/data/graceful/part0.csv
new file mode 100644
index 000000000000..2a8ceabbea58
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part0.csv
@@ -0,0 +1 @@
+0,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part1.csv b/tests/integration/test_s3_cluster/data/graceful/part1.csv
new file mode 100644
index 000000000000..1950012fffd2
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part1.csv
@@ -0,0 +1 @@
+1,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part2.csv b/tests/integration/test_s3_cluster/data/graceful/part2.csv
new file mode 100644
index 000000000000..dc782d5adf9b
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part2.csv
@@ -0,0 +1 @@
+2,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part3.csv b/tests/integration/test_s3_cluster/data/graceful/part3.csv
new file mode 100644
index 000000000000..6e581549d23c
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part3.csv
@@ -0,0 +1 @@
+3,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part4.csv b/tests/integration/test_s3_cluster/data/graceful/part4.csv
new file mode 100644
index 000000000000..bb5a4d956c51
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part4.csv
@@ -0,0 +1 @@
+4,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part5.csv b/tests/integration/test_s3_cluster/data/graceful/part5.csv
new file mode 100644
index 000000000000..5cb2c6be144b
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part5.csv
@@ -0,0 +1 @@
+5,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part6.csv b/tests/integration/test_s3_cluster/data/graceful/part6.csv
new file mode 100644
index 000000000000..e2e2428d100d
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part6.csv
@@ -0,0 +1 @@
+6,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part7.csv b/tests/integration/test_s3_cluster/data/graceful/part7.csv
new file mode 100644
index 000000000000..3c819a315c20
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part7.csv
@@ -0,0 +1 @@
+7,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part8.csv b/tests/integration/test_s3_cluster/data/graceful/part8.csv
new file mode 100644
index 000000000000..72f39e512be3
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part8.csv
@@ -0,0 +1 @@
+8,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/part9.csv b/tests/integration/test_s3_cluster/data/graceful/part9.csv
new file mode 100644
index 000000000000..f288cb2051dd
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/part9.csv
@@ -0,0 +1 @@
+9,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/partA.csv b/tests/integration/test_s3_cluster/data/graceful/partA.csv
new file mode 100644
index 000000000000..da99f68ba784
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/partA.csv
@@ -0,0 +1 @@
+10,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/partB.csv b/tests/integration/test_s3_cluster/data/graceful/partB.csv
new file mode 100644
index 000000000000..46591e0be815
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/partB.csv
@@ -0,0 +1 @@
+11,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/partC.csv b/tests/integration/test_s3_cluster/data/graceful/partC.csv
new file mode 100644
index 000000000000..24af8010b5c6
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/partC.csv
@@ -0,0 +1 @@
+12,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/partD.csv b/tests/integration/test_s3_cluster/data/graceful/partD.csv
new file mode 100644
index 000000000000..0365a5024871
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/partD.csv
@@ -0,0 +1 @@
+13,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/partE.csv b/tests/integration/test_s3_cluster/data/graceful/partE.csv
new file mode 100644
index 000000000000..3143c0eed915
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/partE.csv
@@ -0,0 +1 @@
+14,"Foo"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/data/graceful/partF.csv b/tests/integration/test_s3_cluster/data/graceful/partF.csv
new file mode 100644
index 000000000000..d0306b9bb806
--- /dev/null
+++ b/tests/integration/test_s3_cluster/data/graceful/partF.csv
@@ -0,0 +1 @@
+15,"Bar"
\ No newline at end of file
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index bb9e0e4997bb..812106400e69 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -3,11 +3,13 @@
import os
import shutil
import uuid
-from email.errors import HeaderParseError
+import time
+import threading
import pytest
from helpers.cluster import ClickHouseCluster
+from helpers.client import QueryRuntimeException
from helpers.config_cluster import minio_secret_key
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import TSV
@@ -21,6 +23,22 @@
"data/clickhouse/part123.csv",
"data/database/part2.csv",
"data/database/partition675.csv",
+ "data/graceful/part0.csv",
+ "data/graceful/part1.csv",
+ "data/graceful/part2.csv",
+ "data/graceful/part3.csv",
+ "data/graceful/part4.csv",
+ "data/graceful/part5.csv",
+ "data/graceful/part6.csv",
+ "data/graceful/part7.csv",
+ "data/graceful/part8.csv",
+ "data/graceful/part9.csv",
+ "data/graceful/partA.csv",
+ "data/graceful/partB.csv",
+ "data/graceful/partC.csv",
+ "data/graceful/partD.csv",
+ "data/graceful/partE.csv",
+ "data/graceful/partF.csv",
]
@@ -76,6 +94,7 @@ def started_cluster():
macros={"replica": "replica1", "shard": "shard1"},
with_minio=True,
with_zookeeper=True,
+ stay_alive=True,
)
cluster.add_instance(
"s0_0_1",
@@ -83,6 +102,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
macros={"replica": "replica2", "shard": "shard1"},
with_zookeeper=True,
+ stay_alive=True,
)
cluster.add_instance(
"s0_1_0",
@@ -90,6 +110,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
macros={"replica": "replica1", "shard": "shard2"},
with_zookeeper=True,
+ stay_alive=True,
)
logging.info("Starting cluster...")
@@ -1121,3 +1142,63 @@ def test_cluster_hosts_limit(started_cluster):
"""
)
assert int(hosts_2) == 2
+
+
+def test_graceful_shutdown(started_cluster):
+ node = started_cluster.instances["s0_0_0"]
+ node_to_shutdown = started_cluster.instances["s0_1_0"]
+
+ expected = TSV("64\tBar\t8\n56\tFoo\t8\n")
+
+ num_lock = threading.Lock()
+ errors = 0
+
+ def query_cycle():
+ nonlocal errors
+ try:
+ i = 0
+ while i < 10:
+ i += 1
+ # Query time 3-4 seconds
+ # Processing single object 1-2 seconds
+ result = node.query(f"""
+ SELECT sum(value),name,sum(sleep(1)+1) as sleep FROM s3Cluster(
+ 'cluster_simple',
+ 'http://minio1:9001/root/data/graceful/*', 'minio', '{minio_secret_key}', 'CSV',
+ 'value UInt32, name String')
+ GROUP BY name
+ ORDER BY name
+ SETTINGS max_threads=2
+ """)
+ with num_lock:
+ if TSV(result) != expected:
+ errors += 1
+ if errors >= 1:
+ break
+ except QueryRuntimeException:
+ with num_lock:
+ errors += 1
+
+ threads = []
+
+ for _ in range(10):
+ thread = threading.Thread(target=query_cycle)
+ thread.start()
+ threads.append(thread)
+ time.sleep(0.2)
+
+ time.sleep(3)
+
+ node_to_shutdown.query("SYSTEM STOP SWARM MODE")
+
+ # enough time to complete processing of objects, started before "SYSTEM STOP SWARM MODE"
+ time.sleep(3)
+
+ node_to_shutdown.stop_clickhouse(kill=True)
+
+ for thread in threads:
+ thread.join()
+
+ node_to_shutdown.start_clickhouse()
+
+ assert errors == 0
diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference
index 691403456335..5779dccc9517 100644
--- a/tests/queries/0_stateless/01271_show_privileges.reference
+++ b/tests/queries/0_stateless/01271_show_privileges.reference
@@ -148,6 +148,7 @@ SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP MERGES','START M
SYSTEM TTL MERGES ['SYSTEM STOP TTL MERGES','SYSTEM START TTL MERGES','STOP TTL MERGES','START TTL MERGES'] TABLE SYSTEM
SYSTEM FETCHES ['SYSTEM STOP FETCHES','SYSTEM START FETCHES','STOP FETCHES','START FETCHES'] TABLE SYSTEM
SYSTEM MOVES ['SYSTEM STOP MOVES','SYSTEM START MOVES','STOP MOVES','START MOVES'] TABLE SYSTEM
+SYSTEM SWARM ['SYSTEM STOP SWARM MODE','SYSTEM START SWARM MODE','STOP SWARM MODE','START SWARM MODE'] GLOBAL SYSTEM
SYSTEM PULLING REPLICATION LOG ['SYSTEM STOP PULLING REPLICATION LOG','SYSTEM START PULLING REPLICATION LOG'] TABLE SYSTEM
SYSTEM CLEANUP ['SYSTEM STOP CLEANUP','SYSTEM START CLEANUP'] TABLE SYSTEM
SYSTEM VIEWS ['SYSTEM REFRESH VIEW','SYSTEM START VIEWS','SYSTEM STOP VIEWS','SYSTEM START VIEW','SYSTEM STOP VIEW','SYSTEM CANCEL VIEW','REFRESH VIEW','START VIEWS','STOP VIEWS','START VIEW','STOP VIEW','CANCEL VIEW'] VIEW SYSTEM