Skip to content

Antalya: Any location in delta lake catalog #857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: antalya-25.3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,34 @@ void TableMetadata::setLocation(const std::string & location_)
if (!with_location)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");

location = location_;

if (!endpoint.empty())
parseLocation();
}

void TableMetadata::parseLocation() const
{
/// Location has format:
/// s3://<bucket>/path/to/table/data.
/// We want to split s3://<bucket> and path/to/table/data.

auto pos = location_.find("://");
auto pos = location.find("://");
if (pos == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Location format incompatible with endpoint override: {}", location);

auto pos_to_bucket = pos + std::strlen("://");
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
auto pos_to_path = location.substr(pos_to_bucket).find('/');

if (pos_to_path == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Location format incompatible with endpoint override: {}", location);

pos_to_path = pos_to_bucket + pos_to_path;

location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
path = location.substr(pos_to_path + 1);
bucket = location.substr(pos_to_bucket, pos_to_path - pos_to_bucket);

LOG_TEST(getLogger("TableMetadata"),
"Parsed location without path: {}, path: {}",
location_without_path, path);
LOG_TEST(getLogger("TableMetadata"), "Parsed location, path: {}", path);
}

std::string TableMetadata::getLocation() const
Expand All @@ -88,7 +93,7 @@ std::string TableMetadata::getLocation() const
if (!endpoint.empty())
return constructLocation(endpoint);

return std::filesystem::path(location_without_path) / path;
return location;
}

std::string TableMetadata::getLocationWithEndpoint(const std::string & endpoint_) const
Expand All @@ -99,24 +104,30 @@ std::string TableMetadata::getLocationWithEndpoint(const std::string & endpoint_
if (endpoint_.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Passed endpoint is empty");

if (path.empty())
parseLocation();

return constructLocation(endpoint_);
}

std::string TableMetadata::constructLocation(const std::string & endpoint_) const
{
std::string location = endpoint_;
if (location.ends_with('/'))
location.pop_back();
std::string location_ = endpoint_;
if (location_.ends_with('/'))
location_.pop_back();

if (location.ends_with(bucket))
return std::filesystem::path(location) / path / "";
if (location_.ends_with(bucket))
return std::filesystem::path(location_) / path / "";
else
return std::filesystem::path(location) / bucket / path / "";
return std::filesystem::path(location_) / bucket / path / "";
}

void TableMetadata::setEndpoint(const std::string & endpoint_)
{
endpoint = endpoint_;

if (!endpoint.empty() && path.empty())
parseLocation();
}

void TableMetadata::setSchema(const DB::NamesAndTypesList & schema_)
Expand Down Expand Up @@ -163,12 +174,12 @@ std::optional<DataLakeSpecificProperties> TableMetadata::getDataLakeSpecificProp

StorageType TableMetadata::getStorageType() const
{
return parseStorageTypeFromLocation(location_without_path);
return parseStorageTypeFromLocation(location);
}

bool TableMetadata::hasLocation() const
{
return !location_without_path.empty();
return !location.empty();
}
bool TableMetadata::hasSchema() const
{
Expand Down
10 changes: 7 additions & 3 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ class TableMetadata
std::string getReasonWhyTableIsUnreadable() const { return reason_why_table_is_not_readable; }

private:
std::string location;

/// Starts with s3://, file://, etc.
/// For example, `s3://bucket/`
std::string location_without_path;
/// Path to table's data: `/path/to/table/data/`
std::string path;
mutable std::string path;
DB::NamesAndTypesList schema;

std::string bucket;
mutable std::string bucket;
/// Endpoint is set and used in case we have non-AWS storage implementation, for example, Minio.
/// Also not all catalogs support non-AWS storages.
std::string endpoint;
Expand All @@ -102,6 +103,9 @@ class TableMetadata
bool with_datalake_specific_metadata = false;

std::string constructLocation(const std::string & endpoint_) const;

/// Parse location, get bucket and path
void parseLocation() const;
};


Expand Down
Loading