From 48b2e7863804db9da8980e5044b64f648b6e6823 Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Mon, 24 Nov 2025 01:03:09 +0000 Subject: [PATCH 1/5] HMS commit implementation Partial implementation of HMS traditional commit stabilising fix issues with hms lock flow refactoring for optimistic locks address format --- Cargo.lock | 2 + crates/catalog/hms/Cargo.toml | 1 + crates/catalog/hms/src/catalog.rs | 196 ++++++++++++++++- crates/catalog/hms/src/utils.rs | 69 ++++++ crates/catalog/hms/tests/hms_catalog_test.rs | 215 ++++++++++++++++++- 5 files changed, 476 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a2f0a8206..7576675dad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3845,6 +3845,7 @@ dependencies = [ "tracing", "volo", "volo-thrift", + "whoami 1.6.1", ] [[package]] @@ -8770,6 +8771,7 @@ checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" dependencies = [ "libredox", "wasite 0.1.0", + "web-sys", ] [[package]] diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index cab9433a2f..4920ddf76f 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -53,6 +53,7 @@ linkedbytes = { workspace = true } metainfo = { workspace = true } motore-macros = { workspace = true } volo = { workspace = true } +whoami = "1.6.1" [dev-dependencies] iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 0f15890c77..ed30634b3c 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -51,6 +51,13 @@ pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered"; /// HMS Catalog warehouse location pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; +/// HMS Hive locks disabled +pub const HMS_HIVE_LOCKS_DISABLED: &str = "hive_locks_disabled"; + +/// HMS Environment Context +const HMS_EXPECTED_PARAMETER_KEY: &str = "expected_parameter_key"; +const HMS_EXPECTED_PARAMETER_VALUE: &str = "expected_parameter_value"; + /// Builder for [`HmsCatalog`]. #[derive(Debug)] pub struct HmsCatalogBuilder { @@ -192,6 +199,43 @@ impl Debug for HmsCatalog { } } +/// RAII guard for HMS table locks. Automatically releases the lock when dropped. +struct HmsLockGuard { + client: ThriftHiveMetastoreClient, + lockid: i64, +} + +impl HmsLockGuard { + async fn acquire( + client: &ThriftHiveMetastoreClient, + db_name: &str, + tbl_name: &str, + ) -> Result { + let lock = client + .lock(create_lock_request(db_name, tbl_name)) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; + + Ok(Self { + client: client.clone(), + lockid: lock.lockid, + }) + } +} + +impl Drop for HmsLockGuard { + fn drop(&mut self) { + let client = self.client.clone(); + let lockid = self.lockid; + tokio::spawn(async move { + let _ = client + .unlock(hive_metastore::UnlockRequest { lockid }) + .await; + }); + } +} + impl HmsCatalog { /// Create a new hms catalog. fn new( @@ -244,6 +288,64 @@ impl HmsCatalog { pub fn file_io(&self) -> FileIO { self.file_io.clone() } + + /// Applies a commit to a table and prepares the update for HMS. + /// # Returns + /// A tuple of (staged_table, new_hive_table) ready for HMS alter_table operation + async fn apply_and_prepare_update( + &self, + commit: TableCommit, + db_name: &str, + tbl_name: &str, + hive_table: &hive_metastore::Table, + ) -> Result<(Table, hive_metastore::Table)> { + let metadata_location = get_metadata_location(&hive_table.parameters)?; + + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; + + let cur_table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new( + NamespaceIdent::new(db_name.to_string()), + tbl_name.to_string(), + )) + .build()?; + + let staged_table = commit.apply(cur_table)?; + staged_table + .metadata() + .write_to( + staged_table.file_io(), + staged_table.metadata_location_result()?, + ) + .await?; + + let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?; + + Ok((staged_table, new_hive_table)) + } + + /// Builds an EnvironmentContext for optimistic locking with HMS. + /// + /// The context includes the expected metadata_location, which HMS will use + /// to validate that the table hasn't been modified concurrently. + fn build_environment_context(metadata_location: &str) -> hive_metastore::EnvironmentContext { + let mut env_context_properties = pilota::AHashMap::new(); + env_context_properties.insert( + HMS_EXPECTED_PARAMETER_KEY.into(), + "metadata_location".into(), + ); + env_context_properties.insert( + HMS_EXPECTED_PARAMETER_VALUE.into(), + pilota::FastStr::from_string(metadata_location.to_string()), + ); + + hive_metastore::EnvironmentContext { + properties: Some(env_context_properties), + } + } } #[async_trait] @@ -706,10 +808,94 @@ impl Catalog for HmsCatalog { )) } - async fn update_table(&self, _commit: TableCommit) -> Result { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Updating a table is not supported yet", - )) + /// Updates an existing table by applying a commit operation. + /// + /// This method supports two update strategies depending on the catalog configuration: + /// + /// **Optimistic Locking** (when `hive_locks_disabled` is set): + /// - Retrieves the current table state from HMS without acquiring locks + /// - Constructs an `EnvironmentContext` with the expected metadata location + /// - Uses `alter_table_with_environment_context` to perform an atomic + /// compare-and-swap operation. + /// - HMS will reject the update if the metadata location has changed, + /// indicating a concurrent modification + /// + /// **Traditional Locking** (default): + /// - Acquires an exclusive HMS lock on the table before making changes + /// - Retrieves the current table state + /// - Applies the commit and writes new metadata + /// - Updates the table in HMS using `alter_table` + /// - Releases the lock after the operation completes + /// + /// # Returns + /// A `Result` wrapping the updated `Table` object with new metadata. + /// + /// # Errors + /// This function may return an error in several scenarios: + /// - Failure to validate the namespace or table identifier + /// - Inability to acquire a lock (traditional locking mode) + /// - Failure to retrieve the table from HMS + /// - Errors reading or writing table metadata + /// - HMS rejects the update due to concurrent modification (optimistic locking) + /// - Errors from the underlying Thrift communication with HMS + async fn update_table(&self, commit: TableCommit) -> Result
{ + let ident = commit.identifier().clone(); + let db_name = validate_namespace(ident.namespace())?; + let tbl_name = ident.name.clone(); + + if self.config.props.contains_key(HMS_HIVE_LOCKS_DISABLED) { + // Optimistic locking path: read first, then validate with EnvironmentContext + let hive_table = self + .client + .0 + .get_table(db_name.clone().into(), tbl_name.clone().into()) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; + + let metadata_location = get_metadata_location(&hive_table.parameters)?; + let env_context = Self::build_environment_context(&metadata_location); + + let (staged_table, new_hive_table) = self + .apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table) + .await?; + + self.client + .0 + .alter_table_with_environment_context( + db_name.into(), + tbl_name.into(), + new_hive_table, + env_context, + ) + .await + .map_err(from_thrift_error)?; + + Ok(staged_table) + } else { + // Traditional locking path: acquire lock first, then read + let _guard = HmsLockGuard::acquire(&self.client.0, &db_name, &tbl_name).await?; + + let hive_table = self + .client + .0 + .get_table(db_name.clone().into(), tbl_name.clone().into()) + .await + .map(from_thrift_exception) + .map_err(from_thrift_error)??; + + let (staged_table, new_hive_table) = self + .apply_and_prepare_update(commit, &db_name, &tbl_name, &hive_table) + .await?; + + self.client + .0 + .alter_table(db_name.into(), tbl_name.into(), new_hive_table) + .await + .map_err(from_thrift_error)?; + + Ok(staged_table) + // Lock automatically released here via Drop + } } } diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index cd9b557397..0e7c694b18 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use chrono::Utc; use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor}; use iceberg::spec::Schema; +use iceberg::table::Table; use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; use pilota::{AHashMap, FastStr}; @@ -155,6 +156,54 @@ pub(crate) fn convert_to_database( Ok(db) } +pub(crate) fn update_hive_table_from_table( + hive_tbl: &hive_metastore::Table, + tbl: &Table, +) -> Result { + let mut new_tbl = hive_tbl.clone(); + let metadata = tbl.metadata(); + let schema = metadata.current_schema(); + + let hive_schema = HiveSchemaBuilder::from_iceberg(schema)?.build(); + + match new_tbl.sd.as_mut() { + Some(sd) => { + sd.cols = Some(hive_schema); + } + None => { + // Highly unlikely for a real HMS table, but be defensive + new_tbl.sd = Some(StorageDescriptor { + cols: Some(hive_schema), + ..Default::default() + }); + } + } + + let metadata_location = tbl.metadata_location_result()?.to_string(); + + let mut params: AHashMap = new_tbl.parameters.take().unwrap_or_default(); + for (k, v) in metadata.properties().iter() { + if k == METADATA_LOCATION || k == TABLE_TYPE || k == EXTERNAL { + continue; + } + params.insert( + FastStr::from_string(k.to_string()), + FastStr::from_string(v.to_string()), + ); + } + + params.insert(FastStr::from(EXTERNAL), FastStr::from("TRUE")); + params.insert(FastStr::from(TABLE_TYPE), FastStr::from("ICEBERG")); + params.insert( + FastStr::from(METADATA_LOCATION), + FastStr::from(metadata_location), + ); + + new_tbl.parameters = Some(params); + + Ok(new_tbl) +} + pub(crate) fn convert_to_hive_table( db_name: String, schema: &Schema, @@ -309,6 +358,26 @@ fn get_current_time() -> Result { }) } +pub(crate) fn create_lock_request(db_name: &str, tbl_name: &str) -> hive_metastore::LockRequest { + let component = hive_metastore::LockComponent { + r#type: hive_metastore::LockType::EXCLUSIVE, + level: hive_metastore::LockLevel::TABLE, + dbname: FastStr::from_string(db_name.to_string()), + tablename: Some(FastStr::from_string(tbl_name.to_string())), + partitionname: None, + operation_type: None, + is_acid: Some(true), + is_dynamic_partition_write: None, + }; + hive_metastore::LockRequest { + component: vec![component], + txnid: None, + user: FastStr::from(whoami::username()), + hostname: FastStr::from(whoami::fallible::hostname().unwrap()), + agent_info: None, + } +} + #[cfg(test)] mod tests { use iceberg::spec::{NestedField, PrimitiveType, TableMetadataBuilder, Type}; diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index d0e6486ad8..6dd6775543 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -27,7 +27,9 @@ use iceberg::io::{ FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY, }; -use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{ HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED, @@ -40,12 +42,23 @@ use tracing::info; type Result = std::result::Result; async fn get_catalog() -> HmsCatalog { + get_catalog_with_props(HashMap::new()).await +} + +async fn get_catalog_with_optimistic_locking() -> HmsCatalog { + use iceberg_catalog_hms::HMS_HIVE_LOCKS_DISABLED; + let mut extra_props = HashMap::new(); + extra_props.insert(HMS_HIVE_LOCKS_DISABLED.to_string(), "true".to_string()); + get_catalog_with_props(extra_props).await +} + +async fn get_catalog_with_props(extra_props: HashMap) -> HmsCatalog { set_up(); let hms_endpoint = get_hms_endpoint(); let minio_endpoint = get_minio_endpoint(); - let props = HashMap::from([ + let mut props = HashMap::from([ (HMS_CATALOG_PROP_URI.to_string(), hms_endpoint), ( HMS_CATALOG_PROP_THRIFT_TRANSPORT.to_string(), @@ -62,6 +75,9 @@ async fn get_catalog() -> HmsCatalog { (S3_PATH_STYLE_ACCESS.to_string(), "true".to_string()), ]); + // Merge in extra properties + props.extend(extra_props); + // Wait for bucket to actually exist let file_io = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { customized_credential_load: None, @@ -89,6 +105,30 @@ async fn get_catalog() -> HmsCatalog { .unwrap() } +async fn set_test_namespace(catalog: &HmsCatalog, namespace: &NamespaceIdent) -> Result<()> { + catalog.create_namespace(namespace, HashMap::new()).await?; + + Ok(()) +} + +fn set_table_creation(location: Option, name: impl ToString) -> Result { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let builder = TableCreation::builder() + .name(name.to_string()) + .properties(HashMap::new()) + .location_opt(location) + .schema(schema); + + Ok(builder.build()) +} + #[tokio::test] async fn test_get_default_namespace() -> Result<()> { let catalog = get_catalog().await; @@ -115,3 +155,174 @@ async fn test_get_default_namespace() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_namespace_exists() -> Result<()> { + let catalog = get_catalog().await; + + let ns_exists = Namespace::new(NamespaceIdent::new("default".into())); + let ns_not_exists = Namespace::new(NamespaceIdent::new("test_namespace_exists".into())); + + let result_exists = catalog.namespace_exists(ns_exists.name()).await?; + let result_not_exists = catalog.namespace_exists(ns_not_exists.name()).await?; + + assert!(result_exists); + assert!(!result_not_exists); + + Ok(()) +} + +#[tokio::test] +async fn test_update_namespace() -> Result<()> { + let catalog = get_catalog().await; + + let ns = NamespaceIdent::new("test_update_namespace".into()); + set_test_namespace(&catalog, &ns).await?; + let properties = HashMap::from([("comment".to_string(), "my_update".to_string())]); + + catalog.update_namespace(&ns, properties).await?; + + let db = catalog.get_namespace(&ns).await?; + + assert_eq!( + db.properties().get("comment"), + Some(&"my_update".to_string()) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_drop_namespace() -> Result<()> { + let catalog = get_catalog().await; + + let ns = Namespace::new(NamespaceIdent::new("delete_me".into())); + + catalog.create_namespace(ns.name(), HashMap::new()).await?; + + let result = catalog.namespace_exists(ns.name()).await?; + assert!(result); + + catalog.drop_namespace(ns.name()).await?; + + let result = catalog.namespace_exists(ns.name()).await?; + assert!(!result); + + Ok(()) +} + +#[tokio::test] +async fn test_update_table() -> Result<()> { + let catalog = get_catalog().await; + let creation = set_table_creation(None, "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("test_update_table".into())); + set_test_namespace(&catalog, namespace.name()).await?; + + let expected = catalog.create_table(namespace.name(), creation).await?; + + let table = catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(table.identifier(), expected.identifier()); + assert_eq!(table.metadata_location(), expected.metadata_location()); + assert_eq!(table.metadata(), expected.metadata()); + let original_metadata_location = table.metadata_location(); + let tx = Transaction::new(&table); + let tx = tx + .update_table_properties() + .set("test_property".to_string(), "test_value".to_string()) + .apply(tx)?; + + let updated_table = tx.commit(&catalog).await?; + + assert_eq!( + updated_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + assert_ne!( + updated_table.metadata_location(), + original_metadata_location, + "Metadata location should be updated after commit" + ); + + let reloaded_table = catalog.load_table(table.identifier()).await?; + + assert_eq!( + reloaded_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + assert_eq!( + reloaded_table.metadata_location(), + updated_table.metadata_location(), + "Reloaded table should have the same metadata location as the updated table" + ); + + Ok(()) +} + +#[tokio::test] +async fn test_update_table_with_optimistic_locking() -> Result<()> { + let catalog = get_catalog_with_optimistic_locking().await; + let creation = set_table_creation(None, "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("test_update_table_optimistic".into())); + set_test_namespace(&catalog, namespace.name()).await?; + + let expected = catalog.create_table(namespace.name(), creation).await?; + + let table = catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(table.identifier(), expected.identifier()); + assert_eq!(table.metadata_location(), expected.metadata_location()); + assert_eq!(table.metadata(), expected.metadata()); + let original_metadata_location = table.metadata_location(); + + let tx = Transaction::new(&table); + let tx = tx + .update_table_properties() + .set( + "test_property_optimistic".to_string(), + "test_value_optimistic".to_string(), + ) + .apply(tx)?; + + let updated_table = tx.commit(&catalog).await?; + + assert_eq!( + updated_table + .metadata() + .properties() + .get("test_property_optimistic"), + Some(&"test_value_optimistic".to_string()) + ); + + assert_ne!( + updated_table.metadata_location(), + original_metadata_location, + "Metadata location should be updated after commit with optimistic locking" + ); + + let reloaded_table = catalog.load_table(table.identifier()).await?; + assert_eq!( + reloaded_table + .metadata() + .properties() + .get("test_property_optimistic"), + Some(&"test_value_optimistic".to_string()) + ); + assert_eq!( + reloaded_table.metadata_location(), + updated_table.metadata_location(), + "Reloaded table should have the same metadata location as the updated table" + ); + + Ok(()) +} From b61cd81426acceb004a3c430e7fa791f11f3a25f Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Fri, 26 Dec 2025 09:32:26 +0000 Subject: [PATCH 2/5] Move whois to the main Cargo.toml and use workspace in crate --- Cargo.toml | 1 + crates/catalog/hms/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f11112109a..4644fdf12c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,5 +152,6 @@ url = "2.5.7" uuid = { version = "1.18", features = ["v7"] } volo = "0.10.6" volo-thrift = "0.10.8" +whoami = "1.6.1" zeroize = "1.7" zstd = "0.13.3" diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 4920ddf76f..3ee57c4e08 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -53,7 +53,7 @@ linkedbytes = { workspace = true } metainfo = { workspace = true } motore-macros = { workspace = true } volo = { workspace = true } -whoami = "1.6.1" +whoami = { workspace = true } [dev-dependencies] iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } From 47b1414f67ea5145c2db5fbda03055d740db08c4 Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Sat, 20 Jun 2026 18:39:12 +0100 Subject: [PATCH 3/5] Fix HMS metadata write after rebase --- crates/catalog/hms/src/catalog.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index ed30634b3c..088f106a2f 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::ToSocketAddrs; +use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; @@ -314,12 +315,11 @@ impl HmsCatalog { .build()?; let staged_table = commit.apply(cur_table)?; + let staged_metadata_location = + MetadataLocation::from_str(staged_table.metadata_location_result()?)?; staged_table .metadata() - .write_to( - staged_table.file_io(), - staged_table.metadata_location_result()?, - ) + .write_to(staged_table.file_io(), &staged_metadata_location) .await?; let new_hive_table = update_hive_table_from_table(hive_table, &staged_table)?; From d45ecf49f84765f0b8bafd917f8c74b4684689ce Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Sun, 21 Jun 2026 14:27:37 +0100 Subject: [PATCH 4/5] Address usage of runtime, remove whoami in favr of env vars and autogenerated strings --- Cargo.lock | 2 -- Cargo.toml | 1 - crates/catalog/hms/Cargo.toml | 1 - crates/catalog/hms/public-api.txt | 3 ++- crates/catalog/hms/src/catalog.rs | 1 + crates/catalog/hms/src/utils.rs | 34 +++++++++++++++++++++++++++++-- 6 files changed, 35 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7576675dad..0a2f0a8206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3845,7 +3845,6 @@ dependencies = [ "tracing", "volo", "volo-thrift", - "whoami 1.6.1", ] [[package]] @@ -8771,7 +8770,6 @@ checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" dependencies = [ "libredox", "wasite 0.1.0", - "web-sys", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4644fdf12c..f11112109a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,6 +152,5 @@ url = "2.5.7" uuid = { version = "1.18", features = ["v7"] } volo = "0.10.6" volo-thrift = "0.10.8" -whoami = "1.6.1" zeroize = "1.7" zstd = "0.13.3" diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 3ee57c4e08..cab9433a2f 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -53,7 +53,6 @@ linkedbytes = { workspace = true } metainfo = { workspace = true } motore-macros = { workspace = true } volo = { workspace = true } -whoami = { workspace = true } [dev-dependencies] iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } diff --git a/crates/catalog/hms/public-api.txt b/crates/catalog/hms/public-api.txt index 744819c437..3b947e8172 100644 --- a/crates/catalog/hms/public-api.txt +++ b/crates/catalog/hms/public-api.txt @@ -26,7 +26,7 @@ pub fn iceberg_catalog_hms::HmsCatalog::register_table<'life0, 'life1, 'async_tr pub fn iceberg_catalog_hms::HmsCatalog::rename_table<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, src: &'life1 iceberg::catalog::TableIdent, dest: &'life2 iceberg::catalog::TableIdent) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait pub fn iceberg_catalog_hms::HmsCatalog::table_exists<'life0, 'life1, 'async_trait>(&'life0 self, table: &'life1 iceberg::catalog::TableIdent) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait pub fn iceberg_catalog_hms::HmsCatalog::update_namespace<'life0, 'life1, 'async_trait>(&'life0 self, namespace: &'life1 iceberg::catalog::NamespaceIdent, properties: std::collections::hash::map::HashMap) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait -pub fn iceberg_catalog_hms::HmsCatalog::update_table<'life0, 'async_trait>(&'life0 self, _commit: iceberg::catalog::TableCommit) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait +pub fn iceberg_catalog_hms::HmsCatalog::update_table<'life0, 'async_trait>(&'life0 self, commit: iceberg::catalog::TableCommit) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait pub struct iceberg_catalog_hms::HmsCatalogBuilder impl core::default::Default for iceberg_catalog_hms::HmsCatalogBuilder pub fn iceberg_catalog_hms::HmsCatalogBuilder::default() -> Self @@ -40,5 +40,6 @@ pub fn iceberg_catalog_hms::HmsCatalogBuilder::with_storage_factory(self, storag pub const iceberg_catalog_hms::HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str pub const iceberg_catalog_hms::HMS_CATALOG_PROP_URI: &str pub const iceberg_catalog_hms::HMS_CATALOG_PROP_WAREHOUSE: &str +pub const iceberg_catalog_hms::HMS_HIVE_LOCKS_DISABLED: &str pub const iceberg_catalog_hms::THRIFT_TRANSPORT_BUFFERED: &str pub const iceberg_catalog_hms::THRIFT_TRANSPORT_FRAMED: &str diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 088f106a2f..e6dd288798 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -312,6 +312,7 @@ impl HmsCatalog { NamespaceIdent::new(db_name.to_string()), tbl_name.to_string(), )) + .runtime(self.runtime.clone()) .build()?; let staged_table = commit.apply(cur_table)?; diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 0e7c694b18..30445720e8 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -16,6 +16,9 @@ // under the License. use std::collections::HashMap; +use std::sync::OnceLock; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{env, process}; use chrono::Utc; use hive_metastore::{Database, PrincipalType, SerDeInfo, StorageDescriptor}; @@ -358,6 +361,33 @@ fn get_current_time() -> Result { }) } +fn lock_user() -> String { + env::var("USER") + .or_else(|_| env::var("USERNAME")) + .unwrap_or_else(|_| format!("iceberg-rust-user-{}", lock_identity_suffix())) +} + +fn lock_hostname() -> String { + env::var("HOSTNAME") + .or_else(|_| env::var("COMPUTERNAME")) + .unwrap_or_else(|_| format!("iceberg-rust-host-{}", lock_identity_suffix())) +} + +fn lock_identity_suffix() -> &'static str { + static LOCK_IDENTITY_SUFFIX: OnceLock = OnceLock::new(); + + LOCK_IDENTITY_SUFFIX.get_or_init(|| { + let since_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos()) + .unwrap_or_default(); + let stack_addr = &since_epoch as *const u128 as usize as u128; + let process_id = process::id() as u128; + + format!("{:x}", since_epoch ^ (process_id << 64) ^ stack_addr) + }) +} + pub(crate) fn create_lock_request(db_name: &str, tbl_name: &str) -> hive_metastore::LockRequest { let component = hive_metastore::LockComponent { r#type: hive_metastore::LockType::EXCLUSIVE, @@ -372,8 +402,8 @@ pub(crate) fn create_lock_request(db_name: &str, tbl_name: &str) -> hive_metasto hive_metastore::LockRequest { component: vec![component], txnid: None, - user: FastStr::from(whoami::username()), - hostname: FastStr::from(whoami::fallible::hostname().unwrap()), + user: FastStr::from_string(lock_user()), + hostname: FastStr::from_string(lock_hostname()), agent_info: None, } } From 142f9df0aa64bd3b7334c87849d57620b5e62bdf Mon Sep 17 00:00:00 2001 From: Vladislav Bogomolov Date: Sun, 21 Jun 2026 14:44:01 +0100 Subject: [PATCH 5/5] Modify the testsuite to acknowledge implementation of the udpate table --- crates/catalog/loader/tests/table_suite.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/catalog/loader/tests/table_suite.rs b/crates/catalog/loader/tests/table_suite.rs index cdc9b11043..45b108978d 100644 --- a/crates/catalog/loader/tests/table_suite.rs +++ b/crates/catalog/loader/tests/table_suite.rs @@ -168,10 +168,10 @@ async fn test_catalog_create_table_schema(#[case] kind: CatalogKind) -> Result<( } // Common behavior: updating table properties persists through the catalog. -// HMS is excluded because update_table is not supported yet. #[rstest] #[case::rest_catalog(CatalogKind::Rest)] #[case::glue_catalog(CatalogKind::Glue)] +#[case::hms_catalog(CatalogKind::Hms)] #[case::sql_catalog(CatalogKind::Sql)] #[case::s3tables_catalog(CatalogKind::S3Tables)] #[case::memory_catalog(CatalogKind::Memory)] @@ -211,10 +211,9 @@ async fn test_catalog_update_table_properties(#[case] kind: CatalogKind) -> Resu } // Common behavior: update_table_properties is rejected when unsupported. -#[rstest] -#[case::hms_catalog(CatalogKind::Hms)] -#[tokio::test] -async fn test_catalog_update_table_properties_unsupported(#[case] kind: CatalogKind) -> Result<()> { +// Keep this helper for future catalogs that do not implement update_table. +#[allow(dead_code)] +async fn assert_catalog_update_table_properties_unsupported(kind: CatalogKind) -> Result<()> { let Some(harness) = load_catalog(kind).await else { return Ok(()); };