diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala index dcea76872..1d1aba5c2 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala @@ -209,7 +209,9 @@ abstract class CurrencyL0App( services.collateral ) - _ <- initializeGlobalSnapshotStorages[IO, Run](services, storages, sharedStorages).asResource + _ <- hasherSelectorAlwaysCurrent.withCurrent { implicit hasher => + initializeGlobalSnapshotStorages[IO, Run](services, storages, sharedStorages) + }.asResource program <- (method match { case m: CreateGenesis => diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StoragesInitializer.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StoragesInitializer.scala index f730ea4e5..904892a02 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StoragesInitializer.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StoragesInitializer.scala @@ -1,8 +1,8 @@ package io.constellationnetwork.currency.l0 -import cats.MonadThrow import cats.effect.Async import cats.syntax.all._ +import cats.{MonadThrow, Parallel} import scala.concurrent.duration._ @@ -11,6 +11,7 @@ import io.constellationnetwork.currency.schema.currency.{CurrencyIncrementalSnap import io.constellationnetwork.node.shared.cli.CliMethod import io.constellationnetwork.node.shared.domain.snapshot.services.GlobalL0Service import io.constellationnetwork.node.shared.modules.SharedStorages +import io.constellationnetwork.schema.mpt.GlobalStateConverter.syntax._ import io.constellationnetwork.schema.{GlobalSnapshotWithState, GlobalSnapshotWithStateDeltas} import io.constellationnetwork.security.Hasher import io.constellationnetwork.security.signature.Signed @@ -34,7 +35,7 @@ object StoragesInitializer { } def initializeGlobalSnapshotStorages[ - F[_]: Async: Logger, + F[_]: Async: Logger: Parallel: Hasher, R <: CliMethod ]( services: Services[F, R], @@ -76,6 +77,9 @@ object StoragesInitializer { ) _ <- Logger[F].info(s"Successfully initialized lastGlobalSnapshot storage") + kvPairs <- state.allStateEntries[F] + _ <- sharedStorages.mptStore.syncFull(kvPairs) + _ <- Logger[F].info(s"Successfully initialized all global snapshot storages with ordinal=$ordinal") } yield () } diff --git a/modules/currency-l1/src/main/scala/io/constellationnetwork/currency/l1/modules/Services.scala b/modules/currency-l1/src/main/scala/io/constellationnetwork/currency/l1/modules/Services.scala index 0acdb41eb..2cf9b87bc 100644 --- a/modules/currency-l1/src/main/scala/io/constellationnetwork/currency/l1/modules/Services.scala +++ b/modules/currency-l1/src/main/scala/io/constellationnetwork/currency/l1/modules/Services.scala @@ -72,7 +72,8 @@ object Services { storages.transaction, storages.lastSnapshot, validators.transaction, - sharedStorages.mptStore + sharedStorages.mptStore, + shouldUseMptStore = false ) val allowSpend = AllowSpendService.make[F, CurrencySnapshotStateProof, CurrencyIncrementalSnapshot, CurrencySnapshotInfo]( storages.allowSpend, diff --git a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/Main.scala b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/Main.scala index 53e59f1a3..f45235848 100644 --- a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/Main.scala +++ b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/Main.scala @@ -229,9 +229,6 @@ object Main case (snapshotInfo, snapshot) => for { hashedSnapshot <- hasherSelector.withCurrent(implicit hasher => snapshot.toHashed[IO]) - kvPairs <- snapshotInfo.allStateEntries[IO] - _ <- sharedStorages.mptStore.sync(kvPairs) - result <- services.consensus.manager.startFacilitatingAfterRollback( snapshot.ordinal, GlobalConsensusOutcome( @@ -319,10 +316,9 @@ object Main sharedStorages.lastGlobalSnapshot, programs.download, hashedSnapshot, - hashedGenesis.info + hashedGenesis.info, + sharedStorages.mptStore ) - kvPairs <- GlobalSnapshotInfoV1.toGlobalSnapshotInfo(hashedGenesis.info).allStateEntries[IO] - _ <- sharedStorages.mptStore.sync(kvPairs) _ <- services.consensus.manager .startFacilitatingAfterRollback( signedFirstIncrementalSnapshot.ordinal, diff --git a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/StoragesInitializer.scala b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/StoragesInitializer.scala index 217e3bbd8..f3dc42a08 100644 --- a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/StoragesInitializer.scala +++ b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/StoragesInitializer.scala @@ -1,5 +1,6 @@ package io.constellationnetwork.dag.l0 +import cats.Parallel import cats.effect.Async import cats.syntax.all._ @@ -9,6 +10,8 @@ import io.constellationnetwork.node.shared.domain.collateral.LatestBalances import io.constellationnetwork.node.shared.domain.snapshot.programs.Download import io.constellationnetwork.node.shared.domain.snapshot.storage.{LastNGlobalSnapshotStorage, LastSnapshotStorage, SnapshotStorage} import io.constellationnetwork.node.shared.modules.SharedStorages +import io.constellationnetwork.schema.mpt.GlobalStateConverter.syntax._ +import io.constellationnetwork.schema.mpt.{GlobalStateKey, MptStore} import io.constellationnetwork.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo} import io.constellationnetwork.security.{Hashed, Hasher} @@ -16,14 +19,15 @@ import org.typelevel.log4cats.Logger object StoragesInitializer { def initializeStorages[ - F[_]: Async: Logger: Hasher + F[_]: Async: Logger: Hasher: Parallel ]( globalSnapshotStorage: SnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], lastNGlobalSnapshotStorage: LastNGlobalSnapshotStorage[F], lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], download: Download[F, GlobalIncrementalSnapshot], hashedGlobalIncrementalSnapshot: Hashed[GlobalIncrementalSnapshot], - globalSnapshotInfo: GlobalSnapshotInfo + globalSnapshotInfo: GlobalSnapshotInfo, + mptStore: MptStore[F, GlobalStateKey] ): F[Unit] = { val ordinal = hashedGlobalIncrementalSnapshot.ordinal @@ -50,6 +54,9 @@ object StoragesInitializer { ) _ <- Logger[F].info(s"Successfully initialized lastGlobalSnapshot storage") + kvPairs <- globalSnapshotInfo.allStateEntries[F] + _ <- mptStore.syncFull(kvPairs) + _ <- Logger[F].info(s"Storage initialization completed successfully with ordinal=$ordinal") } yield () } diff --git a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/domain/snapshot/programs/Download.scala b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/domain/snapshot/programs/Download.scala index 64b6c1ee4..ff244e5cf 100644 --- a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/domain/snapshot/programs/Download.scala +++ b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/domain/snapshot/programs/Download.scala @@ -119,8 +119,8 @@ object Download { .flatMap { result => val ((snapshot, context), observationLimit) = result for { - kvPairs <- context.allStateEntries[F] - _ <- mptStore.sync(kvPairs) + kvPairs <- hasherSelector.withCurrent(implicit h => context.allStateEntries[F]) + _ <- mptStore.syncFull(kvPairs) _ <- consensus.manager.startFacilitatingAfterDownload(observationLimit, snapshot, context) } yield () } diff --git a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverse.scala b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverse.scala index bb9ec4ebc..8a7c0439f 100644 --- a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverse.scala +++ b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverse.scala @@ -14,6 +14,7 @@ import io.constellationnetwork.node.shared.domain.snapshot.SnapshotContextFuncti import io.constellationnetwork.node.shared.domain.snapshot.programs.Download import io.constellationnetwork.node.shared.domain.snapshot.storage.{LastNGlobalSnapshotStorage, LastSnapshotStorage, SnapshotStorage} import io.constellationnetwork.schema._ +import io.constellationnetwork.schema.mpt.{GlobalStateKey, MptStore} import io.constellationnetwork.security._ import io.constellationnetwork.security.hash.Hash import io.constellationnetwork.security.signature.Signed @@ -37,7 +38,8 @@ object GlobalSnapshotTraverse { globalSnapshotStorage: SnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], lastNGlobalSnapshotStorage: LastNGlobalSnapshotStorage[F], lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], - download: Download[F, GlobalIncrementalSnapshot] + download: Download[F, GlobalIncrementalSnapshot], + mptStore: MptStore[F, GlobalStateKey] ): GlobalSnapshotTraverse[F] = new GlobalSnapshotTraverse[F] { implicit val logger: SelfAwareStructuredLogger[F] = Slf4jLogger.getLoggerFromName[F](this.getClass.getName) @@ -122,7 +124,8 @@ object GlobalSnapshotTraverse { lastGlobalSnapshotStorage, download, hashedFirstInc, - firstInfo + firstInfo, + mptStore ) ) (info, lastInc) <- incHashesNec.tail.foldLeftM((firstInfo, firstInc)) { diff --git a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/programs/RollbackLoader.scala b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/programs/RollbackLoader.scala index 710c9c0d5..cee8d7d57 100644 --- a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/programs/RollbackLoader.scala +++ b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/programs/RollbackLoader.scala @@ -20,6 +20,7 @@ import io.constellationnetwork.node.shared.infrastructure.snapshot.GlobalSnapsho import io.constellationnetwork.node.shared.infrastructure.snapshot.storage._ import io.constellationnetwork.node.shared.modules.SharedStorages import io.constellationnetwork.schema._ +import io.constellationnetwork.schema.mpt.{GlobalStateKey, MptStore} import io.constellationnetwork.security._ import io.constellationnetwork.security.hash.Hash import io.constellationnetwork.security.signature.Signed @@ -43,7 +44,8 @@ object RollbackLoader { F, GlobalIncrementalSnapshot, GlobalSnapshotInfo - ] + ], + mptStore: MptStore[F, GlobalStateKey] ): RollbackLoader[F] = new RollbackLoader[F]( keyPair, @@ -56,7 +58,8 @@ object RollbackLoader { globalSnapshotStorage, lastNGlobalSnapshotStorage, lastGlobalSnapshotStorage, - combinedSnapshotCheckpointFileSystemStorage + combinedSnapshotCheckpointFileSystemStorage, + mptStore ) {} } @@ -71,7 +74,12 @@ sealed abstract class RollbackLoader[F[_]: Async: Parallel: KryoSerializer: Json globalSnapshotStorage: SnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], lastNGlobalSnapshotStorage: LastNGlobalSnapshotStorage[F], lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], - combinedSnapshotCheckpointFileSystemStorage: CombinedSnapshotCheckpointFileSystemStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo] + combinedSnapshotCheckpointFileSystemStorage: CombinedSnapshotCheckpointFileSystemStorage[ + F, + GlobalIncrementalSnapshot, + GlobalSnapshotInfo + ], + mptStore: MptStore[F, GlobalStateKey] ) { private val logger = Slf4jLogger.getLogger[F] @@ -97,7 +105,8 @@ sealed abstract class RollbackLoader[F[_]: Async: Parallel: KryoSerializer: Json globalSnapshotStorage, lastNGlobalSnapshotStorage, lastGlobalSnapshotStorage, - download + download, + mptStore ) snapshotTraverse.loadChain() } diff --git a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/modules/Programs.scala b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/modules/Programs.scala index b3cbf27be..665df4dba 100644 --- a/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/modules/Programs.scala +++ b/modules/dag-l0/src/main/scala/io/constellationnetwork/dag/l0/modules/Programs.scala @@ -77,7 +77,8 @@ object Programs { globalSnapshotStorage, lastNGlobalSnapshotStorage, lastGlobalSnapshotStorage, - storages.combinedGlobalSnapshotCheckpointStorage + storages.combinedGlobalSnapshotCheckpointStorage, + mptStore ) new Programs[F](sharedPrograms.peerDiscovery, sharedPrograms.joining, trustPush, download, rollbackLoader) {} diff --git a/modules/dag-l0/src/test/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverseSuite.scala b/modules/dag-l0/src/test/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverseSuite.scala index 3caf5e01d..523c7e396 100644 --- a/modules/dag-l0/src/test/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverseSuite.scala +++ b/modules/dag-l0/src/test/scala/io/constellationnetwork/dag/l0/infrastructure/snapshot/GlobalSnapshotTraverseSuite.scala @@ -467,7 +467,8 @@ object GlobalSnapshotTraverseSuite extends MutableIOSuite with Checkers { globalSnapshotStorage, lastNSnapshotStorage, lastSnapshotStorage, - download + download, + mptStore ) } diff --git a/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessor.scala b/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessor.scala index 3f3cde0b7..306b22e4b 100644 --- a/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessor.scala +++ b/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessor.scala @@ -84,7 +84,7 @@ abstract class SnapshotProcessor[ lastSnapshotStorage: LastSnapshotStorage[F, S, SI], addressStorage: AddressStorage[F], mptStore: MptStore[F, GlobalStateKey] - ): F[SnapshotProcessingResult] = + )(implicit hasher: Hasher[F]): F[SnapshotProcessingResult] = alignment match { case AlignedAtNewOrdinal( snapshot, diff --git a/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/transaction/TransactionService.scala b/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/transaction/TransactionService.scala index 80dc724d1..7c9996db3 100644 --- a/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/transaction/TransactionService.scala +++ b/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/domain/transaction/TransactionService.scala @@ -8,6 +8,7 @@ import cats.syntax.all._ import io.constellationnetwork.dag.l1.domain.transaction.ContextualTransactionValidator.NonContextualValidationError import io.constellationnetwork.ext.cats.syntax.validated.validatedSyntax +import io.constellationnetwork.node.shared.domain.collateral.LatestBalances import io.constellationnetwork.node.shared.domain.snapshot.storage.LastSnapshotStorage import io.constellationnetwork.node.shared.domain.transaction.TransactionValidator import io.constellationnetwork.schema.SnapshotOrdinal @@ -19,6 +20,7 @@ import io.constellationnetwork.schema.transaction.Transaction import io.constellationnetwork.security.hash.Hash import io.constellationnetwork.security.{Hashed, Hasher} +import fs2.Stream import io.circe.Json import ContextualTransactionValidator.ContextualTransactionValidationError @@ -38,11 +40,37 @@ object TransactionService { SI <: SnapshotInfo[P] ]( transactionStorage: TransactionStorage[F], - lastSnapshotStorage: LastSnapshotStorage[F, S, SI], + lastSnapshotStorage: LastSnapshotStorage[F, S, SI] with LatestBalances[F], transactionValidator: TransactionValidator[F], - mptStore: MptStore[F, GlobalStateKey] + mptStore: MptStore[F, GlobalStateKey], + shouldUseMptStore: Boolean ): TransactionService[F] = new TransactionService[F] { + def useMptStore( + transaction: Hashed[Transaction] + )(implicit hasher: Hasher[F]) = + for { + maybeSnapshot <- lastSnapshotStorage.get + ordinal = maybeSnapshot.map(_.ordinal).getOrElse(SnapshotOrdinal.MinValue) + balance <- mptStore.getBalance(transaction.source).map(_.getOrElse(Balance.empty)) + result <- transactionStorage.tryPut(transaction, ordinal, balance) + } yield result + + def useGlobalSnapshotInfo( + transaction: Hashed[Transaction] + ) = + lastSnapshotStorage.getCombinedStream.map { + case Some((s, si)) => (s.ordinal, si.balances.getOrElse(transaction.source, Balance.empty)) + case None => (SnapshotOrdinal.MinValue, Balance.empty) + }.changes.switchMap { + case (latestOrdinal, balance) => Stream.eval(transactionStorage.tryPut(transaction, latestOrdinal, balance)) + }.head.compile.last.flatMap { + case Some(value) => value.pure[F] + case None => + new Exception(s"Unexpected state, stream should always emit the first snapshot") + .raiseError[F, Either[NonEmptyList[ContextualTransactionValidationError], Hash]] + } + def offer( transaction: Hashed[Transaction] )(implicit hasher: Hasher[F]): F[Either[NonEmptyList[ContextualTransactionValidationError], Hash]] = @@ -51,13 +79,11 @@ object TransactionService { .map(_.errorMap(NonContextualValidationError)) .flatMap { case Valid(_) => - for { - maybeSnapshot <- lastSnapshotStorage.get - ordinal = maybeSnapshot.map(_.ordinal).getOrElse(SnapshotOrdinal.MinValue) - balance <- mptStore.getBalance(transaction.source).map(_.getOrElse(Balance.empty)) - result <- transactionStorage.tryPut(transaction, ordinal, balance) - } yield result - + if (shouldUseMptStore) { + useMptStore(transaction) + } else { + useGlobalSnapshotInfo(transaction) + } case Invalid(e) => e.toNonEmptyList.asLeft[Hash].leftWiden[NonEmptyList[ContextualTransactionValidationError]].pure[F] } diff --git a/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/modules/Services.scala b/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/modules/Services.scala index 137e87ff5..aca2de85a 100644 --- a/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/modules/Services.scala +++ b/modules/dag-l1/src/main/scala/io/constellationnetwork/dag/l1/modules/Services.scala @@ -68,7 +68,13 @@ object Services { .make[F](p2PClient.l0GlobalSnapshot, globalL0Cluster, lastGlobalSnapshotStorage, None, maybeMajorityPeerIds) val session = sharedServices.session val transaction = - TransactionService.make[F, P, S, SI](storages.transaction, storages.lastSnapshot, validators.transaction, sharedStorages.mptStore) + TransactionService.make[F, P, S, SI]( + storages.transaction, + storages.lastSnapshot, + validators.transaction, + sharedStorages.mptStore, + shouldUseMptStore = true + ) val allowSpend = AllowSpendService.make[F, P, S, SI](storages.allowSpend, storages.lastSnapshot, validators.allowSpend) val allowSpendBlock = AllowSpendBlockService.make[F, P, S, SI]( diff --git a/modules/dag-l1/src/test/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessorSuite.scala b/modules/dag-l1/src/test/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessorSuite.scala index 34aafbcdf..7cac86694 100644 --- a/modules/dag-l1/src/test/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessorSuite.scala +++ b/modules/dag-l1/src/test/scala/io/constellationnetwork/dag/l1/domain/snapshot/programs/SnapshotProcessorSuite.scala @@ -992,8 +992,7 @@ object SnapshotProcessorSuite extends SimpleIOSuite with TransactionGenerator { ), srcKey ).flatMap(_.toHashedWithSignatureCheck.map(_.toOption.get)) - kv <- snapshotInfo.allStateEntries[IO] - _ <- mptStore.sync(kv) + _ <- mptStore.syncFromGlobalSnapshotInfo(snapshotInfo) _ <- lastSnapR.set((hashedLastSnapshot, snapshotInfo).some) lastN = (hashedLastSnapshot, snapshotInfo).some incLastN = SortedMap(hashedLastSnapshot.ordinal -> hashedLastSnapshot) @@ -1699,8 +1698,7 @@ object SnapshotProcessorSuite extends SimpleIOSuite with TransactionGenerator { ), srcKey ).flatMap(_.toHashedWithSignatureCheck.map(_.toOption.get)) - kv <- snapshotInfo.allStateEntries[IO] - _ <- mptStore.sync(kv) + _ <- mptStore.syncFromGlobalSnapshotInfo(snapshotInfo) _ <- lastSnapR.set((hashedLastSnapshot, snapshotInfo).some) lastN = (hashedLastSnapshot, snapshotInfo).some _ <- lastNSnapR.set(lastN) diff --git a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/AllowSpendStateManager.scala b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/AllowSpendStateManager.scala index b2a875e49..64521b060 100644 --- a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/AllowSpendStateManager.scala +++ b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/AllowSpendStateManager.scala @@ -15,6 +15,12 @@ import io.constellationnetwork.security.Hasher import io.constellationnetwork.security.signature.Signed import io.constellationnetwork.syntax.sortedCollection.sortedSetSyntax +/** Result of allow spend acceptance containing full state and deltas */ +case class AllowSpendAcceptanceResult( + fullState: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]], + deltas: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]] +) + trait AllowSpendStateManager[F[_]] { def acceptAllowSpends( epochProgress: EpochProgress, @@ -22,7 +28,7 @@ trait AllowSpendStateManager[F[_]] { globalAllowSpends: SortedMap[Address, SortedSet[Signed[AllowSpend]]], lastActiveAllowSpends: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]], allAcceptedSpendTxns: List[SpendTransaction] - )(implicit hasher: Hasher[F]): F[SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]]] + )(implicit hasher: Hasher[F]): F[AllowSpendAcceptanceResult] def acceptAllowSpendRefs( lastAllowSpendRefs: SortedMap[Address, AllowSpendReference], @@ -39,7 +45,7 @@ trait AllowSpendStateManager[F[_]] { currentBalances: SortedMap[Address, Balance], globalAllowSpends: SortedMap[Address, SortedSet[Signed[AllowSpend]]], lastActiveAllowSpends: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]] - ): Either[BalanceArithmeticError, SortedMap[Address, Balance]] + ): Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])] } object AllowSpendStateManager { @@ -52,7 +58,7 @@ object AllowSpendStateManager { globalAllowSpends: SortedMap[Address, SortedSet[Signed[AllowSpend]]], lastActiveAllowSpends: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]], allAcceptedSpendTxns: List[SpendTransaction] - )(implicit hasher: Hasher[F]): F[SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]]] = { + )(implicit hasher: Hasher[F]): F[AllowSpendAcceptanceResult] = { val allAcceptedSpendTxnsAllowSpendsRefs = allAcceptedSpendTxns .flatMap(_.allowSpendRef) @@ -67,7 +73,7 @@ object AllowSpendStateManager { acc + (address -> unexpired) } - val unexpiredGlobalWithoutSpendTransactions = + val unexpiredGlobalWithoutSpendTransactionsF = unexpiredGlobalAllowSpends.toList.foldLeftM(unexpiredGlobalAllowSpends) { case (acc, (address, allowSpends)) => allowSpends.toList.traverse(_.toHashed).map { hashedAllowSpends => @@ -83,8 +89,14 @@ object AllowSpendStateManager { def processMetagraphAllowSpends( metagraphId: Address, metagraphAllowSpends: SortedMap[Address, SortedSet[Signed[AllowSpend]]], - accAllowSpends: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]] - ): F[SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]]] = { + accAllowSpends: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]], + accDeltas: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]] + ): F[ + ( + SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]], + SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]] + ) + ] = { val lastActiveMetagraphAllowSpends = accAllowSpends.getOrElse(metagraphId.some, SortedMap.empty[Address, SortedSet[Signed[AllowSpend]]]) @@ -102,28 +114,62 @@ object AllowSpendStateManager { } .map(_.map(_.signed).toSortedSet) - unexpiredWithoutSpendTransactions.map(validAllowSpends => address -> validAllowSpends) + unexpiredWithoutSpendTransactions.map { validAllowSpends => + val hasChanged = lastAddressAllowSpends != validAllowSpends + (address, validAllowSpends, hasChanged) + } }.map { updatedMetagraphAllowSpends => - accAllowSpends + (metagraphId.some -> SortedMap(updatedMetagraphAllowSpends: _*)) + val fullStateMap = SortedMap(updatedMetagraphAllowSpends.map { case (addr, spends, _) => addr -> spends }: _*) + val deltasMap = SortedMap(updatedMetagraphAllowSpends.collect { + case (addr, spends, true) => addr -> spends + }: _*) + + val updatedFullState = accAllowSpends + (metagraphId.some -> fullStateMap) + val updatedDeltas = if (deltasMap.nonEmpty) { + accDeltas + (metagraphId.some -> deltasMap) + } else { + accDeltas + } + + (updatedFullState, updatedDeltas) } } - activeAllowSpendsFromCurrencySnapshots.toList - .foldLeft(lastActiveAllowSpends.pure[F]) { - case (accAllowSpendsF, (metagraphId, metagraphAllowSpends)) => + // Process metagraph allow spends and track deltas + val processedMetagraphsF = activeAllowSpendsFromCurrencySnapshots.toList + .foldLeft((lastActiveAllowSpends, SortedMap.empty[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]]).pure[F]) { + case (accF, (metagraphId, metagraphAllowSpends)) => for { - accAllowSpends <- accAllowSpendsF - updatedAllowSpends <- processMetagraphAllowSpends(metagraphId, metagraphAllowSpends, accAllowSpends) - } yield updatedAllowSpends + (accFullState, accDeltas) <- accF + (updatedFullState, updatedDeltas) <- processMetagraphAllowSpends(metagraphId, metagraphAllowSpends, accFullState, accDeltas) + } yield (updatedFullState, updatedDeltas) } - .flatMap { updatedCurrencyAllowSpends => - unexpiredGlobalWithoutSpendTransactions.map { validGlobalAllowSpends => - if (validGlobalAllowSpends.nonEmpty) - updatedCurrencyAllowSpends + (None -> validGlobalAllowSpends) - else - updatedCurrencyAllowSpends + + for { + (updatedCurrencyAllowSpends, currencyDeltas) <- processedMetagraphsF + validGlobalAllowSpends <- unexpiredGlobalWithoutSpendTransactionsF + } yield { + // Compute global deltas by comparing with previous state + val globalDeltas: SortedMap[Address, SortedSet[Signed[AllowSpend]]] = + validGlobalAllowSpends.filter { + case (address, allowSpends) => + !lastActiveGlobalAllowSpends.get(address).contains(allowSpends) } + + val fullState = if (validGlobalAllowSpends.nonEmpty) { + updatedCurrencyAllowSpends + (None -> validGlobalAllowSpends) + } else { + updatedCurrencyAllowSpends + } + + val deltas = if (globalDeltas.nonEmpty) { + currencyDeltas + (None -> globalDeltas) + } else { + currencyDeltas } + + AllowSpendAcceptanceResult(fullState, deltas) + } } def acceptAllowSpendRefs( @@ -143,44 +189,46 @@ object AllowSpendStateManager { currentBalances: SortedMap[Address, Balance], globalAllowSpends: SortedMap[Address, SortedSet[Signed[AllowSpend]]], lastActiveAllowSpends: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]] - ): Either[BalanceArithmeticError, SortedMap[Address, Balance]] = { + ): Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])] = { val lastActiveGlobalAllowSpends = lastActiveAllowSpends.getOrElse(None, SortedMap.empty[Address, SortedSet[Signed[AllowSpend]]]) val expiredGlobalAllowSpends = filterExpiredAllowSpends(lastActiveGlobalAllowSpends, epochProgress) - val result = (globalAllowSpends |+| expiredGlobalAllowSpends).foldLeft[Either[BalanceArithmeticError, SortedMap[Address, Balance]]]( - Right(currentBalances) - ) { - case (accEither, (address, allowSpends)) => - for { - acc <- accEither - initialBalance = acc.getOrElse(address, Balance.empty) - - unexpiredBalance <- { - val unexpired = allowSpends.filter(_.lastValidEpochProgress >= epochProgress) - - unexpired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(initialBalance)) { (currentBalanceEither, allowSpend) => - for { - currentBalance <- currentBalanceEither - balanceAfterAmount <- currentBalance.minus(SwapAmount.toAmount(allowSpend.amount)) - balanceAfterFee <- balanceAfterAmount.minus(AllowSpendFee.toAmount(allowSpend.fee)) - } yield balanceAfterFee + val result = (globalAllowSpends |+| expiredGlobalAllowSpends) + .foldLeft[Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])]]( + Right((currentBalances, SortedMap.empty[Address, Balance])) + ) { + case (accEither, (address, allowSpends)) => + for { + (balances, balancesDelta) <- accEither + initialBalance = balances.getOrElse(address, Balance.empty) + + unexpiredBalance <- { + val unexpired = allowSpends.filter(_.lastValidEpochProgress >= epochProgress) + + unexpired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(initialBalance)) { (currentBalanceEither, allowSpend) => + for { + currentBalance <- currentBalanceEither + balanceAfterAmount <- currentBalance.minus(SwapAmount.toAmount(allowSpend.amount)) + balanceAfterFee <- balanceAfterAmount.minus(AllowSpendFee.toAmount(allowSpend.fee)) + } yield balanceAfterFee + } } - } - expiredBalance <- { - val expired = allowSpends.filter(_.lastValidEpochProgress < epochProgress) + expiredBalance <- { + val expired = allowSpends.filter(_.lastValidEpochProgress < epochProgress) - expired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(unexpiredBalance)) { (currentBalanceEither, allowSpend) => - for { - currentBalance <- currentBalanceEither - balanceAfterExpiredAmount <- currentBalance.plus(SwapAmount.toAmount(allowSpend.amount)) - } yield balanceAfterExpiredAmount + expired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(unexpiredBalance)) { (currentBalanceEither, allowSpend) => + for { + currentBalance <- currentBalanceEither + balanceAfterExpiredAmount <- currentBalance.plus(SwapAmount.toAmount(allowSpend.amount)) + } yield balanceAfterExpiredAmount + } } - } - updatedAcc = acc.updated(address, expiredBalance) - } yield updatedAcc - } + updatedAcc = balances.updated(address, expiredBalance) + updatedBalancesDelta = balancesDelta.updated(address, expiredBalance) + } yield (updatedAcc, updatedBalancesDelta) + } result } } diff --git a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/GlobalSnapshotAcceptanceManager.scala b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/GlobalSnapshotAcceptanceManager.scala index 9b96265f5..d564ba70e 100644 --- a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/GlobalSnapshotAcceptanceManager.scala +++ b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/GlobalSnapshotAcceptanceManager.scala @@ -42,6 +42,7 @@ import io.constellationnetwork.schema.artifact._ import io.constellationnetwork.schema.balance.{Amount, Balance} import io.constellationnetwork.schema.delegatedStake._ import io.constellationnetwork.schema.epoch.EpochProgress +import io.constellationnetwork.schema.mpt.GlobalStateConverter.StateChangesAccumulator import io.constellationnetwork.schema.mpt.GlobalStateConverter.syntax._ import io.constellationnetwork.schema.mpt.{GlobalStateKey, MptStore} import io.constellationnetwork.schema.node.UpdateNodeParameters @@ -667,7 +668,7 @@ object GlobalSnapshotAcceptanceManager { getGlobalSnapshotByOrdinal ) - transactionsRefs = transactionReferenceManager.acceptTransactionRefs( + (transactionsRefs, transactionsRefsDeltas) = transactionReferenceManager.acceptTransactionRefs( lastSnapshotContext.lastTxRefs, initialData.blockResult.contextUpdate.lastTxRefs, acceptedTransactions @@ -709,7 +710,7 @@ object GlobalSnapshotAcceptanceManager { calculateRewardsFn ) - (updatedBalancesByRewards, acceptedRewardTxs) = rewardAcceptanceManager.acceptRewardTxs( + (updatedBalancesByRewards, acceptedRewardTxs, rewardBalancesDelta) = rewardAcceptanceManager.acceptRewardTxs( updatedGlobalBalances ++ currencyAcceptanceBalanceUpdate, withdrawalRewardTxs ++ nodeOperatorRewards ++ reservedAddressRewards ) @@ -810,7 +811,7 @@ object GlobalSnapshotAcceptanceManager { SortedMap.empty[Address, TokenLockReference] ) - updatedAllowSpends <- allowSpendStateManager.acceptAllowSpends( + AllowSpendAcceptanceResult(updatedAllowSpends, allowSpendsDeltas) <- allowSpendStateManager.acceptAllowSpends( epochProgress, activeAllowSpendsFromCurrencySnapshots, globalAllowSpends, @@ -823,7 +824,7 @@ object GlobalSnapshotAcceptanceManager { allowSpendBlockAcceptanceResult.contextUpdate.lastTxRefs ) - updatedBalancesByAllowSpends <- Async[F].fromEither( + (updatedBalancesByAllowSpends, updatedBalancesByAllowSpendsDeltas) <- Async[F].fromEither( allowSpendStateManager .updateGlobalBalancesByAllowSpends( epochProgress, @@ -861,7 +862,7 @@ object GlobalSnapshotAcceptanceManager { .leftMap(error => new RuntimeException(s"Error generating token unlocks: $error")) .liftTo[F] - updatedGlobalTokenLocks <- tokenLockStateManager.acceptTokenLocks( + TokenLockAcceptanceResult(updatedGlobalTokenLocks, tokenLocksDeltas) <- tokenLockStateManager.acceptTokenLocks( epochProgress, globalTokenLocks, globalActiveTokenLocks, @@ -873,12 +874,12 @@ object GlobalSnapshotAcceptanceManager { tokenLockBlockAcceptanceResult.contextUpdate.lastTokenLocksRefs ) - updatedTokenLockBalances = tokenLockStateManager.updateTokenLockBalances( + TokenLockBalanceResult(updatedTokenLockBalances, tokenLockBalancesDeltas) = tokenLockStateManager.updateTokenLockBalances( currencySnapshots, lastSnapshotContext.tokenLockBalances ) - updatedBalancesByTokenLocks = tokenLockStateManager.updateGlobalBalancesByTokenLocks( + (updatedBalancesByTokenLocks, updatedBalancesByTokenLocksDeltas) = tokenLockStateManager.updateGlobalBalancesByTokenLocks( epochProgress, updatedBalancesByAllowSpends, globalTokenLocks, @@ -938,11 +939,12 @@ object GlobalSnapshotAcceptanceManager { .filter(_.currencyId.isEmpty) }.toList - updatedBalancesBySpendTransactions = spendTransactionBalanceManager.updateGlobalBalancesBySpendTransactions( - updatedBalancesByTokenLocks, - allGlobalAllowSpends, - globalSpendTransactions - ) match { + (updatedBalancesBySpendTransactions, updatedBalancesBySpendTransactionsDeltas) = spendTransactionBalanceManager + .updateGlobalBalancesBySpendTransactions( + updatedBalancesByTokenLocks, + allGlobalAllowSpends, + globalSpendTransactions + ) match { case Right(balances) => balances case Left(error) => throw new RuntimeException(s"Balance arithmetic error updating balances by spend transactions: $error") } @@ -976,14 +978,15 @@ object GlobalSnapshotAcceptanceManager { epochProgress ) - updatedAcceptedMetagraphSyncData <- metagraphSyncManager.acceptMetagraphSyncData( - lastSnapshotContext, - incomingCurrencySnapshots, - globalSnapshotsProcessed, - acceptedSpendActions, - ordinal, - epochProgress - ) + MetagraphSyncAcceptanceResult(updatedAcceptedMetagraphSyncData, metagraphSyncDataDeltas) <- metagraphSyncManager + .acceptMetagraphSyncData( + lastSnapshotContext, + incomingCurrencySnapshots, + globalSnapshotsProcessed, + acceptedSpendActions, + ordinal, + epochProgress + ) gsi = buildGlobalSnapshotInfo( ordinal, @@ -1011,7 +1014,33 @@ object GlobalSnapshotAcceptanceManager { updatedAcceptedMetagraphSyncData ) - _ <- mptStore.syncFromGlobalSnapshotInfo(gsi) + balanceChanges: SortedMap[Address, Balance] = + initialData.blockResult.contextUpdate.balances.toSortedMap ++ + currencyAcceptanceBalanceUpdate.toSortedMap ++ + rewardBalancesDelta ++ + updatedBalancesByAllowSpendsDeltas ++ + updatedBalancesByTokenLocksDeltas ++ + updatedBalancesBySpendTransactionsDeltas + + stateChangesAccumulator = StateChangesAccumulator( + lastStateChannelSnapshotHashes = sCSnapshotHashes.toSortedMap, + lastTxRefs = transactionsRefsDeltas, + balances = balanceChanges, + lastCurrencySnapshots = currencySnapshots, + lastCurrencySnapshotsProofs = updatedLastCurrencySnapshotProofs, + activeAllowSpends = allowSpendsDeltas, + activeTokenLocks = tokenLocksDeltas, + tokenLockBalances = tokenLockBalancesDeltas, + lastAllowSpendRefs = allowSpendBlockAcceptanceResult.contextUpdate.lastTxRefs.toSortedMap, + lastTokenLockRefs = tokenLockBlockAcceptanceResult.contextUpdate.lastTokenLocksRefs.toSortedMap, + activeDelegatedStakes = updatedCreateDelegatedStakesCleaned, + delegatedStakesWithdrawals = updatedWithdrawDelegatedStakesCleaned, + activeNodeCollaterals = updatedCreateNodeCollateralsCleaned, + nodeCollateralWithdrawals = updatedWithdrawNodeCollateralsCleaned, + metagraphSyncData = metagraphSyncDataDeltas + ) + + _ <- mptStore.syncFromStateChanges(stateChangesAccumulator) stateProof <- gsi.stateProof(mptStore.underlying) (expiredAllowSpends, expiredTokenLocks) = ( diff --git a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/MetagraphSyncManager.scala b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/MetagraphSyncManager.scala index d59fe05f4..2f2dd94b8 100644 --- a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/MetagraphSyncManager.scala +++ b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/MetagraphSyncManager.scala @@ -17,6 +17,11 @@ import io.constellationnetwork.schema.swap._ import monocle.syntax.all._ +case class MetagraphSyncAcceptanceResult( + fullState: SortedMap[Address, MetagraphSyncDataInfo], + deltas: SortedMap[Address, MetagraphSyncDataInfo] +) + trait MetagraphSyncManager[F[_]] { def acceptMetagraphSyncData( lastSnapshotContext: GlobalSnapshotInfo, @@ -25,7 +30,7 @@ trait MetagraphSyncManager[F[_]] { acceptedSpendActions: Map[Address, List[SpendAction]], currentGlobalOrdinal: SnapshotOrdinal, currentGlobalEpochProgress: EpochProgress - ): F[SortedMap[Address, MetagraphSyncDataInfo]] + ): F[MetagraphSyncAcceptanceResult] } object MetagraphSyncManager { @@ -41,10 +46,10 @@ object MetagraphSyncManager { acceptedSpendActions: Map[Address, List[SpendAction]], currentGlobalOrdinal: SnapshotOrdinal, currentGlobalEpochProgress: EpochProgress - ): F[SortedMap[Address, MetagraphSyncDataInfo]] = + ): F[MetagraphSyncAcceptanceResult] = lastSnapshotContext.metagraphSyncData.map { existingData => for { - updatedFromSnapshots <- updateFromCurrencySnapshots( + (updatedFromSnapshots, snapshotDeltas) <- updateFromCurrencySnapshots( existingData, incomingCurrencySnapshots, globalSnapshotsProcessed, @@ -52,14 +57,17 @@ object MetagraphSyncManager { currentGlobalEpochProgress ) - updatedFromSpendActions <- updateFromSpendActions( + (updatedFromSpendActions, spendActionDeltas) <- updateFromSpendActions( updatedFromSnapshots, acceptedSpendActions, currentGlobalOrdinal ) - } yield updatedFromSpendActions - }.getOrElse(SortedMap.empty[Address, MetagraphSyncDataInfo].pure[F]) + } yield { + val mergedDeltas = snapshotDeltas ++ spendActionDeltas + MetagraphSyncAcceptanceResult(updatedFromSpendActions, mergedDeltas) + } + }.getOrElse(MetagraphSyncAcceptanceResult(SortedMap.empty, SortedMap.empty).pure[F]) private def updateFromCurrencySnapshots( existingData: SortedMap[Address, MetagraphSyncDataInfo], @@ -67,7 +75,7 @@ object MetagraphSyncManager { globalSnapshotsProcessed: Map[Address, List[GlobalSnapshotsProcessed]], currentOrdinal: SnapshotOrdinal, currentEpochProgress: EpochProgress - ): F[SortedMap[Address, MetagraphSyncDataInfo]] = + ): F[(SortedMap[Address, MetagraphSyncDataInfo], SortedMap[Address, MetagraphSyncDataInfo])] = incomingCurrencySnapshots.toList.parTraverse { case (address, _) => val currentInfo = existingData.getOrElse(address, MetagraphSyncDataInfo.empty) @@ -84,23 +92,25 @@ object MetagraphSyncManager { .focus(_.unappliedGlobalChangeOrdinals) .replace(updatedUnappliedGlobalChangeOrdinals) - (address -> updatedInfo).pure[F] + val hasChanged = currentInfo != updatedInfo + (address, updatedInfo, hasChanged).pure[F] }.map { updatedEntries => - val updatedMap = SortedMap.from(updatedEntries) - existingData ++ updatedMap + val updatedMap = SortedMap.from(updatedEntries.map { case (addr, info, _) => addr -> info }) + val deltasMap = SortedMap.from(updatedEntries.collect { case (addr, info, true) => addr -> info }) + (existingData ++ updatedMap, deltasMap) } private def updateFromSpendActions( currentData: SortedMap[Address, MetagraphSyncDataInfo], spendActions: Map[Address, List[SpendAction]], currentOrdinal: SnapshotOrdinal - ): F[SortedMap[Address, MetagraphSyncDataInfo]] = { + ): F[(SortedMap[Address, MetagraphSyncDataInfo], SortedMap[Address, MetagraphSyncDataInfo])] = { val allCurrencySpendTransactions = extractCurrencySpendTransactions(spendActions) val transactionsByMetagraph = allCurrencySpendTransactions.groupBy(_.currencyId.get.value) - transactionsByMetagraph.toList.foldM(currentData) { - case (acc, (metagraphId, _)) => + transactionsByMetagraph.toList.foldM((currentData, SortedMap.empty[Address, MetagraphSyncDataInfo])) { + case ((acc, deltas), (metagraphId, _)) => val currentInfo = acc.getOrElse(metagraphId, MetagraphSyncDataInfo.empty) val updatedUnappliedGlobalChangeOrdinals = @@ -110,7 +120,11 @@ object MetagraphSyncManager { .focus(_.unappliedGlobalChangeOrdinals) .replace(updatedUnappliedGlobalChangeOrdinals) - acc.updated(metagraphId, updatedInfo).pure[F] + val hasChanged = currentInfo != updatedInfo + val newAcc = acc.updated(metagraphId, updatedInfo) + val newDeltas = if (hasChanged) deltas.updated(metagraphId, updatedInfo) else deltas + + (newAcc, newDeltas).pure[F] } } diff --git a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/RewardAcceptanceManager.scala b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/RewardAcceptanceManager.scala index eb590a31f..d8d949914 100644 --- a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/RewardAcceptanceManager.scala +++ b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/RewardAcceptanceManager.scala @@ -10,7 +10,7 @@ trait RewardAcceptanceManager[F[_]] { def acceptRewardTxs( balances: SortedMap[Address, Balance], txs: SortedSet[RewardTransaction] - ): (SortedMap[Address, Balance], SortedSet[RewardTransaction]) + ): (SortedMap[Address, Balance], SortedSet[RewardTransaction], SortedMap[Address, Balance]) } object RewardAcceptanceManager { @@ -20,14 +20,18 @@ object RewardAcceptanceManager { def acceptRewardTxs( balances: SortedMap[Address, Balance], txs: SortedSet[RewardTransaction] - ): (SortedMap[Address, Balance], SortedSet[RewardTransaction]) = - txs.foldLeft((balances, SortedSet.empty[RewardTransaction])) { (acc, tx) => - val (updatedBalances, acceptedTxs) = acc + ): (SortedMap[Address, Balance], SortedSet[RewardTransaction], SortedMap[Address, Balance]) = + txs.foldLeft((balances, SortedSet.empty[RewardTransaction], SortedMap.empty[Address, Balance])) { (acc, tx) => + val (updatedBalances, acceptedTxs, balanceDeltas) = acc - updatedBalances + val updatedBalance = updatedBalances .getOrElse(tx.destination, Balance.empty) .plus(tx.amount) - .map(balance => (updatedBalances.updated(tx.destination, balance), acceptedTxs + tx)) + + updatedBalance + .map(balance => + (updatedBalances.updated(tx.destination, balance), acceptedTxs + tx, balanceDeltas.updated(tx.destination, balance)) + ) .getOrElse(acc) } } diff --git a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/SpendTransactionBalanceManager.scala b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/SpendTransactionBalanceManager.scala index 9902fa439..b10f2cb55 100644 --- a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/SpendTransactionBalanceManager.scala +++ b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/SpendTransactionBalanceManager.scala @@ -18,7 +18,7 @@ trait SpendTransactionBalanceManager[F[_]] { currentBalances: SortedMap[Address, Balance], allGlobalAllowSpends: SortedMap[Address, List[Hashed[AllowSpend]]], globalSpendTransactions: List[SpendTransaction] - ): Either[BalanceArithmeticError, SortedMap[Address, Balance]] + ): Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])] } object SpendTransactionBalanceManager { @@ -29,48 +29,55 @@ object SpendTransactionBalanceManager { currentBalances: SortedMap[Address, Balance], allGlobalAllowSpends: SortedMap[Address, List[Hashed[AllowSpend]]], globalSpendTransactions: List[SpendTransaction] - ): Either[BalanceArithmeticError, SortedMap[Address, Balance]] = - globalSpendTransactions.foldLeft[Either[BalanceArithmeticError, SortedMap[Address, Balance]]](Right(currentBalances)) { - (innerAccEither, spendTransaction) => - for { - innerAcc <- innerAccEither - destinationAddress = spendTransaction.destination - sourceAddress = spendTransaction.source + ): Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])] = + globalSpendTransactions.foldLeft[Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])]]( + Right((currentBalances, SortedMap.empty[Address, Balance])) + ) { (innerAccEither, spendTransaction) => + for { + (balances, balancesDelta) <- innerAccEither + destinationAddress = spendTransaction.destination + sourceAddress = spendTransaction.source - addressAllowSpends = allGlobalAllowSpends.getOrElse(sourceAddress, List.empty) - spendTransactionAmount = SwapAmount.toAmount(spendTransaction.amount) - currentDestinationBalance = innerAcc.getOrElse(destinationAddress, Balance.empty) + addressAllowSpends = allGlobalAllowSpends.getOrElse(sourceAddress, List.empty) + spendTransactionAmount = SwapAmount.toAmount(spendTransaction.amount) + currentDestinationBalance = balances.getOrElse(destinationAddress, Balance.empty) - updatedBalances <- spendTransaction.allowSpendRef.flatMap { allowSpendRef => - addressAllowSpends.find(_.hash === allowSpendRef) - } match { - case Some(allowSpend) => - val sourceAllowSpendAddress = allowSpend.source - val currentSourceBalance = innerAcc.getOrElse(sourceAllowSpendAddress, Balance.empty) - val balanceToReturnToAddress = allowSpend.amount.value.value - spendTransactionAmount.value.value + updatedBalances <- spendTransaction.allowSpendRef.flatMap { allowSpendRef => + addressAllowSpends.find(_.hash === allowSpendRef) + } match { + case Some(allowSpend) => + val sourceAllowSpendAddress = allowSpend.source + val currentSourceBalance = balances.getOrElse(sourceAllowSpendAddress, Balance.empty) + val balanceToReturnToAddress = allowSpend.amount.value.value - spendTransactionAmount.value.value - for { - updatedDestinationBalance <- currentDestinationBalance.plus(spendTransactionAmount) - updatedSourceBalance <- currentSourceBalance.plus( - Amount(NonNegLong.from(balanceToReturnToAddress).getOrElse(NonNegLong.MinValue)) - ) - } yield - innerAcc - .updated(destinationAddress, updatedDestinationBalance) - .updated(sourceAllowSpendAddress, updatedSourceBalance) + for { + updatedDestinationBalance <- currentDestinationBalance.plus(spendTransactionAmount) + updatedSourceBalance <- currentSourceBalance.plus( + Amount(NonNegLong.from(balanceToReturnToAddress).getOrElse(NonNegLong.MinValue)) + ) + balancesUpdated = balances + .updated(destinationAddress, updatedDestinationBalance) + .updated(sourceAllowSpendAddress, updatedSourceBalance) + balancesDeltaUpdated = balancesDelta + .updated(destinationAddress, updatedDestinationBalance) + .updated(sourceAllowSpendAddress, updatedSourceBalance) + } yield (balancesUpdated, balancesDeltaUpdated) - case None => - val currentSourceBalance = innerAcc.getOrElse(sourceAddress, Balance.empty) + case None => + val currentSourceBalance = balances.getOrElse(sourceAddress, Balance.empty) - for { - updatedDestinationBalance <- currentDestinationBalance.plus(spendTransactionAmount) - updatedSourceBalance <- currentSourceBalance.minus(spendTransactionAmount) - } yield - innerAcc - .updated(destinationAddress, updatedDestinationBalance) - .updated(sourceAddress, updatedSourceBalance) - } - } yield updatedBalances + for { + updatedDestinationBalance <- currentDestinationBalance.plus(spendTransactionAmount) + updatedSourceBalance <- currentSourceBalance.minus(spendTransactionAmount) + balancesUpdated = balances + .updated(destinationAddress, updatedDestinationBalance) + .updated(sourceAddress, updatedSourceBalance) + balancesDeltaUpdated = balancesDelta + .updated(destinationAddress, updatedDestinationBalance) + .updated(sourceAddress, updatedSourceBalance) + } yield (balancesUpdated, balancesDeltaUpdated) + } + } yield updatedBalances } } } diff --git a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManager.scala b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManager.scala index 799bc0c3f..4aee3f94f 100644 --- a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManager.scala +++ b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManager.scala @@ -20,13 +20,25 @@ import io.constellationnetwork.security.signature.Signed import eu.timepit.refined.types.numeric.NonNegLong +/** Result of token lock acceptance containing full state and deltas */ +case class TokenLockAcceptanceResult( + fullState: SortedMap[Address, SortedSet[Signed[TokenLock]]], + deltas: SortedMap[Address, SortedSet[Signed[TokenLock]]] +) + +/** Result of token lock balance update containing full state and deltas */ +case class TokenLockBalanceResult( + fullState: SortedMap[Address, SortedMap[Address, Balance]], + deltas: SortedMap[Address, SortedMap[Address, Balance]] +) + trait TokenLockStateManager[F[_]] { def acceptTokenLocks( epochProgress: EpochProgress, acceptedGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], lastActiveGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], generatedTokenUnlocksByAddress: Map[Address, List[TokenUnlock]] - )(implicit hasher: Hasher[F]): F[SortedMap[Address, SortedSet[Signed[TokenLock]]]] + )(implicit hasher: Hasher[F]): F[TokenLockAcceptanceResult] def acceptReplacementTokenLocks( acceptedTokenLocks: List[Signed[TokenLock]], @@ -46,7 +58,7 @@ trait TokenLockStateManager[F[_]] { def updateTokenLockBalances( currencySnapshots: SortedMap[Address, CurrencySnapshotWithState], maybeLastTokenLockBalances: Option[SortedMap[Address, SortedMap[Address, Balance]]] - ): SortedMap[Address, SortedMap[Address, Balance]] + ): TokenLockBalanceResult def updateGlobalBalancesByTokenLocks( epochProgress: EpochProgress, @@ -54,7 +66,7 @@ trait TokenLockStateManager[F[_]] { acceptedGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], lastActiveGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], generatedTokenUnlocksByAddress: Map[Address, List[TokenUnlock]] - ): Either[BalanceArithmeticError, SortedMap[Address, Balance]] + ): Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])] def generateTokenUnlocks( expiredWithdrawals: SortedMap[Address, SortedSet[PendingDelegatedStakeWithdrawal]], @@ -72,30 +84,39 @@ object TokenLockStateManager { acceptedGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], lastActiveGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], generatedTokenUnlocksByAddress: Map[Address, List[TokenUnlock]] - )(implicit hasher: Hasher[F]): F[SortedMap[Address, SortedSet[Signed[TokenLock]]]] = { + )(implicit hasher: Hasher[F]): F[TokenLockAcceptanceResult] = { val expiredGlobalTokenLocks = filterExpiredTokenLocks(lastActiveGlobalTokenLocks, epochProgress) (acceptedGlobalTokenLocks |+| expiredGlobalTokenLocks).toList - .foldM(lastActiveGlobalTokenLocks) { - case (acc, (address, tokenLocks)) => + .foldM((lastActiveGlobalTokenLocks, SortedMap.empty[Address, SortedSet[Signed[TokenLock]]])) { + case ((acc, deltas), (address, tokenLocks)) => val lastAddressTokenLocks = acc.getOrElse(address, SortedSet.empty[Signed[TokenLock]]) val unexpired = (lastAddressTokenLocks ++ tokenLocks).filter(_.unlockEpoch.forall(_ >= epochProgress)) val addressTokenUnlocks = generatedTokenUnlocksByAddress.getOrElse(address, List.empty) val unlocksRefs = addressTokenUnlocks.map(_.tokenLockRef) unexpired - .foldM(SortedSet.empty[Signed[TokenLock]]) { (acc, tokenLock) => + .foldM(SortedSet.empty[Signed[TokenLock]]) { (innerAcc, tokenLock) => tokenLock.toHashed.map { tlh => - if (unlocksRefs.contains(tlh.hash)) acc - else acc + tokenLock + if (unlocksRefs.contains(tlh.hash)) innerAcc + else innerAcc + tokenLock } } .map { updatedLocks => - acc.updated(address, updatedLocks) + val hasChanged = lastAddressTokenLocks != updatedLocks + val newAcc = acc.updated(address, updatedLocks) + val newDeltas = if (hasChanged) deltas.updated(address, updatedLocks) else deltas + (newAcc, newDeltas) } } - .map(updateTokenLocks => updateTokenLocks.filterNot(_._2.isEmpty)) + .map { + case (fullState, deltas) => + val cleanedFullState = fullState.filterNot(_._2.isEmpty) + val cleanedDeltas = deltas.filterNot(_._2.isEmpty) + TokenLockAcceptanceResult(cleanedFullState, cleanedDeltas) + } } + def acceptReplacementTokenLocks( acceptedTokenLocks: List[Signed[TokenLock]], lastSnapshotContext: GlobalSnapshotInfo @@ -136,25 +157,33 @@ object TokenLockStateManager { def updateTokenLockBalances( currencySnapshots: SortedMap[Address, CurrencySnapshotWithState], maybeLastTokenLockBalances: Option[SortedMap[Address, SortedMap[Address, Balance]]] - ): SortedMap[Address, SortedMap[Address, Balance]] = { + ): TokenLockBalanceResult = { val lastTokenLockBalances = maybeLastTokenLockBalances.getOrElse(SortedMap.empty[Address, SortedMap[Address, Balance]]) - currencySnapshots.foldLeft(lastTokenLockBalances) { - case (accTokenLockBalances, (metagraphId, currencySnapshotWithState)) => + val (fullState, deltas) = currencySnapshots.foldLeft((lastTokenLockBalances, SortedMap.empty[Address, SortedMap[Address, Balance]])) { + case ((accTokenLockBalances, accDeltas), (metagraphId, currencySnapshotWithState)) => val activeTokenLocks = currencySnapshotWithState match { case Left(_) => SortedMap.empty[Address, SortedSet[Signed[TokenLock]]] case Right((_, info)) => info.activeTokenLocks.getOrElse(SortedMap.empty[Address, SortedSet[Signed[TokenLock]]]) } val metagraphTokenLocksAmounts = activeTokenLocks.foldLeft(SortedMap.empty[Address, Balance]) { - case (accTokenLockBalances, addressTokenLocks) => + case (accBalances, addressTokenLocks) => val (address, tokenLocks) = addressTokenLocks val amount = NonNegLong.unsafeFrom(tokenLocks.toList.map(_.amount.value.value).sum) - accTokenLockBalances.updated(address, Balance(amount)) + accBalances.updated(address, Balance(amount)) } - accTokenLockBalances + (metagraphId -> metagraphTokenLocksAmounts) + val previousMetagraphBalances = accTokenLockBalances.getOrElse(metagraphId, SortedMap.empty[Address, Balance]) + val hasChanged = previousMetagraphBalances != metagraphTokenLocksAmounts + + val newAccTokenLockBalances = accTokenLockBalances + (metagraphId -> metagraphTokenLocksAmounts) + val newAccDeltas = if (hasChanged) accDeltas + (metagraphId -> metagraphTokenLocksAmounts) else accDeltas + + (newAccTokenLockBalances, newAccDeltas) } + + TokenLockBalanceResult(fullState, deltas) } def updateGlobalBalancesByTokenLocks( @@ -163,67 +192,69 @@ object TokenLockStateManager { acceptedGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], lastActiveGlobalTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]], generatedTokenUnlocksByAddress: Map[Address, List[TokenUnlock]] - ): Either[BalanceArithmeticError, SortedMap[Address, Balance]] = { + ): Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])] = { val expiredGlobalTokenLocks = filterExpiredTokenLocks(lastActiveGlobalTokenLocks, epochProgress) // First, process all addresses that have token locks val balancesAfterTokenLocks = - (acceptedGlobalTokenLocks |+| expiredGlobalTokenLocks).foldLeft[Either[BalanceArithmeticError, SortedMap[Address, Balance]]]( - Right(currentBalances) - ) { - case (accEither, (address, tokenLocks)) => - for { - acc <- accEither - initialBalance = acc.getOrElse(address, Balance.empty) - - unexpiredBalance <- { - val unexpired = tokenLocks.filter(_.unlockEpoch.forall(_ >= epochProgress)) - - unexpired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(initialBalance)) { (currentBalanceEither, tokenLock) => - for { - currentBalance <- currentBalanceEither - balanceAfterAmount <- currentBalance.minus(TokenLockAmount.toAmount(tokenLock.amount)) - balanceAfterFee <- balanceAfterAmount.minus(TokenLockFee.toAmount(tokenLock.fee)) - } yield balanceAfterFee + (acceptedGlobalTokenLocks |+| expiredGlobalTokenLocks) + .foldLeft[Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])]]( + Right((currentBalances, SortedMap.empty[Address, Balance])) + ) { + case (accEither, (address, tokenLocks)) => + for { + (balances, balancesDelta) <- accEither + initialBalance = balances.getOrElse(address, Balance.empty) + + unexpiredBalance <- { + val unexpired = tokenLocks.filter(_.unlockEpoch.forall(_ >= epochProgress)) + + unexpired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(initialBalance)) { (currentBalanceEither, tokenLock) => + for { + currentBalance <- currentBalanceEither + balanceAfterAmount <- currentBalance.minus(TokenLockAmount.toAmount(tokenLock.amount)) + balanceAfterFee <- balanceAfterAmount.minus(TokenLockFee.toAmount(tokenLock.fee)) + } yield balanceAfterFee + } } - } - expiredBalance <- { - val expired = tokenLocks.filter(_.unlockEpoch.exists(_ < epochProgress)) + expiredBalance <- { + val expired = tokenLocks.filter(_.unlockEpoch.exists(_ < epochProgress)) - expired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(unexpiredBalance)) { (currentBalanceEither, tokenLock) => - for { - currentBalance <- currentBalanceEither - balanceAfterExpiredAmount <- currentBalance.plus(TokenLockAmount.toAmount(tokenLock.amount)) - } yield balanceAfterExpiredAmount - } - } - addressTokenUnlocks = generatedTokenUnlocksByAddress.getOrElse(address, List.empty) - finalBalance <- - addressTokenUnlocks.foldLeft[Either[BalanceArithmeticError, Balance]](Right(expiredBalance)) { - case (currentBalanceEither, tokenUnlock) => + expired.foldLeft[Either[BalanceArithmeticError, Balance]](Right(unexpiredBalance)) { (currentBalanceEither, tokenLock) => for { currentBalance <- currentBalanceEither - balanceAfterUnlock <- currentBalance.plus(TokenLockAmount.toAmount(tokenUnlock.amount)) - } yield balanceAfterUnlock + balanceAfterExpiredAmount <- currentBalance.plus(TokenLockAmount.toAmount(tokenLock.amount)) + } yield balanceAfterExpiredAmount + } } - - updatedAcc = acc.updated(address, finalBalance) - } yield updatedAcc - } + addressTokenUnlocks = generatedTokenUnlocksByAddress.getOrElse(address, List.empty) + finalBalance <- + addressTokenUnlocks.foldLeft[Either[BalanceArithmeticError, Balance]](Right(expiredBalance)) { + case (currentBalanceEither, tokenUnlock) => + for { + currentBalance <- currentBalanceEither + balanceAfterUnlock <- currentBalance.plus(TokenLockAmount.toAmount(tokenUnlock.amount)) + } yield balanceAfterUnlock + } + + updatedAcc = balances.updated(address, finalBalance) + updatedBalancesDelta = balancesDelta.updated(address, finalBalance) + } yield (updatedAcc, updatedBalancesDelta) + } // Then, process token unlocks for addresses that don't have token locks - balancesAfterTokenLocks.flatMap { balances => - val addressesWithTokenLocks = (acceptedGlobalTokenLocks.keySet ++ expiredGlobalTokenLocks.keySet).toSet + balancesAfterTokenLocks.flatMap { balancesAfter => + val addressesWithTokenLocks = acceptedGlobalTokenLocks.keySet ++ expiredGlobalTokenLocks.keySet val addressesWithTokenUnlocksOnly = generatedTokenUnlocksByAddress.keySet -- addressesWithTokenLocks - addressesWithTokenUnlocksOnly.foldLeft[Either[BalanceArithmeticError, SortedMap[Address, Balance]]]( - Right(balances) + addressesWithTokenUnlocksOnly.foldLeft[Either[BalanceArithmeticError, (SortedMap[Address, Balance], SortedMap[Address, Balance])]]( + Right(balancesAfter) ) { case (accEither, address) => for { - acc <- accEither - initialBalance = acc.getOrElse(address, Balance.empty) + (balances, balancesDelta) <- accEither + initialBalance = balances.getOrElse(address, Balance.empty) addressTokenUnlocks = generatedTokenUnlocksByAddress.getOrElse(address, List.empty) finalBalance <- addressTokenUnlocks.foldLeft[Either[BalanceArithmeticError, Balance]](Right(initialBalance)) { @@ -233,8 +264,9 @@ object TokenLockStateManager { balanceAfterUnlock <- currentBalance.plus(TokenLockAmount.toAmount(tokenUnlock.amount)) } yield balanceAfterUnlock } - updatedAcc = acc.updated(address, finalBalance) - } yield updatedAcc + updatedAcc = balances.updated(address, finalBalance) + updatedBalancesDelta = balancesDelta.updated(address, finalBalance) + } yield (updatedAcc, updatedBalancesDelta) } } } diff --git a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TransactionReferenceManager.scala b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TransactionReferenceManager.scala index bb1c37e99..15a5d1f1a 100644 --- a/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TransactionReferenceManager.scala +++ b/modules/node-shared/src/main/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TransactionReferenceManager.scala @@ -6,13 +6,14 @@ import io.constellationnetwork.schema._ import io.constellationnetwork.schema.address.Address import io.constellationnetwork.schema.transaction.{Transaction, TransactionReference} import io.constellationnetwork.security.signature.Signed +import io.constellationnetwork.syntax.sortedCollection._ trait TransactionReferenceManager[F[_]] { def acceptTransactionRefs( lastTxRefs: SortedMap[Address, TransactionReference], lastTxRefsContextUpdate: Map[Address, TransactionReference], acceptedTransactions: SortedSet[Signed[Transaction]] - ): SortedMap[Address, TransactionReference] + ): (SortedMap[Address, TransactionReference], SortedMap[Address, TransactionReference]) } object TransactionReferenceManager { @@ -23,10 +24,13 @@ object TransactionReferenceManager { lastTxRefs: SortedMap[Address, TransactionReference], lastTxRefsContextUpdate: Map[Address, TransactionReference], acceptedTransactions: SortedSet[Signed[Transaction]] - ): SortedMap[Address, TransactionReference] = { + ): (SortedMap[Address, TransactionReference], SortedMap[Address, TransactionReference]) = { val updatedRefs = lastTxRefs ++ lastTxRefsContextUpdate val newDestinationAddresses = acceptedTransactions.map(_.destination) -- updatedRefs.keySet - updatedRefs ++ newDestinationAddresses.toList.map(_ -> TransactionReference.empty) + val newDestinationAddressesRefs = newDestinationAddresses.toList.map(_ -> TransactionReference.empty) + + ((updatedRefs ++ newDestinationAddressesRefs).toSortedMap, (lastTxRefsContextUpdate ++ newDestinationAddressesRefs).toSortedMap) + } } } diff --git a/modules/node-shared/src/test/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManagerSuite.scala b/modules/node-shared/src/test/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManagerSuite.scala index 045475344..40c6dcf15 100644 --- a/modules/node-shared/src/test/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManagerSuite.scala +++ b/modules/node-shared/src/test/scala/io/constellationnetwork/node/shared/infrastructure/snapshot/managers/global/TokenLockStateManagerSuite.scala @@ -1078,7 +1078,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { for { result <- acceptanceManager.acceptTokenLocks(currentEpoch, emptyTokenLocks, emptyTokenLocks, emptyUnlocks) - } yield expect(result.isEmpty) + } yield expect(result.fullState.isEmpty) } test("acceptTokenLocks - should accept new token locks and filter out expired ones") { res => @@ -1146,7 +1146,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) } yield expect( - result == SortedMap( + result.fullState == SortedMap( testAddress -> SortedSet(signedNewTokenLock, signedActiveTokenLock) ) ) // Should include new token lock and active token lock, but not expired token lock @@ -1213,7 +1213,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) } yield expect( - result == SortedMap( + result.fullState == SortedMap( testAddress -> SortedSet(signedTokenLockToKeep) ) ) // Should only include token lock that is not being unlocked @@ -1274,7 +1274,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) } yield expect( - result == SortedMap( + result.fullState == SortedMap( address1 -> SortedSet(signedTokenLock1), address2 -> SortedSet(signedTokenLock2) ) @@ -1335,7 +1335,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) } yield expect( - result == SortedMap( + result.fullState == SortedMap( address1 -> SortedSet(signedTokenLock1) ) ) // Should only include address1, address2 should be filtered out due to empty set @@ -1380,7 +1380,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) } yield expect( - result == SortedMap( + result.fullState == SortedMap( testAddress -> SortedSet(signedIndefiniteTokenLock) ) ) // Should include indefinite token lock @@ -1439,7 +1439,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) } yield expect( - result == SortedMap( + result.fullState == SortedMap( testAddress -> SortedSet(signedAcceptedTokenLock) ) ) // Should only include accepted token lock, expired token lock is filtered out @@ -1532,7 +1532,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) } yield expect( - result == SortedMap( + result.fullState == SortedMap( testAddress -> SortedSet(signedNewTokenLock, signedActiveTokenLock) ) ) // Should include new and active token locks, but not expired or unlocked ones @@ -1557,7 +1557,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { ) (expect(result.isRight) && - expect(result.toOption.get.isEmpty)).pure[IO] + expect(result.toOption.get._1.isEmpty)).pure[IO] } test("updateGlobalBalancesByTokenLocks - should deduct amounts for new token locks") { res => @@ -1598,7 +1598,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { expectedBalance = Balance(890L) // 1000 - 100 - 10 } yield expect(result.isRight) && - expect(result.toOption.get(testAddress) == expectedBalance) + expect(result.toOption.flatMap(_._1.get(testAddress)).get == expectedBalance) } test("updateGlobalBalancesByTokenLocks - should add back amounts for expired token locks") { res => @@ -1639,7 +1639,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { expectedBalance = Balance(600L) // 500 + 100 (only amount, not fee) } yield expect(result.isRight) && - expect(result.toOption.get(testAddress) == expectedBalance) + expect(result.toOption.flatMap(_._1.get(testAddress)).get == expectedBalance) } test("updateGlobalBalancesByTokenLocks - should add amounts for generated token unlocks") { res => @@ -1671,7 +1671,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { expectedBalance = Balance(600L) // 500 + 100 } yield expect(result.isRight) && - expect(result.toOption.get(testAddress) == expectedBalance) + expect(result.toOption.flatMap(_._1.get(testAddress)).get == expectedBalance) } test("updateGlobalBalancesByTokenLocks - should handle balance arithmetic errors") { res => @@ -1767,7 +1767,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { expectedBalance = Balance(1140L) } yield expect(result.isRight) && - expect(result.toOption.get(testAddress) == expectedBalance) + expect(result.toOption.flatMap(_._1.get(testAddress)).get == expectedBalance) } test("updateGlobalBalancesByTokenLocks - should handle multiple addresses") { res => @@ -1828,8 +1828,8 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { expectedBalance2 = Balance(700L) // 500 + 200 } yield expect(result.isRight) && - expect(result.toOption.get(address1) == expectedBalance1) && - expect(result.toOption.get(address2) == expectedBalance2) + expect(result.toOption.flatMap(_._1.get(address1)).get == expectedBalance1) && + expect(result.toOption.flatMap(_._1.get(address2)).get == expectedBalance2) } test("updateGlobalBalancesByTokenLocks - should handle token locks without unlock epoch") { res => @@ -1871,7 +1871,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { expectedBalance = Balance(890L) // 1000 - 100 - 10 } yield expect(result.isRight) && - expect(result.toOption.get(testAddress) == expectedBalance) + expect(result.toOption.flatMap(_._1.get(testAddress)).get == expectedBalance) } test("updateGlobalBalancesByTokenLocks - should handle multiple token unlocks for same address") { res => @@ -1907,7 +1907,7 @@ object TokenLockStateManagerSuite extends MutableIOSuite with Checkers { expectedBalance = Balance(850L) // 500 + 100 + 200 + 50 } yield expect(result.isRight) && - expect(result.toOption.get(testAddress) == expectedBalance) + expect(result.toOption.flatMap(_._1.get(testAddress)).get == expectedBalance) } } diff --git a/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/GlobalStateConverter.scala b/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/GlobalStateConverter.scala index b9a201920..9359eb293 100644 --- a/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/GlobalStateConverter.scala +++ b/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/GlobalStateConverter.scala @@ -8,18 +8,20 @@ import scala.collection.immutable.{SortedMap, SortedSet} import io.constellationnetwork.currency.schema.currency.{CurrencyIncrementalSnapshot, CurrencySnapshot, CurrencySnapshotInfo} import io.constellationnetwork.merkletree.Proof -import io.constellationnetwork.schema.GlobalSnapshotInfo import io.constellationnetwork.schema.address.Address import io.constellationnetwork.schema.balance.Balance import io.constellationnetwork.schema.delegatedStake.{DelegatedStakeRecord, PendingDelegatedStakeWithdrawal} +import io.constellationnetwork.schema.mpt.MptStore import io.constellationnetwork.schema.mpt.PartitionNamespace.AddressNamespace import io.constellationnetwork.schema.nodeCollateral.{NodeCollateralRecord, PendingNodeCollateralWithdrawal} import io.constellationnetwork.schema.snapshot.MetagraphSyncDataInfo import io.constellationnetwork.schema.swap.{AllowSpend, AllowSpendReference} import io.constellationnetwork.schema.tokenLock.{TokenLock, TokenLockReference} import io.constellationnetwork.schema.transaction.TransactionReference +import io.constellationnetwork.schema.{GlobalSnapshotInfo, SnapshotOrdinal} import io.constellationnetwork.security.Hasher import io.constellationnetwork.security.hash.Hash +import io.constellationnetwork.security.mpt.producer.{MerklePatriciaError, StatefulMerklePatriciaProducer} import io.constellationnetwork.security.mpt.{MerklePatriciaTrie, MptRoot} import io.constellationnetwork.security.signature.Signed @@ -28,6 +30,26 @@ import io.circe.{Encoder, Json} object GlobalStateConverter { + case class StateChangesAccumulator( + lastStateChannelSnapshotHashes: SortedMap[Address, Hash] = SortedMap.empty, + lastTxRefs: SortedMap[Address, TransactionReference] = SortedMap.empty, + balances: SortedMap[Address, Balance] = SortedMap.empty, + lastCurrencySnapshots: SortedMap[Address, Either[Signed[ + CurrencySnapshot + ], (Signed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo)]] = SortedMap.empty, + lastCurrencySnapshotsProofs: SortedMap[Address, Proof] = SortedMap.empty, + activeAllowSpends: SortedMap[Option[Address], SortedMap[Address, SortedSet[Signed[AllowSpend]]]] = SortedMap.empty, + activeTokenLocks: SortedMap[Address, SortedSet[Signed[TokenLock]]] = SortedMap.empty, + tokenLockBalances: SortedMap[Address, SortedMap[Address, Balance]] = SortedMap.empty, + lastAllowSpendRefs: SortedMap[Address, AllowSpendReference] = SortedMap.empty, + lastTokenLockRefs: SortedMap[Address, TokenLockReference] = SortedMap.empty, + activeDelegatedStakes: SortedMap[Address, SortedSet[DelegatedStakeRecord]] = SortedMap.empty, + delegatedStakesWithdrawals: SortedMap[Address, SortedSet[PendingDelegatedStakeWithdrawal]] = SortedMap.empty, + activeNodeCollaterals: SortedMap[Address, SortedSet[NodeCollateralRecord]] = SortedMap.empty, + nodeCollateralWithdrawals: SortedMap[Address, SortedSet[PendingNodeCollateralWithdrawal]] = SortedMap.empty, + metagraphSyncData: SortedMap[Address, MetagraphSyncDataInfo] = SortedMap.empty + ) + private def convertRequiredHypergraph[F[_]: Sync: Parallel, A: Encoder]( data: SortedMap[Address, A], fieldId: GlobalStateFieldId @@ -60,14 +82,22 @@ object GlobalStateConverter { .getOrElse(Map.empty) .pure[F] - private def convertCurrencySnapshots[F[_]: Sync: Parallel]( + private def convertCurrencySnapshots[F[_]: Async: Parallel: Hasher]( data: SortedMap[Address, Either[Signed[CurrencySnapshot], (Signed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo)]] ): F[Map[GlobalStateKey, Json]] = data.toSeq.parTraverse { case (metagraphAddr, Left(fullSnapshot)) => - List( - GlobalStateKey.metagraph(metagraphAddr, GlobalStateFieldId.LastCurrencySnapshots) -> fullSnapshot.asJson - ).pure[F] + CurrencyIncrementalSnapshot.fromCurrencySnapshot(fullSnapshot.value).map { currencyIncrementalSnapshot => + List( + GlobalStateKey.metagraph(metagraphAddr, GlobalStateFieldId.LastIncrementalCurrencySnapshots) -> Signed( + currencyIncrementalSnapshot, + fullSnapshot.proofs + ).asJson, + GlobalStateKey + .metagraph(metagraphAddr, GlobalStateFieldId.LastCurrencySnapshotInfo) -> fullSnapshot.info.toCurrencySnapshotInfo.asJson + ) + } + case (metagraphAddr, Right((incrementalSnapshot, snapshotInfo))) => List( GlobalStateKey.metagraph(metagraphAddr, GlobalStateFieldId.LastIncrementalCurrencySnapshots) -> incrementalSnapshot.asJson, @@ -104,7 +134,54 @@ object GlobalStateConverter { .getOrElse(Map.empty) .pure[F] - def toAllStateKeyValuePairs[F[_]: Sync: Parallel]( + def toStateKeyValuePairsFromAccumulator[F[_]: Async: Parallel: Hasher]( + acc: StateChangesAccumulator + ): F[Map[GlobalStateKey, Json]] = + ( + convertRequiredMetagraph(acc.lastStateChannelSnapshotHashes, GlobalStateFieldId.LastStateChannelSnapshotHashes), + convertRequiredHypergraph(acc.lastTxRefs, GlobalStateFieldId.LastTxRefs), + convertRequiredHypergraph(acc.balances, GlobalStateFieldId.Balances), + convertCurrencySnapshots(acc.lastCurrencySnapshots), + convertRequiredMetagraph(acc.lastCurrencySnapshotsProofs, GlobalStateFieldId.LastCurrencySnapshotsProofs), + convertActiveAllowSpends(if (acc.activeAllowSpends.nonEmpty) acc.activeAllowSpends.some else none), + convertOptionalHypergraph( + if (acc.activeTokenLocks.nonEmpty) acc.activeTokenLocks.some else none, + GlobalStateFieldId.ActiveTokenLocks + ), + convertTokenLockBalances(if (acc.tokenLockBalances.nonEmpty) acc.tokenLockBalances.some else none), + convertOptionalHypergraph( + if (acc.lastAllowSpendRefs.nonEmpty) acc.lastAllowSpendRefs.some else none, + GlobalStateFieldId.LastAllowSpendRefs + ), + convertOptionalHypergraph( + if (acc.lastTokenLockRefs.nonEmpty) acc.lastTokenLockRefs.some else none, + GlobalStateFieldId.LastTokenLockRefs + ), + convertOptionalHypergraph( + if (acc.activeDelegatedStakes.nonEmpty) acc.activeDelegatedStakes.some else none, + GlobalStateFieldId.ActiveDelegatedStakes + ), + convertOptionalHypergraph( + if (acc.delegatedStakesWithdrawals.nonEmpty) acc.delegatedStakesWithdrawals.some else none, + GlobalStateFieldId.DelegatedStakesWithdrawals + ), + convertOptionalHypergraph( + if (acc.activeNodeCollaterals.nonEmpty) acc.activeNodeCollaterals.some else none, + GlobalStateFieldId.ActiveNodeCollaterals + ), + convertOptionalHypergraph( + if (acc.nodeCollateralWithdrawals.nonEmpty) acc.nodeCollateralWithdrawals.some else none, + GlobalStateFieldId.NodeCollateralWithdrawals + ), + convertOptionalHypergraph( + if (acc.metagraphSyncData.nonEmpty) acc.metagraphSyncData.some else none, + GlobalStateFieldId.MetagraphSyncData + ) + ).parMapN { (m1, m2, m3, m4, m5, m6, m7, m8, m9, m10, m11, m12, m13, m14, m15) => + m1 ++ m2 ++ m3 ++ m4 ++ m5 ++ m6 ++ m7 ++ m8 ++ m9 ++ m10 ++ m11 ++ m12 ++ m13 ++ m14 ++ m15 + } + + def toAllStateKeyValuePairs[F[_]: Async: Parallel: Hasher]( info: GlobalSnapshotInfo ): F[Map[GlobalStateKey, Json]] = ( @@ -138,10 +215,15 @@ object GlobalStateConverter { object syntax { implicit class GlobalSnapshotInfoMptOps(val info: GlobalSnapshotInfo) extends AnyVal { - def allStateEntries[F[_]: Sync: Parallel]: F[Map[GlobalStateKey, Json]] = + def allStateEntries[F[_]: Async: Parallel: Hasher]: F[Map[GlobalStateKey, Json]] = toAllStateKeyValuePairs(info) } + implicit class StateChangesAccumulatorMptOps(val acc: StateChangesAccumulator) extends AnyVal { + def toStateEntries[F[_]: Async: Parallel: Hasher]: F[Map[GlobalStateKey, Json]] = + toStateKeyValuePairsFromAccumulator(acc) + } + implicit class MptBuilderOps[F[_]: Parallel: Async: Hasher](kvPairsF: F[Map[GlobalStateKey, Json]]) { def buildMpt: F[MptRoot] = for { @@ -165,7 +247,10 @@ object GlobalStateConverter { val store: MptStore[F, GlobalStateKey] ) { def syncFromGlobalSnapshotInfo(info: GlobalSnapshotInfo): F[Unit] = - info.allStateEntries[F].flatMap(store.sync[Json]) + info.allStateEntries[F].flatMap(store.syncFull[Json]) + + def syncFromStateChanges(acc: StateChangesAccumulator): F[Unit] = + acc.toStateEntries[F].flatMap(store.sync[Json]) def getBalance(address: Address): F[Option[Balance]] = store diff --git a/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/MptStore.scala b/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/MptStore.scala index 9cb064392..14b07985a 100644 --- a/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/MptStore.scala +++ b/modules/shared/src/main/scala/io/constellationnetwork/schema/mpt/MptStore.scala @@ -34,6 +34,8 @@ trait MptStore[F[_], K] { def sync[V: Encoder](newState: Map[K, V]): F[Unit] + def syncFull[V: Encoder](newState: Map[K, V]): F[Unit] + def update[V: Encoder](toUpsert: Map[K, V], toRemove: Set[K]): F[Unit] def underlying: StatefulMerklePatriciaProducer[F] @@ -98,7 +100,7 @@ object MptStore { override def build: F[Either[MerklePatriciaError, MerklePatriciaTrie]] = producer.build - override def sync[V: Encoder](newState: Map[K, V]): F[Unit] = + override def syncFull[V: Encoder](newState: Map[K, V]): F[Unit] = if (newState.isEmpty) producer.clear else for { @@ -117,6 +119,25 @@ object MptStore { _ <- if (keysToUpsert.nonEmpty) producer.insert(keysToUpsert).void else Async[F].unit } yield () + override def sync[V: Encoder](updates: Map[K, V]): F[Unit] = + if (updates.isEmpty) Async[F].unit + else + for { + newEntries <- updates.toList.parTraverse { + case (k, v) => + toHex(k).map(_ -> v.asJson) + }.map(_.toMap) + + currentEntries <- producer.entries + + keysToUpsert = newEntries.filter { + case (k, v) => + !currentEntries.get(k).contains(v) + } + + _ <- if (keysToUpsert.nonEmpty) producer.insert(keysToUpsert) else Async[F].unit + } yield () + override def update[V: Encoder](toUpsert: Map[K, V], toRemove: Set[K]): F[Unit] = for { upsertHex <-