Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 252 additions & 12 deletions magicblock-chainlink/src/chainlink/fetch_cloner/ata_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use dlp_api::state::DelegationRecord;
use futures_util::future::join_all;
use magicblock_accounts_db::traits::AccountsBank;
use magicblock_core::token_programs::{
is_ata, try_derive_ata_address_and_bump, try_derive_eata_address_and_bump,
AtaInfo,
is_ata, try_derive_eata_address_and_bump, try_derive_supported_ata_pubkeys,
AtaInfo, EphemeralAta, MaybeIntoAta, EATA_PROGRAM_ID,
};
use magicblock_metrics::metrics;
use solana_account::{AccountSharedData, ReadableAccount};
Expand All @@ -22,8 +22,8 @@ use super::{
use crate::{
cloner::{AccountCloneRequest, Cloner, DelegationActions},
remote_account_provider::{
ChainPubsubClient, ChainRpcClient, ResolvedAccountSharedData,
SubscriptionReason,
ChainPubsubClient, ChainRpcClient, MatchSlotsConfig, RemoteAccount,
ResolvedAccountSharedData, SubscriptionReason,
},
};

Expand Down Expand Up @@ -58,18 +58,257 @@ fn ata_info_from_layout(

let mint = Pubkey::new_from_array(data[0..32].try_into().ok()?);
let wallet_owner = Pubkey::new_from_array(data[32..64].try_into().ok()?);
let (derived_ata, _) =
try_derive_ata_address_and_bump(&wallet_owner, &mint)?;
if derived_ata != *ata_pubkey {
let ata_pubkeys = try_derive_supported_ata_pubkeys(&wallet_owner, &mint);
if ata_pubkeys.contains(ata_pubkey) {
return Some(AtaInfo {
mint,
owner: wallet_owner,
});
}

None
}

pub(crate) fn is_known_empty_eata<T, U, V, C>(
this: &FetchCloner<T, U, V, C>,
eata_pubkey: &Pubkey,
) -> bool
where
T: ChainRpcClient,
U: ChainPubsubClient,
V: AccountsBank,
C: Cloner,
{
this.known_empty_eatas.lock().get(eata_pubkey).is_some()
}

pub(crate) fn mark_eata_empty<T, U, V, C>(
this: &FetchCloner<T, U, V, C>,
eata_pubkey: Pubkey,
) where
T: ChainRpcClient,
U: ChainPubsubClient,
V: AccountsBank,
C: Cloner,
{
this.known_empty_eatas.lock().put(eata_pubkey, ());
}

pub(crate) fn maybe_build_projected_ata_clone_request_from_eata_sub_update<
T,
U,
V,
C,
>(
this: &FetchCloner<T, U, V, C>,
eata_pubkey: Pubkey,
eata_account: &AccountSharedData,
deleg_record: Option<&DelegationRecord>,
delegation_actions: &DelegationActions,
) -> Option<AccountCloneRequest>
where
T: ChainRpcClient,
U: ChainPubsubClient,
V: AccountsBank,
C: Cloner,
{
let deleg_record = deleg_record?;

if deleg_record.authority != this.validator_pubkey {
return None;
}
let (wallet_owner, mint) = delegation::parse_raw_eata_pda(
&eata_pubkey,
eata_account.data(),
deleg_record.owner,
)?;
let ata_pubkeys = try_derive_supported_ata_pubkeys(&wallet_owner, &mint);

Some(AtaInfo {
mint,
owner: wallet_owner,
// eATA updates only carry the projected balance fields. The in-bank ATA is
// required as the base so the clone preserves the actual token program
// owner and any Token-2022 account layout extensions.
let mut ata_pubkey = None;
let mut in_bank_ata = None;
for candidate_pubkey in ata_pubkeys.token_2022_first().into_iter().flatten()
{
if let Some(candidate_account) =
this.accounts_bank.get_account(&candidate_pubkey)
{
ata_pubkey = Some(candidate_pubkey);
in_bank_ata = Some(candidate_account);
break;
}
}
let in_bank_ata = in_bank_ata.as_ref()?;
let ata_pubkey = ata_pubkey?;
if in_bank_ata.delegated() || in_bank_ata.undelegating() {
return None;
}
let projected_ata = maybe_project_delegated_ata_from_eata(
this,
in_bank_ata,
eata_account,
deleg_record,
)?;
Some(AccountCloneRequest {
pubkey: ata_pubkey,
account: projected_ata,
commit_frequency_ms: None,
delegation_actions: delegation_actions.clone(),
delegated_to_other: None,
})
}

pub(crate) async fn maybe_project_ata_from_subscription_update<T, U, V, C>(
this: &FetchCloner<T, U, V, C>,
ata_pubkey: Pubkey,
ata_account: AccountSharedData,
) -> (
AccountSharedData,
Option<(DelegationRecord, Option<DelegationActions>)>,
)
where
T: ChainRpcClient,
U: ChainPubsubClient,
V: AccountsBank,
C: Cloner,
{
let Some(ata_info) = is_ata(&ata_pubkey, &ata_account) else {
return (ata_account, None);
};

let Some((eata_pubkey, _)) =
try_derive_eata_address_and_bump(&ata_info.owner, &ata_info.mint)
else {
return (ata_account, None);
};

let was_watching = this.remote_account_provider.is_watching(&eata_pubkey);

// Ensure before cache checks; this keeps the subscription LRU warm
// without refcounting the projection reason on every ATA update.
let subscribed = match this
.ensure_subscription(&eata_pubkey, SubscriptionReason::AtaProjection)
.await
{
Ok(()) => true,
Err(err) => {
warn!(
pubkey = %eata_pubkey,
error = ?err,
"Failed to subscribe to derived eATA"
);
false
}
};

// Known-empty eATAs skip the fetch only if the subscription was already live.
if was_watching && subscribed && is_known_empty_eata(this, &eata_pubkey) {
return (ata_account, None);
}

let (eata_account, definitively_not_found) = match this
.remote_account_provider
.try_get_multi_until_slots_match(
&[eata_pubkey],
Some(MatchSlotsConfig {
min_context_slot: Some(ata_account.remote_slot()),
..Default::default()
}),
metrics::AccountFetchOrigin::ProjectAta,
)
.await
{
Ok(mut accounts) => {
let popped = accounts.pop();
// Only `NotFound` proves absence; stale, missing, or failed fetches retry later.
let nf = matches!(popped, Some(RemoteAccount::NotFound(_)));
let fresh = popped.and_then(|a| a.fresh_account());
(fresh, nf)
}
Err(err) => {
debug!(
pubkey = %eata_pubkey,
error = ?err,
"Failed to fetch eATA for projection"
);
(None, false)
}
};

let Some(eata_account) = eata_account else {
// Cache absence only after a confirmed NotFound and live subscription.
if definitively_not_found && subscribed {
mark_eata_empty(this, eata_pubkey);
}
return (ata_account, None);
};

let deleg_record = delegation::fetch_and_parse_delegation_record(
this,
eata_pubkey,
ata_account.remote_slot().max(eata_account.remote_slot()),
metrics::AccountFetchOrigin::ProjectAta,
)
.await;

let Some(deleg_record) = deleg_record else {
return (ata_account, None);
};
let (deleg_record, delegation_actions) = deleg_record;

if let Some(projected_ata) = maybe_project_delegated_ata_from_eata(
this,
&ata_account,
&eata_account,
&deleg_record,
) {
return (projected_ata, Some((deleg_record, delegation_actions)));
}
(ata_account, Some((deleg_record, delegation_actions)))
}

pub(crate) fn maybe_project_delegated_ata_from_eata<T, U, V, C>(
this: &FetchCloner<T, U, V, C>,
ata_account: &AccountSharedData,
eata_account: &AccountSharedData,
deleg_record: &DelegationRecord,
) -> Option<AccountSharedData>
where
T: ChainRpcClient,
U: ChainPubsubClient,
V: AccountsBank,
C: Cloner,
{
if deleg_record.authority != this.validator_pubkey {
return None;
}

// Projecting from eATA must preserve the base ATA's owner and data length.
// That is what keeps Token-2022 accounts from being rebuilt as legacy SPL
// Token accounts when the eATA itself only stores owner, mint, and amount.
let projected_from_base_ata = if deleg_record.owner == EATA_PROGRAM_ID {
EphemeralAta::try_from_account_data(eata_account.data())
.and_then(|eata| eata.project_into_ata_account(ata_account))
} else {
None
};

let mut projected_ata = match projected_from_base_ata
.or_else(|| eata_account.maybe_into_ata(deleg_record.owner))
{
Some(projected_ata) => projected_ata,
None => {
return None;
}
};
let projected_slot =
ata_account.remote_slot().max(eata_account.remote_slot());
projected_ata.set_remote_slot(projected_slot);
projected_ata.set_delegated(true);
Some(projected_ata)
}

/// Resolves ATAs with eATA projection.
/// For each detected ATA, we derive the eATA PDA, subscribe to both,
/// and, if the ATA is delegated to us and the eATA exists, we clone the eATA data
Expand Down Expand Up @@ -230,8 +469,9 @@ where
delegation::get_delegated_to_other(this, &deleg_record);
commit_frequency_ms = Some(deleg_record.commit_frequency_ms);

if let Some(projected_ata) = this
.maybe_project_delegated_ata_from_eata(
if let Some(projected_ata) =
maybe_project_delegated_ata_from_eata(
this,
input.ata_account.account_shared_data(),
eata_shared,
&deleg_record,
Expand Down
15 changes: 9 additions & 6 deletions magicblock-chainlink/src/chainlink/fetch_cloner/delegation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use dlp_api::{
pda::delegation_record_pda_from_delegated_account, state::DelegationRecord,
};
use magicblock_accounts_db::traits::AccountsBank;
use magicblock_core::token_programs::{derive_eata, EATA_PROGRAM_ID};
use magicblock_core::token_programs::{
try_derive_eata_address_and_bump, EphemeralAta, EATA_PROGRAM_ID,
};
use magicblock_metrics::metrics;
use solana_account::ReadableAccount;
use solana_keypair::Keypair;
Expand Down Expand Up @@ -136,14 +138,15 @@ pub(crate) fn parse_raw_eata_pda(
data: &[u8],
owner_program: Pubkey,
) -> Option<(Pubkey, Pubkey)> {
if owner_program != EATA_PROGRAM_ID || data.len() < 72 {
if owner_program != EATA_PROGRAM_ID {
return None;
}

let wallet_owner = Pubkey::new_from_array(data[0..32].try_into().ok()?);
let mint = Pubkey::new_from_array(data[32..64].try_into().ok()?);
(derive_eata(&wallet_owner, &mint) == *account_pubkey)
.then_some((wallet_owner, mint))
let eata = EphemeralAta::try_from_account_data(data)?;
let (derived_eata, bump) =
try_derive_eata_address_and_bump(&eata.owner, &eata.mint)?;
(derived_eata == *account_pubkey && bump == eata.bump)
.then_some((eata.owner, eata.mint))
}

pub(crate) fn get_delegated_to_other<T, U, V, C>(
Expand Down
Loading
Loading