diff --git a/ci/jobs/scripts/check_style/check-settings-style b/ci/jobs/scripts/check_style/check-settings-style index 42c4df16d103..cb24b45cef8a 100755 --- a/ci/jobs/scripts/check_style/check-settings-style +++ b/ci/jobs/scripts/check_style/check-settings-style @@ -37,7 +37,8 @@ ALL_DECLARATION_FILES=" $ROOT_PATH/src/Storages/MySQL/MySQLSettings.cpp $ROOT_PATH/src/Storages/NATS/NATSSettings.cpp $ROOT_PATH/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp - $ROOT_PATH/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp + $ROOT_PATH/src/Storages/ObjectStorage/StorageObjectStorageSettings.h + $ROOT_PATH/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h $ROOT_PATH/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.cpp $ROOT_PATH/src/Storages/RabbitMQ/RabbitMQSettings.cpp $ROOT_PATH/src/Storages/RocksDB/RocksDBSettings.cpp diff --git a/src/Analyzer/Resolve/IdentifierResolver.cpp b/src/Analyzer/Resolve/IdentifierResolver.cpp index a756a51a6313..1bed3402fe66 100644 --- a/src/Analyzer/Resolve/IdentifierResolver.cpp +++ b/src/Analyzer/Resolve/IdentifierResolver.cpp @@ -186,10 +186,7 @@ IdentifierResolveResult IdentifierResolver::tryResolveTableIdentifierFromDatabas if (!storage) return {}; - if (storage->hasExternalDynamicMetadata()) - { - storage->updateExternalDynamicMetadata(context); - } + storage->updateExternalDynamicMetadataIfExists(context); if (!storage_lock) storage_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]); diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 1aff9006c305..3ee1a7d7a8bf 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6121,6 +6121,9 @@ Enable PRQL - an alternative to SQL. )", EXPERIMENTAL) \ DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"( Trigger processor to spill data into external storage adpatively. grace join is supported at present. +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"( +Allow experimental delta-kernel-rs implementation. )", EXPERIMENTAL) \ DECLARE(String, object_storage_cluster, "", R"( Cluster to make distributed requests to object storages with alternative syntax. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9fb07c7ef5b5..ab7e037056be 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -70,6 +70,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() { // Altinity Antalya modifications atop of 25.3 {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + {"allow_experimental_delta_kernel_rs", false, false, "New setting"}, }); addSettingsChanges(settings_changes_history, "25.2.1.20000", { diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 4eb3aeaa2d1d..580dbdcb3068 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -55,9 +55,9 @@ namespace Setting extern const SettingsBool allow_experimental_database_glue_catalog; extern const SettingsBool use_hive_partitioning; } -namespace StorageObjectStorageSetting +namespace DataLakeStorageSetting { - extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; + extern const DataLakeStorageSettingsString iceberg_metadata_file_path; } namespace ErrorCodes @@ -161,7 +161,9 @@ std::shared_ptr DatabaseDataLake::getCatalog() const return catalog_impl; } -std::shared_ptr DatabaseDataLake::getConfiguration(DatabaseDataLakeStorageType type) const +std::shared_ptr DatabaseDataLake::getConfiguration( + DatabaseDataLakeStorageType type, + DataLakeStorageSettingsPtr storage_settings) const { /// TODO: add tests for azure, local storage types. @@ -175,24 +177,24 @@ std::shared_ptr DatabaseDataLake::getConfig #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif #if USE_AZURE_BLOB_STORAGE case DB::DatabaseDataLakeStorageType::Azure: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif #if USE_HDFS case DB::DatabaseDataLakeStorageType::HDFS: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(); + return std::make_shared(storage_settings); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -204,7 +206,7 @@ std::shared_ptr DatabaseDataLake::getConfig /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(); + return std::make_shared(storage_settings); } #if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS default: @@ -221,12 +223,12 @@ std::shared_ptr DatabaseDataLake::getConfig #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(); + return std::make_shared(storage_settings); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -238,7 +240,7 @@ std::shared_ptr DatabaseDataLake::getConfig /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(); + return std::make_shared(storage_settings); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -253,12 +255,12 @@ std::shared_ptr DatabaseDataLake::getConfig #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(); + return std::make_shared(storage_settings); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -377,9 +379,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con storage_type = table_metadata.getStorageType(); } - const auto configuration = getConfiguration(storage_type); + auto storage_settings = std::make_shared(); + storage_settings->loadFromSettingsChanges(settings.allChanged()); - auto storage_settings = std::make_shared(); if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties(); table_specific_properties.has_value()) { @@ -394,9 +396,11 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con } } - (*storage_settings)[DB::StorageObjectStorageSetting::iceberg_metadata_file_path] = metadata_location; + (*storage_settings)[DB::DataLakeStorageSetting::iceberg_metadata_file_path] = metadata_location; } + const auto configuration = getConfiguration(storage_type, storage_settings); + /// HACK: Hacky-hack to enable lazy load ContextMutablePtr context_copy = Context::createCopy(context_); Settings settings_copy = context_copy->getSettingsCopy(); @@ -405,7 +409,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con /// with_table_structure = false: because there will be /// no table structure in table definition AST. - configuration->initialize(args, context_copy, /* with_table_structure */false, storage_settings); + configuration->initialize(args, context_copy, /* with_table_structure */false); auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value; diff --git a/src/Databases/DataLake/DatabaseDataLake.h b/src/Databases/DataLake/DatabaseDataLake.h index 3b3123dfb6f9..ee6ed0903168 100644 --- a/src/Databases/DataLake/DatabaseDataLake.h +++ b/src/Databases/DataLake/DatabaseDataLake.h @@ -68,7 +68,11 @@ class DatabaseDataLake final : public IDatabase, WithContext void validateSettings(); std::shared_ptr getCatalog() const; - std::shared_ptr getConfiguration(DatabaseDataLakeStorageType type) const; + + std::shared_ptr getConfiguration( + DatabaseDataLakeStorageType type, + DataLakeStorageSettingsPtr storage_settings) const; + std::string getStorageEndpointForTable(const DataLake::TableMetadata & table_metadata) const; diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index 412e96f865cf..345e8d575a1e 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace DB @@ -29,7 +30,8 @@ namespace ErrorCodes DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ - DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) + DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \ + LIST_OF_DATA_LAKE_STORAGE_SETTINGS(M, ALIAS) \ DECLARE_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) IMPLEMENT_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) @@ -89,4 +91,12 @@ void DatabaseDataLakeSettings::loadFromQuery(const ASTStorage & storage_def) } } +SettingsChanges DatabaseDataLakeSettings::allChanged() const +{ + SettingsChanges changes; + for (const auto & setting : impl->allChanged()) + changes.emplace_back(setting.getName(), setting.getValue()); + return changes; +} + } diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.h b/src/Databases/DataLake/DatabaseDataLakeSettings.h index bd4d83f0cf0a..e0adbf442269 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.h +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -19,7 +20,11 @@ class SettingsChanges; M(CLASS_NAME, Bool) \ M(CLASS_NAME, DatabaseDataLakeCatalogType) \ -DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseDataLakeSettings, DECLARE_SETTING_TRAIT) +#define LIST_OF_DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) + +LIST_OF_DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseDataLakeSettings, DECLARE_SETTING_TRAIT) struct DatabaseDataLakeSettings { @@ -34,6 +39,8 @@ struct DatabaseDataLakeSettings void applyChanges(const SettingsChanges & changes); + SettingsChanges allChanged() const; + private: std::unique_ptr impl; }; diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index c6883a83c097..f2ce80ae1549 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -53,6 +53,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("allow_not_comparable_types_in_order_by", 1); context->setSetting("allow_experimental_database_unity_catalog", 1); context->setSetting("allow_experimental_database_glue_catalog", 1); + context->setSetting("allow_experimental_delta_kernel_rs", 1); /// clickhouse-private settings context->setSetting("allow_experimental_shared_set_join", 1); diff --git a/src/Interpreters/InterpreterDescribeQuery.cpp b/src/Interpreters/InterpreterDescribeQuery.cpp index f1e299a42493..53ef032ea01c 100644 --- a/src/Interpreters/InterpreterDescribeQuery.cpp +++ b/src/Interpreters/InterpreterDescribeQuery.cpp @@ -177,13 +177,13 @@ void InterpreterDescribeQuery::fillColumnsFromTableFunction(const ASTTableExpres void InterpreterDescribeQuery::fillColumnsFromTable(const ASTTableExpression & table_expression) { - auto table_id = getContext()->resolveStorageID(table_expression.database_and_table_name); - getContext()->checkAccess(AccessType::SHOW_COLUMNS, table_id); - auto table = DatabaseCatalog::instance().getTable(table_id, getContext()); - if (table->hasExternalDynamicMetadata()) - { - table->updateExternalDynamicMetadata(getContext()); - } + auto query_context = getContext(); + auto table_id = query_context->resolveStorageID(table_expression.database_and_table_name); + query_context->checkAccess(AccessType::SHOW_COLUMNS, table_id); + + auto table = DatabaseCatalog::instance().getTable(table_id, query_context); + + table->updateExternalDynamicMetadataIfExists(query_context); auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings[Setting::lock_acquire_timeout]); diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 10011fa10cec..59ade6302a2f 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -568,11 +568,11 @@ InterpreterSelectQuery::InterpreterSelectQuery( if (storage) { - if (storage->hasExternalDynamicMetadata()) + if (storage->updateExternalDynamicMetadataIfExists(context)) { - storage->updateExternalDynamicMetadata(context); metadata_snapshot = storage->getInMemoryMetadataPtr(); } + table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]); table_id = storage->getStorageID(); diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index a8dd6549b27a..54113f0464bc 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -102,11 +102,6 @@ std::optional IStorage::tryLockForAlter(const std::ch return lock; } -void IStorage::updateExternalDynamicMetadata(ContextPtr) -{ - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateExternalDynamicMetadata is not supported by storage {}", getName()); -} - IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds & acquire_timeout) { auto lock = tryLockForAlter(acquire_timeout); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 276281592800..ab49c34ae54d 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -104,9 +104,6 @@ class IStorage : public std::enable_shared_from_this, public TypePromo /// Returns true if the storage is dictionary virtual bool isDictionary() const { return false; } - /// Returns true if the metadata of a table can be changed normally by other processes - virtual bool hasExternalDynamicMetadata() const { return false; } - /// Returns true if the storage supports queries with the SAMPLE section. virtual bool supportsSampling() const { return getInMemoryMetadataPtr()->hasSamplingKey(); } @@ -488,7 +485,8 @@ class IStorage : public std::enable_shared_from_this, public TypePromo virtual void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder); /// Updates metadata that can be changed by other processes - virtual void updateExternalDynamicMetadata(ContextPtr); + /// Return true if external metadata exists and was updated. + virtual bool updateExternalDynamicMetadataIfExists(ContextPtr /* context */) { return false; } /** Checks that alter commands can be applied to storage. For example, columns can be modified, * or primary key can be changes, etc. diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 0dedb5d388f5..eed624100e0c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -7,11 +7,11 @@ #include #include #include +#include #include #include #include #include -#include #include #include #include @@ -33,13 +33,13 @@ namespace DB namespace ErrorCodes { -extern const int FORMAT_VERSION_TOO_OLD; -extern const int LOGICAL_ERROR; + extern const int FORMAT_VERSION_TOO_OLD; + extern const int LOGICAL_ERROR; } -namespace StorageObjectStorageSetting +namespace DataLakeStorageSetting { -extern const StorageObjectStorageSettingsBool allow_dynamic_metadata_for_data_lakes; + extern DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes; } @@ -52,36 +52,47 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl public: using Configuration = StorageObjectStorage::Configuration; + explicit DataLakeConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} + bool isDataLakeConfiguration() const override { return true; } + const DataLakeStorageSettings & getDataLakeSettings() const override { return *settings; } + std::string getEngineName() const override { return DataLakeMetadata::name + BaseStorageConfiguration::getEngineName(); } - void update(ObjectStoragePtr object_storage, ContextPtr local_context) override + /// Returns true, if metadata is of the latest version, false if unknown. + bool update( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) override { - BaseStorageConfiguration::update(object_storage, local_context); + const bool updated_before = current_metadata != nullptr; + if (updated_before && if_not_updated_before) + return false; + + BaseStorageConfiguration::update( + object_storage, local_context, if_not_updated_before, check_consistent_with_previous_metadata); - bool existed = current_metadata != nullptr; + const bool changed = updateMetadataIfChanged(object_storage, local_context); + if (!changed) + return true; - if (updateMetadataObjectIfNeeded(object_storage, local_context)) + if (check_consistent_with_previous_metadata && hasExternalDynamicMetadata() && updated_before) { - if (hasExternalDynamicMetadata() && existed) - { - throw Exception( - ErrorCodes::FORMAT_VERSION_TOO_OLD, - "Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query."); - } + throw Exception( + ErrorCodes::FORMAT_VERSION_TOO_OLD, + "Metadata is not consinsent with the one which was used to infer table schema. " + "Please, retry the query."); } + return true; } std::optional tryGetTableStructureFromMetadata() const override { - if (!current_metadata) - return std::nullopt; - auto schema_from_metadata = current_metadata->getTableSchema(); - if (!schema_from_metadata.empty()) - { - return ColumnsDescription(std::move(schema_from_metadata)); - } + _assertInitialized(); + if (auto schema = current_metadata->getTableSchema(); !schema.empty()) + return ColumnsDescription(std::move(schema)); return std::nullopt; } @@ -97,40 +108,27 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl std::optional totalRows() override { - if (!current_metadata) - return {}; - + _assertInitialized(); return current_metadata->totalRows(); } std::shared_ptr getInitialSchemaByPath(const String & data_path) const override { - if (!current_metadata) - return {}; + _assertInitialized(); return current_metadata->getInitialSchemaByPath(data_path); } std::shared_ptr getSchemaTransformer(const String & data_path) const override { - if (!current_metadata) - return {}; + _assertInitialized(); return current_metadata->getSchemaTransformer(data_path); } bool hasExternalDynamicMetadata() override { - return BaseStorageConfiguration::getSettingsRef()[StorageObjectStorageSetting::allow_dynamic_metadata_for_data_lakes] - && current_metadata - && current_metadata->supportsExternalMetadataChange(); - } - - ColumnsDescription updateAndGetCurrentSchema( - ObjectStoragePtr object_storage, - ContextPtr context) override - { - BaseStorageConfiguration::update(object_storage, context); - updateMetadataObjectIfNeeded(object_storage, context); - return ColumnsDescription{current_metadata->getTableSchema()}; + _assertInitialized(); + return (*settings)[DataLakeStorageSetting::allow_dynamic_metadata_for_data_lakes] + && current_metadata->supportsSchemaEvolution(); } bool supportsFileIterator() const override { return true; } @@ -140,7 +138,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl IDataLakeMetadata::FileProgressCallback callback, size_t list_batch_size) override { - chassert(current_metadata); + _assertInitialized(); return current_metadata->iterate(filter_dag, callback, list_batch_size); } @@ -152,6 +150,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl #if USE_PARQUET && USE_AWS_S3 DeltaLakePartitionColumns getDeltaLakePartitionColumns() const { + _assertInitialized(); const auto * delta_lake_metadata = dynamic_cast(current_metadata.get()); if (delta_lake_metadata) return delta_lake_metadata->getPartitionColumns(); @@ -162,6 +161,13 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl private: DataLakeMetadataPtr current_metadata; LoggerPtr log = getLogger("DataLakeConfiguration"); + const DataLakeStorageSettingsPtr settings; + + void _assertInitialized() const + { + if (!current_metadata) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata is not initialized"); + } ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, @@ -170,7 +176,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl bool supports_subset_of_columns, ContextPtr local_context) override { - auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); if (!current_metadata) { current_metadata = DataLakeMetadata::create( @@ -178,55 +183,10 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl weak_from_this(), local_context); } - auto read_schema = current_metadata->getReadSchema(); - if (!read_schema.empty()) - { - /// There is a difference between "table schema" and "read schema". - /// "table schema" is a schema from data lake table metadata, - /// while "read schema" is a schema from data files. - /// In most cases they would be the same. - /// TODO: Try to hide this logic inside IDataLakeMetadata. - - const auto read_schema_names = read_schema.getNames(); - const auto table_schema_names = current_metadata->getTableSchema().getNames(); - chassert(read_schema_names.size() == table_schema_names.size()); - - if (read_schema_names != table_schema_names) - { - LOG_TEST(log, "Read schema: {}, table schema: {}, requested columns: {}", - fmt::join(read_schema_names, ", "), - fmt::join(table_schema_names, ", "), - fmt::join(info.requested_columns.getNames(), ", ")); - - auto column_name_mapping = [&]() - { - std::map result; - for (size_t i = 0; i < read_schema_names.size(); ++i) - result[table_schema_names[i]] = read_schema_names[i]; - return result; - }(); - - /// Go through requested columns and change column name - /// from table schema to column name from read schema. - - std::vector read_columns; - for (const auto & column_name : info.requested_columns) - { - const auto pos = info.format_header.getPositionByName(column_name.name); - auto column = info.format_header.getByPosition(pos); - column.name = column_name_mapping.at(column_name.name); - info.format_header.setColumn(pos, column); - - read_columns.emplace_back(column.name, column.type); - } - info.requested_columns = NamesAndTypesList(read_columns.begin(), read_columns.end()); - } - } - - return info; + return current_metadata->prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); } - bool updateMetadataObjectIfNeeded( + bool updateMetadataIfChanged( ObjectStoragePtr object_storage, ContextPtr context) { @@ -249,15 +209,11 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl weak_from_this(), context); - if (*current_metadata != *new_metadata) - { - current_metadata = std::move(new_metadata); - return true; - } - else - { + if (*current_metadata == *new_metadata) return false; - } + + current_metadata = std::move(new_metadata); + return true; } }; @@ -284,6 +240,9 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, friend class StorageObjectStorage::Configuration; public: + StorageIcebergConfiguration() = default; + explicit StorageIcebergConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} + ObjectStorageType getType() const override { return getImpl().getType(); } std::string getTypeName() const override { return getImpl().getTypeName(); } @@ -365,19 +324,30 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, return getImpl().iterate(filter_dag, callback, list_batch_size); } - void update(ObjectStoragePtr object_storage, ContextPtr local_context) override - { return getImpl().update(object_storage, local_context); } - void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context) override - { return getImpl().updateIfRequired(object_storage, local_context); } + bool update( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) override + { + return getImpl().update(object_storage, local_context, if_not_updated_before, check_consistent_with_previous_metadata); + } + bool updateIfRequired( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) override + { + return getImpl().updateIfRequired(object_storage, local_context, if_not_updated_before, check_consistent_with_previous_metadata); + } void initialize( ASTs & engine_args, ContextPtr local_context, - bool with_table_structure, - std::shared_ptr settings) override + bool with_table_structure) override { createDynamicConfiguration(engine_args, local_context); - getImpl().initialize(engine_args, local_context, with_table_structure, settings); + getImpl().initialize(engine_args, local_context, with_table_structure); } ASTPtr createArgsWithAccessData() const override @@ -480,7 +450,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, return getImpl().tryGetSamplePathFromMetadata(); } - virtual void assertInitialized() const override { return getImpl().assertInitialized(); } + void assertInitialized() const override { return getImpl().assertInitialized(); } private: @@ -506,21 +476,21 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, { # if USE_AWS_S3 case ObjectStorageType::S3: - impl = std::make_unique(); + impl = std::make_unique(settings); break; # endif # if USE_AZURE_BLOB_STORAGE case ObjectStorageType::Azure: - impl = std::make_unique(); + impl = std::make_unique(settings); break; # endif # if USE_HDFS case ObjectStorageType::HDFS: - impl = std::make_unique(); + impl = std::make_unique(settings); break; # endif case ObjectStorageType::Local: - impl = std::make_unique(); + impl = std::make_unique(settings); break; default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); @@ -528,6 +498,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, } std::shared_ptr impl; + DataLakeStorageSettingsPtr settings; }; #endif diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.cpp b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.cpp new file mode 100644 index 000000000000..07824c4ab9d4 --- /dev/null +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.cpp @@ -0,0 +1,74 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +DECLARE_SETTINGS_TRAITS(DataLakeStorageSettingsTraits, LIST_OF_DATA_LAKE_STORAGE_SETTINGS) +IMPLEMENT_SETTINGS_TRAITS(DataLakeStorageSettingsTraits, LIST_OF_DATA_LAKE_STORAGE_SETTINGS) + +struct DataLakeStorageSettingsImpl : public BaseSettings +{ +}; + +#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS, ...) \ + DataLakeStorageSettings##TYPE NAME = &DataLakeStorageSettingsImpl ::NAME; + +namespace DataLakeStorageSetting +{ +LIST_OF_DATA_LAKE_STORAGE_SETTINGS(INITIALIZE_SETTING_EXTERN, INITIALIZE_SETTING_EXTERN) +} + +#undef INITIALIZE_SETTING_EXTERN + +DataLakeStorageSettings::DataLakeStorageSettings() : impl(std::make_unique()) +{ +} + +DataLakeStorageSettings::DataLakeStorageSettings(const DataLakeStorageSettings & settings) + : impl(std::make_unique(*settings.impl)) +{ +} + +DataLakeStorageSettings::DataLakeStorageSettings(DataLakeStorageSettings && settings) noexcept + : impl(std::make_unique(std::move(*settings.impl))) +{ +} + + +DataLakeStorageSettings::~DataLakeStorageSettings() = default; + +STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(DataLakeStorageSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR) + + +void DataLakeStorageSettings::loadFromQuery(ASTSetQuery & settings_ast) +{ + impl->applyChanges(settings_ast.changes); +} + +Field DataLakeStorageSettings::get(const std::string & name) +{ + return impl->get(name); +} + +bool DataLakeStorageSettings::hasBuiltin(std::string_view name) +{ + return DataLakeStorageSettingsImpl::hasBuiltin(name); +} + +void DataLakeStorageSettings::loadFromSettingsChanges(const SettingsChanges & changes) +{ + for (const auto & [name, value] : changes) + { + if (impl->has(name)) + impl->set(name, value); + } +} + +} diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h new file mode 100644 index 000000000000..a87a0a43c935 --- /dev/null +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ +class ASTSetQuery; +struct DataLakeStorageSettingsImpl; +struct MutableColumnsAndConstraints; +class StorageObjectStorage; +class SettingsChanges; + +/// List of available types supported in DataLakeStorageSettingsSettings object +#define STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + M(CLASS_NAME, ArrowCompression) \ + M(CLASS_NAME, Bool) \ + M(CLASS_NAME, CapnProtoEnumComparingMode) \ + M(CLASS_NAME, Char) \ + M(CLASS_NAME, DateTimeInputFormat) \ + M(CLASS_NAME, DateTimeOutputFormat) \ + M(CLASS_NAME, DateTimeOverflowBehavior) \ + M(CLASS_NAME, Double) \ + M(CLASS_NAME, EscapingRule) \ + M(CLASS_NAME, Float) \ + M(CLASS_NAME, IdentifierQuotingRule) \ + M(CLASS_NAME, IdentifierQuotingStyle) \ + M(CLASS_NAME, Int64) \ + M(CLASS_NAME, IntervalOutputFormat) \ + M(CLASS_NAME, MsgPackUUIDRepresentation) \ + M(CLASS_NAME, ORCCompression) \ + M(CLASS_NAME, ParquetCompression) \ + M(CLASS_NAME, ParquetVersion) \ + M(CLASS_NAME, SchemaInferenceMode) \ + M(CLASS_NAME, String) \ + M(CLASS_NAME, UInt32) \ + M(CLASS_NAME, UInt64) \ + M(CLASS_NAME, NonZeroUInt64) \ + M(CLASS_NAME, UInt64Auto) \ + M(CLASS_NAME, URI) + +// clang-format off + +#define DATA_LAKE_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \ + DECLARE(Bool, allow_dynamic_metadata_for_data_lakes, false, R"( +If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query. +)", 0) \ + DECLARE(String, iceberg_metadata_file_path, "", R"( +Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only. +)", 0) \ + DECLARE(String, iceberg_metadata_table_uuid, "", R"( +Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path is set. +)", 0) \ + DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"( +If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path. +)", 0) \ + DECLARE(Bool, iceberg_use_version_hint, false, R"( +Get latest metadata path from version-hint.text file. +)", 0) \ + +#define OBSOLETE_SETTINGS(M, ALIAS) \ + MAKE_OBSOLETE(M, Bool, allow_experimental_delta_kernel_rs, true) \ + MAKE_OBSOLETE(M, Bool, delta_lake_read_schema_same_as_table_schema, false) + +// clang-format on + +STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(DataLakeStorageSettings, DECLARE_SETTING_TRAIT) + +struct DataLakeStorageSettings +{ + DataLakeStorageSettings(); + DataLakeStorageSettings(const DataLakeStorageSettings & settings); + DataLakeStorageSettings(DataLakeStorageSettings && settings) noexcept; + ~DataLakeStorageSettings(); + + STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(DataLakeStorageSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR) + + void loadFromQuery(ASTSetQuery & settings_ast); + + void loadFromSettingsChanges(const SettingsChanges & changes); + + Field get(const std::string & name); + + static bool hasBuiltin(std::string_view name); + +private: + std::unique_ptr impl; +}; + +using DataLakeStorageSettingsPtr = std::shared_ptr; + +#define LIST_OF_DATA_LAKE_STORAGE_SETTINGS(M, ALIAS) \ + DATA_LAKE_STORAGE_RELATED_SETTINGS(M, ALIAS) \ + OBSOLETE_SETTINGS(M, ALIAS) \ + LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) + +} diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp index fb791b83dded..2b8699d86605 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp @@ -64,6 +64,7 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator Iterator( const KernelExternEngine & engine_, const KernelSnapshot & snapshot_, + KernelScan & scan_, const std::string & data_prefix_, const DB::NamesAndTypesList & schema_, const DB::Names & partition_columns_, @@ -74,6 +75,7 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator LoggerPtr log_) : engine(engine_) , snapshot(snapshot_) + , scan(scan_) , data_prefix(data_prefix_) , schema(schema_) , partition_columns(partition_columns_) @@ -268,10 +270,9 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator using KernelScan = KernelPointerWrapper; using KernelScanDataIterator = KernelPointerWrapper; - const KernelExternEngine & engine; const KernelSnapshot & snapshot; - KernelScan scan; + KernelScan & scan; KernelScanDataIterator scan_data_iterator; std::optional pruner; @@ -307,11 +308,9 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator TableSnapshot::TableSnapshot( KernelHelperPtr helper_, DB::ObjectStoragePtr object_storage_, - bool read_schema_same_as_table_schema_, LoggerPtr log_) : helper(helper_) , object_storage(object_storage_) - , read_schema_same_as_table_schema(read_schema_same_as_table_schema_) , log(log_) { } @@ -350,15 +349,20 @@ void TableSnapshot::initSnapshotImpl() const snapshot = KernelUtils::unwrapResult( ffi::snapshot(KernelUtils::toDeltaString(helper->getTableLocation()), engine.get()), "snapshot"); snapshot_version = ffi::version(snapshot.get()); - LOG_TRACE(log, "Snapshot version: {}", snapshot_version); -} -ffi::SharedSnapshot * TableSnapshot::getSnapshot() const -{ - if (!snapshot.get()) - initSnapshot(); - return snapshot.get(); + scan = KernelUtils::unwrapResult(ffi::scan(snapshot.get(), engine.get(), /* predicate */{}), "scan"); + scan_state = ffi::get_global_scan_state(scan.get()); + LOG_TRACE(log, "Initialized scan state"); + + std::tie(table_schema, physical_names_map) = getTableSchemaFromSnapshot(snapshot.get()); + LOG_TRACE(log, "Table schema: {}", fmt::join(table_schema.getNames(), ", ")); + + read_schema = getReadSchemaFromSnapshot(scan_state.get()); + LOG_TRACE(log, "Read schema: {}", fmt::join(read_schema.getNames(), ", ")); + + partition_columns = getPartitionColumnsFromSnapshot(scan_state.get()); + LOG_TRACE(log, "Partition columns: {}", fmt::join(partition_columns, ", ")); } DB::ObjectIterator TableSnapshot::iterate( @@ -370,6 +374,7 @@ DB::ObjectIterator TableSnapshot::iterate( return std::make_shared( engine, snapshot, + scan, helper->getDataPath(), getTableSchema(), getPartitionColumns(), @@ -382,52 +387,26 @@ DB::ObjectIterator TableSnapshot::iterate( const DB::NamesAndTypesList & TableSnapshot::getTableSchema() const { - if (!table_schema.has_value()) - { - table_schema = getTableSchemaFromSnapshot(getSnapshot()); - LOG_TRACE(log, "Fetched table schema"); - LOG_TEST(log, "Table schema: {}", table_schema->toString()); - } - return table_schema.value(); + initSnapshot(); + return table_schema; } const DB::NamesAndTypesList & TableSnapshot::getReadSchema() const { - if (read_schema_same_as_table_schema) - return getTableSchema(); - if (!read_schema.has_value()) - loadReadSchemaAndPartitionColumns(); - return read_schema.value(); + initSnapshot(); + return read_schema; } const DB::Names & TableSnapshot::getPartitionColumns() const { - if (!partition_columns.has_value()) - loadReadSchemaAndPartitionColumns(); - return partition_columns.value(); + initSnapshot(); + return partition_columns; } -void TableSnapshot::loadReadSchemaAndPartitionColumns() const +const DB::NameToNameMap & TableSnapshot::getPhysicalNamesMap() const { - auto * current_snapshot = getSnapshot(); - chassert(engine.get()); - if (read_schema_same_as_table_schema) - { - partition_columns = getPartitionColumnsFromSnapshot(current_snapshot, engine.get()); - LOG_TRACE( - log, "Fetched partition columns: {}", - fmt::join(partition_columns.value(), ", ")); - } - else - { - std::tie(read_schema, partition_columns) = getReadSchemaAndPartitionColumnsFromSnapshot(current_snapshot, engine.get()); - LOG_TRACE( - log, "Fetched read schema and partition columns: {}", - fmt::join(partition_columns.value(), ", ")); - - LOG_TEST(log, "Read schema: {}", read_schema->toString()); - } - + initSnapshot(); + return physical_names_map; } } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h index 7b83c993be5d..6945719f3071 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h @@ -30,7 +30,6 @@ class TableSnapshot explicit TableSnapshot( KernelHelperPtr helper_, DB::ObjectStoragePtr object_storage_, - bool read_schema_same_as_table_schema_, LoggerPtr log_); /// Get snapshot version. @@ -55,33 +54,32 @@ class TableSnapshot /// Therefore "table schema" would contain partition columns, /// but "read schema" would not. const DB::Names & getPartitionColumns() const; + const DB::NameToNameMap & getPhysicalNamesMap() const; private: class Iterator; using KernelExternEngine = KernelPointerWrapper; using KernelSnapshot = KernelPointerWrapper; using KernelScan = KernelPointerWrapper; + using KernelGlobalScanState = KernelPointerWrapper; const KernelHelperPtr helper; const DB::ObjectStoragePtr object_storage; - const bool read_schema_same_as_table_schema; const LoggerPtr log; mutable KernelExternEngine engine; mutable KernelSnapshot snapshot; mutable KernelScan scan; + mutable KernelGlobalScanState scan_state; mutable size_t snapshot_version; - mutable std::optional table_schema; - mutable std::optional read_schema; - mutable std::optional partition_columns; + mutable DB::NamesAndTypesList table_schema; + mutable DB::NameToNameMap physical_names_map; + mutable DB::NamesAndTypesList read_schema; + mutable DB::Names partition_columns; void initSnapshot() const; void initSnapshotImpl() const; - /// Both read schema and partition columns are loaded with the same data scan object, - /// therefore we load them together. - void loadReadSchemaAndPartitionColumns() const; - ffi::SharedSnapshot * getSnapshot() const; }; /// TODO; Enable event tracing in DeltaKernel. diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp index 53745f4b0a54..5c86ad5b94d4 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp @@ -5,6 +5,7 @@ #include "KernelUtils.h" #include "KernelPointerWrapper.h" +#include #include #include @@ -77,26 +78,25 @@ class SchemaVisitorData friend class SchemaVisitor; public: - DB::NamesAndTypesList getSchemaResult(); - const DB::Names & getPartitionColumns() const { return partition_columns; } - - void initScanState( - ffi::SharedSnapshot * snapshot, - ffi::SharedExternEngine * engine) + struct SchemaResult { - if (!scan.get()) - scan = KernelUtils::unwrapResult(ffi::scan(snapshot, engine, /* predicate */{}), "scan"); - if (!scan_state.get()) - scan_state = ffi::get_global_scan_state(scan.get()); - } + DB::NamesAndTypesList names_and_types; + DB::NameToNameMap physical_names_map; + }; + SchemaResult getSchemaResult(); + const DB::Names & getPartitionColumns() const { return partition_columns; } private: DB::DataTypes getDataTypesFromTypeList(size_t list_idx); struct Field { - Field(const std::string & name_, const DB::TypeIndex & type_, bool nullable_) - : name(name_), type(type_), nullable(nullable_) {} + Field( + const std::string & name_, + const DB::TypeIndex & type_, + bool nullable_, + const std::string & physical_name_) + : name(name_), type(type_), nullable(nullable_), physical_name(physical_name_) {} /// Column name. const std::string name; @@ -104,6 +104,10 @@ class SchemaVisitorData const DB::TypeIndex type; /// Column nullability. const bool nullable; + /// In case of columnMapping.mode = 'name', + /// physical name of the column in parquet metadata + /// will be different from table schema column name. + const std::string physical_name; /// If type is complex (array, map, struct), whether it can contain nullable values. bool value_contains_null; @@ -134,8 +138,6 @@ class SchemaVisitorData using KernelScan = KernelPointerWrapper; using KernelGlobalScanState = KernelPointerWrapper; - KernelScan scan; - KernelGlobalScanState scan_state; }; /** @@ -157,25 +159,20 @@ class SchemaVisitor } static void visitReadSchema( - ffi::SharedSnapshot * snapshot, - ffi::SharedExternEngine * engine, + ffi::SharedGlobalScanState * scan_state, SchemaVisitorData & data) { - data.initScanState(snapshot, engine); - KernelSharedSchema schema = ffi::get_global_read_schema(data.scan_state.get()); - + KernelSharedSchema schema = ffi::get_global_read_schema(scan_state); auto visitor = createVisitor(data); size_t result = ffi::visit_schema(schema.get(), &visitor); chassert(result == 0, "Unexpected result: " + DB::toString(result)); } static void visitPartitionColumns( - ffi::SharedSnapshot * snapshot, - ffi::SharedExternEngine * engine, + ffi::SharedGlobalScanState * scan_state, SchemaVisitorData & data) { - data.initScanState(snapshot, engine); - KernelStringSliceIterator partition_columns_iter = ffi::get_partition_columns(data.scan_state.get()); + KernelStringSliceIterator partition_columns_iter = ffi::get_partition_columns(scan_state); while (ffi::string_slice_next(partition_columns_iter.get(), &data, &visitPartitionColumn)) {} } @@ -226,13 +223,22 @@ class SchemaVisitor return id; } + static std::unique_ptr extractPhysicalName(const ffi::CStringMap * metadata) + { + std::string * physical_name = static_cast(ffi::get_from_string_map( + metadata, + KernelUtils::toDeltaString("delta.columnMapping.physicalName"), + KernelUtils::allocateString)); + return physical_name ? std::unique_ptr(physical_name) : nullptr; + } + template static void simpleTypeVisitor( void * data, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool nullable, - const ffi::CStringMap * /* metadata */) + const ffi::CStringMap * metadata) { SchemaVisitorData * state = static_cast(data); auto it = state->type_lists.find(sibling_list_id); @@ -244,13 +250,15 @@ class SchemaVisitor } const std::string column_name(name.ptr, name.len); + const auto physical_name_ptr = extractPhysicalName(metadata); + const std::string physical_name = physical_name_ptr ? *physical_name_ptr : ""; LOG_TEST( state->log, - "List id: {}, column name: {}, type: {}, nullable: {}", - sibling_list_id, column_name, type, nullable); + "List id: {}, column name: {} (physical name: {}), type: {}, nullable: {}", + sibling_list_id, column_name, physical_name, type, nullable); - SchemaVisitorData::Field field(column_name, std::move(type), nullable); + SchemaVisitorData::Field field(column_name, std::move(type), nullable, physical_name); field.is_bool = is_bool; it->second->push_back(std::move(field)); } @@ -260,7 +268,7 @@ class SchemaVisitor uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool nullable, - const ffi::CStringMap * /* metadata */, + const ffi::CStringMap * metadata, uint8_t precision, uint8_t scale) { @@ -275,13 +283,15 @@ class SchemaVisitor } const std::string column_name(name.ptr, name.len); + const auto physical_name_ptr = extractPhysicalName(metadata); + const std::string physical_name = physical_name_ptr ? *physical_name_ptr : ""; LOG_TEST( state->log, - "List id: {}, column name: {}, type: {}, nullable: {}", - sibling_list_id, column_name, type, nullable); + "List id: {}, column name: {} (physical name: {}), type: {}, nullable: {}", + sibling_list_id, column_name, physical_name, type, nullable); - SchemaVisitorData::Field field(column_name, type, nullable); + SchemaVisitorData::Field field(column_name, type, nullable, physical_name); field.precision = precision; field.scale = scale; it->second->push_back(std::move(field)); @@ -326,7 +336,7 @@ class SchemaVisitor uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool nullable, - const ffi::CStringMap * /* metadata */, + const ffi::CStringMap * metadata, uintptr_t child_list_id) { SchemaVisitorData * state = static_cast(data); @@ -339,31 +349,40 @@ class SchemaVisitor } const std::string column_name(name.ptr, name.len); + const auto physical_name_ptr = extractPhysicalName(metadata); + const std::string physical_name = physical_name_ptr ? *physical_name_ptr : ""; LOG_TEST( state->log, - "List id: {}, column name: {}, type: {}, " + "List id: {}, column name: {} (physical name: {}), type: {}, " "nullable: {}, child list id: {}", - sibling_list_id, column_name, type, nullable, child_list_id); + sibling_list_id, column_name, physical_name, type, nullable, child_list_id); - SchemaVisitorData::Field field(column_name, std::move(type), nullable); + SchemaVisitorData::Field field(column_name, std::move(type), nullable, physical_name); field.child_list_id = child_list_id; it->second->push_back(field); } }; -DB::NamesAndTypesList SchemaVisitorData::getSchemaResult() +SchemaVisitorData::SchemaResult SchemaVisitorData::getSchemaResult() { const auto types = getDataTypesFromTypeList(0); chassert(types.size() == type_lists[0]->size()); - std::list result; + std::list names_and_types; + SchemaResult result; for (size_t i = 0; i < types.size(); ++i) { const auto & field = (*type_lists[0])[i]; - result.emplace_back(field.name, types[i]); + names_and_types.emplace_back(field.name, types[i]); + if (!field.physical_name.empty()) + { + [[maybe_unused]] bool inserted = result.physical_names_map.emplace(field.name, field.physical_name).second; + chassert(inserted); + } } - return DB::NamesAndTypesList(result.begin(), result.end()); + result.names_and_types = DB::NamesAndTypesList(names_and_types.begin(), names_and_types.end()); + return result; } DB::DataTypes SchemaVisitorData::getDataTypesFromTypeList(size_t list_idx) @@ -446,26 +465,25 @@ DB::DataTypes SchemaVisitorData::getDataTypesFromTypeList(size_t list_idx) return types; } -DB::NamesAndTypesList getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot) +std::pair getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot) { SchemaVisitorData data; SchemaVisitor::visitTableSchema(snapshot, data); - return data.getSchemaResult(); + auto result = data.getSchemaResult(); + return {result.names_and_types, result.physical_names_map}; } -std::pair -getReadSchemaAndPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine) +DB::NamesAndTypesList getReadSchemaFromSnapshot(ffi::SharedGlobalScanState * scan_state) { SchemaVisitorData data; - SchemaVisitor::visitReadSchema(snapshot, engine, data); - SchemaVisitor::visitPartitionColumns(snapshot, engine, data); - return {data.getSchemaResult(), data.getPartitionColumns()}; + SchemaVisitor::visitReadSchema(scan_state, data); + return data.getSchemaResult().names_and_types; } -DB::Names getPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine) +DB::Names getPartitionColumnsFromSnapshot(ffi::SharedGlobalScanState * scan_state) { SchemaVisitorData data; - SchemaVisitor::visitPartitionColumns(snapshot, engine, data); + SchemaVisitor::visitPartitionColumns(scan_state, data); return data.getPartitionColumns(); } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h index c7d511baa951..cf98e214106c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h @@ -9,25 +9,25 @@ namespace ffi { struct SharedSnapshot; -struct SharedExternEngine; +struct SharedGlobalScanState; } namespace DeltaLake { -/// Get table schema. +/// Get table schema and physical column map (logical name to physical name mapping). /// Represents table schema from DeltaLake metadata. /// Contains partition columns. -DB::NamesAndTypesList getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot); +std::pair getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot); -/// Get read schema and partition columns. +/// Get read schema. /// Represents read schema based on data files. +DB::NamesAndTypesList getReadSchemaFromSnapshot(ffi::SharedGlobalScanState * scan_state); + +/// Get list of partition columns. /// Read schema does not contain partition columns, /// therefore partition columns are passed separately. -std::pair -getReadSchemaAndPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine); - -DB::Names getPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine); +DB::Names getPartitionColumnsFromSnapshot(ffi::SharedGlobalScanState * scan_state); } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index 7e98a26ae96c..98d42fbf02e7 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -55,6 +56,12 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } +namespace Setting +{ + extern const SettingsBool allow_experimental_delta_kernel_rs; +} + + namespace { diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index 580a9645361d..6f4bc8881fc6 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -12,13 +12,13 @@ #include #include #include +#include namespace DB { -namespace StorageObjectStorageSetting +namespace Setting { -extern const StorageObjectStorageSettingsBool allow_experimental_delta_kernel_rs; -extern const StorageObjectStorageSettingsBool delta_lake_read_schema_same_as_table_schema; +extern const SettingsBool allow_experimental_delta_kernel_rs; } struct DeltaLakePartitionColumn @@ -62,12 +62,15 @@ class DeltaLakeMetadata final : public IDataLakeMetadata { #if USE_DELTA_KERNEL_RS auto configuration_ptr = configuration.lock(); - const auto & settings_ref = configuration_ptr->getSettingsRef(); - if (settings_ref[StorageObjectStorageSetting::allow_experimental_delta_kernel_rs]) + const auto & query_settings_ref = local_context->getSettingsRef(); + + bool enable_delta_kernel = query_settings_ref[Setting::allow_experimental_delta_kernel_rs]; + if (enable_delta_kernel) + { return std::make_unique( object_storage, - configuration, - settings_ref[StorageObjectStorageSetting::delta_lake_read_schema_same_as_table_schema]); + configuration); + } else return std::make_unique(object_storage, configuration, local_context); #else diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index 1fa3f6636729..fa9f1b3d0cee 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -3,20 +3,23 @@ #if USE_PARQUET && USE_DELTA_KERNEL_RS #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} DeltaLakeMetadataDeltaKernel::DeltaLakeMetadataDeltaKernel( ObjectStoragePtr object_storage, - ConfigurationObserverPtr configuration_, - bool read_schema_same_as_table_schema_) + ConfigurationObserverPtr configuration_) : log(getLogger("DeltaLakeMetadata")) , table_snapshot( std::make_shared( getKernelHelper(configuration_.lock(), object_storage), object_storage, - read_schema_same_as_table_schema_, log)) { } @@ -50,25 +53,65 @@ NamesAndTypesList DeltaLakeMetadataDeltaKernel::getTableSchema() const return table_snapshot->getTableSchema(); } -NamesAndTypesList DeltaLakeMetadataDeltaKernel::getReadSchema() const +DB::ReadFromFormatInfo DeltaLakeMetadataDeltaKernel::prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns) { - auto schema = table_snapshot->getReadSchema(); + auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns); + info.format_header.clear(); + + /// Read schema is different from table schema in case: + /// 1. we have partition columns (they are not stored in the actual data) + /// 2. columnMapping.mode = 'name' or 'id'. + /// So we add partition columns to read schema and put it together into format_header. + /// Partition values will be added to result data right after data is read. - /// Read schema does not contain partition columns - /// because they are not present in the actual data. - /// We have to add them here. - auto partition_columns = table_snapshot->getPartitionColumns(); - if (!partition_columns.empty()) + for (const auto & [column_name, column_type] : table_snapshot->getReadSchema()) + info.format_header.insert({column_type->createColumn(), column_type, column_name}); + + const auto & physical_names_map = table_snapshot->getPhysicalNamesMap(); + auto get_physical_name = [&](const std::string & column_name) { - auto table_schema = getTableSchema(); - for (const auto & column : partition_columns) + if (physical_names_map.empty()) + return column_name; + auto it = physical_names_map.find(column_name); + if (it == physical_names_map.end()) { - auto name_and_type = table_schema.tryGetByName(column); - if (name_and_type.has_value()) - schema.insert(schema.end(), name_and_type.value()); + Names keys; + keys.reserve(physical_names_map.size()); + for (const auto & [key, _] : physical_names_map) + keys.push_back(key); + + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Not found column {} in physical names map. There are only columns: {}", + column_name, fmt::join(keys, ", ")); } + return it->second; + }; + + const auto & table_schema = table_snapshot->getTableSchema(); + for (const auto & column_name : table_snapshot->getPartitionColumns()) + { + auto name_and_type = table_schema.tryGetByName(column_name); + if (!name_and_type) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Not found partition column {} in table schema", column_name); + + info.format_header.insert({name_and_type->type->createColumn(), name_and_type->type, get_physical_name(column_name)}); + } + + /// Update requested columns to reference actual physical column names. + if (!physical_names_map.empty()) + { + for (auto & [column_name, _] : info.requested_columns) + column_name = get_physical_name(column_name); } - return schema; + + return info; } } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index 88a733b75434..eb94c44fa3d0 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -19,11 +18,6 @@ class TableSnapshot; namespace DB { -namespace StorageObjectStorageSetting -{ -extern const StorageObjectStorageSettingsBool allow_experimental_delta_kernel_rs; -extern const StorageObjectStorageSettingsBool delta_lake_read_schema_same_as_table_schema; -} class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata { @@ -33,8 +27,7 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata DeltaLakeMetadataDeltaKernel( ObjectStoragePtr object_storage_, - ConfigurationObserverPtr configuration_, - bool read_schema_same_as_table_schema_); + ConfigurationObserverPtr configuration_); bool supportsUpdate() const override { return true; } @@ -44,21 +37,21 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata NamesAndTypesList getTableSchema() const override; - NamesAndTypesList getReadSchema() const override; + DB::ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns) override; bool operator ==(const IDataLakeMetadata &) const override; static DataLakeMetadataPtr create( ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, - ContextPtr, bool) + ContextPtr /* context */) { auto configuration_ptr = configuration.lock(); - const auto & settings_ref = configuration_ptr->getSettingsRef(); - return std::make_unique( - object_storage, - configuration, - settings_ref[StorageObjectStorageSetting::delta_lake_read_schema_same_as_table_schema]); + return std::make_unique(object_storage, configuration); } ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 6bbf81c74965..df4f5ed3a45b 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -64,4 +64,13 @@ ObjectIterator IDataLakeMetadata::createKeysIterator( return std::make_shared(std::move(data_files_), object_storage_, callback_); } +DB::ReadFromFormatInfo IDataLakeMetadata::prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns) +{ + return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns); +} + } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 1984750351bc..5315d266e880 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -4,6 +4,7 @@ #include #include "Interpreters/ActionsDAG.h" #include +#include namespace DB { @@ -35,10 +36,14 @@ class IDataLakeMetadata : boost::noncopyable /// Read schema is the schema of actual data files, /// which can differ from table schema from data lake metadata. /// Return nothing if read schema is the same as table schema. - virtual NamesAndTypesList getReadSchema() const { return {}; } + virtual DB::ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns); - virtual std::shared_ptr getInitialSchemaByPath(const String &) const { return {}; } - virtual std::shared_ptr getSchemaTransformer(const String &) const { return {}; } + virtual std::shared_ptr getInitialSchemaByPath(const String & /* path */) const { return {}; } + virtual std::shared_ptr getSchemaTransformer(const String & /* path */) const { return {}; } /// Whether metadata is updateable (instead of recreation from scratch) /// to the latest version of table state in data lake. @@ -46,8 +51,7 @@ class IDataLakeMetadata : boost::noncopyable /// Update metadata to the latest version. virtual bool update(const ContextPtr &) { return false; } - /// Whether schema evolution is supported. - virtual bool supportsExternalMetadataChange() const { return false; } + virtual bool supportsSchemaEvolution() const { return false; } virtual std::optional totalRows() const { return {}; } virtual std::optional totalBytes() const { return {}; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index bcf3aef8b26c..ecd76413303f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include @@ -32,11 +32,11 @@ namespace ProfileEvents namespace DB { -namespace StorageObjectStorageSetting +namespace DataLakeStorageSetting { - extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; - extern const StorageObjectStorageSettingsString iceberg_metadata_table_uuid; - extern const StorageObjectStorageSettingsBool iceberg_recent_metadata_file_by_last_updated_ms_field; + extern const DataLakeStorageSettingsString iceberg_metadata_file_path; + extern const DataLakeStorageSettingsString iceberg_metadata_table_uuid; + extern const DataLakeStorageSettingsBool iceberg_recent_metadata_file_by_last_updated_ms_field; } namespace ErrorCodes @@ -309,7 +309,7 @@ static std::pair getLatestMetadataFileAndVersion( { auto log = getLogger("IcebergMetadataFileResolver"); MostRecentMetadataFileSelectionWay selection_way - = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_recent_metadata_file_by_last_updated_ms_field].value + = configuration.getDataLakeSettings()[DataLakeStorageSetting::iceberg_recent_metadata_file_by_last_updated_ms_field].value ? MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD : MostRecentMetadataFileSelectionWay::BY_METADATA_FILE_VERSION; bool need_all_metadata_files_parsing @@ -386,9 +386,10 @@ static std::pair getLatestOrExplicitMetadataFileAndVersion( const ContextPtr & local_context, Poco::Logger * log) { - if (configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path].changed) + const auto & data_lake_settings = configuration.getDataLakeSettings(); + if (data_lake_settings[DataLakeStorageSetting::iceberg_metadata_file_path].changed) { - auto explicit_metadata_path = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path].value; + auto explicit_metadata_path = data_lake_settings[DataLakeStorageSetting::iceberg_metadata_file_path].value; try { LOG_TEST(log, "Explicit metadata file path is specified {}, will read from this metadata file", explicit_metadata_path); @@ -409,9 +410,9 @@ static std::pair getLatestOrExplicitMetadataFileAndVersion( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid path {} specified for iceberg_metadata_file_path: '{}'", explicit_metadata_path, ex.what()); } } - else if (configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_table_uuid].changed) + else if (data_lake_settings[DataLakeStorageSetting::iceberg_metadata_table_uuid].changed) { - std::optional table_uuid = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_table_uuid].value; + std::optional table_uuid = data_lake_settings[DataLakeStorageSetting::iceberg_metadata_table_uuid].value; return getLatestMetadataFileAndVersion(object_storage, configuration, local_context, table_uuid); } else diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index c2cf03afa198..57dc04044d86 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -78,7 +78,7 @@ class IcebergMetadata : public IDataLakeMetadata : nullptr; } - bool supportsExternalMetadataChange() const override { return true; } + bool supportsSchemaEvolution() const override { return true; } static Int32 parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 7639ff35228e..0638fa0cd480 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -90,7 +90,7 @@ StorageObjectStorage::StorageObjectStorage( LoadingStrictnessLevel mode, bool distributed_processing_, ASTPtr partition_by_, - bool is_table_function_, + bool is_table_function, bool lazy_init, std::optional sample_path_) : IStorage(table_id_) @@ -101,40 +101,50 @@ StorageObjectStorage::StorageObjectStorage( , distributed_processing(distributed_processing_) , log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName()))) { - bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->getFormat().empty(); - update_configuration_on_read = !is_table_function_ || do_lazy_init; - bool failed_init = false; - auto do_init = [&]() + const bool need_resolve_columns_or_format = columns_.empty() || (configuration->getFormat() == "auto"); + const bool need_resolve_sample_path = context->getSettingsRef()[Setting::use_hive_partitioning] + && !configuration->withPartitionWildcard() + && !configuration->isDataLakeConfiguration(); + const bool do_lazy_init = lazy_init && !need_resolve_columns_or_format && !need_resolve_sample_path; + + bool updated_configuration = false; + try { - try + if (!do_lazy_init) { - if (configuration->hasExternalDynamicMetadata()) - configuration->updateAndGetCurrentSchema(object_storage, context); - else - configuration->update(object_storage, context); + configuration->update( + object_storage, + context, + /* if_not_updated_before */is_table_function, + /* check_consistent_with_previous_metadata */true); + + updated_configuration = true; } - catch (...) + } + catch (...) + { + // If we don't have format or schema yet, we can't ignore failed configuration update, + // because relevant configuration is crucial for format and schema inference + if (mode <= LoadingStrictnessLevel::CREATE || need_resolve_columns_or_format) { - // If we don't have format or schema yet, we can't ignore failed configuration update, - // because relevant configuration is crucial for format and schema inference - if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->getFormat() == "auto")) - { - throw; - } - else - { - tryLogCurrentException(log); - failed_init = true; - } + throw; } - }; + tryLogCurrentException(log); + } - if (!do_lazy_init) - do_init(); + /// We always update configuration on read for table engine, + /// but this is not needed for table function, + /// which exists only for the duration of a single query + /// (e.g. read always follows constructor immediately). + update_configuration_on_read_write = !is_table_function || !updated_configuration; std::string sample_path = sample_path_.value_or(""); ColumnsDescription columns{columns_}; - resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); + if (need_resolve_columns_or_format) + resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); + else + validateSupportedColumns(columns, *configuration); + configuration->check(context); StorageInMemoryMetadata metadata; @@ -144,13 +154,8 @@ StorageObjectStorage::StorageObjectStorage( /// FIXME: We need to call getPathSample() lazily on select /// in case it failed to be initialized in constructor. - if (!failed_init - && sample_path.empty() - && context->getSettingsRef()[Setting::use_hive_partitioning] - && !configuration->withPartitionWildcard()) + if (updated_configuration && sample_path.empty() && need_resolve_sample_path) { - if (do_lazy_init) - do_init(); try { sample_path = getPathSample(context); @@ -158,8 +163,10 @@ StorageObjectStorage::StorageObjectStorage( catch (...) { LOG_WARNING( - log, "Failed to list object storage, cannot use hive partitioning. " - "Error: {}", getCurrentExceptionMessage(true)); + log, + "Failed to list object storage, cannot use hive partitioning. " + "Error: {}", + getCurrentExceptionMessage(true)); } } @@ -187,40 +194,79 @@ bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) c return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context, format_settings); } -void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context) +bool StorageObjectStorage::Configuration::update( ///NOLINT + ObjectStoragePtr object_storage_ptr, + ContextPtr context, + bool /* if_not_updated_before */, + bool /* check_consistent_with_previous_metadata */) { IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()}; object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options); updated = true; + return true; } -void StorageObjectStorage::Configuration::updateIfRequired(ObjectStoragePtr object_storage_ptr, ContextPtr local_context) +bool StorageObjectStorage::Configuration::updateIfRequired( + ObjectStoragePtr object_storage_ptr, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) { if (!updated) - update(object_storage_ptr, local_context); + update(object_storage_ptr, local_context, if_not_updated_before, check_consistent_with_previous_metadata); + return true; } -bool StorageObjectStorage::hasExternalDynamicMetadata() const +bool StorageObjectStorage::updateExternalDynamicMetadataIfExists(ContextPtr query_context) { - return configuration->hasExternalDynamicMetadata(); -} + bool updated = configuration->update( + object_storage, + query_context, + /* if_not_updated_before */true, + /* check_consistent_with_previous_metadata */false); + + if (!configuration->hasExternalDynamicMetadata()) + return false; + + if (!updated) + { + /// Force the update. + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */false); + } + + auto columns = configuration->tryGetTableStructureFromMetadata(); + if (!columns.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No schema in table metadata"); -void StorageObjectStorage::updateExternalDynamicMetadata(ContextPtr context_ptr) -{ StorageInMemoryMetadata metadata; - metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr)); + metadata.setColumns(std::move(columns.value())); setInMemoryMetadata(metadata); + return true; } std::optional StorageObjectStorage::totalRows(ContextPtr query_context) const { - configuration->update(object_storage, query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalRows(); } std::optional StorageObjectStorage::totalBytes(ContextPtr query_context) const { - configuration->update(object_storage, query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalBytes(); } @@ -371,8 +417,14 @@ void StorageObjectStorage::read( { /// We did configuration->update() in constructor, /// so in case of table function there is no need to do the same here again. - if (update_configuration_on_read) - configuration->update(object_storage, local_context); + if (update_configuration_on_read_write) + { + configuration->update( + object_storage, + local_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + } if (partition_by && configuration->withPartitionWildcard()) { @@ -411,7 +463,15 @@ SinkToStoragePtr StorageObjectStorage::write( ContextPtr local_context, bool /* async_insert */) { - configuration->update(object_storage, local_context); + if (update_configuration_on_read_write) + { + configuration->update( + object_storage, + local_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + } + const auto sample_block = metadata_snapshot->getSampleBlock(); const auto & settings = configuration->getQuerySettings(local_context); @@ -520,18 +580,6 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( std::string & sample_path, const ContextPtr & context) { - if (configuration->isDataLakeConfiguration()) - { - if (configuration->hasExternalDynamicMetadata()) - configuration->updateAndGetCurrentSchema(object_storage, context); - else - configuration->update(object_storage, context); - - auto table_structure = configuration->tryGetTableStructureFromMetadata(); - if (table_structure) - return table_structure.value(); - } - ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); auto schema = readSchemaFromFormat(configuration->getFormat(), format_settings, *iterator, context); @@ -607,8 +655,7 @@ SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, c void StorageObjectStorage::Configuration::initialize( ASTs & engine_args, ContextPtr local_context, - bool with_table_structure, - StorageObjectStorageSettingsPtr settings) + bool with_table_structure) { if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) fromNamedCollection(*named_collection, local_context); @@ -632,15 +679,9 @@ void StorageObjectStorage::Configuration::initialize( else FormatFactory::instance().checkFormatName(format); - storage_settings = settings; initialized = true; } -const StorageObjectStorageSettings & StorageObjectStorage::Configuration::getSettingsRef() const -{ - return *storage_settings; -} - void StorageObjectStorage::Configuration::check(ContextPtr) const { FormatFactory::instance().checkFormatName(format); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 5b4e10b9a872..072dec4129b6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -9,8 +9,10 @@ #include #include #include +#include #include + namespace DB { @@ -137,15 +139,16 @@ class StorageObjectStorage : public IStorage void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; - bool hasExternalDynamicMetadata() const override; - - void updateExternalDynamicMetadata(ContextPtr) override; + bool updateExternalDynamicMetadataIfExists(ContextPtr query_context) override; std::optional totalRows(ContextPtr query_context) const override; std::optional totalBytes(ContextPtr query_context) const override; + protected: + /// Get path sample for hive partitioning implementation. String getPathSample(ContextPtr context); + /// Creates ReadBufferIterator for schema inference implementation. static std::unique_ptr createReadBufferIterator( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, @@ -153,12 +156,21 @@ class StorageObjectStorage : public IStorage ObjectInfos & read_keys, const ContextPtr & context); + /// Storage configuration (S3, Azure, HDFS, Local, DataLake). + /// Contains information about table engine configuration + /// and underlying storage access. ConfigurationPtr configuration; + /// `object_storage` to allow direct access to data storage. const ObjectStoragePtr object_storage; const std::optional format_settings; + /// Partition by expression from CREATE query. const ASTPtr partition_by; + /// Whether this engine is a part of according Cluster engine implementation. + /// (One of the reading replicas, not the initiator). const bool distributed_processing; - bool update_configuration_on_read; + /// Whether we need to call `configuration->update()` + /// (e.g. refresh configuration) on each read() method call. + bool update_configuration_on_read_write = true; LoggerPtr log; }; @@ -173,11 +185,11 @@ class StorageObjectStorage::Configuration using Path = std::string; using Paths = std::vector; + /// Initialize configuration from either AST or NamedCollection. virtual void initialize( ASTs & engine_args, ContextPtr local_context, - bool with_table_structure, - StorageObjectStorageSettingsPtr settings); + bool with_table_structure); /// Storage type: s3, hdfs, azure, local. virtual ObjectStorageType getType() const = 0; @@ -238,6 +250,8 @@ class StorageObjectStorage::Configuration throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateAndGetCurrentSchema is not supported by storage {}", getEngineName()); } + virtual void modifyFormatSettings(FormatSettings &) const {} + virtual ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, const Strings & requested_columns, @@ -257,10 +271,22 @@ class StorageObjectStorage::Configuration throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method iterate() is not implemented for configuration type {}", getTypeName()); } - virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); - virtual void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); + /// Returns true, if metadata is of the latest version, false if unknown. + virtual bool update( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata); + virtual bool updateIfRequired( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata); - const StorageObjectStorageSettings & getSettingsRef() const; + virtual const DataLakeStorageSettings & getDataLakeSettings() const + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getDataLakeSettings() is not implemented for configuration type {}", getTypeName()); + } /// Create arguments for table function with path and access parameters virtual ASTPtr createArgsWithAccessData() const @@ -291,8 +317,6 @@ class StorageObjectStorage::Configuration bool initialized = false; std::atomic updated = false; - - StorageObjectStorageSettingsPtr storage_settings; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index cdfad7f646cb..7621e4c34b30 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -86,6 +86,15 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( , object_storage(object_storage_) , cluster_name_in_settings(false) { + /// We allow exceptions to be thrown on update(), + /// because Cluster engine can only be used as table function, + /// so no lazy initialization is allowed. + configuration->update( + object_storage, + context_, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + ColumnsDescription columns{columns_}; std::string sample_path; resolveSchemaAndFormat(columns, object_storage, configuration, {}, sample_path, context_); @@ -130,6 +139,30 @@ std::string StorageObjectStorageCluster::getName() const return configuration->getEngineName(); } +std::optional StorageObjectStorageCluster::totalRows(ContextPtr query_context) const +{ + if (pure_storage) + return pure_storage->totalRows(query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalRows(); +} + +std::optional StorageObjectStorageCluster::totalBytes(ContextPtr query_context) const +{ + if (pure_storage) + return pure_storage->totalBytes(query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalBytes(); +} + void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context) { // Change table engine on table function for distributed request diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 7b7031a7c728..8c1694de15ff 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -41,24 +41,6 @@ class StorageObjectStorageCluster : public IStorageCluster String getClusterName(ContextPtr context) const override; - bool hasExternalDynamicMetadata() const override - { - return (pure_storage && pure_storage->hasExternalDynamicMetadata()) - || configuration->hasExternalDynamicMetadata(); - } - - void updateExternalDynamicMetadata(ContextPtr context_ptr) override - { - if (pure_storage && pure_storage->hasExternalDynamicMetadata()) - pure_storage->updateExternalDynamicMetadata(context_ptr); - if (configuration->hasExternalDynamicMetadata()) - { - StorageInMemoryMetadata metadata; - metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr)); - IStorageCluster::setInMemoryMetadata(metadata); - } - } - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; void truncate( @@ -69,21 +51,8 @@ class StorageObjectStorageCluster : public IStorageCluster void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; - std::optional totalRows(ContextPtr query_context) const override - { - if (pure_storage) - return pure_storage->totalRows(query_context); - configuration->update(object_storage, query_context); - return configuration->totalRows(); - } - - std::optional totalBytes(ContextPtr query_context) const override - { - if (pure_storage) - return pure_storage->totalBytes(query_context); - configuration->update(object_storage, query_context); - return configuration->totalBytes(); - } + std::optional totalRows(ContextPtr query_context) const override; + std::optional totalBytes(ContextPtr query_context) const override; private: void updateQueryToSendIfNeeded( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp index 469bbaa4e456..512a8d53552b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp @@ -11,37 +11,6 @@ namespace DB { -// clang-format off - -#define STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \ - DECLARE(Bool, allow_dynamic_metadata_for_data_lakes, false, R"( -If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query. -)", 0) \ - DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"( -If enabled, the engine would use delta-kernel-rs for DeltaLake metadata parsing -)", 0) \ - DECLARE(Bool, delta_lake_read_schema_same_as_table_schema, false, R"( -Whether delta-lake read schema is the same as table schema. -)", 0) \ - DECLARE(String, iceberg_metadata_file_path, "", R"( -Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only. -)", 0) \ - DECLARE(String, object_storage_cluster, "", R"( -Cluster for distributed requests -)", 0) \ - DECLARE(String, iceberg_metadata_table_uuid, "", R"( -Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path is set. -)", 0) \ - DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"( -If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path. -)", 0) - -// clang-format on - -#define LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ - STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(M, ALIAS) \ - LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) - DECLARE_SETTINGS_TRAITS(StorageObjectStorageSettingsTraits, LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS) IMPLEMENT_SETTINGS_TRAITS(StorageObjectStorageSettingsTraits, LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS) @@ -93,4 +62,14 @@ bool StorageObjectStorageSettings::hasBuiltin(std::string_view name) { return StorageObjectStorageSettingsImpl::hasBuiltin(name); } + +void StorageObjectStorageSettings::loadFromSettingsChanges(const SettingsChanges & changes) +{ + for (const auto & [name, value] : changes) + { + if (impl->has(name)) + impl->set(name, value); + } +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h index d3f72aa2f8a3..76c4cb7dcc06 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h @@ -43,6 +43,15 @@ class SettingsChanges; M(CLASS_NAME, UInt64Auto) \ M(CLASS_NAME, URI) +// clang-format off + +#define STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \ + DECLARE(String, object_storage_cluster, "", R"( +Cluster for distributed requests +)", 0) + +// clang-format on + STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(StorageObjectStorageSettings, DECLARE_SETTING_TRAIT) struct StorageObjectStorageSettings @@ -56,6 +65,8 @@ struct StorageObjectStorageSettings void loadFromQuery(ASTSetQuery & settings_ast); + void loadFromSettingsChanges(const SettingsChanges & changes); + Field get(const std::string & name); static bool hasBuiltin(std::string_view name); @@ -66,4 +77,9 @@ struct StorageObjectStorageSettings using StorageObjectStorageSettingsPtr = std::shared_ptr; +#define LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ + STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(M, ALIAS) \ + OBSOLETE_SETTINGS(M, ALIAS) \ + LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 814b99d78032..f1d3277799ed 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -151,7 +151,7 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression can not have wildcards inside {} name", configuration->getNamespaceType()); - configuration->updateIfRequired(object_storage, local_context); + configuration->updateIfRequired(object_storage, local_context, true, true); std::unique_ptr iterator; if (configuration->isPathWithGlobs()) diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index d4f152bfd582..9c71ea7431e8 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -51,29 +51,60 @@ void resolveSchemaAndFormat( std::string & sample_path, const ContextPtr & context) { + if (configuration->getFormat() == "auto") + { + if (configuration->isDataLakeConfiguration()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Format must be already specified for {} storage.", + configuration->getTypeName()); + } + } + if (columns.empty()) { - if (configuration->getFormat() == "auto") + if (configuration->isDataLakeConfiguration()) + { + auto table_structure = configuration->tryGetTableStructureFromMetadata(); + if (table_structure) + columns = table_structure.value(); + } + + if (columns.empty()) { - std::string format; - std::tie(columns, format) = - StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, sample_path, context); - configuration->setFormat(format); + if (configuration->getFormat() == "auto") + { + std::string format; + std::tie(columns, format) = StorageObjectStorage::resolveSchemaAndFormatFromData( + object_storage, configuration, format_settings, sample_path, context); + configuration->setFormat(format); + } + else + { + chassert(!configuration->getFormat().empty()); + columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context); + } } - else - columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context); } else if (configuration->getFormat() == "auto") { configuration->setFormat(StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context)); } + validateSupportedColumns(columns, *configuration); +} + +void validateSupportedColumns( + ColumnsDescription & columns, + const StorageObjectStorage::Configuration & configuration) +{ if (!columns.hasOnlyOrdinary()) { /// We don't allow special columns. throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Special columns are not supported for {} storage" - "like MATERIALIZED, ALIAS or EPHEMERAL", configuration->getTypeName()); + "Special columns like MATERIALIZED, ALIAS or EPHEMERAL are not supported for {} storage.", + configuration.getTypeName()); } } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 17e30babb709..7631d92173db 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -21,4 +21,8 @@ void resolveSchemaAndFormat( std::string & sample_path, const ContextPtr & context); +void validateSupportedColumns( + ColumnsDescription & columns, + const StorageObjectStorage::Configuration & configuration); + } diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 16b8697b1d21..30ab6b751241 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -47,7 +47,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject auto cluster_name = (*storage_settings)[StorageObjectStorageSetting::object_storage_cluster].value; - configuration->initialize(args.engine_args, context, false, storage_settings); + configuration->initialize(args.engine_args, context, false); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current @@ -178,20 +178,29 @@ void registerStorageObjectStorage(StorageFactory & factory) #if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. +static DataLakeStorageSettingsPtr getDataLakeStorageSettings(const ASTStorage & storage_def) +{ + auto storage_settings = std::make_shared(); + if (storage_def.settings) + storage_settings->loadFromQuery(*storage_def.settings); + return storage_settings; +} + void registerStorageIceberg(StorageFactory & factory) { factory.registerStorage( "Iceberg", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::NONE, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # if USE_AWS_S3 @@ -199,14 +208,15 @@ void registerStorageIceberg(StorageFactory & factory) "IcebergS3", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::S3, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif # if USE_AZURE_BLOB_STORAGE @@ -214,14 +224,15 @@ void registerStorageIceberg(StorageFactory & factory) "IcebergAzure", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::AZURE, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif # if USE_HDFS @@ -229,28 +240,30 @@ void registerStorageIceberg(StorageFactory & factory) "IcebergHDFS", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::HDFS, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif factory.registerStorage( "IcebergLocal", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::FILE, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); } @@ -265,14 +278,15 @@ void registerStorageDeltaLake(StorageFactory & factory) "DeltaLake", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::S3, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif UNUSED(factory); @@ -286,14 +300,15 @@ void registerStorageHudi(StorageFactory & factory) "Hudi", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = false, .supports_schema_inference = true, .source_access_type = AccessType::S3, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); #endif UNUSED(factory); diff --git a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp index 63d7f7a286a1..63666faf1a98 100644 --- a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp +++ b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp @@ -33,7 +33,7 @@ StoragePtr createQueueStorage(const StorageFactory::Arguments & args) throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); auto configuration = std::make_shared(); - configuration->initialize(args.engine_args, args.getContext(), false, nullptr); + configuration->initialize(args.engine_args, args.getContext(), false); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 57b6cda5039d..7f908e5c9af8 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace DB @@ -42,24 +43,29 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } -template -ObjectStoragePtr TableFunctionObjectStorage::getObjectStorage(const ContextPtr & context, bool create_readonly) const +template +ObjectStoragePtr TableFunctionObjectStorage::getObjectStorage(const ContextPtr & context, bool create_readonly) const { if (!object_storage) object_storage = configuration->createObjectStorage(context, create_readonly); return object_storage; } -template -StorageObjectStorage::ConfigurationPtr TableFunctionObjectStorage::getConfiguration() const +template +StorageObjectStorage::ConfigurationPtr TableFunctionObjectStorage::getConfiguration() const { if (!configuration) - configuration = std::make_shared(); + { + if constexpr (is_data_lake) + configuration = std::make_shared(settings); + else + configuration = std::make_shared(); + } return configuration; } -template -std::vector TableFunctionObjectStorage::skipAnalysisForArguments( +template +std::vector TableFunctionObjectStorage::skipAnalysisForArguments( const QueryTreeNodePtr & query_node_table_function, ContextPtr) const { auto & table_function_node = query_node_table_function->as(); @@ -76,8 +82,18 @@ std::vector TableFunctionObjectStorage::skipA return result; } -template -void TableFunctionObjectStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) +template +std::shared_ptr::Settings> +TableFunctionObjectStorage::createEmptySettings() +{ + if constexpr (is_data_lake) + return std::make_shared(); + else + return std::make_shared(); +} + +template +void TableFunctionObjectStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) { /// Clone ast function, because we can modify its arguments like removing headers. auto ast_copy = ast_function->clone(); @@ -85,7 +101,7 @@ void TableFunctionObjectStorage::parseArguments(const if (args_func.size() != 1) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName()); - settings = std::make_shared(); + settings = createEmptySettings(); auto & args = args_func.at(0)->children; /// Support storage settings in table function, @@ -105,24 +121,38 @@ void TableFunctionObjectStorage::parseArguments(const parseArgumentsImpl(args, context); } -template +template ColumnsDescription TableFunctionObjectStorage< - Definition, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const + Definition, Configuration, is_data_lake>::getActualTableStructure(ContextPtr context, bool is_insert_query) const { if (configuration->getStructure() == "auto") { context->checkAccess(getSourceAccessType()); - ColumnsDescription columns; + auto storage = getObjectStorage(context, !is_insert_query); + configuration->update( + object_storage, + context, + /* if_not_updated_before */true, + /* check_consistent_with_previous_metadata */true); + std::string sample_path; - resolveSchemaAndFormat(columns, storage, configuration, std::nullopt, sample_path, context); + ColumnsDescription columns; + resolveSchemaAndFormat( + columns, + std::move(storage), + configuration, + /* format_settings */std::nullopt, + sample_path, + context); + return columns; } return parseColumnsListFromString(configuration->getStructure(), context); } -template -StoragePtr TableFunctionObjectStorage::executeImpl( +template +StoragePtr TableFunctionObjectStorage::executeImpl( const ASTPtr & /* ast_function */, ContextPtr context, const std::string & table_name, @@ -249,23 +279,23 @@ template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO && USE_AZURE_BLOB_STORAGE -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO && USE_HDFS -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_PARQUET && USE_AWS_S3 && USE_DELTA_KERNEL_RS -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AWS_S3 -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 9493baf18571..22ef293948c1 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -107,11 +108,15 @@ struct HudiDefinition static constexpr auto storage_type_name = "S3"; }; -template +template class TableFunctionObjectStorage : public ITableFunction { public: static constexpr auto name = Definition::name; + using Settings = typename std::conditional_t< + is_data_lake, + DataLakeStorageSettings, + StorageObjectStorageSettings>; String getName() const override { return name; } @@ -134,7 +139,7 @@ class TableFunctionObjectStorage : public ITableFunction virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context) { - getConfiguration()->initialize(args, context, true, settings); + getConfiguration()->initialize(args, context, true); } static void updateStructureAndFormatArgumentsIfNeeded( @@ -143,7 +148,16 @@ class TableFunctionObjectStorage : public ITableFunction const String & format, const ContextPtr & context) { - Configuration().addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); + if constexpr (is_data_lake) + { + Configuration configuration(createEmptySettings()); + if (configuration.getFormat() == "auto") + configuration.setFormat("Parquet"); /// Default format of data lakes. + + configuration.addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); + } + else + Configuration().addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); } protected: @@ -164,10 +178,12 @@ class TableFunctionObjectStorage : public ITableFunction ObjectStoragePtr getObjectStorage(const ContextPtr & context, bool create_readonly) const; ConfigurationPtr getConfiguration() const; + static std::shared_ptr createEmptySettings(); + mutable ConfigurationPtr configuration; mutable ObjectStoragePtr object_storage; ColumnsDescription structure_hint; - std::shared_ptr settings; + std::shared_ptr settings; std::vector skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; }; @@ -191,20 +207,20 @@ using TableFunctionLocal = TableFunctionObjectStorage; # if USE_AWS_S3 -using TableFunctionIcebergS3 = TableFunctionObjectStorage; +using TableFunctionIcebergS3 = TableFunctionObjectStorage; # endif # if USE_AZURE_BLOB_STORAGE -using TableFunctionIcebergAzure = TableFunctionObjectStorage; +using TableFunctionIcebergAzure = TableFunctionObjectStorage; # endif # if USE_HDFS -using TableFunctionIcebergHDFS = TableFunctionObjectStorage; +using TableFunctionIcebergHDFS = TableFunctionObjectStorage; # endif -using TableFunctionIcebergLocal = TableFunctionObjectStorage; +using TableFunctionIcebergLocal = TableFunctionObjectStorage; #endif #if USE_AWS_S3 # if USE_PARQUET && USE_DELTA_KERNEL_RS -using TableFunctionDeltaLake = TableFunctionObjectStorage; +using TableFunctionDeltaLake = TableFunctionObjectStorage; # endif -using TableFunctionHudi = TableFunctionObjectStorage; +using TableFunctionHudi = TableFunctionObjectStorage; #endif } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index c30ce59e330c..a6bdb32028a3 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -14,8 +14,8 @@ namespace DB { -template -StoragePtr TableFunctionObjectStorageCluster::executeImpl( +template +StoragePtr TableFunctionObjectStorageCluster::executeImpl( const ASTPtr & /*function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const { @@ -46,7 +46,8 @@ StoragePtr TableFunctionObjectStorageCluster::execute /* format_settings */ std::nullopt, /// No format_settings /* mode */ LoadingStrictnessLevel::CREATE, /* distributed_processing */ true, - /* partition_by */ nullptr); + /* partition_by_ */nullptr, + /* is_table_function */true); } else { diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index d03f7198d359..57686d7a1866 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -75,8 +75,8 @@ struct HudiClusterDefinition * On worker node it asks initiator about next task to process, processes it. * This is repeated until the tasks are finished. */ -template -class TableFunctionObjectStorageCluster : public ITableFunctionCluster> +template +class TableFunctionObjectStorageCluster : public ITableFunctionCluster> { public: static constexpr auto name = Definition::name; @@ -84,7 +84,7 @@ class TableFunctionObjectStorageCluster : public ITableFunctionCluster; + using Base = TableFunctionObjectStorage; StoragePtr executeImpl( const ASTPtr & ast_function, @@ -117,23 +117,23 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster; #if USE_AVRO && USE_AWS_S3 -using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; #endif #if USE_AVRO && USE_AZURE_BLOB_STORAGE -using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster; #endif #if USE_AVRO && USE_HDFS -using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster; #endif #if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS -using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster; +using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster; #endif #if USE_AWS_S3 -using TableFunctionHudiCluster = TableFunctionObjectStorageCluster; +using TableFunctionHudiCluster = TableFunctionObjectStorageCluster; #endif } diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index c0c1a451d861..55e687cb9eea 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -593,7 +593,7 @@ def test_partition_columns(started_cluster, use_delta_kernel): bucket = started_cluster.minio_bucket TABLE_NAME = randomize_table_name("test_partition_columns") result_file = f"{TABLE_NAME}" - partition_columns = ["b", "c", "d", "e"] + partition_columns = ["b", "c", "d"] delta_table = ( DeltaTable.create(spark) @@ -642,22 +642,15 @@ def test_partition_columns(started_cluster, use_delta_kernel): assert len(files) > 0 print(f"Uploaded files: {files}") - result = instance.query( - f"describe table deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}', SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel})" - ).strip() + table_function = f"deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}', SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel})" + result = instance.query(f"describe table {table_function}").strip() assert ( result == "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date32)\t\t\t\t\t\nd\tNullable(Int32)\t\t\t\t\t\ne\tNullable(Bool)" ) - result = int( - instance.query( - f"""SELECT count() - FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}', SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel}) - """ - ) - ) + result = int(instance.query(f"SELECT count() FROM {table_function}")) assert result == num_rows query_id = f"query_with_filter_{TABLE_NAME}" @@ -1155,10 +1148,15 @@ def test_partition_columns_2(started_cluster, use_delta_kernel): 'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , '{minio_access_key}', '{minio_secret_key}', - SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel}) + SETTINGS allow_experimental_delta_kernel_rs=0) """ - num_files = int(node.query(f"SELECT uniqExact(_path) FROM {delta_function}")) + num_files = int( + node.query( + f"SELECT uniqExact(_path) FROM {delta_function}", + settings={"allow_experimental_delta_kernel_rs": 1}, + ) + ) assert num_files == 5 new_data = [ @@ -1179,10 +1177,19 @@ def test_partition_columns_2(started_cluster, use_delta_kernel): "b\tNullable(Int32)\t\t\t\t\t\n" "c\tNullable(Int32)\t\t\t\t\t\n" "d\tNullable(String)\t\t\t\t\t\n" - "e\tNullable(String)" == node.query(f"DESCRIBE TABLE {delta_function}").strip() + "e\tNullable(String)" + == node.query( + f"DESCRIBE TABLE {delta_function}", + settings={"allow_experimental_delta_kernel_rs": 1}, + ).strip() ) - num_files = int(node.query(f"SELECT uniqExact(_path) FROM {delta_function}")) + num_files = int( + node.query( + f"SELECT uniqExact(_path) FROM {delta_function}", + settings={"allow_experimental_delta_kernel_rs": 1}, + ) + ) assert num_files == 6 query_id = f"{table_name}-{uuid.uuid4()}" @@ -1191,6 +1198,7 @@ def test_partition_columns_2(started_cluster, use_delta_kernel): in node.query( f" SELECT a FROM {delta_function} WHERE c = 7 and d = 'aa'", query_id=query_id, + settings={"allow_experimental_delta_kernel_rs": 1}, ).strip() ) @@ -1213,7 +1221,47 @@ def check_pruned(count, query_id): in node.query( f"SELECT a FROM {delta_function} WHERE c = 7 and d = 'bb'", query_id=query_id, + settings={"allow_experimental_delta_kernel_rs": 1}, ).strip() ) check_pruned(num_files - 1, query_id) + + +@pytest.mark.parametrize("new_analyzer", ["1", "0"]) +def test_cluster_function(started_cluster, new_analyzer): + instance = started_cluster.instances["node1"] + table_name = randomize_table_name("test_cluster_function") + + schema = pa.schema([("a", pa.int32()), ("b", pa.string())]) + data = [ + pa.array([1, 2, 3, 4, 5], type=pa.int32()), + pa.array(["aa", "bb", "cc", "aa", "bb"], type=pa.string()), + ] + + storage_options = { + "AWS_ENDPOINT_URL": f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}", + "AWS_ACCESS_KEY_ID": minio_access_key, + "AWS_SECRET_ACCESS_KEY": minio_secret_key, + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + path = f"s3://root/{table_name}" + table = pa.Table.from_arrays(data, schema=schema) + write_deltalake(path, table, storage_options=storage_options) + + table_function = f""" +deltaLakeCluster(cluster, + 'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , + '{minio_access_key}', + '{minio_secret_key}', + SETTINGS allow_experimental_delta_kernel_rs=1) + """ + instance.query( + f"SELECT * FROM {table_function} SETTINGS allow_experimental_analyzer={new_analyzer}" + ) + assert 5 == int( + instance.query( + f"SELECT count() FROM {table_function} SETTINGS allow_experimental_analyzer={new_analyzer}" + ) + )