diff --git a/Cargo.lock b/Cargo.lock index 171148905..7c08f0506 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3654,9 +3654,9 @@ dependencies = [ [[package]] name = "mpl-core" -version = "0.10.0" +version = "0.10.1-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73af4f9f58790d71570faa6cc6bd5be1b2816d6ec3f09d9a1554369a143a0e08" +checksum = "350891ff368c0bf447fcd0db7a2a4a4a2784095074f246dea58446a7da2420af" dependencies = [ "base64 0.22.1", "borsh 0.10.4", diff --git a/Cargo.toml b/Cargo.toml index 99e88e38c..1a355cf51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ migration = { path = "migration" } mime_guess = "2.0.4" mpl-bubblegum = "2.0.0" mpl-account-compression = "0.4.2" -mpl-core = {version = "0.10.0", features = ["serde"]} +mpl-core = {version = "=0.10.1-alpha.2", features = ["serde"]} mpl-noop = "0.2.1" mpl-token-metadata = "4.1.1" nft_ingester = { path = "nft_ingester" } diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index a684dfb2c..cc3ac7d2f 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -364,7 +364,17 @@ pub async fn get_related_for_assets( let grouping_base_query = asset_grouping::Entity::find() .filter(asset_grouping::Column::AssetId.is_in(ids.clone())) - .filter(asset_grouping::Column::GroupValue.is_not_null()) + .filter( + Condition::any() + // Include collection groupings only if they have a non-null value + .add( + asset_grouping::Column::GroupKey + .eq("collection") + .and(asset_grouping::Column::GroupValue.is_not_null()), + ) + // Include all other groupings (regardless of value) + .add(asset_grouping::Column::GroupKey.ne("collection")), + ) .filter(cond) .order_by_asc(asset_grouping::Column::AssetId); @@ -400,6 +410,11 @@ pub async fn get_related_for_assets( } }; + // Filter out stale groupings from each asset after groups have been populated. + for (_id, asset) in assets_map.iter_mut() { + filter_out_stale_asset_groupings(&mut asset.groups); + } + Ok(assets_map.into_iter().map(|(_, v)| v).collect()) } @@ -480,7 +495,17 @@ pub async fn get_by_id( let grouping_query = asset_grouping::Entity::find() .filter(asset_grouping::Column::AssetId.eq(asset.id.clone())) - .filter(asset_grouping::Column::GroupValue.is_not_null()) + .filter( + Condition::any() + // Include collection groupings only if they have a non-null value + .add( + asset_grouping::Column::GroupKey + .eq("collection") + .and(asset_grouping::Column::GroupValue.is_not_null()), + ) + // Include all other groupings (regardless of value) + .add(asset_grouping::Column::GroupKey.ne("collection")), + ) .filter( Condition::any() .add(asset_grouping::Column::Verified.eq(true)) @@ -490,7 +515,7 @@ pub async fn get_by_id( ) .order_by_asc(asset_grouping::Column::AssetId); - let groups = if options.show_collection_metadata { + let mut groups = if options.show_collection_metadata { grouping_query .find_also_related(asset_data::Entity) .all(conn) @@ -504,6 +529,8 @@ pub async fn get_by_id( .collect::>() }; + filter_out_stale_asset_groupings(&mut groups); + Ok(FullAsset { asset, data, @@ -636,6 +663,26 @@ fn filter_out_stale_creators(creators: &mut Vec) { } } +fn filter_out_stale_asset_groupings( + asset_groupings_with_date: &mut Vec<(asset_grouping::Model, Option)>, +) { + // For core assets, any asset groupings that do not have the max + // `slot_updated` value are stale and should be removed. + // However, only filter out groupings where group_key is "group". + // All other groupings should be kept. + let max_slot_updated = asset_groupings_with_date + .iter() + .filter(|ag| ag.0.group_key == "group") + .map(|ag| ag.0.slot_updated) + .max(); + if let Some(max_slot_updated) = max_slot_updated { + asset_groupings_with_date.retain(|ag| { + ag.0.group_key != "group" + || (ag.0.slot_updated == max_slot_updated && ag.0.group_value.is_some()) + }); + } +} + pub async fn get_token_accounts( conn: &impl ConnectionTrait, owner_address: Option>, diff --git a/integration_tests/.gitignore b/integration_tests/.gitignore new file mode 100644 index 000000000..2eea525d8 --- /dev/null +++ b/integration_tests/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/migration/src/lib.rs b/migration/src/lib.rs index c2dd490e7..082e2d081 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -51,6 +51,7 @@ mod m20250313_105206_add_idx_ta_owner_amount_and_idx_ta_mint_amount; mod m20250321_120101_add_bgum_leaf_schema_v2_items; mod m20250327_120101_add_bubblegum_v2_ixs_to_enum; mod m20250702_120101_add_bubblegum_v2_enum_vals; +mod m20251008_145049_fix_asset_grouping_constraints; pub mod model; @@ -111,6 +112,7 @@ impl MigratorTrait for Migrator { Box::new(m20250321_120101_add_bgum_leaf_schema_v2_items::Migration), Box::new(m20250327_120101_add_bubblegum_v2_ixs_to_enum::Migration), Box::new(m20250702_120101_add_bubblegum_v2_enum_vals::Migration), + Box::new(m20251008_145049_fix_asset_grouping_constraints::Migration), ] } } diff --git a/migration/src/m20251008_145049_fix_asset_grouping_constraints.rs b/migration/src/m20251008_145049_fix_asset_grouping_constraints.rs new file mode 100644 index 000000000..e2e89f320 --- /dev/null +++ b/migration/src/m20251008_145049_fix_asset_grouping_constraints.rs @@ -0,0 +1,88 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +use crate::model::table::AssetGrouping; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Drop the existing unique constraint on (asset_id, group_key) + manager + .drop_index( + sea_query::Index::drop() + .name("asset_grouping_key_unique") + .table(AssetGrouping::Table) + .to_owned(), + ) + .await?; + + // Create partial unique index for 'collection' group_key only + // This maintains the constraint that each asset can only have one collection + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "CREATE UNIQUE INDEX IF NOT EXISTS asset_grouping_collection_unique \ + ON asset_grouping (asset_id, group_key) \ + WHERE group_key = 'collection'" + .to_string(), + )) + .await?; + + // Create unique constraint for non-collection group keys + // This allows multiple groups per asset but prevents duplicate group values + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "CREATE UNIQUE INDEX IF NOT EXISTS asset_grouping_other_unique \ + ON asset_grouping (asset_id, group_key, group_value) \ + WHERE group_key != 'collection'" + .to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Drop the new partial indexes + manager + .drop_index( + sea_query::Index::drop() + .name("asset_grouping_collection_unique") + .table(AssetGrouping::Table) + .to_owned(), + ) + .await?; + + manager + .drop_index( + sea_query::Index::drop() + .name("asset_grouping_other_unique") + .table(AssetGrouping::Table) + .to_owned(), + ) + .await?; + + // Recreate the original unique constraint + manager + .create_index( + sea_query::Index::create() + .unique() + .name("asset_grouping_key_unique") + .col(AssetGrouping::AssetId) + .col(AssetGrouping::GroupKey) + .table(AssetGrouping::Table) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/program_transformers/src/bubblegum/db.rs b/program_transformers/src/bubblegum/db.rs index ba5258f4c..e2c25bfcc 100644 --- a/program_transformers/src/bubblegum/db.rs +++ b/program_transformers/src/bubblegum/db.rs @@ -407,25 +407,18 @@ where ..Default::default() }; - let mut query = asset_grouping::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, - asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); + let mut query = asset_grouping::Entity::insert(model).build(DbBackend::Postgres); + // Use index inference for partial unique indexes + // For group_key = 'collection', we use the partial unique index on (asset_id, group_key) WHERE group_key = 'collection' // Do not overwrite changes that happened after decompression (asset_grouping.group_info_seq = 0). query.sql = format!( - "{} WHERE (asset_grouping.group_info_seq != 0 AND excluded.group_info_seq >= asset_grouping.group_info_seq) OR asset_grouping.group_info_seq IS NULL", + "{} ON CONFLICT (asset_id, group_key) WHERE (group_key = 'collection') DO UPDATE SET \ + group_value = EXCLUDED.group_value, \ + verified = EXCLUDED.verified, \ + slot_updated = EXCLUDED.slot_updated, \ + group_info_seq = EXCLUDED.group_info_seq \ + WHERE (asset_grouping.group_info_seq != 0 AND excluded.group_info_seq >= asset_grouping.group_info_seq) OR asset_grouping.group_info_seq IS NULL", query.sql ); diff --git a/program_transformers/src/mpl_core_program/v1_asset.rs b/program_transformers/src/mpl_core_program/v1_asset.rs index 943f7b99b..06e90acdf 100644 --- a/program_transformers/src/mpl_core_program/v1_asset.rs +++ b/program_transformers/src/mpl_core_program/v1_asset.rs @@ -26,8 +26,7 @@ use { entity::{ActiveValue, ColumnTrait, EntityTrait}, prelude::*, query::{JsonValue, QueryFilter, QuerySelect, QueryTrait}, - sea_query::query::OnConflict, - sea_query::Expr, + sea_query::{query::OnConflict, Expr}, ConnectionTrait, CursorTrait, DbBackend, FromQueryResult, TransactionTrait, }, serde_json::{value::Value, Map}, @@ -448,7 +447,64 @@ pub async fn save_v1_asset( .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } - // Commit the database transaction. + //----------------------- + // asset_grouping table - groups plugin + //----------------------- + + let plugin_groups = asset.plugins.get(&PluginType::Groups).and_then(|plugin| { + if let Plugin::Groups(g) = &plugin.data { + Some(&g.groups) + } else { + None + } + }); + + let empty_groups = Vec::new(); + let groups = plugin_groups.unwrap_or(&empty_groups); + + // If there are no groups or groups plugin is not present, insert a group with no value. + let group_entities = if groups.is_empty() { + vec![asset_grouping::ActiveModel { + asset_id: ActiveValue::Set(id_vec.clone()), + group_key: ActiveValue::Set("group".to_string()), + group_value: ActiveValue::Set(None), + slot_updated: ActiveValue::Set(Some(slot_i)), + verified: ActiveValue::Set(true), + group_info_seq: ActiveValue::Set(Some(0)), + ..Default::default() + }] + } else { + groups + .iter() + .map(|group| asset_grouping::ActiveModel { + asset_id: ActiveValue::Set(id_vec.clone()), + group_key: ActiveValue::Set("group".to_string()), + group_value: ActiveValue::Set(Some(group.to_string())), + slot_updated: ActiveValue::Set(Some(slot_i)), + verified: ActiveValue::Set(true), + group_info_seq: ActiveValue::Set(Some(0)), + ..Default::default() + }) + .collect::>() + }; + + let mut query = asset_grouping::Entity::insert_many(group_entities).build(DbBackend::Postgres); + + // Use index inference for partial unique indexes + // For group_key = 'group', we use the partial unique index on (asset_id, group_key, group_value) WHERE group_key != 'collection' + query.sql = format!( + "{} ON CONFLICT (asset_id, group_key, group_value) WHERE (group_key != 'collection') DO UPDATE SET \ + slot_updated = EXCLUDED.slot_updated, \ + verified = EXCLUDED.verified, \ + group_info_seq = EXCLUDED.group_info_seq \ + WHERE excluded.slot_updated >= asset_grouping.slot_updated OR asset_grouping.slot_updated IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + txn.commit().await?; // Return early if there is no URI. diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs index 70634f5bc..60eab012a 100644 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -342,23 +342,17 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(Some(slot_i)), ..Default::default() }; - let mut query = asset_grouping::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupValue, - asset_grouping::Column::Verified, - asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); + let mut query = asset_grouping::Entity::insert(model).build(DbBackend::Postgres); + + // Use index inference for partial unique indexes + // For group_key = 'collection', we use the partial unique index on (asset_id, group_key) WHERE group_key = 'collection' query.sql = format!( - "{} WHERE excluded.slot_updated > asset_grouping.slot_updated", + "{} ON CONFLICT (asset_id, group_key) WHERE (group_key = 'collection') DO UPDATE SET \ + group_value = EXCLUDED.group_value, \ + verified = EXCLUDED.verified, \ + slot_updated = EXCLUDED.slot_updated, \ + group_info_seq = EXCLUDED.group_info_seq \ + WHERE excluded.slot_updated > asset_grouping.slot_updated", query.sql ); txn.execute(query)