Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ abstract class CurrencyL0App(
services.collateral
)

_ <- initializeGlobalSnapshotStorages[IO, Run](services, storages, sharedStorages).asResource
_ <- hasherSelectorAlwaysCurrent.withCurrent { implicit hasher =>
initializeGlobalSnapshotStorages[IO, Run](services, storages, sharedStorages)
}.asResource

program <- (method match {
case m: CreateGenesis =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.constellationnetwork.currency.l0

import cats.MonadThrow
import cats.effect.Async
import cats.syntax.all._
import cats.{MonadThrow, Parallel}

import scala.concurrent.duration._

Expand All @@ -11,6 +11,7 @@ import io.constellationnetwork.currency.schema.currency.{CurrencyIncrementalSnap
import io.constellationnetwork.node.shared.cli.CliMethod
import io.constellationnetwork.node.shared.domain.snapshot.services.GlobalL0Service
import io.constellationnetwork.node.shared.modules.SharedStorages
import io.constellationnetwork.schema.mpt.GlobalStateConverter.syntax._
import io.constellationnetwork.schema.{GlobalSnapshotWithState, GlobalSnapshotWithStateDeltas}
import io.constellationnetwork.security.Hasher
import io.constellationnetwork.security.signature.Signed
Expand All @@ -34,7 +35,7 @@ object StoragesInitializer {
}

def initializeGlobalSnapshotStorages[
F[_]: Async: Logger,
F[_]: Async: Logger: Parallel: Hasher,
R <: CliMethod
](
services: Services[F, R],
Expand Down Expand Up @@ -76,6 +77,9 @@ object StoragesInitializer {
)
_ <- Logger[F].info(s"Successfully initialized lastGlobalSnapshot storage")

kvPairs <- state.allStateEntries[F]
_ <- sharedStorages.mptStore.syncFull(kvPairs)

_ <- Logger[F].info(s"Successfully initialized all global snapshot storages with ordinal=$ordinal")
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ object Services {
storages.transaction,
storages.lastSnapshot,
validators.transaction,
sharedStorages.mptStore
sharedStorages.mptStore,
shouldUseMptStore = false
)
val allowSpend = AllowSpendService.make[F, CurrencySnapshotStateProof, CurrencyIncrementalSnapshot, CurrencySnapshotInfo](
storages.allowSpend,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,6 @@ object Main
case (snapshotInfo, snapshot) =>
for {
hashedSnapshot <- hasherSelector.withCurrent(implicit hasher => snapshot.toHashed[IO])
kvPairs <- snapshotInfo.allStateEntries[IO]
_ <- sharedStorages.mptStore.sync(kvPairs)

result <- services.consensus.manager.startFacilitatingAfterRollback(
snapshot.ordinal,
GlobalConsensusOutcome(
Expand Down Expand Up @@ -319,10 +316,9 @@ object Main
sharedStorages.lastGlobalSnapshot,
programs.download,
hashedSnapshot,
hashedGenesis.info
hashedGenesis.info,
sharedStorages.mptStore
)
kvPairs <- GlobalSnapshotInfoV1.toGlobalSnapshotInfo(hashedGenesis.info).allStateEntries[IO]
_ <- sharedStorages.mptStore.sync(kvPairs)
_ <- services.consensus.manager
.startFacilitatingAfterRollback(
signedFirstIncrementalSnapshot.ordinal,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.constellationnetwork.dag.l0

import cats.Parallel
import cats.effect.Async
import cats.syntax.all._

Expand All @@ -9,21 +10,24 @@ import io.constellationnetwork.node.shared.domain.collateral.LatestBalances
import io.constellationnetwork.node.shared.domain.snapshot.programs.Download
import io.constellationnetwork.node.shared.domain.snapshot.storage.{LastNGlobalSnapshotStorage, LastSnapshotStorage, SnapshotStorage}
import io.constellationnetwork.node.shared.modules.SharedStorages
import io.constellationnetwork.schema.mpt.GlobalStateConverter.syntax._
import io.constellationnetwork.schema.mpt.{GlobalStateKey, MptStore}
import io.constellationnetwork.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo}
import io.constellationnetwork.security.{Hashed, Hasher}

import org.typelevel.log4cats.Logger

object StoragesInitializer {
def initializeStorages[
F[_]: Async: Logger: Hasher
F[_]: Async: Logger: Hasher: Parallel
](
globalSnapshotStorage: SnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
lastNGlobalSnapshotStorage: LastNGlobalSnapshotStorage[F],
lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
download: Download[F, GlobalIncrementalSnapshot],
hashedGlobalIncrementalSnapshot: Hashed[GlobalIncrementalSnapshot],
globalSnapshotInfo: GlobalSnapshotInfo
globalSnapshotInfo: GlobalSnapshotInfo,
mptStore: MptStore[F, GlobalStateKey]
): F[Unit] = {
val ordinal = hashedGlobalIncrementalSnapshot.ordinal

Expand All @@ -50,6 +54,9 @@ object StoragesInitializer {
)
_ <- Logger[F].info(s"Successfully initialized lastGlobalSnapshot storage")

kvPairs <- globalSnapshotInfo.allStateEntries[F]
_ <- mptStore.syncFull(kvPairs)

_ <- Logger[F].info(s"Storage initialization completed successfully with ordinal=$ordinal")
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ object Download {
.flatMap { result =>
val ((snapshot, context), observationLimit) = result
for {
kvPairs <- context.allStateEntries[F]
_ <- mptStore.sync(kvPairs)
kvPairs <- hasherSelector.withCurrent(implicit h => context.allStateEntries[F])
_ <- mptStore.syncFull(kvPairs)
_ <- consensus.manager.startFacilitatingAfterDownload(observationLimit, snapshot, context)
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import io.constellationnetwork.node.shared.domain.snapshot.SnapshotContextFuncti
import io.constellationnetwork.node.shared.domain.snapshot.programs.Download
import io.constellationnetwork.node.shared.domain.snapshot.storage.{LastNGlobalSnapshotStorage, LastSnapshotStorage, SnapshotStorage}
import io.constellationnetwork.schema._
import io.constellationnetwork.schema.mpt.{GlobalStateKey, MptStore}
import io.constellationnetwork.security._
import io.constellationnetwork.security.hash.Hash
import io.constellationnetwork.security.signature.Signed
Expand All @@ -37,7 +38,8 @@ object GlobalSnapshotTraverse {
globalSnapshotStorage: SnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
lastNGlobalSnapshotStorage: LastNGlobalSnapshotStorage[F],
lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
download: Download[F, GlobalIncrementalSnapshot]
download: Download[F, GlobalIncrementalSnapshot],
mptStore: MptStore[F, GlobalStateKey]
): GlobalSnapshotTraverse[F] =
new GlobalSnapshotTraverse[F] {
implicit val logger: SelfAwareStructuredLogger[F] = Slf4jLogger.getLoggerFromName[F](this.getClass.getName)
Expand Down Expand Up @@ -122,7 +124,8 @@ object GlobalSnapshotTraverse {
lastGlobalSnapshotStorage,
download,
hashedFirstInc,
firstInfo
firstInfo,
mptStore
)
)
(info, lastInc) <- incHashesNec.tail.foldLeftM((firstInfo, firstInc)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.constellationnetwork.node.shared.infrastructure.snapshot.GlobalSnapsho
import io.constellationnetwork.node.shared.infrastructure.snapshot.storage._
import io.constellationnetwork.node.shared.modules.SharedStorages
import io.constellationnetwork.schema._
import io.constellationnetwork.schema.mpt.{GlobalStateKey, MptStore}
import io.constellationnetwork.security._
import io.constellationnetwork.security.hash.Hash
import io.constellationnetwork.security.signature.Signed
Expand All @@ -43,7 +44,8 @@ object RollbackLoader {
F,
GlobalIncrementalSnapshot,
GlobalSnapshotInfo
]
],
mptStore: MptStore[F, GlobalStateKey]
): RollbackLoader[F] =
new RollbackLoader[F](
keyPair,
Expand All @@ -56,7 +58,8 @@ object RollbackLoader {
globalSnapshotStorage,
lastNGlobalSnapshotStorage,
lastGlobalSnapshotStorage,
combinedSnapshotCheckpointFileSystemStorage
combinedSnapshotCheckpointFileSystemStorage,
mptStore
) {}
}

Expand All @@ -71,7 +74,12 @@ sealed abstract class RollbackLoader[F[_]: Async: Parallel: KryoSerializer: Json
globalSnapshotStorage: SnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
lastNGlobalSnapshotStorage: LastNGlobalSnapshotStorage[F],
lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo],
combinedSnapshotCheckpointFileSystemStorage: CombinedSnapshotCheckpointFileSystemStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo]
combinedSnapshotCheckpointFileSystemStorage: CombinedSnapshotCheckpointFileSystemStorage[
F,
GlobalIncrementalSnapshot,
GlobalSnapshotInfo
],
mptStore: MptStore[F, GlobalStateKey]
) {

private val logger = Slf4jLogger.getLogger[F]
Expand All @@ -97,7 +105,8 @@ sealed abstract class RollbackLoader[F[_]: Async: Parallel: KryoSerializer: Json
globalSnapshotStorage,
lastNGlobalSnapshotStorage,
lastGlobalSnapshotStorage,
download
download,
mptStore
)
snapshotTraverse.loadChain()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ object Programs {
globalSnapshotStorage,
lastNGlobalSnapshotStorage,
lastGlobalSnapshotStorage,
storages.combinedGlobalSnapshotCheckpointStorage
storages.combinedGlobalSnapshotCheckpointStorage,
mptStore
)

new Programs[F](sharedPrograms.peerDiscovery, sharedPrograms.joining, trustPush, download, rollbackLoader) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ object GlobalSnapshotTraverseSuite extends MutableIOSuite with Checkers {
globalSnapshotStorage,
lastNSnapshotStorage,
lastSnapshotStorage,
download
download,
mptStore
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ abstract class SnapshotProcessor[
lastSnapshotStorage: LastSnapshotStorage[F, S, SI],
addressStorage: AddressStorage[F],
mptStore: MptStore[F, GlobalStateKey]
): F[SnapshotProcessingResult] =
)(implicit hasher: Hasher[F]): F[SnapshotProcessingResult] =
alignment match {
case AlignedAtNewOrdinal(
snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import cats.syntax.all._

import io.constellationnetwork.dag.l1.domain.transaction.ContextualTransactionValidator.NonContextualValidationError
import io.constellationnetwork.ext.cats.syntax.validated.validatedSyntax
import io.constellationnetwork.node.shared.domain.collateral.LatestBalances
import io.constellationnetwork.node.shared.domain.snapshot.storage.LastSnapshotStorage
import io.constellationnetwork.node.shared.domain.transaction.TransactionValidator
import io.constellationnetwork.schema.SnapshotOrdinal
Expand All @@ -19,6 +20,7 @@ import io.constellationnetwork.schema.transaction.Transaction
import io.constellationnetwork.security.hash.Hash
import io.constellationnetwork.security.{Hashed, Hasher}

import fs2.Stream
import io.circe.Json

import ContextualTransactionValidator.ContextualTransactionValidationError
Expand All @@ -38,11 +40,37 @@ object TransactionService {
SI <: SnapshotInfo[P]
](
transactionStorage: TransactionStorage[F],
lastSnapshotStorage: LastSnapshotStorage[F, S, SI],
lastSnapshotStorage: LastSnapshotStorage[F, S, SI] with LatestBalances[F],
transactionValidator: TransactionValidator[F],
mptStore: MptStore[F, GlobalStateKey]
mptStore: MptStore[F, GlobalStateKey],
shouldUseMptStore: Boolean
): TransactionService[F] = new TransactionService[F] {

def useMptStore(
transaction: Hashed[Transaction]
)(implicit hasher: Hasher[F]) =
for {
maybeSnapshot <- lastSnapshotStorage.get
ordinal = maybeSnapshot.map(_.ordinal).getOrElse(SnapshotOrdinal.MinValue)
balance <- mptStore.getBalance(transaction.source).map(_.getOrElse(Balance.empty))
result <- transactionStorage.tryPut(transaction, ordinal, balance)
} yield result

def useGlobalSnapshotInfo(
transaction: Hashed[Transaction]
) =
lastSnapshotStorage.getCombinedStream.map {
case Some((s, si)) => (s.ordinal, si.balances.getOrElse(transaction.source, Balance.empty))
case None => (SnapshotOrdinal.MinValue, Balance.empty)
}.changes.switchMap {
case (latestOrdinal, balance) => Stream.eval(transactionStorage.tryPut(transaction, latestOrdinal, balance))
}.head.compile.last.flatMap {
case Some(value) => value.pure[F]
case None =>
new Exception(s"Unexpected state, stream should always emit the first snapshot")
.raiseError[F, Either[NonEmptyList[ContextualTransactionValidationError], Hash]]
}

def offer(
transaction: Hashed[Transaction]
)(implicit hasher: Hasher[F]): F[Either[NonEmptyList[ContextualTransactionValidationError], Hash]] =
Expand All @@ -51,13 +79,11 @@ object TransactionService {
.map(_.errorMap(NonContextualValidationError))
.flatMap {
case Valid(_) =>
for {
maybeSnapshot <- lastSnapshotStorage.get
ordinal = maybeSnapshot.map(_.ordinal).getOrElse(SnapshotOrdinal.MinValue)
balance <- mptStore.getBalance(transaction.source).map(_.getOrElse(Balance.empty))
result <- transactionStorage.tryPut(transaction, ordinal, balance)
} yield result

if (shouldUseMptStore) {
useMptStore(transaction)
} else {
useGlobalSnapshotInfo(transaction)
}
case Invalid(e) =>
e.toNonEmptyList.asLeft[Hash].leftWiden[NonEmptyList[ContextualTransactionValidationError]].pure[F]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ object Services {
.make[F](p2PClient.l0GlobalSnapshot, globalL0Cluster, lastGlobalSnapshotStorage, None, maybeMajorityPeerIds)
val session = sharedServices.session
val transaction =
TransactionService.make[F, P, S, SI](storages.transaction, storages.lastSnapshot, validators.transaction, sharedStorages.mptStore)
TransactionService.make[F, P, S, SI](
storages.transaction,
storages.lastSnapshot,
validators.transaction,
sharedStorages.mptStore,
shouldUseMptStore = true
)
val allowSpend =
AllowSpendService.make[F, P, S, SI](storages.allowSpend, storages.lastSnapshot, validators.allowSpend)
val allowSpendBlock = AllowSpendBlockService.make[F, P, S, SI](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,7 @@ object SnapshotProcessorSuite extends SimpleIOSuite with TransactionGenerator {
),
srcKey
).flatMap(_.toHashedWithSignatureCheck.map(_.toOption.get))
kv <- snapshotInfo.allStateEntries[IO]
_ <- mptStore.sync(kv)
_ <- mptStore.syncFromGlobalSnapshotInfo(snapshotInfo)
_ <- lastSnapR.set((hashedLastSnapshot, snapshotInfo).some)
lastN = (hashedLastSnapshot, snapshotInfo).some
incLastN = SortedMap(hashedLastSnapshot.ordinal -> hashedLastSnapshot)
Expand Down Expand Up @@ -1699,8 +1698,7 @@ object SnapshotProcessorSuite extends SimpleIOSuite with TransactionGenerator {
),
srcKey
).flatMap(_.toHashedWithSignatureCheck.map(_.toOption.get))
kv <- snapshotInfo.allStateEntries[IO]
_ <- mptStore.sync(kv)
_ <- mptStore.syncFromGlobalSnapshotInfo(snapshotInfo)
_ <- lastSnapR.set((hashedLastSnapshot, snapshotInfo).some)
lastN = (hashedLastSnapshot, snapshotInfo).some
_ <- lastNSnapR.set(lastN)
Expand Down
Loading
Loading