From d685bbf6f34338f585912bcb1b39e17c0f857057 Mon Sep 17 00:00:00 2001 From: Arthur Passos <arthur.ti@outlook.com> Date: Tue, 20 May 2025 15:20:18 -0300 Subject: [PATCH 1/4] snowflakeid --- src/Core/Settings.cpp | 3 ++ src/Functions/generateSnowflakeID.cpp | 7 +++ src/Functions/generateSnowflakeID.h | 10 ++++ .../DataLakes/DataLakeConfiguration.h | 8 ++++ .../DataLakes/DeltaLakeMetadata.h | 13 +++++ .../DataLakes/DeltaLakeMetadataDeltaKernel.h | 7 +++ .../ObjectStorage/DataLakes/HudiMetadata.h | 9 ++++ .../DataLakes/IDataLakeMetadata.h | 2 + .../DataLakes/Iceberg/IcebergMetadata.h | 22 +++++++++ .../DataLakes/Iceberg/SchemaProcessor.h | 27 +++++++++++ .../ObjectStorage/StorageObjectStorage.cpp | 48 ++++++++++++++++--- .../ObjectStorage/StorageObjectStorage.h | 2 + .../StorageObjectStorageSink.cpp | 40 +++++++++------- .../ObjectStorage/StorageObjectStorageSink.h | 1 - src/Storages/ObjectStorage/Utils.cpp | 35 ++++++++++++++ src/Storages/ObjectStorage/Utils.h | 4 ++ ...ct_storage_snowflake_id_wildcard.reference | 44 +++++++++++++++++ ...8_object_storage_snowflake_id_wildcard.sql | 44 +++++++++++++++++ 18 files changed, 302 insertions(+), 24 deletions(-) create mode 100644 src/Functions/generateSnowflakeID.h create mode 100644 tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.reference create mode 100644 tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 792dacc93ba7..7f6adaece07a 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5951,6 +5951,9 @@ This only affects operations performed on the client side, in particular parsing Normally this setting should be set in user profile (users.xml or queries like `ALTER USER`), not through the client (client command line arguments, `SET` query, or `SETTINGS` section of `SELECT` query). Through the client it can be changed to false, but can't be changed to true (because the server won't send the settings if user profile has `apply_settings_from_server = false`). Note that initially (24.12) there was a server setting (`send_settings_to_client`), but latter it got replaced with this client setting, for better usability. +)", 0) \ + DECLARE(Bool, object_storage_treat_key_wildcard_as_star, false, R"( +Upon reading from object storage (e.g, s3, azure and etc), treat {_snowflake_id} and {_partition_id} as *. This will allow symmetrical reads and writes using a single table. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Functions/generateSnowflakeID.cpp b/src/Functions/generateSnowflakeID.cpp index 33fc7be1b841..c9180314af80 100644 --- a/src/Functions/generateSnowflakeID.cpp +++ b/src/Functions/generateSnowflakeID.cpp @@ -147,6 +147,13 @@ struct Data } +uint64_t generateSnowflakeID() +{ + Data data; + SnowflakeId snowflake_id = data.reserveRange(getMachineId(), 1); + return fromSnowflakeId(snowflake_id); +} + class FunctionGenerateSnowflakeID : public IFunction { public: diff --git a/src/Functions/generateSnowflakeID.h b/src/Functions/generateSnowflakeID.h new file mode 100644 index 000000000000..954a919c2dee --- /dev/null +++ b/src/Functions/generateSnowflakeID.h @@ -0,0 +1,10 @@ +#pragma once + +#include <cstdint> + +namespace DB +{ + +uint64_t generateSnowflakeID(); + +} diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 5acb42d2a7dc..3ffc242a2367 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -50,10 +50,18 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl public: using Configuration = StorageObjectStorage::Configuration; + DataLakeConfiguration() = default; + + DataLakeConfiguration(const DataLakeConfiguration & other) + : BaseStorageConfiguration(other) + , current_metadata(other.current_metadata->clone()) {} + bool isDataLakeConfiguration() const override { return true; } std::string getEngineName() const override { return DataLakeMetadata::name + BaseStorageConfiguration::getEngineName(); } + StorageObjectStorage::ConfigurationPtr clone() override { return std::make_shared<DataLakeConfiguration>(*this); } + void update(ObjectStoragePtr object_storage, ContextPtr local_context) override { BaseStorageConfiguration::update(object_storage, local_context); diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index 95c527c560ce..9df8631b7b2d 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -39,6 +39,19 @@ class DeltaLakeMetadata final : public IDataLakeMetadata DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + DeltaLakeMetadata(const DeltaLakeMetadata & other) + { + object_storage = other.object_storage; + data_files = other.data_files; + schema = other.schema; + partition_columns = other.partition_columns; + } + + std::unique_ptr<IDataLakeMetadata> clone() override + { + return std::make_unique<DeltaLakeMetadata>(*this); + } + Strings getDataFiles() const override { return data_files; } NamesAndTypesList getTableSchema() const override { return schema; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index 5147e358c1e1..04632af9908e 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -37,10 +37,17 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata ContextPtr context_, bool read_schema_same_as_table_schema_); + DeltaLakeMetadataDeltaKernel(const DeltaLakeMetadataDeltaKernel & other) : log(other.log), table_snapshot(other.table_snapshot) {} + bool supportsUpdate() const override { return true; } bool update(const ContextPtr & context) override; + std::unique_ptr<IDataLakeMetadata> clone() override + { + return std::make_unique<DeltaLakeMetadataDeltaKernel>(*this); + } + Strings getDataFiles() const override; NamesAndTypesList getTableSchema() const override; diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index 5b515dc1f37e..d9f0f104ef4b 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -19,6 +19,10 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + HudiMetadata(const HudiMetadata & other) + : WithContext(other.getContext()), object_storage(other.object_storage), configuration(other.configuration), data_files(other.data_files) + {} + Strings getDataFiles() const override; NamesAndTypesList getTableSchema() const override { return {}; } @@ -39,6 +43,11 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext return std::make_unique<HudiMetadata>(object_storage, configuration, local_context); } + std::unique_ptr<IDataLakeMetadata> clone() override + { + return std::make_unique<HudiMetadata>(*this); + } + protected: ObjectIterator iterate( const ActionsDAG * filter_dag, diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 1984750351bc..d64653a84d61 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -52,6 +52,8 @@ class IDataLakeMetadata : boost::noncopyable virtual std::optional<size_t> totalRows() const { return {}; } virtual std::optional<size_t> totalBytes() const { return {}; } + virtual std::unique_ptr<IDataLakeMetadata> clone() = 0; + protected: ObjectIterator createKeysIterator( Strings && data_files_, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 23b0cd04b897..f98055094cc2 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -42,6 +42,28 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext const Poco::JSON::Object::Ptr & metadata_object, IcebergMetadataFilesCachePtr cache_ptr); + IcebergMetadata(const IcebergMetadata & other) + : WithContext(other.getContext()) + , object_storage(other.object_storage) + , configuration(other.configuration) + , schema_processor(other.schema_processor.clone()) + , log(other.log) + , manifest_cache(other.manifest_cache) + , manifest_file_by_data_file(other.manifest_file_by_data_file) + , last_metadata_version(other.last_metadata_version) + , last_metadata_object(other.last_metadata_object) + , format_version(other.format_version) + , relevant_snapshot_schema_id(other.relevant_snapshot_schema_id) + , relevant_snapshot(other.relevant_snapshot) + , relevant_snapshot_id(other.relevant_snapshot_id) + , table_location(other.table_location) + {} + + std::unique_ptr<IDataLakeMetadata> clone() override + { + return std::make_unique<IcebergMetadata>(*this); + } + /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. /// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed /// without changing metadata file). Drops on every snapshot update. diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h index 311d723c83b2..736d996cb897 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h @@ -76,6 +76,24 @@ class IcebergSchemaProcessor using Node = ActionsDAG::Node; public: + IcebergSchemaProcessor() = default; + + IcebergSchemaProcessor clone() + { + std::lock_guard lock(mutex); + + IcebergSchemaProcessor ret; + + ret.iceberg_table_schemas_by_ids = iceberg_table_schemas_by_ids; + ret.clickhouse_table_schemas_by_ids = clickhouse_table_schemas_by_ids; + ret.transform_dags_by_ids = transform_dags_by_ids; + ret.clickhouse_types_by_source_ids = clickhouse_types_by_source_ids; + ret.clickhouse_ids_by_source_names = clickhouse_ids_by_source_names; + ret.current_schema_id = current_schema_id; + + return ret; + } + void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr); std::shared_ptr<NamesAndTypesList> getClickhouseTableSchemaById(Int32 id); std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id); @@ -87,6 +105,15 @@ class IcebergSchemaProcessor bool hasClickhouseTableSchemaById(Int32 id) const; private: + IcebergSchemaProcessor(const IcebergSchemaProcessor & other) + : iceberg_table_schemas_by_ids(other.iceberg_table_schemas_by_ids), + clickhouse_table_schemas_by_ids(other.clickhouse_table_schemas_by_ids), + transform_dags_by_ids(other.transform_dags_by_ids), + clickhouse_types_by_source_ids(other.clickhouse_types_by_source_ids), + clickhouse_ids_by_source_names(other.clickhouse_ids_by_source_names), + current_schema_id(other.current_schema_id) + {} + std::unordered_map<Int32, Poco::JSON::Object::Ptr> iceberg_table_schemas_by_ids; std::unordered_map<Int32, std::shared_ptr<NamesAndTypesList>> clickhouse_table_schemas_by_ids; std::map<std::pair<Int32, Int32>, std::shared_ptr<ActionsDAG>> transform_dags_by_ids; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index eea47cbf9044..edeb8da4d01b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -36,6 +36,7 @@ namespace Setting extern const SettingsMaxThreads max_threads; extern const SettingsBool optimize_count_from_files; extern const SettingsBool use_hive_partitioning; + extern const SettingsBool object_storage_treat_key_wildcard_as_star; } namespace ErrorCodes @@ -373,21 +374,38 @@ void StorageObjectStorage::read( if (update_configuration_on_read) configuration->update(object_storage, local_context); - if (partition_by && configuration->withPartitionWildcard()) + auto config_clone = configuration->clone(); + + if (config_clone->withPartitionWildcard() || config_clone->withSnowflakeIdWildcard()) { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "Reading from a partitioned {} storage is not implemented yet", - getName()); + /* + * Replace `_partition_id` and `_snowflake_id` wildcards with `*` so that any files that match this pattern can be retrieved. + */ + if (local_context->getSettingsRef()[Setting::object_storage_treat_key_wildcard_as_star]) + { + const auto path_without_partition_id_wildcard = PartitionedSink::replaceWildcards(config_clone->getPath(), "*"); + + const auto no_key_related_wildcard_path = replaceSnowflakeIdWildcard(path_without_partition_id_wildcard, "*"); + + config_clone->setPath(no_key_related_wildcard_path); + } + + if (config_clone->withPartitionWildcard() || config_clone->withSnowflakeIdWildcard()) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "Reading from a globbed path {} is not implemented yet, except for `_snowflake_id` and `_partition_id on storage {}`", + config_clone->getPath(), getName()); + } } - const auto read_from_format_info = configuration->prepareReadingFromFormat( + const auto read_from_format_info = config_clone->prepareReadingFromFormat( object_storage, column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context); const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef()[Setting::optimize_count_from_files]; auto read_step = std::make_unique<ReadFromObjectStorageStep>( object_storage, - configuration, + config_clone, fmt::format("{}({})", getName(), getStorageID().getFullTableName()), column_names, getVirtualsList(), @@ -421,7 +439,7 @@ SinkToStoragePtr StorageObjectStorage::write( configuration->getPath()); } - if (configuration->withGlobsIgnorePartitionWildcard()) + if (configuration->withGlobsIgnorePartitionWildcardAndSnowflakeIdWildcard()) { throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Path '{}' contains globs, so the table is in readonly mode", @@ -659,6 +677,13 @@ bool StorageObjectStorage::Configuration::withPartitionWildcard() const || getNamespace().find(PARTITION_ID_WILDCARD) != String::npos; } +bool StorageObjectStorage::Configuration::withSnowflakeIdWildcard() const +{ + static const String PARTITION_ID_WILDCARD = "{_snowflake_id}"; + return getPath().find(PARTITION_ID_WILDCARD) != String::npos + || getNamespace().find(PARTITION_ID_WILDCARD) != String::npos; +} + bool StorageObjectStorage::Configuration::withGlobsIgnorePartitionWildcard() const { if (!withPartitionWildcard()) @@ -666,6 +691,15 @@ bool StorageObjectStorage::Configuration::withGlobsIgnorePartitionWildcard() con return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos; } +bool StorageObjectStorage::Configuration::withGlobsIgnorePartitionWildcardAndSnowflakeIdWildcard() const +{ + const auto path_without_partition_id_wildcard = PartitionedSink::replaceWildcards(getPath(), ""); + + const auto path_without_snowflake_id_wildcard = replaceSnowflakeIdWildcard(path_without_partition_id_wildcard, ""); + + return path_without_snowflake_id_wildcard.find_first_of("*?{") != std::string::npos; +} + bool StorageObjectStorage::Configuration::isPathWithGlobs() const { return getPath().find_first_of("*?{") != std::string::npos; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 2aef633a64d4..1b41e49c87e1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -206,8 +206,10 @@ class StorageObjectStorage::Configuration ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) = 0; virtual bool withPartitionWildcard() const; + virtual bool withSnowflakeIdWildcard() const; bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); } virtual bool withGlobsIgnorePartitionWildcard() const; + virtual bool withGlobsIgnorePartitionWildcardAndSnowflakeIdWildcard() const; virtual bool isPathWithGlobs() const; virtual bool isNamespaceWithGlobs() const; virtual std::string getPathWithoutGlobs() const; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index 263188301877..5f22e564766a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -5,6 +5,7 @@ #include <Core/Settings.h> #include <Storages/ObjectStorage/Utils.h> #include <base/defines.h> +#include <Storages/ObjectStorage/Utils.h> namespace DB { @@ -20,6 +21,22 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace +{ +void validateKey(const String & str) +{ + /// See: + /// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html + /// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject + + if (str.empty() || str.size() > 1024) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size()); + + if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size())) + throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key"); +} +} + StorageObjectStorageSink::StorageObjectStorageSink( ObjectStoragePtr object_storage, ConfigurationPtr configuration, @@ -31,7 +48,11 @@ StorageObjectStorageSink::StorageObjectStorageSink( , sample_block(sample_block_) { const auto & settings = context->getSettingsRef(); - const auto path = blob_path.empty() ? configuration->getPaths().back() : blob_path; + auto path = blob_path.empty() ? configuration->getPaths().back() : blob_path; + path = fillSnowflakeIdWildcard(path); + + validateKey(path); + const auto chosen_compression_method = chooseCompressionMethod(path, configuration->getCompressionMethod()); auto buffer = object_storage->writeObject( @@ -121,6 +142,8 @@ StorageObjectStorageSink::~StorageObjectStorageSink() SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String & partition_id) { + validatePartitionKey(partition_id, true); + auto partition_bucket = replaceWildcards(configuration->getNamespace(), partition_id); validateNamespace(partition_bucket); @@ -143,21 +166,6 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String ); } -void PartitionedStorageObjectStorageSink::validateKey(const String & str) -{ - /// See: - /// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html - /// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject - - if (str.empty() || str.size() > 1024) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size()); - - if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size())) - throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key"); - - validatePartitionKey(str, true); -} - void PartitionedStorageObjectStorageSink::validateNamespace(const String & str) { configuration->validateNamespace(str); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index 97fd3d9b4179..50bc2c4299fe 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -52,7 +52,6 @@ class PartitionedStorageObjectStorageSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override; private: - void validateKey(const String & str); void validateNamespace(const String & str); ObjectStoragePtr object_storage; diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index d4f152bfd582..b76c7e4101ea 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -1,10 +1,19 @@ #include <Storages/ObjectStorage/Utils.h> #include <Disks/ObjectStorages/IObjectStorage.h> #include <Storages/ObjectStorage/StorageObjectStorage.h> +#include <Core/Settings.h> +#include <boost/algorithm/string/replace.hpp> +#include <Functions/generateSnowflakeID.h> +#include <Storages/ObjectStorage/StorageObjectStorageSink.h> namespace DB { +namespace Setting +{ + extern const SettingsBool object_storage_treat_key_wildcard_as_star; +} + namespace ErrorCodes { extern const int BAD_ARGUMENTS; @@ -51,6 +60,19 @@ void resolveSchemaAndFormat( std::string & sample_path, const ContextPtr & context) { + /* + * Replace `_partition_id` and `_snowflake_id` wildcards with `*` so that any files that match this pattern can be retrieved. + */ + auto old_path = configuration->getPath(); + if (context->getSettingsRef()[Setting::object_storage_treat_key_wildcard_as_star]) + { + const auto path_without_partition_id_wildcard = PartitionedSink::replaceWildcards(configuration->getPath(), "*"); + + const auto no_key_related_wildcard_path = replaceSnowflakeIdWildcard(path_without_partition_id_wildcard, "*"); + + configuration->setPath(no_key_related_wildcard_path); + } + if (columns.empty()) { if (configuration->getFormat() == "auto") @@ -68,6 +90,9 @@ void resolveSchemaAndFormat( configuration->setFormat(StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context)); } + // restored globbed path + configuration->setPath(old_path); + if (!columns.hasOnlyOrdinary()) { /// We don't allow special columns. @@ -77,4 +102,14 @@ void resolveSchemaAndFormat( } } +std::string replaceSnowflakeIdWildcard(const std::string & input, const std::string & snowflake_id) +{ + return boost::replace_all_copy(input, "{_snowflake_id}", snowflake_id); +} + +std::string fillSnowflakeIdWildcard(const std::string & input) +{ + return replaceSnowflakeIdWildcard(input, std::to_string(generateSnowflakeID())); +} + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 17e30babb709..268d1a1c9f5f 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -21,4 +21,8 @@ void resolveSchemaAndFormat( std::string & sample_path, const ContextPtr & context); +std::string replaceSnowflakeIdWildcard(const std::string & input, const std::string & snowflake_id); + +std::string fillSnowflakeIdWildcard(const std::string & input); + } diff --git a/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.reference b/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.reference new file mode 100644 index 000000000000..415f8dcd8f10 --- /dev/null +++ b/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.reference @@ -0,0 +1,44 @@ +test/t_03378_parquet/(2022,\'USA\')/<snowflakeid>.parquet 1 +test/t_03378_parquet/(2022,\'Canada\')/<snowflakeid>.parquet 2 +test/t_03378_parquet/(2023,\'USA\')/<snowflakeid>.parquet 3 +test/t_03378_parquet/(2023,\'Mexico\')/<snowflakeid>.parquet 4 +test/t_03378_parquet/(2024,\'France\')/<snowflakeid>.parquet 5 +test/t_03378_parquet/(2024,\'Germany\')/<snowflakeid>.parquet 6 +test/t_03378_parquet/(2024,\'Germany\')/<snowflakeid>.parquet 7 +test/t_03378_parquet/(1999,\'Brazil\')/<snowflakeid>.parquet 8 +test/t_03378_parquet/(2100,\'Japan\')/<snowflakeid>.parquet 9 +test/t_03378_parquet/(2024,\'CN\')/<snowflakeid>.parquet 10 +test/t_03378_parquet/(2025,\'\')/<snowflakeid>.parquet 11 +test/t_03378_csv/(2022,\'USA\')/<snowflakeid>.csv 1 +test/t_03378_csv/(2022,\'Canada\')/<snowflakeid>.csv 2 +test/t_03378_csv/(2023,\'USA\')/<snowflakeid>.csv 3 +test/t_03378_csv/(2023,\'Mexico\')/<snowflakeid>.csv 4 +test/t_03378_csv/(2024,\'France\')/<snowflakeid>.csv 5 +test/t_03378_csv/(2024,\'Germany\')/<snowflakeid>.csv 6 +test/t_03378_csv/(2024,\'Germany\')/<snowflakeid>.csv 7 +test/t_03378_csv/(1999,\'Brazil\')/<snowflakeid>.csv 8 +test/t_03378_csv/(2100,\'Japan\')/<snowflakeid>.csv 9 +test/t_03378_csv/(2024,\'CN\')/<snowflakeid>.csv 10 +test/t_03378_csv/(2025,\'\')/<snowflakeid>.csv 11 +test/t_03378_function/(2022,\'USA\')/<snowflakeid>.parquet 1 +test/t_03378_function/(2022,\'Canada\')/<snowflakeid>.parquet 2 +test/t_03378_function/(2023,\'USA\')/<snowflakeid>.parquet 3 +test/t_03378_function/(2023,\'Mexico\')/<snowflakeid>.parquet 4 +test/t_03378_function/(2024,\'France\')/<snowflakeid>.parquet 5 +test/t_03378_function/(2024,\'Germany\')/<snowflakeid>.parquet 6 +test/t_03378_function/(2024,\'Germany\')/<snowflakeid>.parquet 7 +test/t_03378_function/(1999,\'Brazil\')/<snowflakeid>.parquet 8 +test/t_03378_function/(2100,\'Japan\')/<snowflakeid>.parquet 9 +test/t_03378_function/(2024,\'CN\')/<snowflakeid>.parquet 10 +test/t_03378_function/(2025,\'\')/<snowflakeid>.parquet 11 +test/t_03378_function/(2022,\'USA\')/<snowflakeid>.parquet 1 +test/t_03378_function/(2022,\'Canada\')/<snowflakeid>.parquet 2 +test/t_03378_function/(2023,\'USA\')/<snowflakeid>.parquet 3 +test/t_03378_function/(2023,\'Mexico\')/<snowflakeid>.parquet 4 +test/t_03378_function/(2024,\'France\')/<snowflakeid>.parquet 5 +test/t_03378_function/(2024,\'Germany\')/<snowflakeid>.parquet 6 +test/t_03378_function/(2024,\'Germany\')/<snowflakeid>.parquet 7 +test/t_03378_function/(1999,\'Brazil\')/<snowflakeid>.parquet 8 +test/t_03378_function/(2100,\'Japan\')/<snowflakeid>.parquet 9 +test/t_03378_function/(2024,\'CN\')/<snowflakeid>.parquet 10 +test/t_03378_function/(2025,\'\')/<snowflakeid>.parquet 11 diff --git a/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql b/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql new file mode 100644 index 000000000000..214b62403e4c --- /dev/null +++ b/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql @@ -0,0 +1,44 @@ +DROP TABLE IF EXISTS t_03378_parquet, t_03378_csv; + +CREATE TABLE t_03378_parquet (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03378_parquet/{_partition_id}/{_snowflake_id}.parquet', format = Parquet) +PARTITION BY (year, country); + +INSERT INTO t_03378_parquet VALUES + (2022, 'USA', 1), + (2022, 'Canada', 2), + (2023, 'USA', 3), + (2023, 'Mexico', 4), + (2024, 'France', 5), + (2024, 'Germany', 6), + (2024, 'Germany', 7), + (1999, 'Brazil', 8), + (2100, 'Japan', 9), + (2024, 'CN', 10), + (2025, '', 11); + +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from t_03378_parquet order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; + +CREATE TABLE t_03378_csv (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03378_csv/{_partition_id}/{_snowflake_id}.csv', format = CSV) +PARTITION BY (year, country); + +INSERT INTO t_03378_csv VALUES + (2022, 'USA', 1), + (2022, 'Canada', 2), + (2023, 'USA', 3), + (2023, 'Mexico', 4), + (2024, 'France', 5), + (2024, 'Germany', 6), + (2024, 'Germany', 7), + (1999, 'Brazil', 8), + (2100, 'Japan', 9), + (2024, 'CN', 10), + (2025, '', 11); + +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.csv', '/<snowflakeid>.csv') AS _path, counter from t_03378_csv order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; + +-- s3 table function +INSERT INTO FUNCTION s3(s3_conn, filename='t_03378_function/{_partition_id}/{_snowflake_id}.parquet', format=Parquet) PARTITION BY (year, country) SELECT country, year, counter FROM t_03378_parquet SETTINGS object_storage_treat_key_wildcard_as_star=1;; +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from s3(s3_conn, filename='t_03378_function/**.parquet') order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from s3(s3_conn, filename='t_03378_function/{_partition_id}/{_snowflake_id}.parquet') order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; From 19a14b1cc7823b665117c541f953165be02476c0 Mon Sep 17 00:00:00 2001 From: Arthur Passos <arthur.ti@outlook.com> Date: Wed, 21 May 2025 12:32:14 -0300 Subject: [PATCH 2/4] rename setting and fix exception message --- src/Core/Settings.cpp | 2 +- .../ObjectStorage/StorageObjectStorage.cpp | 13 +++++-------- src/Storages/ObjectStorage/Utils.cpp | 17 +++++++++-------- src/Storages/ObjectStorage/Utils.h | 2 ++ ...378_object_storage_snowflake_id_wildcard.sql | 10 +++++----- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 7f6adaece07a..0402b0ec579e 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5952,7 +5952,7 @@ Normally this setting should be set in user profile (users.xml or queries like ` Note that initially (24.12) there was a server setting (`send_settings_to_client`), but latter it got replaced with this client setting, for better usability. )", 0) \ - DECLARE(Bool, object_storage_treat_key_wildcard_as_star, false, R"( + DECLARE(Bool, object_storage_treat_key_related_wildcards_as_star, false, R"( Upon reading from object storage (e.g, s3, azure and etc), treat {_snowflake_id} and {_partition_id} as *. This will allow symmetrical reads and writes using a single table. )", 0) \ \ diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index edeb8da4d01b..0c8d0091f81a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -36,7 +36,7 @@ namespace Setting extern const SettingsMaxThreads max_threads; extern const SettingsBool optimize_count_from_files; extern const SettingsBool use_hive_partitioning; - extern const SettingsBool object_storage_treat_key_wildcard_as_star; + extern const SettingsBool object_storage_treat_key_related_wildcards_as_star; } namespace ErrorCodes @@ -381,19 +381,16 @@ void StorageObjectStorage::read( /* * Replace `_partition_id` and `_snowflake_id` wildcards with `*` so that any files that match this pattern can be retrieved. */ - if (local_context->getSettingsRef()[Setting::object_storage_treat_key_wildcard_as_star]) + if (local_context->getSettingsRef()[Setting::object_storage_treat_key_related_wildcards_as_star]) { - const auto path_without_partition_id_wildcard = PartitionedSink::replaceWildcards(config_clone->getPath(), "*"); - - const auto no_key_related_wildcard_path = replaceSnowflakeIdWildcard(path_without_partition_id_wildcard, "*"); - - config_clone->setPath(no_key_related_wildcard_path); + config_clone->setPath(getPathWithKeyRelatedWildcardsReplacedWithStar(config_clone->getPath())); } if (config_clone->withPartitionWildcard() || config_clone->withSnowflakeIdWildcard()) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "Reading from a globbed path {} is not implemented yet, except for `_snowflake_id` and `_partition_id on storage {}`", + "Reading from a globbed path {} on storage {} is not implemented yet," + "except when the only globs are `_snowflake_id` and/or `_partition_id` with `object_storage_treat_key_related_wildcards_as_star=1`", config_clone->getPath(), getName()); } } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index b76c7e4101ea..50f19d069c6c 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -11,7 +11,7 @@ namespace DB namespace Setting { - extern const SettingsBool object_storage_treat_key_wildcard_as_star; + extern const SettingsBool object_storage_treat_key_related_wildcards_as_star; } namespace ErrorCodes @@ -63,14 +63,10 @@ void resolveSchemaAndFormat( /* * Replace `_partition_id` and `_snowflake_id` wildcards with `*` so that any files that match this pattern can be retrieved. */ - auto old_path = configuration->getPath(); - if (context->getSettingsRef()[Setting::object_storage_treat_key_wildcard_as_star]) + const auto old_path = configuration->getPath(); + if (context->getSettingsRef()[Setting::object_storage_treat_key_related_wildcards_as_star]) { - const auto path_without_partition_id_wildcard = PartitionedSink::replaceWildcards(configuration->getPath(), "*"); - - const auto no_key_related_wildcard_path = replaceSnowflakeIdWildcard(path_without_partition_id_wildcard, "*"); - - configuration->setPath(no_key_related_wildcard_path); + configuration->setPath(getPathWithKeyRelatedWildcardsReplacedWithStar(configuration->getPath())); } if (columns.empty()) @@ -112,4 +108,9 @@ std::string fillSnowflakeIdWildcard(const std::string & input) return replaceSnowflakeIdWildcard(input, std::to_string(generateSnowflakeID())); } +std::string getPathWithKeyRelatedWildcardsReplacedWithStar(const std::string & path) +{ + return replaceSnowflakeIdWildcard(PartitionedSink::replaceWildcards(path, "*"), "*"); +} + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 268d1a1c9f5f..12ec0c52cf91 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -25,4 +25,6 @@ std::string replaceSnowflakeIdWildcard(const std::string & input, const std::str std::string fillSnowflakeIdWildcard(const std::string & input); +std::string getPathWithKeyRelatedWildcardsReplacedWithStar(const std::string & path); + } diff --git a/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql b/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql index 214b62403e4c..ca51e3be2984 100644 --- a/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql +++ b/tests/queries/0_stateless/03378_object_storage_snowflake_id_wildcard.sql @@ -17,7 +17,7 @@ INSERT INTO t_03378_parquet VALUES (2024, 'CN', 10), (2025, '', 11); -select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from t_03378_parquet order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from t_03378_parquet order by counter SETTINGS object_storage_treat_key_related_wildcards_as_star=1; CREATE TABLE t_03378_csv (year UInt16, country String, counter UInt8) ENGINE = S3(s3_conn, filename = 't_03378_csv/{_partition_id}/{_snowflake_id}.csv', format = CSV) @@ -36,9 +36,9 @@ INSERT INTO t_03378_csv VALUES (2024, 'CN', 10), (2025, '', 11); -select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.csv', '/<snowflakeid>.csv') AS _path, counter from t_03378_csv order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.csv', '/<snowflakeid>.csv') AS _path, counter from t_03378_csv order by counter SETTINGS object_storage_treat_key_related_wildcards_as_star=1; -- s3 table function -INSERT INTO FUNCTION s3(s3_conn, filename='t_03378_function/{_partition_id}/{_snowflake_id}.parquet', format=Parquet) PARTITION BY (year, country) SELECT country, year, counter FROM t_03378_parquet SETTINGS object_storage_treat_key_wildcard_as_star=1;; -select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from s3(s3_conn, filename='t_03378_function/**.parquet') order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; -select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from s3(s3_conn, filename='t_03378_function/{_partition_id}/{_snowflake_id}.parquet') order by counter SETTINGS object_storage_treat_key_wildcard_as_star=1; +INSERT INTO FUNCTION s3(s3_conn, filename='t_03378_function/{_partition_id}/{_snowflake_id}.parquet', format=Parquet) PARTITION BY (year, country) SELECT country, year, counter FROM t_03378_parquet SETTINGS object_storage_treat_key_related_wildcards_as_star=1;; +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from s3(s3_conn, filename='t_03378_function/**.parquet') order by counter SETTINGS object_storage_treat_key_related_wildcards_as_star=1; +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/<snowflakeid>.parquet') AS _path, counter from s3(s3_conn, filename='t_03378_function/{_partition_id}/{_snowflake_id}.parquet') order by counter SETTINGS object_storage_treat_key_related_wildcards_as_star=1; From 20506af689b64fbfe81ff82e063be559b16f3f75 Mon Sep 17 00:00:00 2001 From: Arthur Passos <arthur.ti@outlook.com> Date: Wed, 21 May 2025 16:14:06 -0300 Subject: [PATCH 3/4] fix build issues --- programs/library-bridge/CMakeLists.txt | 2 +- programs/odbc-bridge/CMakeLists.txt | 2 +- src/Core/SettingsChangesHistory.cpp | 6 ++++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/programs/library-bridge/CMakeLists.txt b/programs/library-bridge/CMakeLists.txt index 36b84ca5cd8f..86410d712ec7 100644 --- a/programs/library-bridge/CMakeLists.txt +++ b/programs/library-bridge/CMakeLists.txt @@ -19,7 +19,7 @@ target_link_libraries(clickhouse-library-bridge PRIVATE daemon dbms bridge - clickhouse_functions_extractkeyvaluepairs + clickhouse_functions ) set_target_properties(clickhouse-library-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..) diff --git a/programs/odbc-bridge/CMakeLists.txt b/programs/odbc-bridge/CMakeLists.txt index 0d105dafc92d..97ec9cbab438 100644 --- a/programs/odbc-bridge/CMakeLists.txt +++ b/programs/odbc-bridge/CMakeLists.txt @@ -22,7 +22,7 @@ target_link_libraries(clickhouse-odbc-bridge PRIVATE dbms bridge clickhouse_parsers - clickhouse_functions_extractkeyvaluepairs + clickhouse_functions ch_contrib::nanodbc ch_contrib::unixodbc ) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index e26edbd4d424..003b22be4700 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -64,6 +64,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + + addSettingsChanges(settings_changes_history, "25.3", + { + // Altinity Antalya modifications atop of 25.3 + {"object_storage_treat_key_related_wildcards_as_star", false, false, "New setting."}, + }); addSettingsChanges(settings_changes_history, "25.2.1.20000", { // Altinity Antalya modifications atop of 25.2 From e51be4bf9caadc3d0dcdcd3a0b6ce7e32b83eff8 Mon Sep 17 00:00:00 2001 From: Arthur Passos <arthur.ti@outlook.com> Date: Thu, 22 May 2025 12:22:34 -0300 Subject: [PATCH 4/4] tmp fix --- src/Storages/ObjectStorage/StorageObjectStorage.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 0c8d0091f81a..b55c39e22ee2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -665,6 +665,7 @@ StorageObjectStorage::Configuration::Configuration(const Configuration & other) format = other.format; compression_method = other.compression_method; structure = other.structure; + storage_settings = other.storage_settings; } bool StorageObjectStorage::Configuration::withPartitionWildcard() const