Skip to content

Backport of 81300: A few fixes for data lake cluster engines #884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: antalya-25.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/jobs/scripts/check_style/check-settings-style
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ ALL_DECLARATION_FILES="
$ROOT_PATH/src/Storages/MySQL/MySQLSettings.cpp
$ROOT_PATH/src/Storages/NATS/NATSSettings.cpp
$ROOT_PATH/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp
$ROOT_PATH/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp
$ROOT_PATH/src/Storages/ObjectStorage/StorageObjectStorageSettings.h
$ROOT_PATH/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h
$ROOT_PATH/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.cpp
$ROOT_PATH/src/Storages/RabbitMQ/RabbitMQSettings.cpp
$ROOT_PATH/src/Storages/RocksDB/RocksDBSettings.cpp
Expand Down
5 changes: 1 addition & 4 deletions src/Analyzer/Resolve/IdentifierResolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,7 @@ IdentifierResolveResult IdentifierResolver::tryResolveTableIdentifierFromDatabas
if (!storage)
return {};

if (storage->hasExternalDynamicMetadata())
{
storage->updateExternalDynamicMetadata(context);
}
storage->updateExternalDynamicMetadataIfExists(context);

if (!storage_lock)
storage_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]);
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6121,6 +6121,9 @@ Enable PRQL - an alternative to SQL.
)", EXPERIMENTAL) \
DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"(
Trigger processor to spill data into external storage adpatively. grace join is supported at present.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"(
Allow experimental delta-kernel-rs implementation.
)", EXPERIMENTAL) \
DECLARE(String, object_storage_cluster, "", R"(
Cluster to make distributed requests to object storages with alternative syntax.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{
// Altinity Antalya modifications atop of 25.3
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"allow_experimental_delta_kernel_rs", false, false, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.2.1.20000",
{
Expand Down
38 changes: 21 additions & 17 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ namespace Setting
extern const SettingsBool allow_experimental_database_glue_catalog;
extern const SettingsBool use_hive_partitioning;
}
namespace StorageObjectStorageSetting
namespace DataLakeStorageSetting
{
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
extern const DataLakeStorageSettingsString iceberg_metadata_file_path;
}

namespace ErrorCodes
Expand Down Expand Up @@ -161,7 +161,9 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
return catalog_impl;
}

std::shared_ptr<StorageObjectStorage::Configuration> DatabaseDataLake::getConfiguration(DatabaseDataLakeStorageType type) const
std::shared_ptr<StorageObjectStorage::Configuration> DatabaseDataLake::getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const
{
/// TODO: add tests for azure, local storage types.

Expand All @@ -175,24 +177,24 @@ std::shared_ptr<StorageObjectStorage::Configuration> DatabaseDataLake::getConfig
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>();
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
}
#endif
#if USE_AZURE_BLOB_STORAGE
case DB::DatabaseDataLakeStorageType::Azure:
{
return std::make_shared<StorageAzureIcebergConfiguration>();
return std::make_shared<StorageAzureIcebergConfiguration>(storage_settings);
}
#endif
#if USE_HDFS
case DB::DatabaseDataLakeStorageType::HDFS:
{
return std::make_shared<StorageHDFSIcebergConfiguration>();
return std::make_shared<StorageHDFSIcebergConfiguration>(storage_settings);
}
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalIcebergConfiguration>();
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -204,7 +206,7 @@ std::shared_ptr<StorageObjectStorage::Configuration> DatabaseDataLake::getConfig
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>();
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
}
#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS
default:
Expand All @@ -221,12 +223,12 @@ std::shared_ptr<StorageObjectStorage::Configuration> DatabaseDataLake::getConfig
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3DeltaLakeConfiguration>();
return std::make_shared<StorageS3DeltaLakeConfiguration>(storage_settings);
}
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>();
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -238,7 +240,7 @@ std::shared_ptr<StorageObjectStorage::Configuration> DatabaseDataLake::getConfig
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>();
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand All @@ -253,12 +255,12 @@ std::shared_ptr<StorageObjectStorage::Configuration> DatabaseDataLake::getConfig
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>();
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
}
#endif
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>();
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand Down Expand Up @@ -377,9 +379,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
storage_type = table_metadata.getStorageType();
}

const auto configuration = getConfiguration(storage_type);
auto storage_settings = std::make_shared<DataLakeStorageSettings>();
storage_settings->loadFromSettingsChanges(settings.allChanged());

auto storage_settings = std::make_shared<StorageObjectStorageSettings>();
if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties();
table_specific_properties.has_value())
{
Expand All @@ -394,9 +396,11 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
}
}

(*storage_settings)[DB::StorageObjectStorageSetting::iceberg_metadata_file_path] = metadata_location;
(*storage_settings)[DB::DataLakeStorageSetting::iceberg_metadata_file_path] = metadata_location;
}

const auto configuration = getConfiguration(storage_type, storage_settings);

/// HACK: Hacky-hack to enable lazy load
ContextMutablePtr context_copy = Context::createCopy(context_);
Settings settings_copy = context_copy->getSettingsCopy();
Expand All @@ -405,7 +409,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con

/// with_table_structure = false: because there will be
/// no table structure in table definition AST.
configuration->initialize(args, context_copy, /* with_table_structure */false, storage_settings);
configuration->initialize(args, context_copy, /* with_table_structure */false);

auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value;

Expand Down
6 changes: 5 additions & 1 deletion src/Databases/DataLake/DatabaseDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ class DatabaseDataLake final : public IDatabase, WithContext

void validateSettings();
std::shared_ptr<DataLake::ICatalog> getCatalog() const;
std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration(DatabaseDataLakeStorageType type) const;

std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const;

std::string getStorageEndpointForTable(const DataLake::TableMetadata & table_metadata) const;


Expand Down
12 changes: 11 additions & 1 deletion src/Databases/DataLake/DatabaseDataLakeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Databases/DataLake/DatabaseDataLakeSettings.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Common/Exception.h>

namespace DB
Expand All @@ -29,7 +30,8 @@ namespace ErrorCodes
DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_DATA_LAKE_STORAGE_SETTINGS(M, ALIAS) \

DECLARE_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
Expand Down Expand Up @@ -89,4 +91,12 @@ void DatabaseDataLakeSettings::loadFromQuery(const ASTStorage & storage_def)
}
}

SettingsChanges DatabaseDataLakeSettings::allChanged() const
{
SettingsChanges changes;
for (const auto & setting : impl->allChanged())
changes.emplace_back(setting.getName(), setting.getValue());
return changes;
}

}
9 changes: 8 additions & 1 deletion src/Databases/DataLake/DatabaseDataLakeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Core/BaseSettingsFwdMacros.h>
#include <Core/FormatFactorySettings.h>
#include <Storages/ObjectStorage/StorageObjectStorageSettings.h>
#include <Core/SettingsEnums.h>
#include <Core/SettingsFields.h>

Expand All @@ -19,7 +20,11 @@ class SettingsChanges;
M(CLASS_NAME, Bool) \
M(CLASS_NAME, DatabaseDataLakeCatalogType) \

DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseDataLakeSettings, DECLARE_SETTING_TRAIT)
#define LIST_OF_DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M)

LIST_OF_DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseDataLakeSettings, DECLARE_SETTING_TRAIT)

struct DatabaseDataLakeSettings
{
Expand All @@ -34,6 +39,8 @@ struct DatabaseDataLakeSettings

void applyChanges(const SettingsChanges & changes);

SettingsChanges allChanged() const;

private:
std::unique_ptr<DatabaseDataLakeSettingsImpl> impl;
};
Expand Down
1 change: 1 addition & 0 deletions src/Databases/enableAllExperimentalSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
context->setSetting("allow_not_comparable_types_in_order_by", 1);
context->setSetting("allow_experimental_database_unity_catalog", 1);
context->setSetting("allow_experimental_database_glue_catalog", 1);
context->setSetting("allow_experimental_delta_kernel_rs", 1);

/// clickhouse-private settings
context->setSetting("allow_experimental_shared_set_join", 1);
Expand Down
14 changes: 7 additions & 7 deletions src/Interpreters/InterpreterDescribeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ void InterpreterDescribeQuery::fillColumnsFromTableFunction(const ASTTableExpres

void InterpreterDescribeQuery::fillColumnsFromTable(const ASTTableExpression & table_expression)
{
auto table_id = getContext()->resolveStorageID(table_expression.database_and_table_name);
getContext()->checkAccess(AccessType::SHOW_COLUMNS, table_id);
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->hasExternalDynamicMetadata())
{
table->updateExternalDynamicMetadata(getContext());
}
auto query_context = getContext();
auto table_id = query_context->resolveStorageID(table_expression.database_and_table_name);
query_context->checkAccess(AccessType::SHOW_COLUMNS, table_id);

auto table = DatabaseCatalog::instance().getTable(table_id, query_context);

table->updateExternalDynamicMetadataIfExists(query_context);

auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings[Setting::lock_acquire_timeout]);

Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(

if (storage)
{
if (storage->hasExternalDynamicMetadata())
if (storage->updateExternalDynamicMetadataIfExists(context))
{
storage->updateExternalDynamicMetadata(context);
metadata_snapshot = storage->getInMemoryMetadataPtr();
}

table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]);
table_id = storage->getStorageID();

Expand Down
5 changes: 0 additions & 5 deletions src/Storages/IStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ std::optional<IStorage::AlterLockHolder> IStorage::tryLockForAlter(const std::ch
return lock;
}

void IStorage::updateExternalDynamicMetadata(ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateExternalDynamicMetadata is not supported by storage {}", getName());
}

IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds & acquire_timeout)
{
auto lock = tryLockForAlter(acquire_timeout);
Expand Down
6 changes: 2 additions & 4 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
/// Returns true if the storage is dictionary
virtual bool isDictionary() const { return false; }

/// Returns true if the metadata of a table can be changed normally by other processes
virtual bool hasExternalDynamicMetadata() const { return false; }

/// Returns true if the storage supports queries with the SAMPLE section.
virtual bool supportsSampling() const { return getInMemoryMetadataPtr()->hasSamplingKey(); }

Expand Down Expand Up @@ -488,7 +485,8 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
virtual void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder);

/// Updates metadata that can be changed by other processes
virtual void updateExternalDynamicMetadata(ContextPtr);
/// Return true if external metadata exists and was updated.
virtual bool updateExternalDynamicMetadataIfExists(ContextPtr /* context */) { return false; }

/** Checks that alter commands can be applied to storage. For example, columns can be modified,
* or primary key can be changes, etc.
Expand Down
Loading
Loading