diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index d2afc528505d..1d3a43cacd50 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -70,13 +70,19 @@ void TableMetadata::setLocation(const std::string & location_) auto pos_to_path = location_.substr(pos_to_bucket).find('/'); if (pos_to_path == std::string::npos) - throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); - - pos_to_path = pos_to_bucket + pos_to_path; + { // empty path + location_without_path = location_; + path.clear(); + bucket = location_.substr(pos_to_bucket); + } + else + { + pos_to_path = pos_to_bucket + pos_to_path; - location_without_path = location_.substr(0, pos_to_path); - path = location_.substr(pos_to_path + 1); - bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + location_without_path = location_.substr(0, pos_to_path); + path = location_.substr(pos_to_path + 1); + bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + } LOG_TEST(getLogger("TableMetadata"), "Parsed location without path: {}, path: {}", diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 4c2be7f4673d..57eef9328683 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -379,7 +379,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const auto bucket_uri = getURIForBucket(bucket); if (!bucket_uri) { - if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value()) + if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value()) return *maybe_error; if (auto region = getRegionForBucket(bucket); !region.empty()) @@ -584,7 +584,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const if (auto uri = getURIForBucket(bucket); uri.has_value()) request.overrideURI(std::move(*uri)); - bool found_new_endpoint = false; // if we found correct endpoint after 301 responses, update the cache for future requests SCOPE_EXIT( @@ -864,12 +863,15 @@ std::optional Client::getURIFromError(const Aws::S3::S3Error & error) c } // Do a list request because head requests don't have body in response -std::optional Client::updateURIForBucketForHead(const std::string & bucket) const +// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject +std::optional Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const { - ListObjectsV2Request req; + GetObjectRequest req; req.SetBucket(bucket); - req.SetMaxKeys(1); - auto result = ListObjectsV2(req); + req.SetKey(key); + req.SetRange("bytes=0-1"); + auto result = GetObject(req); + if (result.IsSuccess()) return std::nullopt; return result.GetError(); diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index 33ae0435e90c..db5f2c87217c 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const; std::optional getURIFromError(const Aws::S3::S3Error & error) const; - std::optional updateURIForBucketForHead(const std::string & bucket) const; + std::optional updateURIForBucketForHead(const std::string & bucket, const std::string & key) const; std::optional getURIForBucket(const std::string & bucket) const; diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index cfcdd21a5b90..10ca3d647e06 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) validateKey(key, uri); } +bool URI::isAWSRegion(std::string_view region) +{ + /// List from https://docs.aws.amazon.com/general/latest/gr/s3.html + static const std::unordered_set regions = { + "us-east-2", + "us-east-1", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-2", + "ap-southeast-3", + "ap-southeast-5", + "ap-southeast-4", + "ap-south-1", + "ap-northeast-3", + "ap-northeast-2", + "ap-southeast-1", + "ap-southeast-2", + "ap-east-2", + "ap-southeast-7", + "ap-northeast-1", + "ca-central-1", + "ca-west-1", + "eu-central-1", + "eu-west-1", + "eu-west-2", + "eu-south-1", + "eu-west-3", + "eu-south-2", + "eu-north-1", + "eu-central-2", + "il-central-1", + "mx-central-1", + "me-south-1", + "me-central-1", + "sa-east-1", + "us-gov-east-1", + "us-gov-west-1" + }; + + /// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2' + /// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility + if (region.substr(0, 3) == "s3-") + region = region.substr(3); + + return regions.contains(region); +} + void URI::addRegionToURI(const std::string ®ion) { if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos) + { + if (pos > 0) + { /// Check if region is already in endpoint to avoid add it second time + auto prev_pos = endpoint.find_last_of("/.", pos - 1); + if (prev_pos == std::string::npos) + prev_pos = 0; + else + ++prev_pos; + std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos); + if (isAWSRegion(endpoint_region)) + return; + } endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos); + } } void URI::validateBucket(const String & bucket, const Poco::URI & uri) diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index d455cd1908f0..d3b7e04b265a 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -42,6 +42,10 @@ struct URI static void validateBucket(const std::string & bucket, const Poco::URI & uri); static void validateKey(const std::string & key, const Poco::URI & uri); + /// Returns true if 'region' string is an AWS S3 region + /// https://docs.aws.amazon.com/general/latest/gr/s3.html + static bool isAWSRegion(std::string_view region); + private: std::pair> getURIAndArchivePattern(const std::string & source); }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 7e318aed3906..6b1145779df0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -515,7 +515,7 @@ void IcebergMetadata::updateSnapshot(Poco::JSON::Object::Ptr metadata_object) relevant_snapshot = IcebergSnapshot{ getManifestList(getProperFilePathFromMetadataInfo( - snapshot->getValue(f_manifest_list), configuration_ptr->getPath(), table_location)), + snapshot->getValue(f_manifest_list), configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace())), relevant_snapshot_id, total_rows, total_bytes}; if (!snapshot->has(f_schema_id)) @@ -678,7 +678,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(const String & filename) for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i) { const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet(); - const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location); + const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace()); Int64 added_sequence_number = 0; if (format_version > 1) added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet(); @@ -805,6 +805,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64 schema_processor, inherited_sequence_number, table_location, + configuration_ptr->getNamespace(), getContext()); }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index a295071f2600..6a307a47c9b8 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -127,6 +127,7 @@ ManifestFileContent::ManifestFileContent( const IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, const String & table_location, + const String & common_namespace, DB::ContextPtr context) { this->schema_id = schema_id_; @@ -191,7 +192,11 @@ ManifestFileContent::ManifestFileContent( } const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet()); - const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), common_path, table_location); + const auto file_path = getProperFilePathFromMetadataInfo( + manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), + common_path, + table_location, + common_namespace); /// NOTE: This is weird, because in manifest file partition looks like this: /// { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index 0fc613c65946..9d49c9ef548d 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -94,6 +94,7 @@ class ManifestFileContent const DB::IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, const std::string & table_location, + const std::string & common_namespace, DB::ContextPtr context); const std::vector & getFiles() const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 56d7b9565fb1..df2a8007e74c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -28,7 +28,11 @@ using namespace DB; // This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files. // For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro // Common path should end with "" or "/". -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location) +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace) { auto trim_backward_slash = [](std::string_view str) -> std::string_view { @@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s } else { - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path); + /// Data files can have different path + pos = data_path.find("://"); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + pos = data_path.find("/", pos + 3); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + if (data_path.substr(pos + 1).starts_with(common_namespace)) + { + auto new_pos = data_path.find("/", pos + 1); + if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path + pos = new_pos; + } + return std::string(data_path.substr(pos)); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 432751be8832..300df4492aa6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -10,7 +10,11 @@ namespace Iceberg { -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location); +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace); }