Skip to content

Restart cluster tasks on connection lost #780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: antalya-25.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Core/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ namespace Protocol
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
SSHChallenge = 18, /// Return challenge for SSH signature signing

MAX = SSHChallenge,

ConnectionLost = 255, /// Exception that occurred on the client side.
};

/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6130,6 +6130,9 @@ Limit for hosts used for request in object storage cluster table functions - azu
Possible values:
- Positive integer.
- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
Allow retries in cluster request, when one node goes offline
)", EXPERIMENTAL) \
\
/** Experimental tsToGrid aggregate function. */ \
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"iceberg_snapshot_id", 0, 0, "New setting."},
{"parallel_replicas_for_cluster_engines", false, true, "New setting."},
/// Release closed. Please use 25.4
{"use_object_storage_list_objects_cache", true, false, "New setting."},
{"allow_retries_in_cluster_requests", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "24.12.2.20000",
{
Expand Down
28 changes: 26 additions & 2 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace Setting
extern const SettingsOverflowMode timeout_overflow_mode;
extern const SettingsBool use_hedged_requests;
extern const SettingsBool push_external_roles_in_interserver_queries;
extern const SettingsBool allow_retries_in_cluster_requests;
}

namespace ErrorCodes
Expand Down Expand Up @@ -77,6 +78,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, extension(extension_)
, priority_func(priority_func_)
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
{
}

Expand Down Expand Up @@ -458,7 +460,8 @@ int RemoteQueryExecutor::sendQueryAsync()
read_context = std::make_unique<ReadContext>(
*this,
/*suspend_when_query_sent*/ true,
read_packet_type_separately);
read_packet_type_separately,
allow_retries_in_cluster_requests);

/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
/// because we can still be in process of sending scalars or external tables.
Expand Down Expand Up @@ -531,7 +534,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context = std::make_unique<ReadContext>(
*this,
/*suspend_when_query_sent*/ false,
read_packet_type_separately);
read_packet_type_separately,
allow_retries_in_cluster_requests);
recreate_read_context = false;
}

Expand Down Expand Up @@ -655,7 +659,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
/// We can actually return it, and the first call to RemoteQueryExecutor::read
/// will return earlier. We should consider doing it.
if (packet.block && (packet.block.rows() > 0))
{
if (extension && extension->replica_info)
replica_has_processed_data.insert(extension->replica_info->number_of_current_replica);
return ReadResult(adaptBlockStructure(packet.block, header));
}
break; /// If the block is empty - we will receive other packets before EndOfStream.

case Protocol::Server::Exception:
Expand Down Expand Up @@ -717,6 +725,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
case Protocol::Server::TimezoneUpdate:
break;

case Protocol::Server::ConnectionLost:
if (allow_retries_in_cluster_requests)
{
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
{
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
{
finished = true;
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
return ReadResult(Block{});
}
}
}
packet.exception->rethrow();
break;

default:
got_unknown_packet_from_replica = true;
throw Exception(
Expand Down
22 changes: 20 additions & 2 deletions src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,22 @@ class RemoteQueryExecutorReadContext;

class ParallelReplicasReadingCoordinator;

/// This is the same type as StorageS3Source::IteratorWrapper
using TaskIterator = std::function<String(size_t)>;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
};

class TaskIterator
{
public:
virtual ~TaskIterator() = default;
virtual bool supportRerunTask() const { return false; }
virtual void rescheduleTasksFromReplica(size_t /* number_of_current_replica */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rescheduleTasksFromReplica is not implemented");
}
virtual std::string operator()(size_t number_of_current_replica) const = 0;
};

/// This class allows one to launch queries on remote replicas of one shard and get results
class RemoteQueryExecutor
Expand Down Expand Up @@ -320,6 +334,10 @@ class RemoteQueryExecutor

const bool read_packet_type_separately = false;

const bool allow_retries_in_cluster_requests = false;

std::unordered_set<size_t> replica_has_processed_data;

/// Send all scalars to remote servers
void sendScalars();

Expand Down
33 changes: 24 additions & 9 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ namespace ErrorCodes
}

RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_)
RemoteQueryExecutor & executor_,
bool suspend_when_query_sent_,
bool read_packet_type_separately_,
bool allow_retries_in_cluster_requests_)
: AsyncTaskExecutor(std::make_unique<Task>(*this))
, executor(executor_)
, suspend_when_query_sent(suspend_when_query_sent_)
, read_packet_type_separately(read_packet_type_separately_)
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
{
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
Expand Down Expand Up @@ -54,18 +58,29 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
if (read_context.executor.needToSkipUnavailableShard())
return;

while (true)
try
{
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
while (true)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
suspend_callback();
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
}
catch (const Exception &)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you catch only network related exceptions? Or maybe the question is: is there an non-connection-loss related exceptionthat could be thrown?

Copy link
Author

@ianton-ru ianton-ru May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only network exceptions - may be.
Others - not sure, It replica sent an exception because data corrupted (it may be unpredictable error), other replica with the same data gets the same error. May be possible to reschedule task with some specific exceptions, but need to know this specific cases. I don't know right now. If we found something, we can add this later.

{
if (!read_context.allow_retries_in_cluster_requests)
throw;
read_context.packet.type = Protocol::Server::ConnectionLost;
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
suspend_callback();
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/QueryPipeline/RemoteQueryExecutorReadContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
{
public:
explicit RemoteQueryExecutorReadContext(
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_);
RemoteQueryExecutor & executor_,
bool suspend_when_query_sent_,
bool read_packet_type_separately_,
bool allow_retries_in_cluster_requests_);

~RemoteQueryExecutorReadContext() override;

Expand Down Expand Up @@ -108,6 +111,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
bool suspend_when_query_sent = false;
bool is_query_sent = false;
const bool read_packet_type_separately = false;
const bool allow_retries_in_cluster_requests = false;
};

}
Expand Down
29 changes: 23 additions & 6 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
args.insert(args.end(), object_storage_type_arg);
}

class TaskDistributor : public TaskIterator
{
public:
TaskDistributor(std::shared_ptr<IObjectIterator> iterator,
const std::vector<std::string> & ids_of_hosts)
: task_distributor(iterator, ids_of_hosts) {}
~TaskDistributor() override = default;
bool supportRerunTask() const override { return true; }
void rescheduleTasksFromReplica(size_t number_of_current_replica) override
{
task_distributor.rescheduleTasksFromReplica(number_of_current_replica);
}

std::string operator()(size_t number_of_current_replica) const override
{
return task_distributor.getNextTask(number_of_current_replica).value_or("");
}

private:
mutable StorageObjectStorageStableTaskDistributor task_distributor;
};

RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const std::optional<ActionsDAG> & filter_actions_dag,
Expand All @@ -386,12 +408,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
}
}

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String {
return task_distributor->getNextTask(number_of_current_replica).value_or("");
});
auto callback = std::make_shared<TaskDistributor>(iterator, ids_of_hosts);

return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
};

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::vector<std::string> ids_of_nodes_)
Expand All @@ -14,6 +20,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
, ids_of_nodes(ids_of_nodes_)
, iterator_exhausted(false)
{
size_t nodes = ids_of_nodes.size();
for (size_t i = 0; i < nodes; ++i)
replica_to_files_to_be_processed[i] = std::list<String>{};
}

std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
Expand All @@ -24,16 +33,27 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
number_of_current_replica
);

// 1. Check pre-queued files first
if (auto file = getPreQueuedFile(number_of_current_replica))
return file;
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);

// 1. Check pre-queued files first
std::optional<String> file = getPreQueuedFile(number_of_current_replica);
// 2. Try to find a matching file from the iterator
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
return file;

if (!file.has_value())
file = getMatchingFileFromIterator(number_of_current_replica);
// 3. Process unprocessed files if iterator is exhausted
return getAnyUnprocessedFile(number_of_current_replica);
if (!file.has_value())
file = getAnyUnprocessedFile(number_of_current_replica);

if (file.has_value())
processed_file_list_ptr->second.push_back(*file);

return file;
}

size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
Expand Down Expand Up @@ -179,4 +199,28 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
return std::nullopt;
}

void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica)
{
LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica);
std::lock_guard lock(mutex);

auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost already",
number_of_current_replica
);

if (replica_to_files_to_be_processed.size() < 2)
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"All replicas were marked as lost"
);

for (const auto & file_path : processed_file_list_ptr->second)
unprocessed_files.insert(file_path);
replica_to_files_to_be_processed.erase(number_of_current_replica);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
#include <unordered_set>
#include <unordered_map>
#include <list>
#include <vector>
#include <mutex>
#include <memory>
Expand All @@ -22,6 +24,9 @@ class StorageObjectStorageStableTaskDistributor

std::optional<String> getNextTask(size_t number_of_current_replica);

/// Insert objects back to unprocessed files
void rescheduleTasksFromReplica(size_t number_of_current_replica);

private:
size_t getReplicaForFile(const String & file_path);
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
Expand All @@ -34,6 +39,7 @@ class StorageObjectStorageStableTaskDistributor
std::unordered_set<String> unprocessed_files;

std::vector<std::string> ids_of_nodes;
std::unordered_map<size_t, std::list<String>> replica_to_files_to_be_processed;

std::mutex mutex;
bool iterator_exhausted = false;
Expand Down
Loading