diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index bcf3aef8b26c..e046c342c6b6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -106,10 +107,16 @@ std::string normalizeUuid(const std::string & uuid) } Poco::JSON::Object::Ptr -readJSON(const String & metadata_file_path, ObjectStoragePtr object_storage, const ContextPtr & local_context, LoggerPtr log) +readJSON(const String & metadata_file_path, ObjectStoragePtr object_storage, const ContextPtr & local_context, LoggerPtr log, CompressionMethod compression_method) { ObjectInfo object_info(metadata_file_path); - auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log); + auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log); + + std::unique_ptr buf; + if (compression_method != CompressionMethod::None) + buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method); + else + buf = std::move(source_buf); String json_str; readJSONObjectPossiblyInvalid(json_str, *buf); @@ -263,7 +270,30 @@ Int32 IcebergMetadata::parseTableSchema( } } -static std::pair getMetadataFileAndVersion(const std::string & path) +struct MetadataFileWithInfo +{ + Int32 version; + String path; + CompressionMethod compression_method; +}; + +static CompressionMethod getCompressionMethodFromMetadataFile(const String & path) +{ + constexpr std::string_view metadata_suffix = ".metadata.json"; + + auto compression_method = chooseCompressionMethod(path, "auto"); + + /// NOTE you will be surprised, but some metadata files store compression not in the end of the file name, + /// but somewhere in the middle of the file name, before metadata.json suffix. + /// Maybe history of Iceberg metadata files is not so long, but it is already full of surprises. + /// Example of weird engineering decisions: 00000-85befd5a-69c7-46d4-bca6-cfbd67f0f7e6.gz.metadata.json + if (compression_method == CompressionMethod::None && path.ends_with(metadata_suffix)) + compression_method = chooseCompressionMethod(path.substr(0, path.size() - metadata_suffix.size()), "auto"); + + return compression_method; +} + +static MetadataFileWithInfo getMetadataFileAndVersion(const std::string & path) { String file_name(path.begin() + path.find_last_of('/') + 1, path.end()); String version_str; @@ -278,7 +308,10 @@ static std::pair getMetadataFileAndVersion(const std::string & pa throw Exception( ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); - return std::make_pair(std::stoi(version_str), path); + return MetadataFileWithInfo{ + .version = std::stoi(version_str), + .path = path, + .compression_method = getCompressionMethodFromMetadataFile(path)}; } enum class MostRecentMetadataFileSelectionWay @@ -289,7 +322,7 @@ enum class MostRecentMetadataFileSelectionWay struct ShortMetadataFileInfo { - UInt32 version; + Int32 version; UInt64 last_updated_ms; String path; }; @@ -301,7 +334,7 @@ struct ShortMetadataFileInfo * 1) v.metadata.json, where V - metadata version. * 2) -.metadata.json, where V - metadata version */ -static std::pair getLatestMetadataFileAndVersion( +static MetadataFileWithInfo getLatestMetadataFileAndVersion( const ObjectStoragePtr & object_storage, const StorageObjectStorage::Configuration & configuration, const ContextPtr & local_context, @@ -324,10 +357,10 @@ static std::pair getLatestMetadataFileAndVersion( metadata_files_with_versions.reserve(metadata_files.size()); for (const auto & path : metadata_files) { - auto [version, metadata_file_path] = getMetadataFileAndVersion(path); + auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path); if (need_all_metadata_files_parsing) { - auto metadata_file_object = readJSON(metadata_file_path, object_storage, local_context, log); + auto metadata_file_object = readJSON(metadata_file_path, object_storage, local_context, log, compression_method); if (table_uuid.has_value()) { if (metadata_file_object->has("table-uuid")) @@ -377,10 +410,11 @@ static std::pair getLatestMetadataFileAndVersion( [](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; }); } }(); - return {latest_metadata_file_info.version, latest_metadata_file_info.path}; + + return {latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)}; } -static std::pair getLatestOrExplicitMetadataFileAndVersion( +static MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( const ObjectStoragePtr & object_storage, const StorageObjectStorage::Configuration & configuration, const ContextPtr & local_context, @@ -425,14 +459,14 @@ bool IcebergMetadata::update(const ContextPtr & local_context) { auto configuration_ptr = configuration.lock(); - const auto [metadata_version, metadata_file_path] + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get()); bool metadata_file_changed = false; if (last_metadata_version != metadata_version) { last_metadata_version = metadata_version; - last_metadata_object = ::DB::readJSON(metadata_file_path, object_storage, local_context, log); + last_metadata_object = ::DB::readJSON(metadata_file_path, object_storage, local_context, log, compression_method); metadata_file_changed = true; } @@ -594,12 +628,18 @@ DataLakeMetadataPtr IcebergMetadata::create( else LOG_TRACE(log, "Not using in-memory cache for iceberg metadata files, because the setting use_iceberg_metadata_files_cache is false."); - const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get()); + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get()); auto create_fn = [&]() { ObjectInfo object_info(metadata_file_path); // NOLINT - auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log); + auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log); + + std::unique_ptr buf; + if (compression_method != CompressionMethod::None) + buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method); + else + buf = std::move(source_buf); String json_str; readJSONObjectPossiblyInvalid(json_str, *buf); diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 2b4e71291dfe..556538ec0ce1 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1,5 +1,6 @@ import logging import os +import subprocess import uuid import time from datetime import datetime, timezone @@ -3247,3 +3248,43 @@ def check_validity_and_get_prunned_files(select_expression): for query in queries: assert check_validity_and_get_prunned_files(query) > 0 + + +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_compressed_metadata(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_compressed_metadata_" + storage_type + "_" + get_uuid_str() + + table_properties = { + "write.metadata.compression": "gzip" + } + + df = spark.createDataFrame([ + (1, "Alice"), + (2, "Bob") + ], ["id", "name"]) + + # for some reason write.metadata.compression is not working :( + df.writeTo(TABLE_NAME) \ + .tableProperty("write.metadata.compression", "gzip") \ + .using("iceberg") \ + .create() + + # manual compression of metadata file before upload, still test some scenarios + subprocess.check_output(f"gzip /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json", shell=True) + + # Weird but compression extension is really in the middle of the file name, not in the end... + subprocess.check_output(f"mv /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json.gz /iceberg_data/default/{TABLE_NAME}/metadata/v1.gz.metadata.json", shell=True) + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="") + + assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n" +