diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 240cd82b7d5c..344bd2a8e93c 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -482,6 +482,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS } break; } + case ASTAlterCommand::EXPORT_PART: + { + required_access.emplace_back(AccessType::ALTER_MOVE_PARTITION, database, table); + required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); + break; + } case ASTAlterCommand::REPLACE_PARTITION: { required_access.emplace_back(AccessType::SELECT, command.from_database, command.from_table); diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 4d6d06e436d3..81c82ac780d8 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -352,6 +352,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << quoteString(move_destination_name); } } + else if (type == ASTAlterCommand::EXPORT_PART) + { + ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ") + << (settings.hilite ? hilite_none : ""); + partition->format(ostr, settings, state, frame); + ostr << " TO "; + switch (move_destination_type) + { + case DataDestinationType::TABLE: + ostr << "TABLE "; + if (!to_database.empty()) + { + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database) + << (settings.hilite ? hilite_none : "") << "."; + } + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table) + << (settings.hilite ? hilite_none : ""); + return; + default: + break; + } + + } else if (type == ASTAlterCommand::REPLACE_PARTITION) { ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION " diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index 66816c69ca82..820d659c037d 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST FREEZE_ALL, UNFREEZE_PARTITION, UNFREEZE_ALL, + EXPORT_PART, DELETE, UPDATE, diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index 62b5ca135cd8..b0a5153baa1c 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -328,6 +328,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ + MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 2139e3857c6e..d29903ebd50b 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION); ParserKeyword s_move_partition(Keyword::MOVE_PARTITION); ParserKeyword s_move_part(Keyword::MOVE_PART); + ParserKeyword s_export_part(Keyword::EXPORT_PARTITION); ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION); ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART); ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION); @@ -554,6 +555,23 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->move_destination_name = ast_space_name->as().value.safeGet(); } } + else if (s_export_part.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command_partition, expected)) + return false; + + command->type = ASTAlterCommand::EXPORT_PART; +// command->part = true; + + if (!s_to_table.ignore(pos, expected)) + { + return false; + } + + if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) + return false; + command->move_destination_type = DataDestinationType::TABLE; + } else if (s_add_constraint.ignore(pos, expected)) { if (s_if_not_exists.ignore(pos, expected)) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 59ab490d32f7..44f3ccd42545 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include #include #include @@ -48,8 +50,6 @@ #include #include #include -#include -#include #include #include #include @@ -57,37 +57,58 @@ #include #include #include +#include +#include #include +#include #include -#include -#include #include +#include +#include #include #include #include #include #include #include -#include #include #include #include #include -#include #include +#include #include #include #include +#include #include #include #include #include +#include +#include +#include #include #include -#include +#include +#include +#include +#include #include #include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -5684,6 +5705,12 @@ Pipe MergeTreeData::alterPartition( } break; + case PartitionCommand::EXPORT_PART: + { + exportPartitionToTable(command, query_context); + break; + } + case PartitionCommand::DROP_DETACHED_PARTITION: dropDetached(command.partition, command.part, query_context); break; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index be5132e6f7bb..4d1715eb867e 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -869,6 +869,8 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Moves partition to specified Table void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context); + virtual void exportPartitionToTable(const PartitionCommand &, ContextPtr) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "export not implemented");} + /// Checks that Partition could be dropped right now /// Otherwise - throws an exception with detailed information. /// We do not use mutex because it is not very important that the size could change during the operation. diff --git a/src/Storages/MergeTree/exportMTPartToStorage.cpp b/src/Storages/MergeTree/exportMTPartToStorage.cpp new file mode 100644 index 000000000000..1162c13ab4da --- /dev/null +++ b/src/Storages/MergeTree/exportMTPartToStorage.cpp @@ -0,0 +1,73 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +void exportMTPartToStorage(const MergeTreeData & source_data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context) +{ + auto metadata_snapshot = source_data.getInMemoryMetadataPtr(); + Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); + StorageSnapshotPtr storage_snapshot = source_data.getStorageSnapshot(metadata_snapshot, context); + + MergeTreeData::IMutationsSnapshot::Params params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = data_part->getMetadataVersion(), + }; + + auto mutations_snapshot = source_data.getMutationsSnapshot(params); + + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + data_part, + mutations_snapshot, + metadata_snapshot, + context); + + QueryPlan plan; + + // todoa arthur + MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Merge; + + bool apply_deleted_mask = true; + bool read_with_direct_io = false; + bool prefetch = false; + + createReadFromPartStep( + read_type, + plan, + source_data, + storage_snapshot, + data_part, + alter_conversions, + columns_to_read, + nullptr, + apply_deleted_mask, + std::nullopt, + read_with_direct_io, + prefetch, + context, + getLogger("abcde")); + + auto pipeline_settings = BuildQueryPipelineSettings(context); + auto optimization_settings = QueryPlanOptimizationSettings(context); + auto builder = plan.buildQueryPipeline(optimization_settings, pipeline_settings); + + QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + + pipeline.complete(std::move(dst_storage_sink)); + + CompletedPipelineExecutor executor(pipeline); + executor.execute(); +} + +} diff --git a/src/Storages/MergeTree/exportMTPartToStorage.h b/src/Storages/MergeTree/exportMTPartToStorage.h new file mode 100644 index 000000000000..28f08540ab03 --- /dev/null +++ b/src/Storages/MergeTree/exportMTPartToStorage.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +void exportMTPartToStorage(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context); + +} diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index bafc5a8d95db..69e8e8028c95 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -124,7 +124,7 @@ StorageObjectStorage::QuerySettings StorageS3Configuration::getQuerySettings(con const auto & settings = context->getSettingsRef(); return StorageObjectStorage::QuerySettings{ .truncate_on_insert = settings[Setting::s3_truncate_on_insert], - .create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert], + . create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert], .schema_inference_use_cache = settings[Setting::schema_inference_use_cache_for_s3], .schema_inference_mode = settings[Setting::schema_inference_mode], .skip_empty_files = settings[Setting::s3_skip_empty_files], diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index eea47cbf9044..87f571f4a78a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -421,6 +421,7 @@ SinkToStoragePtr StorageObjectStorage::write( configuration->getPath()); } + // todo arthur continue from here if (configuration->withGlobsIgnorePartitionWildcard()) { throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index 41cc03552458..0d86bdc5802f 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -52,6 +52,17 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.part = command_ast->part; return res; } + if (command_ast->type == ASTAlterCommand::EXPORT_PART) + { + PartitionCommand res; + res.type = EXPORT_PART; + res.partition = command_ast->partition->clone(); + res.part = command_ast->part; + res.move_destination_type = PartitionCommand::MoveDestinationType::TABLE; + res.to_database = command_ast->to_database; + res.to_table = command_ast->to_table; + return res; + } if (command_ast->type == ASTAlterCommand::MOVE_PARTITION) { PartitionCommand res; diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index 917e510f24b4..15d2a7fb869f 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -33,6 +33,7 @@ struct PartitionCommand UNFREEZE_ALL_PARTITIONS, UNFREEZE_PARTITION, REPLACE_PARTITION, + EXPORT_PART, }; Type type = UNKNOWN; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 6a2e320fbf08..64c3885a8917 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -47,6 +47,8 @@ #include #include "Core/Names.h" #include +#include +#include namespace DB { @@ -2468,6 +2470,42 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const } } +/* + * For now, this function is meant to be used when exporting to different formats (i.e, the case where data needs to be re-encoded / serialized) + * For the cases where this is not necessary, there are way more optimal ways of doing that, such as hard links implemented by `movePartitionToTable` + * */ +void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) +{ + String dest_database = query_context->resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); + + /// The target table and the source table are the same. + if (dest_storage->getStorageID() == this->getStorageID()) + return; + + bool async_insert = areAsynchronousInsertsEnabled(); + + auto query = std::make_shared(); + + String partition_id = getPartitionIDFromQuery(command.partition, getContext()); + auto src_parts = getVisibleDataPartsVectorInPartition(getContext(), partition_id); + + if (src_parts.empty()) + { + return; + } + + auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]); + auto lock2 = dest_storage->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]); + auto merges_blocker = stopMergesAndWait(); + + for (const auto & data_part : src_parts) + { + auto sink = dest_storage->write(query, getInMemoryMetadataPtr(), getContext(), async_insert); + exportMTPartToStorage(*this, data_part, sink, query_context); + } +} + ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge) diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index e0f512b54f9d..9eadab8920b5 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -249,6 +249,7 @@ class StorageMergeTree final : public MergeTreeData void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) override; void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) override; + void exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) override; bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; /// Update mutation entries after part mutation execution. May reset old /// errors if mutation was successful. Otherwise update last_failed* fields