diff --git a/contracts/dex_aggregator/src/reply.rs b/contracts/dex_aggregator/src/reply.rs index e8838ef..4b61b3c 100644 --- a/contracts/dex_aggregator/src/reply.rs +++ b/contracts/dex_aggregator/src/reply.rs @@ -97,26 +97,22 @@ fn handle_swap_reply( .map_err(|e| ContractError::SubmessageResultError { error: e })? .events; - // Find the specific DEX event. This is our source of truth for the amount. let swap_event_opt = events.iter().rev().find(|e| { e.ty.starts_with("wasm") && (e.attributes.iter().any(|a| a.key == "return_amount") || e.attributes.iter().any(|a| a.key == "swap_final_amount")) }); - // If there is no swap event, we assume the output was zero. - // In this case, the path cannot continue, so we treat it as "complete" with a zero value. if swap_event_opt.is_none() { - exec_state.replies_expected -= 1; // Mutate exec_state + exec_state.replies_expected -= 1; let response = if exec_state.replies_expected > 0 { - EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state + EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; Response::new() .add_attribute("action", "accumulating_path_outputs") - .add_attribute("info", "zero_value_path_completed") + .add_attribute("info", "zero_value_path_completed_or_irrelevant_reply") } else { - exec_state.current_stage_index += 1; // Mutate exec_state - // Call proceeds with exec_state and plan + exec_state.current_stage_index += 1; proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)? }; return Ok(response); @@ -124,7 +120,6 @@ fn handle_swap_reply( let swap_event = swap_event_opt.unwrap(); - // Get the address of the contract that emitted this specific event. let replying_pool_addr_str = swap_event .attributes .iter() @@ -139,7 +134,6 @@ fn handle_swap_reply( .get(exec_state.current_stage_index as usize) .ok_or(ContractError::EmptyRoute {})?; - // Now, find the operation that matches this validated address. let mut replied_path_info = None; 'outer: for (split_idx, split) in current_stage.splits.iter().enumerate() { for (op_idx, op) in split.path.iter().enumerate() { @@ -150,87 +144,85 @@ fn handle_swap_reply( } } - let ((split_index, op_index), replied_op) = replied_path_info.ok_or_else(|| { - StdError::generic_err(format!( - "Could not find a split/operation matching the replying contract: {}", - replying_pool_addr - )) - })?; - - // Since we know the event exists, we can now safely parse the amount from the original message. - let received_amount = parse_amount_from_swap_reply(&msg)?; - let received_asset_info = get_operation_output(replied_op)?; + if let Some(((split_index, op_index), replied_op)) = replied_path_info { + let received_amount = parse_amount_from_swap_reply(&msg)?; + let received_asset_info = get_operation_output(replied_op)?; - let replied_path = ¤t_stage.splits[split_index].path; + let replied_path = ¤t_stage.splits[split_index].path; - if let Some(next_op) = replied_path.get(op_index + 1) { - let required_input_info = get_operation_input(next_op)?; - let offer_asset_for_next_op = amm::Asset { - info: received_asset_info, - amount: received_amount, - }; - if offer_asset_for_next_op.info != required_input_info { - exec_state.awaiting = Awaiting::PathConversion; // Mutate exec_state - exec_state.pending_path_op = Some(PendingPathOp { - // Mutate exec_state - operation: next_op.clone(), + if let Some(next_op) = replied_path.get(op_index + 1) { + let required_input_info = get_operation_input(next_op)?; + let offer_asset_for_next_op = amm::Asset { + info: received_asset_info, amount: received_amount, - }); - let config = CONFIG.load(deps.storage)?; - let conversion_msg = create_conversion_msg(&offer_asset_for_next_op, &config, &env)?; - let sub_msg = SubMsg::reply_on_success(conversion_msg, master_reply_id); - EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state - return Ok(Response::new() - .add_submessage(sub_msg) - .add_attribute("action", "performing_path_conversion")); - } - let next_msg = create_swap_cosmos_msg( - &mut deps, - next_op, - &offer_asset_for_next_op.info, - offer_asset_for_next_op.amount, - &env, - )?; - let sub_msg = SubMsg::reply_on_success(next_msg, master_reply_id); - EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state - Ok(Response::new() - .add_submessage(sub_msg) - .add_attribute("action", "proceeding_to_next_op_in_path") - .add_attribute("split_index", split_index.to_string()) - .add_attribute("op_index", (op_index + 1).to_string())) - } else { - let fee = match FEE_MAP.may_load(deps.storage, &replying_pool_addr)? { - Some(fee_percent) => { - let numerator = fee_percent.atomics(); - let denominator = Uint128::new(1_000_000_000_000_000_000u128); - received_amount.multiply_ratio(numerator, denominator) + }; + if offer_asset_for_next_op.info != required_input_info { + exec_state.awaiting = Awaiting::PathConversion; + exec_state.pending_path_op = Some(PendingPathOp { + operation: next_op.clone(), + amount: received_amount, + }); + let config = CONFIG.load(deps.storage)?; + let conversion_msg = + create_conversion_msg(&offer_asset_for_next_op, &config, &env)?; + let sub_msg = SubMsg::reply_on_success(conversion_msg, master_reply_id); + EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; + return Ok(Response::new() + .add_submessage(sub_msg) + .add_attribute("action", "performing_path_conversion")); } - None => Uint128::zero(), - }; - let amount_after_fee = received_amount.checked_sub(fee).map_err(StdError::from)?; - exec_state.accumulated_assets.push(amm::Asset { - // Mutate exec_state - info: received_asset_info.clone(), - amount: amount_after_fee, - }); - exec_state.replies_expected -= 1; - let mut response; - if exec_state.replies_expected > 0 { - EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state - response = Response::new().add_attribute("action", "accumulating_path_outputs"); + let next_msg = create_swap_cosmos_msg( + &mut deps, + next_op, + &offer_asset_for_next_op.info, + offer_asset_for_next_op.amount, + &env, + )?; + let sub_msg = SubMsg::reply_on_success(next_msg, master_reply_id); + EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; + Ok(Response::new() + .add_submessage(sub_msg) + .add_attribute("action", "proceeding_to_next_op_in_path") + .add_attribute("split_index", split_index.to_string()) + .add_attribute("op_index", (op_index + 1).to_string())) } else { - exec_state.current_stage_index += 1; // Mutate exec_state - response = proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)?; - } - if !fee.is_zero() { - let config = CONFIG.load(deps.storage)?; - let fee_send_msg = create_send_msg(&config.fee_collector, &received_asset_info, fee)?; - response = response - .add_message(fee_send_msg) - .add_attribute("fee_collected", fee.to_string()) - .add_attribute("fee_pool", replying_pool_addr.to_string()); + let fee = match FEE_MAP.may_load(deps.storage, &replying_pool_addr)? { + Some(fee_percent) => { + let numerator = fee_percent.atomics(); + let denominator = Uint128::new(1_000_000_000_000_000_000u128); + received_amount.multiply_ratio(numerator, denominator) + } + None => Uint128::zero(), + }; + let amount_after_fee = received_amount.checked_sub(fee).map_err(StdError::from)?; + exec_state.accumulated_assets.push(amm::Asset { + info: received_asset_info.clone(), + amount: amount_after_fee, + }); + exec_state.replies_expected -= 1; + let mut response; + if exec_state.replies_expected > 0 { + EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; + response = Response::new().add_attribute("action", "accumulating_path_outputs"); + } else { + exec_state.current_stage_index += 1; + response = proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)?; + } + if !fee.is_zero() { + let config = CONFIG.load(deps.storage)?; + let fee_send_msg = + create_send_msg(&config.fee_collector, &received_asset_info, fee)?; + response = response + .add_message(fee_send_msg) + .add_attribute("fee_collected", fee.to_string()) + .add_attribute("fee_pool", replying_pool_addr.to_string()); + } + Ok(response) } - Ok(response) + } else { + Ok(Response::new() + .add_attribute("action", "ignored_unmatched_reply") + .add_attribute("source", replying_pool_addr)) } }