diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala index 6498803c19..5f1f1bcdf9 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala @@ -16,7 +16,10 @@ import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.{ SpliceTestConsoleEnvironment, } import org.lfdecentralizedtrust.splice.scan.admin.http.ProtobufJsonScanHttpEncodings -import org.lfdecentralizedtrust.splice.scan.automation.ScanHistoryBackfillingTrigger +import org.lfdecentralizedtrust.splice.scan.automation.{ + DeleteCorruptAcsSnapshotTrigger, + ScanHistoryBackfillingTrigger, +} import org.lfdecentralizedtrust.splice.store.{PageLimit, TreeUpdateWithMigrationId} import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AdvanceOpenMiningRoundTrigger import org.lfdecentralizedtrust.splice.util.{EventId, UpdateHistoryTestUtil, WalletTestUtil} @@ -57,6 +60,7 @@ class ScanHistoryBackfillingIntegrationTest updateAutomationConfig(ConfigurableApp.Scan)( _.withPausedTrigger[ScanHistoryBackfillingTrigger] .withPausedTrigger[TxLogBackfillingTrigger[TxLogEntry]] + .withPausedTrigger[DeleteCorruptAcsSnapshotTrigger] )(config) ) .addConfigTransforms((_, config) => @@ -258,6 +262,28 @@ class ScanHistoryBackfillingIntegrationTest } } + clue( + "Backfilling triggers state. All idle while waiting for corrupt snapshots to be deleted." + ) { + sv1BackfillTrigger.retrieveTasks().futureValue should be(empty) + sv2BackfillTrigger.retrieveTasks().futureValue should be(empty) + sv1ScanTxLogBackfillTrigger.retrieveTasks().futureValue should be(empty) + sv2ScanTxLogBackfillTrigger.retrieveTasks().futureValue should be(empty) + } + + actAndCheck( + "Run trigger that checks for corrupt snapshots once on all scans", { + sv1DeleteSnapshotsTrigger.runOnce().futureValue + sv2DeleteSnapshotsTrigger.runOnce().futureValue + }, + )( + "History marked as free of corrupt snapshots", + _ => { + sv1ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true + sv2ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true + }, + ) + clue( "Backfilling triggers state. SV1+SV2: update is about to initialize, txlog is not doing anything yet" ) { @@ -521,6 +547,10 @@ class ScanHistoryBackfillingIntegrationTest sv1ScanBackend.automation.trigger[TxLogBackfillingTrigger[TxLogEntry]] private def sv2ScanTxLogBackfillTrigger(implicit env: SpliceTestConsoleEnvironment) = sv2ScanBackend.automation.trigger[TxLogBackfillingTrigger[TxLogEntry]] + private def sv1DeleteSnapshotsTrigger(implicit env: SpliceTestConsoleEnvironment) = + sv1ScanBackend.automation.trigger[DeleteCorruptAcsSnapshotTrigger] + private def sv2DeleteSnapshotsTrigger(implicit env: SpliceTestConsoleEnvironment) = + sv2ScanBackend.automation.trigger[DeleteCorruptAcsSnapshotTrigger] private def allUpdatesFromScanBackend(scanBackend: ScanAppBackendReference) = { // Need to use the store directly, as the HTTP endpoint refuses to return data unless it's completely backfilled diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala index 5dd2754a5f..f576f5a3da 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala @@ -135,6 +135,41 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit )(metricsContext) } + object CorruptAcsSnapshots { + private val corruptAcsSnapshotsPrefix: MetricName = prefix :+ "corrupt-acs-snapshots" + + type CantonTimestampMicros = + Long // OpenTelemetry Gauges only allow numeric types and there's no way to map it + val latestRecordTime: Gauge[CantonTimestampMicros] = + metricsFactory.gauge( + MetricInfo( + name = corruptAcsSnapshotsPrefix :+ "latest-record-time", + summary = "The record time of the latest corrupt snapshot that has been deleted", + Traffic, + ), + initial = CantonTimestamp.MinValue.toMicros, + )(metricsContext) + + val count: Counter = + metricsFactory.counter( + MetricInfo( + name = corruptAcsSnapshotsPrefix :+ "count", + summary = "The number of corrupt ACS snapshots deleted", + Traffic, + ) + )(metricsContext) + + val completed: Gauge[Int] = + metricsFactory.gauge( + MetricInfo( + name = corruptAcsSnapshotsPrefix :+ "completed", + summary = "Whether all corrupt snapshots are deleted (1) or not (0)", + Debug, + ), + initial = 0, + )(metricsContext) + } + object UpdateHistory { private val updateHistoryPrefix: MetricName = prefix :+ "updates" diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala index 23b3ea501a..56cdf8aec1 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala @@ -68,6 +68,23 @@ import scala.jdk.CollectionConverters.* import scala.jdk.OptionConverters.* import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture +/** Stores all original daml updates visible to `updateStreamParty`. + * + * ==Related triggers== + * + * The following triggers perform long-running background tasks related to [[UpdateHistory]], + * and must complete in the following order: + * + * 1. [[DeleteCorruptAcsSnapshotTrigger]] deletes all ACS snapshots that were computed from this UpdateHistory while + * it was missing import updates. Such snapshots are easily identified by the trigger. + * UpdateHistory has an in-memory flag (as part of [[UpdateHistory.State]]) that stores + * whether all corrupt updates have been deleted. + * See [[corruptAcsSnapshotsDeleted]] and [[markCorruptAcsSnapshotsDeleted]]. + * 1. [[ScanHistoryBackfillingTrigger]] backfills missing updates from peer scan applications. + * Information on the progress of this backfilling process is stored in the database. + * See [[destinationHistory.markBackfillingComplete]] and [[destinationHistory.markImportUpdatesBackfillingComplete]]. + * 1. [[AcsSnapshotTrigger]] backfills ACS snapshots. + */ class UpdateHistory( storage: DbStorage, val domainMigrationInfo: DomainMigrationInfo, @@ -246,14 +263,6 @@ class UpdateHistory( .map(_.map(LegacyOffset.Api.assertFromStringToLong)) _ <- cleanUpDataAfterDomainMigration(newHistoryId) - - _ <- - if (enableImportUpdateBackfill) { - deleteInvalidAcsSnapshots(newHistoryId) - } else { - logger.info(s"Not deleting invalid ACS snapshots for history $newHistoryId") - Future.unit - } } yield { state.updateAndGet( _.copy( @@ -654,21 +663,17 @@ class UpdateHistory( """ } - private[this] def deleteInvalidAcsSnapshots( - historyId: Long - )(implicit tc: TraceContext): Future[Unit] = { - assert(enableImportUpdateBackfill) - def migrationsWithCorruptSnapshots(): Future[Set[Long]] = { - for { - migrationsWithImportUpdates <- storage - .query( - // The following is equivalent to: - // """select distinct migration_id - // from update_history_transactions - // where history_id = $historyId - // and record_time = ${CantonTimestamp.MinValue}""" - // but it uses a recursive CTE to implement a loose index scan - sql""" + def migrationsWithCorruptSnapshots()(implicit tc: TraceContext): Future[Set[Long]] = { + for { + migrationsWithImportUpdates <- storage + .query( + // The following is equivalent to: + // """select distinct migration_id + // from update_history_transactions + // where history_id = $historyId + // and record_time = ${CantonTimestamp.MinValue}""" + // but it uses a recursive CTE to implement a loose index scan + sql""" with recursive t as ( ( select migration_id @@ -688,56 +693,34 @@ class UpdateHistory( ) select migration_id from t where migration_id is not null """.as[Long], - "deleteInvalidAcsSnapshots.1", - ) - firstMigrationId <- getFirstMigrationId(historyId) - migrationsWithSnapshots <- storage - .query( - sql""" + "deleteInvalidAcsSnapshots.1", + ) + firstMigrationIdO <- getFirstMigrationId(historyId) + migrationsWithSnapshots <- storage + .query( + sql""" select distinct migration_id from acs_snapshot where history_id = $historyId """.as[Long], - "deleteInvalidAcsSnapshots.2", - ) - } yield { - val migrationsThatNeedImportUpdates: Set[Long] = - migrationsWithSnapshots.toSet - firstMigrationId.getOrElse( - throw new RuntimeException("No first migration found") - ) - migrationsThatNeedImportUpdates -- migrationsWithImportUpdates.toSet + "deleteInvalidAcsSnapshots.2", + ) + } yield { + firstMigrationIdO match { + case None => + Set.empty + case Some(firstMigrationId) => + val migrationsThatNeedImportUpdates: Set[Long] = + migrationsWithSnapshots.toSet - firstMigrationId + migrationsThatNeedImportUpdates -- migrationsWithImportUpdates.toSet } } + } - for { - state <- getBackfillingStateForHistory(historyId) - _ <- state match { - // Note: we want to handle the case where backfilling finished before import update backfilling was implemented, - // because in that case UpdateHistory signalled that history was complete when it fact it was missing import updates, - // which caused [[AcsSnapshotTrigger]] to compute corrupt snapshots. - // It is fine to run the code below on each application startup because it only deletes corrupt snapshots. - case BackfillingState.InProgress(_, _) => - logger.info( - s"This update history may be missing import updates, checking for corrupt ACS snapshots" - ) - migrationsWithCorruptSnapshots() - .flatMap { migrations => - Future.sequence(migrations.map { migrationId => - deleteAcsSnapshotsAfter(historyId, migrationId, CantonTimestamp.MinValue) - }) - } - .andThen { case _ => - logger.info(s"Finished checking for corrupt ACS snapshots") - } - case _ => - logger.debug( - s"History is in backfilling state $state, no need to check for corrupt ACS snapshots" - ) - Future.unit - } - } yield { - () - } + def corruptAcsSnapshotsDeleted: Boolean = state.get.corruptSnapshotsDeleted + def markCorruptAcsSnapshotsDeleted(): Unit = { + state.updateAndGet(_.copy(corruptSnapshotsDeleted = true)) + () } private[this] def cleanUpDataAfterDomainMigration( @@ -2232,11 +2215,15 @@ object UpdateHistory { ) case class State( - historyId: Option[Long] + historyId: Option[Long], + corruptSnapshotsDeleted: Boolean, ) {} object State { - def empty(): State = State(None) + def empty(): State = State( + historyId = None, + corruptSnapshotsDeleted = false, + ) } sealed trait BackfillingState diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/DeleteCorruptAcsSnapshotTrigger.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/DeleteCorruptAcsSnapshotTrigger.scala new file mode 100644 index 0000000000..d162de4418 --- /dev/null +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/DeleteCorruptAcsSnapshotTrigger.scala @@ -0,0 +1,96 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.scan.automation + +import com.daml.metrics.api.MetricsContext +import org.lfdecentralizedtrust.splice.automation.{ + PollingParallelTaskExecutionTrigger, + TaskOutcome, + TaskSuccess, + TriggerContext, +} +import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore +import org.lfdecentralizedtrust.splice.store.{HistoryMetrics, UpdateHistory} +import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} +import com.digitalasset.canton.tracing.TraceContext +import io.opentelemetry.api.trace.Tracer +import org.apache.pekko.stream.Materializer + +import scala.concurrent.{ExecutionContext, Future} + +class DeleteCorruptAcsSnapshotTrigger( + store: AcsSnapshotStore, + updateHistory: UpdateHistory, + protected val context: TriggerContext, +)(implicit + ec: ExecutionContext, + tracer: Tracer, + mat: Materializer, + // we always return 1 task, so PollingParallelTaskExecutionTrigger in effect does nothing in parallel +) extends PollingParallelTaskExecutionTrigger[DeleteCorruptAcsSnapshotTrigger.Task] { + + private val historyMetrics = new HistoryMetrics(context.metricsFactory)(MetricsContext.Empty) + + override def retrieveTasks()(implicit + tc: TraceContext + ): Future[Seq[DeleteCorruptAcsSnapshotTrigger.Task]] = { + if (!updateHistory.isReady) { + Future.successful(Seq.empty) + } else if (updateHistory.corruptAcsSnapshotsDeleted) { + Future.successful(Seq.empty) + } else { + for { + migrations <- updateHistory.migrationsWithCorruptSnapshots() + } yield migrations.lastOption match { + case Some(migrationToClean) => + historyMetrics.CorruptAcsSnapshots.completed.updateValue(0) + Seq(DeleteCorruptAcsSnapshotTrigger.Task(migrationToClean)) + case None => + updateHistory.markCorruptAcsSnapshotsDeleted() + historyMetrics.CorruptAcsSnapshots.completed.updateValue(1) + Seq.empty + } + } + } + + override protected def completeTask(task: DeleteCorruptAcsSnapshotTrigger.Task)(implicit + tc: TraceContext + ): Future[TaskOutcome] = task match { + case DeleteCorruptAcsSnapshotTrigger.Task(migrationId) => + for { + lastSnapshotO <- store.lookupSnapshotBefore(migrationId, CantonTimestamp.MaxValue) + lastSnapshot = lastSnapshotO.getOrElse( + throw new RuntimeException("Task should never become stale") + ) + _ <- store.deleteSnapshot(lastSnapshot) + } yield { + historyMetrics.CorruptAcsSnapshots.count.inc() + historyMetrics.CorruptAcsSnapshots.latestRecordTime.updateValue( + lastSnapshot.snapshotRecordTime.toMicros + ) + TaskSuccess( + s"Successfully deleted snapshot $lastSnapshot." + ) + } + } + + override protected def isStaleTask(task: DeleteCorruptAcsSnapshotTrigger.Task)(implicit + tc: TraceContext + ): Future[Boolean] = Future.successful(false) +} + +object DeleteCorruptAcsSnapshotTrigger { + + case class Task( + migrationId: Long + ) extends PrettyPrinting { + import org.lfdecentralizedtrust.splice.util.PrettyInstances.* + + override def pretty: Pretty[this.type] = prettyOfClass( + param("migrationId", _.migrationId) + ) + } + +} diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala index 01e4ef21ff..d32a6e7bf0 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala @@ -93,6 +93,15 @@ class ScanAutomationService( triggerContext, ) ) + if (config.updateHistoryBackfillImportUpdatesEnabled) { + registerTrigger( + new DeleteCorruptAcsSnapshotTrigger( + snapshotStore, + store.updateHistory, + triggerContext, + ) + ) + } if (config.txLogBackfillEnabled) { registerTrigger( new TxLogBackfillingTrigger( diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala index e886241da9..0889357595 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala @@ -91,6 +91,9 @@ class ScanHistoryBackfillingTrigger( if (!store.updateHistory.isReady) { logger.debug("UpdateHistory is not yet ready") Future.successful(Seq.empty) + } else if (importUpdateBackfillingEnabled && !store.updateHistory.corruptAcsSnapshotsDeleted) { + logger.debug("There may be corrupt ACS snapshots that need to be deleted") + Future.successful(Seq.empty) } else { store.updateHistory.getBackfillingState().map { case BackfillingState.Complete => diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala index d2dddaff5b..9fe6082d92 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala @@ -23,6 +23,7 @@ import com.digitalasset.canton.resource.DbStorage.Implicits.BuilderChain.toSQLAc import com.digitalasset.canton.topology.PartyId import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent +import slick.dbio.DBIOAction import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton import slick.jdbc.{GetResult, JdbcProfile} @@ -39,10 +40,10 @@ class AcsSnapshotStore( with AcsQueries with LimitHelpers with NamedLogging { - import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture override val profile: JdbcProfile = storage.profile.jdbc + import profile.api.jdbcActionExtensionMethods private def historyId = updateHistory.historyId @@ -152,6 +153,18 @@ class AcsSnapshotStore( } } + def deleteSnapshot( + snapshot: AcsSnapshot + )(implicit + tc: TraceContext + ): Future[Unit] = { + val statement = DBIOAction.seq( + sqlu"""delete from acs_snapshot where snapshot_record_time = ${snapshot.snapshotRecordTime}""", + sqlu"""delete from acs_snapshot_data where row_id between ${snapshot.firstRowId} and ${snapshot.lastRowId}""", + ) + storage.update(statement.transactionally, "deleteSnapshot") + } + def queryAcsSnapshot( migrationId: Long, snapshot: CantonTimestamp, diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala index ece1c32fbc..7d8b0b0adb 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala @@ -18,7 +18,7 @@ import org.lfdecentralizedtrust.splice.util.{Contract, HoldingsSummary, PackageQ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.FutureUnlessShutdown import com.digitalasset.canton.resource.DbStorage -import com.digitalasset.canton.topology.{PartyId, SynchronizerId} +import com.digitalasset.canton.topology.PartyId import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.MonadUtil import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} @@ -789,17 +789,14 @@ class AcsSnapshotStoreTest backfillingRequired = BackfillingRequirement.NeedsBackfilling, ) store1 = mkStore(updateHistory1) + migrationsWithCorruptSnapshots1 <- store1.updateHistory.migrationsWithCorruptSnapshots() + _ = migrationsWithCorruptSnapshots1 shouldBe Set.empty update1 <- ingestCreate( updateHistory1, amuletRules(), timestamp1.minusSeconds(1L), ) _ <- store1.insertNewSnapshot(None, firstMigration, timestamp1) - snapshot1first <- store1.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot1second <- store1.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - - _ = snapshot1first.map(_.snapshotRecordTime).value should be(timestamp1) - _ = snapshot1second should be(None) // Second migration. This is missing the import update corresponding to the create above. updateHistory2 <- mkUpdateHistory( @@ -812,58 +809,34 @@ class AcsSnapshotStoreTest amuletRules(), timestamp2.minusSeconds(1L), ) + + // This snapshot on the second migration is corrupt, it should be possible to detect and delete it _ <- store2.insertNewSnapshot(None, secondMigration, timestamp2) - snapshot2first <- store2.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot2second <- store2.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - _ = snapshot2first.map(_.snapshotRecordTime).value should be(timestamp1) - _ = snapshot2second.map(_.snapshotRecordTime).value should be(timestamp2) - _ = snapshot2second.map(s => s.lastRowId - s.firstRowId).value should be(0) + migrationsWithCorruptSnapshots2 <- store2.updateHistory.migrationsWithCorruptSnapshots() + _ = migrationsWithCorruptSnapshots2 shouldBe Set(secondMigration) - // Mark backfilling as complete, but import update backfilling as not complete. - // This is the state affected SVs should be in. - _ <- updateHistory1.initializeBackfilling( - joiningMigrationId = firstMigration, - joiningSynchronizerId = SynchronizerId.tryFromString(update1.tree.getSynchronizerId), - joiningUpdateId = update1.tree.getUpdateId, - complete = false, - ) - _ <- updateHistory1.destinationHistory.markBackfillingComplete() + corruptSnapshot <- store2.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) + _ = corruptSnapshot.value.snapshotRecordTime shouldBe timestamp2 - // Re-initialize the store. This should delete the second snapshot. - updateHistory3 <- mkUpdateHistory( + _ <- store2.deleteSnapshot(corruptSnapshot.value) + corruptSnapshotAfterDelete <- store2.lookupSnapshotBefore( secondMigration, - backfillingRequired = BackfillingRequirement.NeedsBackfilling, + CantonTimestamp.MaxValue, ) - store3 = mkStore(updateHistory3) - snapshot3first <- store3.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot3second <- store3.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - - _ = snapshot3first.map(_.snapshotRecordTime).value should be(timestamp1) - _ = snapshot3second should be(None) + _ = corruptSnapshotAfterDelete should be(empty) // Ingest some import update to simulate the import update backfilling and re-create the snapshot _ <- ingestCreate( - updateHistory3, + updateHistory2, amuletRules(), CantonTimestamp.MinValue, ) - _ <- updateHistory1.destinationHistory.markImportUpdatesBackfillingComplete() - _ <- store3.insertNewSnapshot(None, secondMigration, timestamp2) + _ <- store2.insertNewSnapshot(None, secondMigration, timestamp2) - // Re-initialize the store. The second snapshot should now not be deleted. - updateHistory4 <- mkUpdateHistory( - secondMigration, - backfillingRequired = BackfillingRequirement.NeedsBackfilling, - ) - store4 = mkStore(updateHistory4) - snapshot4first <- store4.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot4second <- store4.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - } yield { - snapshot4first.map(_.snapshotRecordTime).value should be(timestamp1) - snapshot4second.map(_.snapshotRecordTime).value should be(timestamp2) - snapshot4second.map(s => s.lastRowId - s.firstRowId).value should be(1) - } + migrationsWithCorruptSnapshots3 <- store2.updateHistory.migrationsWithCorruptSnapshots() + _ = migrationsWithCorruptSnapshots3 shouldBe Set.empty + } yield succeed } } }