Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 77 additions & 85 deletions contracts/dex_aggregator/src/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,34 +97,29 @@ fn handle_swap_reply(
.map_err(|e| ContractError::SubmessageResultError { error: e })?
.events;

// Find the specific DEX event. This is our source of truth for the amount.
let swap_event_opt = events.iter().rev().find(|e| {
e.ty.starts_with("wasm")
&& (e.attributes.iter().any(|a| a.key == "return_amount")
|| e.attributes.iter().any(|a| a.key == "swap_final_amount"))
});

// If there is no swap event, we assume the output was zero.
// In this case, the path cannot continue, so we treat it as "complete" with a zero value.
if swap_event_opt.is_none() {
exec_state.replies_expected -= 1; // Mutate exec_state
exec_state.replies_expected -= 1;

let response = if exec_state.replies_expected > 0 {
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?;
Response::new()
.add_attribute("action", "accumulating_path_outputs")
.add_attribute("info", "zero_value_path_completed")
.add_attribute("info", "zero_value_path_completed_or_irrelevant_reply")
} else {
exec_state.current_stage_index += 1; // Mutate exec_state
// Call proceeds with exec_state and plan
exec_state.current_stage_index += 1;
proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)?
};
return Ok(response);
}

let swap_event = swap_event_opt.unwrap();

// Get the address of the contract that emitted this specific event.
let replying_pool_addr_str = swap_event
.attributes
.iter()
Expand All @@ -139,7 +134,6 @@ fn handle_swap_reply(
.get(exec_state.current_stage_index as usize)
.ok_or(ContractError::EmptyRoute {})?;

// Now, find the operation that matches this validated address.
let mut replied_path_info = None;
'outer: for (split_idx, split) in current_stage.splits.iter().enumerate() {
for (op_idx, op) in split.path.iter().enumerate() {
Expand All @@ -150,87 +144,85 @@ fn handle_swap_reply(
}
}

let ((split_index, op_index), replied_op) = replied_path_info.ok_or_else(|| {
StdError::generic_err(format!(
"Could not find a split/operation matching the replying contract: {}",
replying_pool_addr
))
})?;

// Since we know the event exists, we can now safely parse the amount from the original message.
let received_amount = parse_amount_from_swap_reply(&msg)?;
let received_asset_info = get_operation_output(replied_op)?;
if let Some(((split_index, op_index), replied_op)) = replied_path_info {
let received_amount = parse_amount_from_swap_reply(&msg)?;
let received_asset_info = get_operation_output(replied_op)?;

let replied_path = &current_stage.splits[split_index].path;
let replied_path = &current_stage.splits[split_index].path;

if let Some(next_op) = replied_path.get(op_index + 1) {
let required_input_info = get_operation_input(next_op)?;
let offer_asset_for_next_op = amm::Asset {
info: received_asset_info,
amount: received_amount,
};
if offer_asset_for_next_op.info != required_input_info {
exec_state.awaiting = Awaiting::PathConversion; // Mutate exec_state
exec_state.pending_path_op = Some(PendingPathOp {
// Mutate exec_state
operation: next_op.clone(),
if let Some(next_op) = replied_path.get(op_index + 1) {
let required_input_info = get_operation_input(next_op)?;
let offer_asset_for_next_op = amm::Asset {
info: received_asset_info,
amount: received_amount,
});
let config = CONFIG.load(deps.storage)?;
let conversion_msg = create_conversion_msg(&offer_asset_for_next_op, &config, &env)?;
let sub_msg = SubMsg::reply_on_success(conversion_msg, master_reply_id);
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state
return Ok(Response::new()
.add_submessage(sub_msg)
.add_attribute("action", "performing_path_conversion"));
}
let next_msg = create_swap_cosmos_msg(
&mut deps,
next_op,
&offer_asset_for_next_op.info,
offer_asset_for_next_op.amount,
&env,
)?;
let sub_msg = SubMsg::reply_on_success(next_msg, master_reply_id);
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state
Ok(Response::new()
.add_submessage(sub_msg)
.add_attribute("action", "proceeding_to_next_op_in_path")
.add_attribute("split_index", split_index.to_string())
.add_attribute("op_index", (op_index + 1).to_string()))
} else {
let fee = match FEE_MAP.may_load(deps.storage, &replying_pool_addr)? {
Some(fee_percent) => {
let numerator = fee_percent.atomics();
let denominator = Uint128::new(1_000_000_000_000_000_000u128);
received_amount.multiply_ratio(numerator, denominator)
};
if offer_asset_for_next_op.info != required_input_info {
exec_state.awaiting = Awaiting::PathConversion;
exec_state.pending_path_op = Some(PendingPathOp {
operation: next_op.clone(),
amount: received_amount,
});
let config = CONFIG.load(deps.storage)?;
let conversion_msg =
create_conversion_msg(&offer_asset_for_next_op, &config, &env)?;
let sub_msg = SubMsg::reply_on_success(conversion_msg, master_reply_id);
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?;
return Ok(Response::new()
.add_submessage(sub_msg)
.add_attribute("action", "performing_path_conversion"));
}
None => Uint128::zero(),
};
let amount_after_fee = received_amount.checked_sub(fee).map_err(StdError::from)?;
exec_state.accumulated_assets.push(amm::Asset {
// Mutate exec_state
info: received_asset_info.clone(),
amount: amount_after_fee,
});
exec_state.replies_expected -= 1;
let mut response;
if exec_state.replies_expected > 0 {
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; // Save exec_state
response = Response::new().add_attribute("action", "accumulating_path_outputs");
let next_msg = create_swap_cosmos_msg(
&mut deps,
next_op,
&offer_asset_for_next_op.info,
offer_asset_for_next_op.amount,
&env,
)?;
let sub_msg = SubMsg::reply_on_success(next_msg, master_reply_id);
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?;
Ok(Response::new()
.add_submessage(sub_msg)
.add_attribute("action", "proceeding_to_next_op_in_path")
.add_attribute("split_index", split_index.to_string())
.add_attribute("op_index", (op_index + 1).to_string()))
} else {
exec_state.current_stage_index += 1; // Mutate exec_state
response = proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)?;
}
if !fee.is_zero() {
let config = CONFIG.load(deps.storage)?;
let fee_send_msg = create_send_msg(&config.fee_collector, &received_asset_info, fee)?;
response = response
.add_message(fee_send_msg)
.add_attribute("fee_collected", fee.to_string())
.add_attribute("fee_pool", replying_pool_addr.to_string());
let fee = match FEE_MAP.may_load(deps.storage, &replying_pool_addr)? {
Some(fee_percent) => {
let numerator = fee_percent.atomics();
let denominator = Uint128::new(1_000_000_000_000_000_000u128);
received_amount.multiply_ratio(numerator, denominator)
}
None => Uint128::zero(),
};
let amount_after_fee = received_amount.checked_sub(fee).map_err(StdError::from)?;
exec_state.accumulated_assets.push(amm::Asset {
info: received_asset_info.clone(),
amount: amount_after_fee,
});
exec_state.replies_expected -= 1;
let mut response;
if exec_state.replies_expected > 0 {
EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?;
response = Response::new().add_attribute("action", "accumulating_path_outputs");
} else {
exec_state.current_stage_index += 1;
response = proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)?;
}
if !fee.is_zero() {
let config = CONFIG.load(deps.storage)?;
let fee_send_msg =
create_send_msg(&config.fee_collector, &received_asset_info, fee)?;
response = response
.add_message(fee_send_msg)
.add_attribute("fee_collected", fee.to_string())
.add_attribute("fee_pool", replying_pool_addr.to_string());
}
Ok(response)
}
Ok(response)
} else {
Ok(Response::new()
.add_attribute("action", "ignored_unmatched_reply")
.add_attribute("source", replying_pool_addr))
}
}

Expand Down