From d01e15bae69daf9abc00060d241f8044f61a1f8d Mon Sep 17 00:00:00 2001 From: Robert Autenrieth Date: Wed, 18 Jun 2025 10:59:33 +0000 Subject: [PATCH 1/2] Revert "Reapply and fix backfill import updates (#18432)" This reverts commit b4cd510366922c641aaa4e58df3833866b8276df. Signed-off-by: Robert Autenrieth --- ...SynchronizerMigrationIntegrationTest.scala | 2 +- ...canHistoryBackfillingIntegrationTest.scala | 4 +- .../splice/util/UpdateHistoryTestUtil.scala | 6 +- .../V039__backfilling_import_updates2.sql | 13 - .../splice/store/HistoryBackfilling.scala | 5 - .../splice/store/HistoryMetrics.scala | 33 - .../store/ImportUpdatesBackfilling.scala | 239 ------- .../splice/store/UpdateHistory.scala | 644 +++--------------- .../store/UpdateHistoryBackfillingTest.scala | 157 +---- .../splice/store/UpdateHistoryTest.scala | 81 +-- .../splice/store/UpdateHistoryTestBase.scala | 30 +- apps/scan/src/main/openapi/scan.yaml | 62 +- .../client/BackfillingScanConnection.scala | 6 - .../admin/api/client/BftScanConnection.scala | 49 -- .../api/client/SingleScanConnection.scala | 14 - .../client/commands/HttpScanAppClient.scala | 38 -- .../scan/admin/http/HttpScanHandler.scala | 48 +- .../scan/automation/AcsSnapshotTrigger.scala | 14 +- .../ScanHistoryBackfillingTrigger.scala | 51 +- .../scan/store/ScanHistoryBackfilling.scala | 48 +- .../api/client/BftScanConnectionTest.scala | 158 +---- .../automation/AcsSnapshotTriggerTest.scala | 158 +---- .../store/ScanHistoryBackfillingTest.scala | 140 +--- .../store/db/AcsSnapshotStoreTest.scala | 141 +--- 24 files changed, 258 insertions(+), 1883 deletions(-) delete mode 100644 apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql delete mode 100644 apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ImportUpdatesBackfilling.scala diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala index 78652dcc3d..8dc7b605a3 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala @@ -1070,7 +1070,7 @@ class DecentralizedSynchronizerMigrationIntegrationTest val backfilledUpdates = sv1ScanLocalBackend.appState.store.updateHistory - .getAllUpdates(None, PageLimit.tryCreate(1000)) + .getUpdates(None, includeImportUpdates = true, PageLimit.tryCreate(1000)) .futureValue backfilledUpdates.collect { case TreeUpdateWithMigrationId(tree, migrationId) diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala index 1c759bc85b..ae97ccc243 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala @@ -284,7 +284,7 @@ class ScanHistoryBackfillingIntegrationTest sv2ScanBackend.appState.store.updateHistory .getBackfillingState() - .futureValue should be(BackfillingState.InProgress(false, false)) + .futureValue should be(BackfillingState.InProgress) sv2ScanBackend.getBackfillingStatus().complete shouldBe false assertThrowsAndLogsCommandFailures( readUpdateHistoryFromScan(sv2ScanBackend), @@ -518,7 +518,7 @@ class ScanHistoryBackfillingIntegrationTest private def allUpdatesFromScanBackend(scanBackend: ScanAppBackendReference) = { // Need to use the store directly, as the HTTP endpoint refuses to return data unless it's completely backfilled scanBackend.appState.store.updateHistory - .getAllUpdates(None, PageLimit.tryCreate(1000)) + .getUpdates(None, includeImportUpdates = true, PageLimit.tryCreate(1000)) .futureValue } diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala index cd58b41ad0..e63e22e3af 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala @@ -95,7 +95,7 @@ trait UpdateHistoryTestUtil extends TestCommon { updateHistoryFromParticipant(ledgerBegin, updateHistory.updateStreamParty, participant) val recordedUpdates = updateHistory - .getAllUpdates( + .getUpdates( Some( ( 0L, @@ -104,6 +104,7 @@ trait UpdateHistoryTestUtil extends TestCommon { actualUpdates.head.update.recordTime.addMicros(-1L), ) ), + includeImportUpdates = true, PageLimit.tryCreate(actualUpdates.size), ) .futureValue @@ -138,8 +139,9 @@ trait UpdateHistoryTestUtil extends TestCommon { scanClient: ScanAppClientReference, ): Assertion = { val historyFromStore = scanBackend.appState.store.updateHistory - .getAllUpdates( + .getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) .futureValue diff --git a/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql b/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql deleted file mode 100644 index 24ec7fc1fc..0000000000 --- a/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql +++ /dev/null @@ -1,13 +0,0 @@ --- Only SVs that joined the network at any point in the initial migration have all import updates --- This statement was already part of the migration script V036__backfilling_import_updates.sql. --- It needs to be executed again because Scala code for backfilling import updates --- was reverted and re-applied between these two migrations. -update update_history_backfilling as bf -set import_updates_complete = true -where - bf.complete = true and - bf.joining_migration_id = ( - select min(migration_id) - from update_history_transactions as tx - where bf.history_id = tx.history_id - ); diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryBackfilling.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryBackfilling.scala index dd602fb5fa..c0f710c033 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryBackfilling.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryBackfilling.scala @@ -285,20 +285,15 @@ object HistoryBackfilling { * None if the given migration id is the beginning of known history. * @param recordTimeRange All domains that produced history items in the given migration id, * along with the record time of the newest and oldest history item associated with each domain. - * @param lastImportUpdateId The id of the last import update (where import updates are sorted by update id) - * for the given migration id, if any. * @param complete True if the backfilling for the given migration id is complete, * i.e., the history knows the first item for each domain in the given migration id. * We need this to decide when the backfilling is complete, because it might be difficult to * identify the first item of a migration otherwise. - * @param importUpdatesComplete True if the import updates for the given migration id are complete. */ final case class SourceMigrationInfo( previousMigrationId: Option[Long], recordTimeRange: Map[SynchronizerId, DomainRecordTimeRange], - lastImportUpdateId: Option[String], complete: Boolean, - importUpdatesComplete: Boolean, ) /** Information about the point at which backfilling is currently inserting data. diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala index 897280050b..27c83ace32 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala @@ -102,39 +102,6 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit )(metricsContext) } - object ImportUpdatesBackfilling { - private val importUpdatesBackfillingPrefix: MetricName = prefix :+ "import-updates-backfilling" - - val latestMigrationId: Gauge[Long] = - metricsFactory.gauge( - MetricInfo( - name = importUpdatesBackfillingPrefix :+ "latest-migration-id", - summary = "The migration id of the latest backfilled import update", - Traffic, - ), - initial = -1L, - )(metricsContext) - - val contractCount: Counter = - metricsFactory.counter( - MetricInfo( - name = importUpdatesBackfillingPrefix :+ "contract-count", - summary = "The number of contracts that have been backfilled", - Traffic, - ) - )(metricsContext) - - val completed: Gauge[Int] = - metricsFactory.gauge( - MetricInfo( - name = importUpdatesBackfillingPrefix :+ "completed", - summary = "Whether it was completed (1) or not (0)", - Debug, - ), - initial = 0, - )(metricsContext) - } - object UpdateHistory { private val updateHistoryPrefix: MetricName = prefix :+ "updates" diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ImportUpdatesBackfilling.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ImportUpdatesBackfilling.scala deleted file mode 100644 index 2059923f39..0000000000 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ImportUpdatesBackfilling.scala +++ /dev/null @@ -1,239 +0,0 @@ -// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package org.lfdecentralizedtrust.splice.store - -import org.lfdecentralizedtrust.splice.store.ImportUpdatesBackfilling.{ - DestinationImportUpdates, - DestinationImportUpdatesBackfillingInfo, - Outcome, - SourceImportUpdates, -} -import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.tracing.TraceContext -import org.lfdecentralizedtrust.splice.store.HistoryBackfilling.SourceMigrationInfo -import org.lfdecentralizedtrust.splice.store.ImportUpdatesBackfilling.Outcome.{ - BackfillingIsComplete, - MoreWorkAvailableLater, - MoreWorkAvailableNow, -} - -import scala.concurrent.{ExecutionContext, Future} - -/** Copies import updates from a source to a destination history. - * - * "Import updates" are updates that represent the ACS right before a hard domain migration, - * and are imported into the participant right after a hard domain migration. - * They all have a record time of [[CantonTimestamp.MinValue]]. - * - * Notes: - * - This should have been part of [[HistoryBackfilling]], but that backfilling had already finished - * in production, so we need a separate process to fill in the missing import updates. - */ -final class ImportUpdatesBackfilling[T]( - destination: DestinationImportUpdates[T], - source: SourceImportUpdates[T], - batchSize: Int, - override protected val loggerFactory: NamedLoggerFactory, -)(implicit - ec: ExecutionContext -) extends NamedLogging { - - /** Backfill a small part of the destination history, making sure that the destination history won't have any gaps. - * - * It is intended that this function is called repeatedly until it returns `BackfillingIsComplete`, - * e.g., from a PollingTrigger. - * - * This function is *NOT* safe to call concurrently from multiple threads. - */ - def backfillImportUpdates()(implicit tc: TraceContext): Future[Outcome] = { - if (!source.isReady) { - logger.info("Source import updates are not ready, skipping backfill for now") - Future.successful(MoreWorkAvailableLater) - } else if (!destination.isReady) { - logger.info("Destination import updates are not ready, skipping backfill for now") - Future.successful(MoreWorkAvailableLater) - } else { - for { - backfillingInfoO <- destination.importUpdatesBackfillingInfo - outcome <- backfillingInfoO match { - case None => - // Do not backfill while there's absolutely no data, otherwise we don't know where to start backfilling - logger.info( - "Destination import updates is not ready, skipping backfilling for now" - ) - Future.successful(MoreWorkAvailableLater) - case Some(backfillingInfo) => - backfillMigrationId(backfillingInfo) - } - } yield outcome - } - } - - private def backfillMigrationId( - destInfo: DestinationImportUpdatesBackfillingInfo - )(implicit tc: TraceContext): Future[Outcome] = { - for { - srcInfo <- source.migrationInfo(destInfo.migrationId) - migrationId = destInfo.migrationId - outcome <- srcInfo match { - case None => - logger.debug( - s"Source is not ready for migration id $migrationId, skipping backfill for now" - ) - Future.successful(MoreWorkAvailableLater) - case Some(srcInfo) => - logger.debug(s"Backfilling migration id $migrationId") - backfillMigrationId( - srcInfo, - destInfo, - ) - } - } yield outcome - } - - private def backfillMigrationId( - srcInfo: SourceMigrationInfo, - destInfo: DestinationImportUpdatesBackfillingInfo, - )(implicit tc: TraceContext): Future[Outcome] = { - logger.debug( - s"backfillMigrationId(srcInfo=${srcInfo}, destInfo=${destInfo})" - ) - val migrationId = destInfo.migrationId - - if (!srcInfo.importUpdatesComplete) { - logger.debug( - s"Source is not complete, not backfilling." - ) - Future.successful(MoreWorkAvailableLater) - } else { - (srcInfo.lastImportUpdateId, destInfo.lastUpdateId, srcInfo.previousMigrationId) match { - case (None, None, None) => - logger.info( - s"Source is complete for migration $migrationId, there are no import updates on that migration, and there is no previous migration. " + - "Considering import update backfilling as complete." - ) - destination.markImportUpdatesBackfillingComplete().map { _ => - BackfillingIsComplete - } - case (Some(srcLastUpdateId), Some(destLastUpdateId), Some(srcPrevMigrationId)) - if destLastUpdateId == srcLastUpdateId => - logger.info( - s"No more import updates to backfill for migration ${migrationId}, continuing with previous migration id ${srcPrevMigrationId}" - ) - backfillMigrationId( - DestinationImportUpdatesBackfillingInfo(srcPrevMigrationId, None) - ) - case (Some(srcLastUpdateId), Some(destLastUpdateId), _) - if destLastUpdateId < srcLastUpdateId => - logger.debug( - s"Continuing to backfill import updates for migration id $migrationId from $destLastUpdateId until $srcLastUpdateId" - ) - copyImportUpdates(migrationId, destLastUpdateId) - case (Some(srcLastUpdateId), None, _) => - logger.debug( - s"Starting to backfill import updates for migration id $migrationId from the beginning until $srcLastUpdateId" - ) - copyImportUpdates(migrationId, destLastUpdateId = "") - case _ => - logger.error( - s"Unexpected import update backfilling state: srcInfo=$srcInfo, destInfo=$destInfo" - ) - Future.successful(MoreWorkAvailableLater) - } - - } - } - - private def copyImportUpdates(migrationId: Long, destLastUpdateId: String)(implicit - tc: TraceContext - ): Future[Outcome] = { - for { - items <- source.importUpdates(migrationId, destLastUpdateId, batchSize) - insertResult <- destination.insertImportUpdates(migrationId, items) - } yield MoreWorkAvailableNow(insertResult) - } -} - -object ImportUpdatesBackfilling { - - sealed trait Outcome extends Product with Serializable - - object Outcome { - - /** Backfilling is not complete, and there is more work to do right now. - * Call backfill() again immediately. - */ - final case class MoreWorkAvailableNow( - workDone: ImportUpdatesBackfilling.DestinationImportUpdates.InsertResult - ) extends Outcome - - /** Backfilling is not complete, but cannot proceed right now. - * Call backfill() again after a short delay. - */ - final case object MoreWorkAvailableLater extends Outcome - - /** Backfilling is fully complete. - * Stop calling backfill(), it won't do anything useful ever again. - */ - final case object BackfillingIsComplete extends Outcome - } - - /** Information about the point at which backfilling is currently inserting data. - * - * @param migrationId The migration id that is currently being backfilled - * @param lastUpdateId The id of the last import update that exists on the given migration, - * where import updates are ordered by update id. - */ - final case class DestinationImportUpdatesBackfillingInfo( - migrationId: Long, - lastUpdateId: Option[String], - ) - - trait SourceImportUpdates[T] { - - def isReady: Boolean - - /** Returns metadata for the given migration id. - * Returns None if data for the given migration id is not yet available. - */ - def migrationInfo(migrationId: Long)(implicit - tc: TraceContext - ): Future[Option[SourceMigrationInfo]] - - /** A batch of import updates from the given migration, sorted by update id, - * having an update id strictly greater than `afterUpdateId` - */ - def importUpdates( - migrationId: Long, - afterUpdateId: String, - count: Int, - )(implicit tc: TraceContext): Future[Seq[T]] - } - - trait DestinationImportUpdates[T] { - - def isReady: Boolean - - /** Returns information about the point at which backfilling is currently inserting data. - */ - def importUpdatesBackfillingInfo(implicit - tc: TraceContext - ): Future[Option[DestinationImportUpdatesBackfillingInfo]] - - /** Insert the given sequence of import updates. - */ - def insertImportUpdates(migrationId: Long, items: Seq[T])(implicit - tc: TraceContext - ): Future[DestinationImportUpdates.InsertResult] - - /** Explicitly marks the backfilling as complete */ - def markImportUpdatesBackfillingComplete()(implicit tc: TraceContext): Future[Unit] - } - object DestinationImportUpdates { - case class InsertResult( - migrationId: Long, - backfilledContracts: Long, - ) - } -} diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala index 39c14b39a9..9a78bad476 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala @@ -54,10 +54,6 @@ import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInt import com.digitalasset.canton.resource.DbStorage.Implicits.BuilderChain.toSQLActionBuilderChain import com.digitalasset.canton.resource.DbStorage.SQLActionBuilderChain import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent -import org.lfdecentralizedtrust.splice.store.ImportUpdatesBackfilling.{ - DestinationImportUpdates, - DestinationImportUpdatesBackfillingInfo, -} import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.IngestionSink.IngestionStart import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import slick.jdbc.canton.SQLActionBuilder @@ -87,20 +83,17 @@ class UpdateHistory( with NamedLogging { override lazy val profile: JdbcProfile = storage.api.jdbcProfile - import profile.api.jdbcActionExtensionMethods import UpdateHistory.* private[this] def domainMigrationId = domainMigrationInfo.currentMigrationId private val state = new AtomicReference[State](State.empty()) - def historyId: Long = state .get() .historyId .getOrElse(throw new RuntimeException("Using historyId before it was assigned")) - def isReady: Boolean = state.get().historyId.isDefined lazy val ingestionSink: MultiDomainAcsStore.IngestionSink = @@ -147,8 +140,7 @@ class UpdateHistory( d6 <- sqlu"delete from update_history_last_ingested_offsets where history_id = $newHistoryId" d7 <- sqlu"delete from update_history_descriptors where id = $newHistoryId" - _ <- - sqlu""" + _ <- sqlu""" update update_history_descriptors set store_name = ${lengthLimited(storeName)} where id = $oldHistoryId @@ -244,8 +236,6 @@ class UpdateHistory( .map(_.map(LegacyOffset.Api.assertFromStringToLong)) _ <- cleanUpDataAfterDomainMigration(newHistoryId) - - _ <- deleteInvalidAcsSnapshots(newHistoryId) } yield { state.updateAndGet( _.copy( @@ -646,91 +636,6 @@ class UpdateHistory( """ } - private[this] def deleteInvalidAcsSnapshots( - historyId: Long - )(implicit tc: TraceContext): Future[Unit] = { - def migrationsWithCorruptSnapshots(): Future[Set[Long]] = { - for { - migrationsWithImportUpdates <- storage - .query( - // The following is equivalent to: - // """select distinct migration_id - // from update_history_transactions - // where history_id = $historyId - // and record_time = ${CantonTimestamp.MinValue}""" - // but it uses a recursive CTE to implement a loose index scan - sql""" - with recursive t as ( - ( - select migration_id - from update_history_transactions - where history_id = $historyId and record_time = ${CantonTimestamp.MinValue} - order by migration_id limit 1 - ) - union all - select ( - select migration_id - from update_history_transactions - where migration_id > t.migration_id and history_id = $historyId and record_time = ${CantonTimestamp.MinValue} - order by migration_id limit 1 - ) - from t - where t.migration_id is not null - ) - select migration_id from t where migration_id is not null - """.as[Long], - "deleteInvalidAcsSnapshots.1", - ) - firstMigrationId <- getFirstMigrationId(historyId) - migrationsWithSnapshots <- storage - .query( - sql""" - select distinct migration_id - from acs_snapshot - where history_id = $historyId - """.as[Long], - "deleteInvalidAcsSnapshots.2", - ) - } yield { - val migrationsThatNeedImportUpdates: Set[Long] = - migrationsWithSnapshots.toSet - firstMigrationId.getOrElse( - throw new RuntimeException("No first migration found") - ) - migrationsThatNeedImportUpdates -- migrationsWithImportUpdates.toSet - } - } - - for { - state <- getBackfillingStateForHistory(historyId) - _ <- state match { - // Note: we want to handle the case where backfilling finished before import update backfilling was implemented, - // because in that case UpdateHistory signalled that history was complete when it fact it was missing import updates, - // which caused [[AcsSnapshotTrigger]] to compute corrupt snapshots. - // It is fine to run the code below on each application startup because it only deletes corrupt snapshots. - case BackfillingState.InProgress(_, _) => - logger.info( - s"This update history may be missing import updates, checking for corrupt ACS snapshots" - ) - migrationsWithCorruptSnapshots() - .flatMap { migrations => - Future.sequence(migrations.map { migrationId => - deleteAcsSnapshotsAfter(historyId, migrationId, CantonTimestamp.MinValue) - }) - } - .andThen { case _ => - logger.info(s"Finished checking for corrupt ACS snapshots") - } - case _ => - logger.debug( - s"History is in backfilling state $state, no need to check for corrupt ACS snapshots" - ) - Future.unit - } - } yield { - () - } - } - private[this] def cleanUpDataAfterDomainMigration( historyId: Long )(implicit tc: TraceContext): Future[Unit] = { @@ -1083,11 +988,12 @@ class UpdateHistory( } } - def getUpdatesWithoutImportUpdates( + def getUpdates( afterO: Option[(Long, CantonTimestamp)], + includeImportUpdates: Boolean, limit: PageLimit, )(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = { - val filters = afterFilters(afterO, includeImportUpdates = false) + val filters = afterFilters(afterO, includeImportUpdates) val orderBy = sql"migration_id, record_time, domain_id" for { txs <- getTxUpdates(filters, orderBy, limit) @@ -1098,23 +1004,6 @@ class UpdateHistory( } } - def getAllUpdates( - afterO: Option[(Long, CantonTimestamp)], - limit: PageLimit, - )(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = { - val filters = afterFilters(afterO, includeImportUpdates = true) - // With import updates, we have to include the update id to get a deterministic order. - // We don't have an index for this order, but this is only used in test code and deprecated scan endpoints. - val orderBy = sql"migration_id, record_time, domain_id, update_id" - for { - txs <- getTxUpdates(filters, orderBy, limit) - assignments <- getAssignmentUpdates(filters, orderBy, limit) - unassignments <- getUnassignmentUpdates(filters, orderBy, limit) - } yield { - (txs ++ assignments ++ unassignments).sorted.take(limit.limit) - } - } - def getUpdatesBefore( migrationId: Long, synchronizerId: SynchronizerId, @@ -1133,77 +1022,6 @@ class UpdateHistory( } } - /** Returns paginated import updates for the given migration id. - * - * Note: we store original import updates in the database (as we receive them from the ledger API), - * but we want this method to return updates that are consistent across SVs. - * Original import updates have an update id that differs across SVs, - * and we don't want to rely on the fact that each import update has exactly one create event. - * - * Therefore, we rewrite the import updates such that: - * - Each import update has exactly one create event - * - The update id is generated from the contract id - * - * Note: HttpScanHandler rewrites event ids in order to make them consistent across SVs. - * For import updates, we need to implement the rewrite here, as it needs to implemented - * in the database query. - */ - def getImportUpdates( - migrationId: Long, - afterUpdateId: String, - limit: PageLimit, - )(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = { - val query = - sql""" - select - tx.record_time, - tx.participant_offset, - tx.domain_id, - tx.migration_id, - tx.effective_at, - tx.workflow_id, - tx.command_id, - c.event_id, - c.contract_id, - c.created_at, - c.template_id_package_id, - c.template_id_module_name, - c.template_id_entity_name, - c.package_name, - c.create_arguments, - c.signatories, - c.observers, - c.contract_key - from - update_history_creates c, - update_history_transactions tx - where - c.history_id = $historyId and - c.migration_id = $migrationId and - c.record_time = ${CantonTimestamp.MinValue} and - c.contract_id > $afterUpdateId and - c.update_row_id = tx.row_id - order by c.contract_id asc - limit ${limit.limit} - """ - for { - rows <- storage - .query( - query.toActionBuilder.as[SelectFromImportUpdates], - "getImportUpdates", - ) - } yield { - rows.map { row => - TreeUpdateWithMigrationId( - decodeImportTransaction( - row - ), - row.migrationId, - ) - } - } - } - def getUpdate( updateId: String )(implicit tc: TraceContext): Future[Option[TreeUpdateWithMigrationId]] = { @@ -1352,53 +1170,6 @@ class UpdateHistory( } } - private def decodeImportTransaction( - updateRow: SelectFromImportUpdates - ): UpdateHistoryResponse = { - // The result should be consistent across SVs, so we generate a deterministic update id and event id. - // We don't use any prefix so that we can use an index on contract ids when fetching import updates. - val updateId = updateRow.contractId - val eventNodeId = 0 - - val createEvent = new CreatedEvent( - /*witnessParties = */ java.util.Collections.emptyList(), - /*offset = */ 0, // not populated - /*nodeId = */ eventNodeId, - /*templateId = */ tid( - updateRow.templatePackageId, - updateRow.templateModuleName, - updateRow.templateEntityName, - ), - /* packageName = */ updateRow.packageName, - /*contractId = */ updateRow.contractId, - /*arguments = */ ProtobufCodec.deserializeValue(updateRow.createArguments).asRecord().get(), - /*createdEventBlob = */ ByteString.EMPTY, - /*interfaceViews = */ java.util.Collections.emptyMap(), - /*failedInterfaceViews = */ java.util.Collections.emptyMap(), - /*contractKey = */ updateRow.contractKey.map(ProtobufCodec.deserializeValue).toJava, - /*signatories = */ updateRow.signatories.getOrElse(missingStringSeq).asJava, - /*observers = */ updateRow.observers.getOrElse(missingStringSeq).asJava, - /*createdAt = */ updateRow.createdAt.toInstant, - ) - - UpdateHistoryResponse( - update = TransactionTreeUpdate( - new TransactionTree( - /*updateId = */ updateId, - /*commandId = */ updateRow.commandId.getOrElse(missingString), - /*workflowId = */ updateRow.workflowId.getOrElse(missingString), - /*effectiveAt = */ updateRow.effectiveAt.toInstant, - /*offset = */ LegacyOffset.Api.assertFromStringToLong(updateRow.participantOffset), - /*eventsById = */ java.util.Map.of(eventNodeId, createEvent), - /*synchronizerId = */ updateRow.synchronizerId, - /*traceContext = */ TraceContextOuterClass.TraceContext.getDefaultInstance, - /*recordTime = */ updateRow.recordTime.toInstant, - ) - ), - synchronizerId = SynchronizerId.tryFromString(updateRow.synchronizerId), - ) - } - private def decodeTransaction( updateRow: SelectFromTransactions, createRows: Seq[SelectFromCreateEvents], @@ -1623,33 +1394,6 @@ class UpdateHistory( ) } - private implicit lazy val GetResultSelectFromImportUpdates: GetResult[SelectFromImportUpdates] = - GetResult { prs => - import prs.* - (SelectFromImportUpdates.apply _).tupled( - ( - <<[CantonTimestamp], - <<[String], - <<[String], - <<[Long], - <<[CantonTimestamp], - <<[Option[String]], - <<[Option[String]], - <<[String], - <<[String], - <<[CantonTimestamp], - <<[String], - <<[String], - <<[String], - <<[String], - <<[String], - <<[Option[Seq[String]]], - <<[Option[Seq[String]]], - <<[Option[String]], - ) - ) - } - /** Returns the record time range of sequenced events excluding ACS imports after a HDM. */ def getRecordTimeRange( @@ -1702,25 +1446,6 @@ class UpdateHistory( } } - def getLastImportUpdateId( - migrationId: Long - )(implicit tc: TraceContext): Future[Option[String]] = { - storage.query( - sql""" - select - -- Note: to make update ids consistent across SVs, we use the contract id as the update id. - max(c.contract_id) - from - update_history_creates c - where - history_id = $historyId and - migration_id = $migrationId and - record_time = ${CantonTimestamp.MinValue} - """.as[Option[String]].head, - s"getLastImportUpdateId", - ) - } - def getPreviousMigrationId(migrationId: Long)(implicit tc: TraceContext ): Future[Option[Long]] = { @@ -1758,7 +1483,7 @@ class UpdateHistory( } } - private[this] def getFirstMigrationId(historyId: Long)(implicit + private[this] def getFirstMigrationId()(implicit tc: TraceContext ): Future[Option[Long]] = { def previousId(table: String) = { @@ -1784,60 +1509,8 @@ class UpdateHistory( ).flatten.minOption } } - - /** Returns the migration id at which the import update backfilling should start */ - private[this] def getImportUpdateBackfillingMigrationId()(implicit - tc: TraceContext - ): Future[Option[Long]] = { - for { - importUpdateBackfillingComplete <- storage - .query( - sql""" - select import_updates_complete - from update_history_backfilling - where history_id = $historyId - """.as[Boolean].head, - "getImportUpdateBackfillingMigrationId.1", - ) - (firstMigration, lastMigration) <- storage.query( - sql""" - select min(migration_id), max(migration_id) - from update_history_transactions - where history_id = $historyId - """.as[(Option[Long], Option[Long])].head, - s"getImportUpdateBackfillingMigrationId.2", - ) - firstMigrationWithImportUpdates <- storage.query( - sql""" - select min(migration_id) - from update_history_transactions - where history_id = $historyId - and record_time = ${CantonTimestamp.MinValue} - """.as[Option[Long]].head, - s"getImportUpdateBackfillingMigrationId.3", - ) - } yield { - if (importUpdateBackfillingComplete) { - // Import updates backfilling complete, we know everything about import updates up to the very first migration. - firstMigration - } else { - // Import updates backfilling not complete yet, return the first migration that has any import updates, - // or, if there are no import updates whatsoever, the last migration. - if (firstMigrationWithImportUpdates.isDefined) { - firstMigrationWithImportUpdates - } else { - lastMigration - } - } - } - } - def getBackfillingState()(implicit tc: TraceContext - ): Future[BackfillingState] = getBackfillingStateForHistory(historyId) - - private[this] def getBackfillingStateForHistory(historyId: Long)(implicit - tc: TraceContext ): Future[BackfillingState] = { backfillingRequired match { case BackfillingRequirement.BackfillingNotRequired => @@ -1846,22 +1519,15 @@ class UpdateHistory( storage .query( sql""" - select complete, import_updates_complete + select complete from update_history_backfilling where history_id = $historyId - """.as[(Boolean, Boolean)].headOption, - "getBackfillingStateForHistory", + """.as[Boolean].headOption, + "getBackfillingState", ) .map { - case Some((updatesComplete, importUpdatesComplete)) => - if (updatesComplete && importUpdatesComplete) { - BackfillingState.Complete - } else { - BackfillingState.InProgress( - updatesComplete = updatesComplete, - importUpdatesComplete = importUpdatesComplete, - ) - } + case Some(true) => BackfillingState.Complete + case Some(false) => BackfillingState.InProgress case None => BackfillingState.NotInitialized } } @@ -1883,20 +1549,6 @@ class UpdateHistory( .map(_ => ()) } - private[this] def setBackfillingImportUpdatesComplete()(implicit - tc: TraceContext - ): Future[Unit] = - storage - .update( - sqlu""" - update update_history_backfilling - set import_updates_complete = true - where history_id = $historyId - """, - "setBackfillingImportUpdatesComplete", - ) - .map(_ => ()) - def initializeBackfilling( joiningMigrationId: Long, joiningSynchronizerId: SynchronizerId, @@ -1913,14 +1565,13 @@ class UpdateHistory( storage .update( sqlu""" - insert into update_history_backfilling (history_id, joining_migration_id, joining_domain_id, joining_update_id, complete, import_updates_complete) - values ($historyId, $joiningMigrationId, $joiningSynchronizerId, $safeUpdateId, $complete, $complete) + insert into update_history_backfilling (history_id, joining_migration_id, joining_domain_id, joining_update_id, complete) + values ($historyId, $joiningMigrationId, $joiningSynchronizerId, $safeUpdateId, $complete) on conflict (history_id) do update set joining_migration_id = $joiningMigrationId, joining_domain_id = $joiningSynchronizerId, joining_update_id = $safeUpdateId, - complete = $complete, - import_updates_complete = $complete + complete = $complete """, "initializeBackfilling", ) @@ -1954,34 +1605,17 @@ class UpdateHistory( )(implicit tc: TraceContext): Future[Option[SourceMigrationInfo]] = for { previousMigrationId <- getPreviousMigrationId(migrationId) recordTimeRange <- getRecordTimeRange(migrationId) - lastImportUpdateId <- getLastImportUpdateId(migrationId) state <- getBackfillingState() } yield { state match { case BackfillingState.NotInitialized => None - case BackfillingState.Complete => - Option.when(recordTimeRange.nonEmpty)( - SourceMigrationInfo( - previousMigrationId = previousMigrationId, - recordTimeRange = recordTimeRange, - lastImportUpdateId = lastImportUpdateId, - complete = true, - importUpdatesComplete = true, - ) - ) - case BackfillingState.InProgress(updatesComplete, importUpdatesComplete) => + case _ => Option.when(recordTimeRange.nonEmpty)( SourceMigrationInfo( previousMigrationId = previousMigrationId, recordTimeRange = recordTimeRange, - lastImportUpdateId = lastImportUpdateId, - // Note: this will only report this migration as "complete" if the backfilling process has completed for - // all migration ids (`state` contains global information, across all migrations). - // This is not wrong, but we could also report this migration as complete if there exists any data on - // the previous migration. - complete = updatesComplete, - importUpdatesComplete = importUpdatesComplete, + complete = state == BackfillingState.Complete, ) ) } @@ -2003,162 +1637,98 @@ class UpdateHistory( } } - class DestinationHistoryImplementation - extends HistoryBackfilling.DestinationHistory[UpdateHistoryResponse] - with ImportUpdatesBackfilling.DestinationImportUpdates[UpdateHistoryResponse] { - - override def isReady = state - .get() - .historyId - .isDefined - - override def backfillingInfo(implicit - tc: TraceContext - ): Future[Option[DestinationBackfillingInfo]] = (for { - state <- OptionT.liftF(getBackfillingState()) - if state != BackfillingState.NotInitialized - migrationId <- OptionT(getFirstMigrationId(historyId)) - recordTimeRange <- OptionT.liftF(getRecordTimeRange(migrationId)) - } yield DestinationBackfillingInfo( - migrationId = migrationId, - backfilledAt = recordTimeRange.view.mapValues(_.min).toMap, - )).value - - override def importUpdatesBackfillingInfo(implicit - tc: TraceContext - ): Future[Option[DestinationImportUpdatesBackfillingInfo]] = (for { - state <- OptionT.liftF(getBackfillingState()) - if state != BackfillingState.NotInitialized - migrationId <- OptionT(getImportUpdateBackfillingMigrationId()) - lastUpdateId <- OptionT.liftF(getLastImportUpdateId(migrationId)) - } yield DestinationImportUpdatesBackfillingInfo( - migrationId = migrationId, - lastUpdateId = lastUpdateId, - )).value - - override def insert( - migrationId: Long, - synchronizerId: SynchronizerId, - items: Seq[UpdateHistoryResponse], - )(implicit - tc: TraceContext - ): Future[DestinationHistory.InsertResult] = { - insertItems(migrationId, items).map(insertedItems => - DestinationHistory.InsertResult( - backfilledUpdates = insertedItems.size.toLong, - backfilledEvents = eventCount(insertedItems), - lastBackfilledRecordTime = insertedItems.last.update.recordTime, - ) - ) - } - - override def insertImportUpdates( - migrationId: Long, - items: Seq[UpdateHistoryResponse], - )(implicit - tc: TraceContext - ): Future[DestinationImportUpdates.InsertResult] = { - insertItems(migrationId, items).map(insertedItems => - DestinationImportUpdates.InsertResult( - migrationId = migrationId, - backfilledContracts = insertedItems.size.toLong, - ) - ) - } + lazy val destinationHistory: HistoryBackfilling.DestinationHistory[UpdateHistoryResponse] = + new HistoryBackfilling.DestinationHistory[UpdateHistoryResponse] { + override def isReady = state + .get() + .historyId + .isDefined - private def eventCount(updates: NonEmptyList[UpdateHistoryResponse]): Long = - updates - .map(_.update) - .collect { case TransactionTreeUpdate(tree) => - tree.getEventsById.size().toLong - } - .sum - - private def insertItems( - migrationId: Long, - items: Seq[UpdateHistoryResponse], - )(implicit - tc: TraceContext - ): Future[NonEmptyList[UpdateHistoryResponse]] = { - assert(backfillingRequired == BackfillingRequirement.NeedsBackfilling) - val nonEmpty = NonEmptyList - .fromFoldable(items) - .getOrElse( - throw new RuntimeException("insert() must not be called with an empty sequence") - ) - // Because DbStorage requires all actions to be idempotent, and we can't just slap a "ON CONFLICT DO NOTHING" - // onto all subqueries of ingestUpdate_() because they are using "RETURNING" which doesn't work with the above, - // we simply check whether one of the items was already inserted. - val (headItemTable, headItemRecordTime, headItemSynchronizerId, headItemUpdateId) = - nonEmpty.head.update match { - case TransactionTreeUpdate(tree) => - ( - "update_history_transactions", - CantonTimestamp.assertFromInstant(tree.getRecordTime), - SynchronizerId.tryFromString(tree.getSynchronizerId), - tree.getUpdateId, - ) - case ReassignmentUpdate(update) => - update.event match { - case _: ReassignmentEvent.Assign => - ( - "update_history_assignments", - update.recordTime, - update.event.target, - update.updateId, - ) - case _: ReassignmentEvent.Unassign => - ( - "update_history_unassignments", - update.recordTime, - update.event.source, - update.updateId, - ) - } - } + override def backfillingInfo(implicit + tc: TraceContext + ): Future[Option[DestinationBackfillingInfo]] = (for { + state <- OptionT.liftF(getBackfillingState()) + _ <- OptionT.when[Future, Unit](state != BackfillingState.NotInitialized)(()) + migrationId <- OptionT(getFirstMigrationId()) + recordTimeRange <- OptionT.liftF(getRecordTimeRange(migrationId)) + } yield DestinationBackfillingInfo( + migrationId = migrationId, + backfilledAt = recordTimeRange.view.mapValues(_.min).toMap, + )).value + + override def insert( + migrationId: Long, + synchronizerId: SynchronizerId, + items: Seq[UpdateHistoryResponse], + )(implicit + tc: TraceContext + ): Future[DestinationHistory.InsertResult] = { + assert(backfillingRequired == BackfillingRequirement.NeedsBackfilling) + val nonEmpty = NonEmptyList + .fromFoldable(items) + .getOrElse( + throw new RuntimeException("insert() must not be called with an empty sequence") + ) + // Because DbStorage requires all actions to be idempotent, and we can't just slap a "ON CONFLICT DO NOTHING" + // onto all subqueries of ingestUpdate_() because they are using "RETURNING" which doesn't work with the above, + // we simply check whether one of the items was already inserted. + val (headItemTable, headItemRecordTime) = + nonEmpty.head.update match { + case TransactionTreeUpdate(tree) => + ("update_history_transactions", CantonTimestamp.assertFromInstant(tree.getRecordTime)) + case ReassignmentUpdate(update) => + update.event match { + case _: ReassignmentEvent.Assign => + ("update_history_assignments", update.recordTime) + case _: ReassignmentEvent.Unassign => + ("update_history_unassignments", update.recordTime) + } + } - val action = for { - itemExists <- - sql""" + val action = for { + itemExists <- sql""" select exists( select row_id from #$headItemTable where history_id = $historyId and migration_id = $migrationId and - domain_id = $headItemSynchronizerId and - record_time = $headItemRecordTime and - update_id = $headItemUpdateId + domain_id = $synchronizerId and + record_time = $headItemRecordTime ) """.as[Boolean].head - _ <- - if (!itemExists) { - DBIOAction - .sequence(items.map(item => ingestUpdate_(item.update, migrationId))) - } else { - DBIOAction.successful(()) - } - } yield nonEmpty - - storage - .queryAndUpdate( - action.transactionally, - "destinationHistory.insert", - ) - } - - override def markBackfillingComplete()(implicit - tc: TraceContext - ): Future[Unit] = setBackfillingComplete() + _ <- + if (!itemExists) { + DBIOAction + .sequence(items.map(item => ingestUpdate_(item.update, migrationId))) + } else { + DBIOAction.successful(()) + } + } yield () - override def markImportUpdatesBackfillingComplete()(implicit tc: TraceContext): Future[Unit] = - setBackfillingImportUpdatesComplete() - } + storage + .queryAndUpdate( + action.transactionally, + "destinationHistory.insert", + ) + .map(_ => + DestinationHistory.InsertResult( + backfilledUpdates = nonEmpty.size.toLong, + backfilledEvents = nonEmpty + .map(_.update) + .collect { case TransactionTreeUpdate(tree) => + tree.getEventsById.size().toLong + } + .sum, + lastBackfilledRecordTime = nonEmpty.last.update.recordTime, + ) + ) + } - lazy val destinationHistory: HistoryBackfilling.DestinationHistory[ - UpdateHistoryResponse - ] & ImportUpdatesBackfilling.DestinationImportUpdates[UpdateHistoryResponse] = - new DestinationHistoryImplementation() + override def markBackfillingComplete()(implicit + tc: TraceContext + ): Future[Unit] = setBackfillingComplete() + } } object UpdateHistory { @@ -2193,8 +1763,7 @@ object UpdateHistory { sealed trait BackfillingState object BackfillingState { case object Complete extends BackfillingState - case class InProgress(updatesComplete: Boolean, importUpdatesComplete: Boolean) - extends BackfillingState + case object InProgress extends BackfillingState case object NotInitialized extends BackfillingState } @@ -2345,27 +1914,6 @@ object UpdateHistory { contractId: String, ) - private case class SelectFromImportUpdates( - recordTime: CantonTimestamp, - participantOffset: String, - synchronizerId: String, - migrationId: Long, - effectiveAt: CantonTimestamp, - workflowId: Option[String], - commandId: Option[String], - eventId: String, - contractId: String, - createdAt: CantonTimestamp, - templatePackageId: String, - templateModuleName: String, - templateEntityName: String, - packageName: String, - createArguments: String, - signatories: Option[Seq[String]], - observers: Option[Seq[String]], - contractKey: Option[String], - ) - private def tid(packageName: String, moduleName: String, entityName: String) = new Identifier(packageName, moduleName, entityName) diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryBackfillingTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryBackfillingTest.scala index 080e5807ac..e0c5b87632 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryBackfillingTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryBackfillingTest.scala @@ -1,6 +1,5 @@ package org.lfdecentralizedtrust.splice.store -import com.digitalasset.canton.util.MonadUtil import org.lfdecentralizedtrust.splice.store.HistoryBackfilling.SourceMigrationInfo import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement.BackfillingNotRequired import org.lfdecentralizedtrust.splice.util.DomainRecordTimeRange @@ -58,12 +57,14 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase { backfiller = mkBackfilling(source = storeA2, destination = storeB2, 2) _ <- backfillAll(backfiller) // Check that the updates are the same - updatesA <- storeA2.getAllUpdates( + updatesA <- storeA2.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) - updatesB <- storeB2.getAllUpdates( + updatesB <- storeB2.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) infoB2 <- storeB2.sourceHistory.migrationInfo(0) @@ -173,12 +174,14 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase { backfiller = mkBackfilling(source = storeA5, destination = storeB5, 5) _ <- backfillAll(backfiller) // Check that the updates are the same - updatesA <- storeA5.getAllUpdates( + updatesA <- storeA5.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) - updatesB <- storeB5.getAllUpdates( + updatesB <- storeB5.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) } yield { @@ -188,150 +191,6 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase { } } } - - "importUpdatesBackfillingInfo" should { - "return None if backfilling is not initialized" in { - val storeA0 = mkStore(domainMigrationId = 0, participantId = participant1) - for { - _ <- initStore(storeA0) - - info <- storeA0.destinationHistory.importUpdatesBackfillingInfo - } yield { - info shouldBe None - } - } - - "return correct value for (backfilling fully complete, now in first migration)" in { - val storeA0 = mkStore(domainMigrationId = 0, participantId = participant1) - for { - _ <- initStore(storeA0) - tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1)) - _ <- storeA0.initializeBackfilling(0, domain1, tx1.getUpdateId, complete = true) - - info <- storeA0.destinationHistory.importUpdatesBackfillingInfo - } yield { - info.value.migrationId shouldBe 0 - info.value.lastUpdateId shouldBe None - } - } - - "return correct value for (backfilling fully complete, now in second migration)" in { - val storeA0 = mkStore(domainMigrationId = 0, participantId = participant1) - val storeA1 = mkStore(domainMigrationId = 1, participantId = participant1) - for { - _ <- initStore(storeA0) - _ <- initStore(storeA1) - - tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1)) - _ <- importUpdate(tx1, validOffset(2), storeA1) - _ <- create(domain1, validContractId(3), validOffset(3), party1, storeA1, time(3)) - - _ <- storeA1.initializeBackfilling(0, domain1, tx1.getUpdateId, complete = true) - - info <- storeA0.destinationHistory.importUpdatesBackfillingInfo - } yield { - info.value.migrationId shouldBe 0 - info.value.lastUpdateId shouldBe None - } - } - - "return correct value for (update backfilling complete, now in first migration)" in { - val storeA0 = mkStore(domainMigrationId = 0, participantId = participant1) - for { - _ <- initStore(storeA0) - - _ <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1)) - tx2 <- create(domain1, validContractId(2), validOffset(2), party1, storeA0, time(2)) - - _ <- storeA0.initializeBackfilling(0, domain1, tx2.getUpdateId, complete = false) - _ <- storeA0.destinationHistory.markBackfillingComplete() - - info <- storeA0.destinationHistory.importUpdatesBackfillingInfo - } yield { - info.value.migrationId shouldBe 0 - info.value.lastUpdateId shouldBe None - } - } - - "return correct value for (update backfilling complete, now in second migration)" in { - val storeA0 = mkStore(domainMigrationId = 0, participantId = participant1) - val storeA1 = mkStore(domainMigrationId = 1, participantId = participant1) - for { - _ <- initStore(storeA0) - _ <- initStore(storeA1) - - // One update in each migration, but the import update is missing - tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1)) - _ <- create(domain1, validContractId(3), validOffset(3), party1, storeA1, time(3)) - - _ <- storeA1.initializeBackfilling(0, domain1, tx1.getUpdateId, complete = false) - _ <- storeA1.destinationHistory.markBackfillingComplete() - - info <- storeA1.destinationHistory.importUpdatesBackfillingInfo - } yield { - info.value.migrationId shouldBe 1 - info.value.lastUpdateId shouldBe None - } - } - - "return correct value for (import backfilling in progress, now in second migration)" in { - val storeA0 = mkStore(domainMigrationId = 0, participantId = participant1) - val storeA1 = mkStore(domainMigrationId = 1, participantId = participant1) - for { - _ <- initStore(storeA0) - _ <- initStore(storeA1) - - // One update in each migration, with the corresponding import update, - // but the import backfilling is not marked complete, so we don't know if there are more import updates. - tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1)) - itx <- importUpdate(tx1, validOffset(2), storeA1) - _ <- create(domain1, validContractId(3), validOffset(3), party1, storeA1, time(3)) - - _ <- storeA1.initializeBackfilling(0, domain1, tx1.getUpdateId, complete = false) - _ <- storeA1.destinationHistory.markBackfillingComplete() - - info <- storeA1.destinationHistory.importUpdatesBackfillingInfo - } yield { - info.value.migrationId shouldBe 1 - info.value.lastUpdateId shouldBe Some( - itx.getEventsById.values().iterator().next().getContractId - ) - } - } - - "return correct last update id" in { - val storeA0 = mkStore(domainMigrationId = 0, participantId = participant1) - val storeA1 = mkStore(domainMigrationId = 1, participantId = participant1) - for { - _ <- initStore(storeA0) - _ <- initStore(storeA1) - - // 10 updates in migration 0, with corresponding import updates in migration 1 - txs <- MonadUtil.sequentialTraverse((1 to 10).toList) { i => - create(domain1, validContractId(i), validOffset(i), party1, storeA0, time(i)) - } - itxs <- MonadUtil.sequentialTraverse(txs.zipWithIndex) { case (tx, i) => - importUpdate(tx, validOffset(10 + i), storeA1) - } - _ <- create(domain1, validContractId(30), validOffset(30), party1, storeA1, time(30)) - - _ <- storeA1.initializeBackfilling( - 0, - domain1, - txs.headOption.value.getUpdateId, - complete = false, - ) - _ <- storeA1.destinationHistory.markBackfillingComplete() - - info <- storeA1.destinationHistory.importUpdatesBackfillingInfo - } yield { - info.value.migrationId shouldBe 1 - info.value.lastUpdateId shouldBe Some( - itxs.map(_.getEventsById.values().iterator().next().getContractId).max - ) - } - } - } } private def mkBackfilling( diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala index 5988cf4663..b2336d219f 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala @@ -39,7 +39,7 @@ class UpdateHistoryTest extends UpdateHistoryTestBase { migrationId: Long = migration1, ): Future[Seq[UpdateHistoryResponse]] = { store - .getAllUpdates(None, PageLimit.tryCreate(1000)) + .getUpdates(None, includeImportUpdates = true, PageLimit.tryCreate(1000)) .map(_.filter(_.migrationId == migrationId).map(_.update)) } @@ -420,10 +420,11 @@ class UpdateHistoryTest extends UpdateHistoryTestBase { ): Seq[TransactionTreeUpdate] = { val result = store - .getAllUpdates( + .getUpdates( after.map { case (migrationId, recordTime) => (migrationId, CantonTimestamp.assertFromInstant(recordTime)) }, + includeImportUpdates = true, PageLimit.tryCreate(1), ) .futureValue @@ -471,12 +472,14 @@ class UpdateHistoryTest extends UpdateHistoryTestBase { }, maxCount = updates.size, ) - all <- storeMigrationId1.getAllUpdates( + all <- storeMigrationId1.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) - all2 <- storeMigrationId2.getAllUpdates( + all2 <- storeMigrationId2.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) } yield { @@ -620,75 +623,5 @@ class UpdateHistoryTest extends UpdateHistoryTestBase { } } - - "getImportUpdates" should { - - "return consistent ids" in { - val storeA1 = mkStore(party1, migration1, participant1) - val storeA2 = mkStore(party1, migration2, participant1) - val storeB1 = mkStore(party2, migration1, participant1) - val storeB2 = mkStore(party2, migration2, participant1) - for { - // Ingest the same update into two stores. - _ <- initStore(storeA1) - _ <- initStore(storeB1) - txA <- create(domain1, cid1, offset1, party1, storeA1, time(1), Seq(party2)) - txB <- create(domain1, cid1, offset1, party1, storeB1, time(1), Seq(party2)) - - // Ingest the import update corresponding to the above in the next migration. - _ <- initStore(storeA2) - _ <- initStore(storeB2) - itA <- importUpdate(txA, offset1, storeA2) - itB <- importUpdate(txB, offset1, storeB2) - - resultA <- storeA2.getImportUpdates(migration2, "", PageLimit.tryCreate(1000)) - resultB <- storeB2.getImportUpdates(migration2, "", PageLimit.tryCreate(1000)) - } yield { - // Updates generated by test code should have random update ids (otherwise this test is not doing anything) - txA.getUpdateId should not be txB.getUpdateId - itA.getUpdateId should not be itB.getUpdateId - - // Updates queried through the `getImportUpdates` method should have consistent update ids - val updateA = resultA.loneElement.update.update.asInstanceOf[TransactionTreeUpdate] - val updateB = resultB.loneElement.update.update.asInstanceOf[TransactionTreeUpdate] - updateA.updateId shouldBe updateB.updateId - - // ... and consistent event ids - val eventIdsA = updateA.tree.getRootNodeIds.asScala - val eventIdsB = updateB.tree.getRootNodeIds.asScala - eventIdsA shouldBe eventIdsB - } - } - - "sort and limit correctly" in { - val store1 = mkStore(party1, migration1, participant1) - val store2 = mkStore(party1, migration2, participant1) - for { - // Ingest some updates with random contract ids - _ <- initStore(store1) - tx3 <- create(domain1, "c3", validOffset(1), party1, store1, time(1)) - tx1 <- create(domain1, "c1", validOffset(2), party1, store1, time(2)) - tx4 <- create(domain1, "c4", validOffset(3), party1, store1, time(3)) - tx2 <- create(domain1, "c2", validOffset(4), party1, store1, time(4)) - - // Ingest the import update corresponding to the above in the next migration. - _ <- initStore(store2) - _ <- importUpdate(tx4, validOffset(1), store2) - _ <- importUpdate(tx1, validOffset(2), store2) - _ <- importUpdate(tx3, validOffset(3), store2) - _ <- importUpdate(tx2, validOffset(4), store2) - - result0 <- store2.getImportUpdates(migration2, "", PageLimit.tryCreate(1000)) - result2 <- store2.getImportUpdates(migration2, "c2", PageLimit.tryCreate(1000)) - result4 <- store2.getImportUpdates(migration2, "c4", PageLimit.tryCreate(1000)) - result5 <- store2.getImportUpdates(migration2, "c1", PageLimit.tryCreate(2)) - } yield { - result0.map(_.update.update.updateId) shouldBe Seq("c1", "c2", "c3", "c4") - result2.map(_.update.update.updateId) shouldBe Seq("c3", "c4") - result4 shouldBe empty - result5.map(_.update.update.updateId) shouldBe Seq("c2", "c3") - } - } - } } } diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala index fb4d9c5ff3..d875705b48 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala @@ -20,8 +20,6 @@ import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} import com.google.protobuf.ByteString import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.lfdecentralizedtrust.splice.util.EventId -import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet as amuletCodegen -import org.lfdecentralizedtrust.splice.util.Contract import org.scalatest.Assertion import scala.concurrent.Future @@ -47,7 +45,6 @@ abstract class UpdateHistoryTestBase party: PartyId, store: UpdateHistory, txEffectiveAt: CantonTimestamp, - otherSignatories: Seq[PartyId] = Seq.empty, ) = { DomainSyntax(domain).create( c = appRewardCoupon( @@ -57,37 +54,13 @@ abstract class UpdateHistoryTestBase ), offset = offset, txEffectiveAt = txEffectiveAt.toInstant, - createdEventSignatories = party +: otherSignatories, + createdEventSignatories = Seq(party), recordTime = txEffectiveAt.toInstant, )( store ) } - /** Ingests an import update for a transaction previously ingested using the [[create]] call */ - protected def importUpdate( - tx: TransactionTree, - offset: Long, - store: UpdateHistory, - ) = { - val createEvent = tx.getEventsById.asScala.loneElement._2 match { - case created: CreatedEvent => created - case _ => throw new RuntimeException("Unexpected event type") - } - val contract = - Contract.fromCreatedEvent(amuletCodegen.AppRewardCoupon.COMPANION)(createEvent).value - DomainSyntax(SynchronizerId.tryFromString(tx.getSynchronizerId)).create( - c = contract, - offset = offset, - txEffectiveAt = tx.getEffectiveAt, - createdEventSignatories = - createEvent.getSignatories.asScala.toSeq.map(PartyId.tryFromProtoPrimitive), - recordTime = importUpdateRecordTime.toInstant, - )( - store - ) - } - protected def assign( domainTo: SynchronizerId, domainFrom: SynchronizerId, @@ -174,7 +147,6 @@ abstract class UpdateHistoryTestBase i.toLong } - protected val importUpdateRecordTime: CantonTimestamp = CantonTimestamp.MinValue protected def time(i: Int): CantonTimestamp = CantonTimestamp.assertFromInstant(defaultEffectiveAt.plusMillis(i.toLong)) diff --git a/apps/scan/src/main/openapi/scan.yaml b/apps/scan/src/main/openapi/scan.yaml index f8d5e62e7b..0e2196696f 100644 --- a/apps/scan/src/main/openapi/scan.yaml +++ b/apps/scan/src/main/openapi/scan.yaml @@ -1530,27 +1530,6 @@ paths: "500": $ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/500" - /v0/backfilling/import-updates: - post: - tags: [scan] - x-jvm-package: scan - operationId: "getImportUpdates" - requestBody: - required: true - content: - application/json: - schema: - "$ref": "#/components/schemas/GetImportUpdatesRequest" - responses: - "200": - description: ok - content: - application/json: - schema: - "$ref": "#/components/schemas/GetImportUpdatesResponse" - "404": - $ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/404" - components: schemas: GetSpliceInstanceNamesResponse: @@ -3387,18 +3366,9 @@ components: type: array items: $ref: "#/components/schemas/RecordTimeRange" - last_import_update_id: - description: | - The update id of the last import update (where import updates are sorted by update id, ascending) - for the given migration id, if any - type: string complete: description: | - True if this scan has all non-import updates for given migration id - type: boolean - import_updates_complete: - description: | - True if this scan has all import updates for the given migration id + True if this scan has all updates for given migration id type: boolean RecordTimeRange: @@ -3456,36 +3426,6 @@ components: items: $ref: "#/components/schemas/UpdateHistoryItem" - GetImportUpdatesRequest: - type: object - required: - - migration_id - - after_update_id - - limit - properties: - migration_id: - type: integer - format: int64 - after_update_id: - description: | - Only return updates with an update id strictly greater than this. - type: string - limit: - description: | - Return at most this many updates. The actual number of updates returned may be smaller. - type: integer - format: int32 - - GetImportUpdatesResponse: - type: object - required: - - transactions - properties: - transactions: - type: array - items: - $ref: "#/components/schemas/UpdateHistoryItem" - DamlValueEncoding: type: string description: | diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BackfillingScanConnection.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BackfillingScanConnection.scala index b666ef98a0..0a64da17bf 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BackfillingScanConnection.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BackfillingScanConnection.scala @@ -25,10 +25,4 @@ trait BackfillingScanConnection extends FlagCloseableAsync { atOrAfter: Option[CantonTimestamp], count: Int, )(implicit tc: TraceContext): Future[Seq[UpdateHistoryResponse]] - - def getImportUpdates( - migrationId: Long, - afterUpdateId: String, - count: Int, - )(implicit tc: TraceContext): Future[Seq[UpdateHistoryResponse]] } diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala index 5c2c5ba996..a047ff9a90 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala @@ -286,10 +286,6 @@ class BftScanConnection( val completeResponses = responses.withData.filter { case (_, migrationInfo) => migrationInfo.complete } - val importUpdatesCompleteResponses = responses.withData.filter { - case (_, migrationInfo) => - migrationInfo.importUpdatesComplete - } for { // We already have the responses, use bftCall() to avoid re-implementing the consensus logic. // All non-malicious scans that have backfilled the input migrationId should return @@ -302,15 +298,6 @@ class BftScanConnection( // and instead rely on metrics to situations when backfilling is not progressing. Level.INFO, ) - lastImportUpdateId <- bftCall( - connection => - Future.successful(importUpdatesCompleteResponses(connection).lastImportUpdateId), - BftCallConfig.forAvailableData(connections, importUpdatesCompleteResponses.contains), - // This method is very sensitive to unavailable SVs. - // Do not log warnings for failures to reach consensus, as this would be too noisy, - // and instead rely on metrics to situations when backfilling is not progressing. - Level.INFO, - ) } yield { @SuppressWarnings(Array("org.wartremover.warts.IterableOps")) val unionOfRecordTimeRanges = @@ -319,9 +306,7 @@ class BftScanConnection( SourceMigrationInfo( previousMigrationId = previousMigrationId, recordTimeRange = unionOfRecordTimeRanges, - lastImportUpdateId = lastImportUpdateId, complete = completeResponses.nonEmpty, - importUpdatesComplete = importUpdatesCompleteResponses.nonEmpty, ) ) } @@ -403,40 +388,6 @@ class BftScanConnection( ) ) - override def getImportUpdates( - migrationId: Long, - afterUpdateId: String, - count: Int, - )(implicit tc: TraceContext): Future[Seq[UpdateHistoryResponse]] = { - val connections = scanList.scanConnections - for { - // Ask ALL scans for the migration info so that we can figure out who has the data - responses <- getMigrationInfoResponses(connections, migrationId) - // Filter out connections that don't have any data - withData = responses.withData.toList.filter { case (_, info) => - info.importUpdatesComplete - } - connectionsWithData = withData.map(_._1) - // Make a BFT call to connections that have the data - result <- bftCall( - connection => connection.getImportUpdates(migrationId, afterUpdateId, count), - BftCallConfig.forAvailableData(connections, connectionsWithData.contains), - // This method is very sensitive to unavailable SVs. - // Do not log warnings for failures to reach consensus, as this would be too noisy, - // and instead rely on metrics to situations when backfilling is not progressing. - Level.INFO, - // This call returns up to 100 full daml transaction trees. It's not feasible to log them all, - // so we only print their update ids. This is enough to investigate consensus failures if different - // scans return different updates. In the more unlikely case where scans disagree on the payload of - // a given update, we would need to fetch the update payload from the update history database. - shortenResponsesForLog = - (responses: Seq[UpdateHistoryResponse]) => responses.map(_.update.updateId), - ) - } yield { - result - } - } - override def getUpdatesBefore( migrationId: Long, synchronizerId: SynchronizerId, diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala index b9355c418d..6f61019b87 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala @@ -521,20 +521,6 @@ class SingleScanConnection private[client] ( ), ) - override def getImportUpdates( - migrationId: Long, - afterUpdateId: String, - count: Int, - )(implicit tc: TraceContext): Future[Seq[UpdateHistoryResponse]] = - runHttpCmd( - config.adminApi.url, - HttpScanAppClient.GetImportUpdates( - migrationId, - afterUpdateId, - count, - ), - ) - override def listDsoRulesVoteRequests()(implicit tc: TraceContext, ec: ExecutionContext, diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/commands/HttpScanAppClient.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/commands/HttpScanAppClient.scala index 10c5f51ebb..40a0f720fd 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/commands/HttpScanAppClient.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/commands/HttpScanAppClient.scala @@ -1437,10 +1437,7 @@ object HttpScanAppClient { SourceMigrationInfo( previousMigrationId = response.previousMigrationId, recordTimeRange = recordTimeRange, - lastImportUpdateId = response.lastImportUpdateId, complete = response.complete, - // This field was introduced in a later version of the API, consider all old remotes as not complete - importUpdatesComplete = response.importUpdatesComplete.getOrElse(false), ) ) case http.GetMigrationInfoResponse.NotFound(_) => @@ -2340,39 +2337,4 @@ object HttpScanAppClient { } - case class GetImportUpdates( - migrationId: Long, - afterUpdateId: String, - limit: Int, - ) extends InternalBaseCommand[ - http.GetImportUpdatesResponse, - Seq[UpdateHistoryResponse], - ] { - override def submitRequest( - client: http.ScanClient, - headers: List[HttpHeader], - ): EitherT[Future, Either[ - Throwable, - HttpResponse, - ], http.GetImportUpdatesResponse] = { - client.getImportUpdates( - definitions - .GetImportUpdatesRequest( - migrationId, - afterUpdateId, - limit, - ), - headers, - ) - } - - override def handleOk()(implicit decoder: TemplateJsonDecoder) = { - case http.GetImportUpdatesResponse.OK(response) => - Right( - response.transactions.map(http => - ProtobufJsonScanHttpEncodings.httpToLapiUpdate(http).update - ) - ) - } - } } diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala index cbc8440474..c66bd8de8b 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala @@ -734,7 +734,7 @@ class HttpScanHandler( "This scan instance has not yet loaded its updates history. Wait a short time and retry." ) .asRuntimeException() - case BackfillingState.InProgress(_, _) => + case BackfillingState.InProgress => throw Status.UNAVAILABLE .withDescription( "This scan instance has not yet replicated all data. This process can take an extended period of time to complete. " + @@ -743,17 +743,11 @@ class HttpScanHandler( .asRuntimeException() case BackfillingState.Complete => for { - txs <- - if (includeImportUpdates) - updateHistory.getAllUpdates( - afterO, - PageLimit.tryCreate(pageSize), - ) - else - updateHistory.getUpdatesWithoutImportUpdates( - afterO, - PageLimit.tryCreate(pageSize), - ) + txs <- updateHistory.getUpdates( + afterO, + includeImportUpdates = includeImportUpdates, + PageLimit.tryCreate(pageSize), + ) } yield txs .map( ScanHttpEncodings.encodeUpdate( @@ -1823,8 +1817,6 @@ class HttpScanHandler( definitions.GetMigrationInfoResponse( previousMigrationId = info.previousMigrationId, complete = info.complete, - importUpdatesComplete = Some(info.importUpdatesComplete), - lastImportUpdateId = info.lastImportUpdateId, recordTimeRange = info.recordTimeRange.iterator.map { case (synchronizerId, range) => definitions.RecordTimeRange( synchronizerId = synchronizerId.toProtoPrimitive, @@ -1872,34 +1864,6 @@ class HttpScanHandler( } } } - - override def getImportUpdates(respond: ScanResource.GetImportUpdatesResponse.type)( - body: definitions.GetImportUpdatesRequest - )(extracted: TraceContext): Future[ScanResource.GetImportUpdatesResponse] = { - implicit val tc: TraceContext = extracted - withSpan(s"$workflowId.getImportUpdates") { _ => _ => - val updateHistory = store.updateHistory - updateHistory - .getImportUpdates( - migrationId = body.migrationId, - afterUpdateId = body.afterUpdateId, - limit = PageLimit.tryCreate(body.limit), - ) - .map { txs => - definitions.GetImportUpdatesResponse( - txs - .map( - ScanHttpEncodings.encodeUpdate( - _, - encoding = definitions.DamlValueEncoding.members.ProtobufJson, - version = ScanHttpEncodings.V1, - ) - ) - .toVector - ) - } - } - } override def getMemberTrafficStatus( respond: ScanResource.GetMemberTrafficStatusResponse.type )(synchronizerId: String, memberId: String)( diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala index 28830aa479..e4a0d5b700 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala @@ -65,9 +65,7 @@ class AcsSnapshotTrigger( * And also for past migrations, whether the SV was present in them or not. */ private def isHistoryBackfilled(migrationId: Long)(implicit tc: TraceContext) = { - updateHistory.sourceHistory - .migrationInfo(migrationId) - .map(_.exists(i => i.complete && i.importUpdatesComplete)) + updateHistory.sourceHistory.migrationInfo(migrationId).map(_.exists(_.complete)) } private def retrieveTask()(implicit @@ -115,8 +113,9 @@ class AcsSnapshotTrigger( Future.successful(None) case Some(task) => updateHistory - .getUpdatesWithoutImportUpdates( + .getUpdates( Some((currentMigrationId, task.snapshotRecordTime)), + includeImportUpdates = true, PageLimit.tryCreate(1), ) .map(_.headOption) @@ -206,7 +205,6 @@ class AcsSnapshotTrigger( tc: TraceContext ): Future[TaskOutcome] = task match { case AcsSnapshotTrigger.Task(snapshotRecordTime, migrationId, lastSnapshot) => - assert(task.snapshotRecordTime > CantonTimestamp.MinValue) store .insertNewSnapshot(lastSnapshot, migrationId, snapshotRecordTime) .map { insertCount => @@ -234,13 +232,15 @@ class AcsSnapshotTrigger( tc: TraceContext, ): Future[Option[AcsSnapshotTrigger.Task]] = { updateHistory - .getUpdatesWithoutImportUpdates( + .getUpdates( Some( ( migrationId, - CantonTimestamp.MinValue, + // exclude ACS imports, which have record_time=MinValue + CantonTimestamp.MinValue.plusSeconds(1L), ) ), + includeImportUpdates = true, PageLimit.tryCreate(1), ) .map(_.headOption) diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala index 1fc6500c84..5c97abacb1 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala @@ -28,7 +28,6 @@ import org.lfdecentralizedtrust.splice.scan.store.{ScanHistoryBackfilling, ScanS import org.lfdecentralizedtrust.splice.store.{ HistoryBackfilling, HistoryMetrics, - ImportUpdatesBackfilling, PageLimit, TreeUpdateWithMigrationId, } @@ -94,19 +93,10 @@ class ScanHistoryBackfillingTrigger( store.updateHistory.getBackfillingState().map { case BackfillingState.Complete => historyMetrics.UpdateHistoryBackfilling.completed.updateValue(1) - historyMetrics.ImportUpdatesBackfilling.completed.updateValue(1) Seq.empty - case BackfillingState.InProgress(updatesComplete, _) => - if (!updatesComplete) { - historyMetrics.ImportUpdatesBackfilling.completed.updateValue(0) - Seq(ScanHistoryBackfillingTrigger.BackfillTask()) - } else { - historyMetrics.UpdateHistoryBackfilling.completed.updateValue(1) - Seq(ScanHistoryBackfillingTrigger.ImportUpdatesBackfillTask()) - } + case BackfillingState.InProgress => + Seq(ScanHistoryBackfillingTrigger.BackfillTask()) case BackfillingState.NotInitialized => - historyMetrics.UpdateHistoryBackfilling.completed.updateValue(0) - historyMetrics.ImportUpdatesBackfilling.completed.updateValue(0) Seq(ScanHistoryBackfillingTrigger.InitializeBackfillingTask(findHistoryStartAfter)) } } @@ -121,8 +111,6 @@ class ScanHistoryBackfillingTrigger( ): Future[TaskOutcome] = task match { case ScanHistoryBackfillingTrigger.InitializeBackfillingTask(_) => initializeBackfilling() - case ScanHistoryBackfillingTrigger.ImportUpdatesBackfillTask() => - performImportUpdatesBackfilling() case ScanHistoryBackfillingTrigger.BackfillTask() => performBackfilling() } @@ -194,8 +182,9 @@ class ScanHistoryBackfillingTrigger( synchronized { val batchSize = 100 for { - updates <- store.updateHistory.getUpdatesWithoutImportUpdates( + updates <- store.updateHistory.getUpdates( findHistoryStartAfter, + includeImportUpdates = false, PageLimit.tryCreate(batchSize), ) _ = updates.lastOption.foreach(u => @@ -284,35 +273,13 @@ class ScanHistoryBackfillingTrigger( TaskNoop case HistoryBackfilling.Outcome.BackfillingIsComplete => historyMetrics.UpdateHistoryBackfilling.completed.updateValue(1) - logger.info("UpdateHistory backfilling is complete") + logger.info( + "UpdateHistory backfilling is complete, this trigger should not do any work ever again" + ) TaskSuccess("Backfilling completed") } } yield outcome - private def performImportUpdatesBackfilling()(implicit - traceContext: TraceContext - ): Future[TaskOutcome] = for { - connection <- getOrCreateScanConnection() - backfilling = getOrCreateBackfilling(connection) - outcome <- backfilling.backfillImportUpdates().map { - case ImportUpdatesBackfilling.Outcome.MoreWorkAvailableNow(workDone) => - historyMetrics.ImportUpdatesBackfilling.completed.updateValue(0) - // Using MetricsContext.Empty is okay, because it's merged with the StoreMetrics context - historyMetrics.ImportUpdatesBackfilling.contractCount.inc( - workDone.backfilledContracts - )(MetricsContext.Empty) - historyMetrics.ImportUpdatesBackfilling.latestMigrationId.updateValue(workDone.migrationId) - TaskSuccess("Backfilling import updates step completed") - case ImportUpdatesBackfilling.Outcome.MoreWorkAvailableLater => - historyMetrics.ImportUpdatesBackfilling.completed.updateValue(0) - TaskNoop - case ImportUpdatesBackfilling.Outcome.BackfillingIsComplete => - historyMetrics.ImportUpdatesBackfilling.completed.updateValue(1) - logger.info("UpdateHistory backfilling import updates is complete") - TaskSuccess("Backfilling import updates completed") - } - } yield outcome - override def closeAsync(): Seq[AsyncOrSyncCloseable] = { connectionVar .map(connection => @@ -337,8 +304,4 @@ object ScanHistoryBackfillingTrigger { override def pretty: Pretty[this.type] = prettyOfClass() } - final case class ImportUpdatesBackfillTask() extends Task { - override def pretty: Pretty[this.type] = - prettyOfClass() - } } diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanHistoryBackfilling.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanHistoryBackfilling.scala index 1d13b7851f..0486b2d9d9 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanHistoryBackfilling.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanHistoryBackfilling.scala @@ -6,16 +6,12 @@ package org.lfdecentralizedtrust.splice.scan.store import com.daml.ledger.javaapi.data as javaApi import org.lfdecentralizedtrust.splice.environment.ledger.api.TransactionTreeUpdate import org.lfdecentralizedtrust.splice.scan.admin.api.client.BackfillingScanConnection -import org.lfdecentralizedtrust.splice.store.{ - HistoryBackfilling, - ImportUpdatesBackfilling, - TreeUpdateWithMigrationId, -} -import org.lfdecentralizedtrust.splice.store.HistoryBackfilling.SourceMigrationInfo +import org.lfdecentralizedtrust.splice.store.{HistoryBackfilling, TreeUpdateWithMigrationId} +import org.lfdecentralizedtrust.splice.store.HistoryBackfilling.{Outcome, SourceMigrationInfo} import org.lfdecentralizedtrust.splice.store.UpdateHistory.UpdateHistoryResponse import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.topology.{PartyId, SynchronizerId} +import com.digitalasset.canton.topology.{SynchronizerId, PartyId} import com.digitalasset.canton.tracing.TraceContext import scala.concurrent.{ExecutionContextExecutor, Future} @@ -23,11 +19,15 @@ import scala.jdk.CollectionConverters.* import scala.jdk.OptionConverters.* /** Backfills the scan history by copying data from a remote source history to the destination history. + * + * The algorithm caches the set of all remote connections, along with "static data" for the remote history. + * Some of the data (such as the record time range) is technically not static because remote scans + * could be backfilling themselves, but the algorithm is designed to terminate correctly as long as the cache + * is eventually updated. */ class ScanHistoryBackfilling( connection: BackfillingScanConnection, - destinationHistory: HistoryBackfilling.DestinationHistory[UpdateHistoryResponse] - with ImportUpdatesBackfilling.DestinationImportUpdates[UpdateHistoryResponse], + destinationHistory: HistoryBackfilling.DestinationHistory[UpdateHistoryResponse], currentMigrationId: Long, batchSize: Int = 100, override val loggerFactory: NamedLoggerFactory, @@ -36,29 +36,21 @@ class ScanHistoryBackfilling( ) extends NamedLogging { private val sourceHistory = - new HistoryBackfilling.SourceHistory[UpdateHistoryResponse] - with ImportUpdatesBackfilling.SourceImportUpdates[UpdateHistoryResponse] { + new HistoryBackfilling.SourceHistory[UpdateHistoryResponse] { def isReady: Boolean = true - override def migrationInfo(migrationId: Long)(implicit + def migrationInfo(migrationId: Long)(implicit tc: TraceContext ): Future[Option[SourceMigrationInfo]] = connection.getMigrationInfo(migrationId) - override def items( + def items( migrationId: Long, synchronizerId: SynchronizerId, before: CantonTimestamp, count: Int, )(implicit tc: TraceContext): Future[Seq[UpdateHistoryResponse]] = connection.getUpdatesBefore(migrationId, synchronizerId, before, None, count) - - def importUpdates( - migrationId: Long, - afterUpdateId: String, - count: Int, - )(implicit tc: TraceContext): Future[Seq[UpdateHistoryResponse]] = - connection.getImportUpdates(migrationId, afterUpdateId, count) } private val backfilling = @@ -70,23 +62,9 @@ class ScanHistoryBackfilling( loggerFactory, ) - private val importUpdatesBackfilling = - new ImportUpdatesBackfilling( - destinationHistory, - sourceHistory, - batchSize = batchSize, - loggerFactory, - ) - - def backfill()(implicit tc: TraceContext): Future[HistoryBackfilling.Outcome] = { + def backfill()(implicit tc: TraceContext): Future[Outcome] = { backfilling.backfill() } - - def backfillImportUpdates()(implicit - tc: TraceContext - ): Future[ImportUpdatesBackfilling.Outcome] = { - importUpdatesBackfilling.backfillImportUpdates() - } } object ScanHistoryBackfilling { diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala index 59f6e3867f..d1ed91907b 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala @@ -146,16 +146,6 @@ class BftScanConnectionTest when(mock.getUpdatesBefore(migrationId, synchronizerId, before, Some(atOrAfter), count)) .thenReturn(Future.successful(updates)) } - def makeMockReturnImportUpdates( - mock: SingleScanConnection, - migrationId: Long, - after: String, - updates: Seq[UpdateHistoryResponse], - count: Int, - ): Unit = { - when(mock.getImportUpdates(migrationId, after, count)) - .thenReturn(Future.successful(updates)) - } def makeMockFailUpdatesBefore( mock: SingleScanConnection, before: CantonTimestamp, @@ -437,11 +427,9 @@ class BftScanConnectionTest val infoResponse = Some( SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = Map(synchronizerId -> DomainRecordTimeRange(ctime(1), ctime(2))), - lastImportUpdateId = Some("updateId1"), + None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(1), ctime(2))), complete = true, - importUpdatesComplete = true, ) ) connections.foreach(makeMockReturnMigrationInfo(_, 0, infoResponse)) @@ -457,11 +445,9 @@ class BftScanConnectionTest def infoResponse(start: Int, complete: Boolean) = Some( SourceMigrationInfo( - previousMigrationId = if (complete) Some(0) else None, - recordTimeRange = Map(synchronizerId -> DomainRecordTimeRange(ctime(start), ctime(10))), - lastImportUpdateId = Some("updateId1"), + if (complete) Some(0) else None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(start), ctime(10))), complete = complete, - importUpdatesComplete = complete, ) ) makeMockReturnMigrationInfo(connections(0), 1, None) @@ -475,11 +461,9 @@ class BftScanConnectionTest } yield migrationInfo should be( Some( SourceMigrationInfo( - previousMigrationId = Some(0), - recordTimeRange = Map(synchronizerId -> DomainRecordTimeRange(ctime(1), ctime(10))), - lastImportUpdateId = Some("updateId1"), + Some(0), + Map(synchronizerId -> DomainRecordTimeRange(ctime(1), ctime(10))), complete = true, - importUpdatesComplete = true, ) ) ) @@ -490,11 +474,9 @@ class BftScanConnectionTest val infoResponse = Some( SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = Map(synchronizerId -> DomainRecordTimeRange(ctime(1), ctime(2))), - lastImportUpdateId = Some("updateId1"), + None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(1), ctime(2))), complete = true, - importUpdatesComplete = true, ) ) connections.foreach(makeMockReturnMigrationInfo(_, 0, infoResponse)) @@ -512,12 +494,9 @@ class BftScanConnectionTest def infoResponse(first: Int, last: Int, complete: Boolean) = Some( SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = - Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), - lastImportUpdateId = Some("updateId1"), + None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), complete = complete, - importUpdatesComplete = complete, ) ) @@ -552,12 +531,9 @@ class BftScanConnectionTest def infoResponse(first: Int, last: Int, complete: Boolean) = Some( SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = - Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), - lastImportUpdateId = Some("updateId1"), + None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), complete = complete, - importUpdatesComplete = complete, ) ) @@ -585,12 +561,9 @@ class BftScanConnectionTest def infoResponse(first: Int, last: Int, complete: Boolean) = Some( SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = - Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), - lastImportUpdateId = Some("updateId1"), + None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), complete = complete, - importUpdatesComplete = complete, ) ) @@ -624,12 +597,9 @@ class BftScanConnectionTest def infoResponse(first: Int, last: Int, complete: Boolean) = Some( SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = - Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), - lastImportUpdateId = Some("updateId1"), + None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), complete = complete, - importUpdatesComplete = complete, ) ) @@ -666,57 +636,14 @@ class BftScanConnectionTest } } - "fail when consensus cannot be reached for import updates info" in { - val connections = getMockedConnections(n = 7) // f=2 - def infoResponse(last: Int, complete: Boolean) = - Some( - SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = Map(synchronizerId -> DomainRecordTimeRange(ctime(1), ctime(10))), - lastImportUpdateId = Some(s"updateId${last}"), - complete = complete, - importUpdatesComplete = complete, - ) - ) - - def mockResponses(connection: Int, last: Int) = { - makeMockReturnMigrationInfo(connections(connection), 0, infoResponse(last, true)) - } - - // Two scans return last id = 2 - mockResponses(0, 2) - mockResponses(1, 2) - // Two scans return last id = 3 - mockResponses(2, 3) - mockResponses(3, 3) - // Two scan returns last id = 4 - mockResponses(4, 4) - mockResponses(5, 4) - // One scan returns last id = 5 - mockResponses(6, 5) - - val bft = getBft(connections) - - // Note: getUpdatesBefore() doesn't produce WARN logs, so we don't need to suppress them - for { - failure <- bft.getMigrationInfo(0).failed - } yield inside(failure) { case HttpErrorWithHttpCode(code, message) => - code should be(StatusCodes.BadGateway) - message should include("Failed to reach consensus from 5 Scan nodes") - } - } - - "fail when consensus cannot be reached for updates" in { + "fail when when consensus cannot be reached" in { val connections = getMockedConnections(n = 7) // f=2 def infoResponse(first: Int, last: Int, complete: Boolean) = Some( SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = - Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), - lastImportUpdateId = Some("updateId1"), + None, + Map(synchronizerId -> DomainRecordTimeRange(ctime(first), ctime(last))), complete = complete, - importUpdatesComplete = complete, ) ) @@ -754,53 +681,6 @@ class BftScanConnectionTest message should include("Failed to reach consensus from 5 Scan nodes") } } - - "fail when consensus cannot be reached for import updates" in { - val connections = getMockedConnections(n = 7) // f=2 - def infoResponse(last: Int, complete: Boolean) = - Some( - SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = Map(), - lastImportUpdateId = Some(s"updateId${last}"), - complete = complete, - importUpdatesComplete = complete, - ) - ) - - def mockResponses(connection: Int, last: Int, updates: Seq[Int]) = { - makeMockReturnMigrationInfo(connections(connection), 0, infoResponse(last, true)) - makeMockReturnImportUpdates( - connections(connection), - 0, - "", - updates.map(testUpdate), - 10, - ) - } - - // Two scans return updates [1,2,3,5] - mockResponses(0, 5, Seq(1, 2, 3, 5)) - mockResponses(1, 5, Seq(1, 2, 3, 5)) - // Two scans return updates [1,3,4,5] - mockResponses(2, 5, Seq(1, 3, 4, 5)) - mockResponses(3, 5, Seq(1, 3, 4, 5)) - // Two scans return updates [1,2,3,4,5] - mockResponses(4, 5, Seq(1, 2, 3, 4, 5)) - mockResponses(5, 5, Seq(1, 2, 3, 4, 5)) - // One scans returns updates [1,5] - mockResponses(6, 5, Seq(1, 5)) - - val bft = getBft(connections) - - // Note: getImportUpdates() doesn't produce WARN logs, so we don't need to suppress them - for { - failure <- bft.getImportUpdates(0, "", 10).failed - } yield inside(failure) { case HttpErrorWithHttpCode(code, message) => - code should be(StatusCodes.BadGateway) - message should include("Failed to reach consensus from 5 Scan nodes") - } - } } "ScanAggregatesConnection" should { diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala index adf7e59f5a..1abb4d5f5f 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala @@ -54,8 +54,9 @@ class AcsSnapshotTriggerTest // but there's still updates pending when( - updateHistory.getUpdatesWithoutImportUpdates( + updateHistory.getUpdates( eqTo(Some((currentMigrationId, lastSnapshotTime.plusSeconds(3600L)))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn(Future.successful(Seq.empty)) @@ -95,8 +96,9 @@ class AcsSnapshotTriggerTest noPreviousSnapshot() when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), + updateHistory.getUpdates( + eqTo(Some((currentMigrationId, CantonTimestamp.MinValue.plusSeconds(1L)))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn(Future.successful(Seq.empty)) @@ -111,8 +113,9 @@ class AcsSnapshotTriggerTest // data after ACS when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), + updateHistory.getUpdates( + eqTo(Some((currentMigrationId, CantonTimestamp.MinValue.plusSeconds(1L)))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -130,8 +133,9 @@ class AcsSnapshotTriggerTest // but there's still updates pending when( - updateHistory.getUpdatesWithoutImportUpdates( + updateHistory.getUpdates( eqTo(Some((currentMigrationId, firstSnapshotTime))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn(Future.successful(Seq.empty)) @@ -146,8 +150,9 @@ class AcsSnapshotTriggerTest // data after ACS when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), + updateHistory.getUpdates( + eqTo(Some((currentMigrationId, CantonTimestamp.MinValue.plusSeconds(1L)))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -166,8 +171,9 @@ class AcsSnapshotTriggerTest // no updates pending when( - updateHistory.getUpdatesWithoutImportUpdates( + updateHistory.getUpdates( eqTo(Some((currentMigrationId, firstSnapshotTime))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -195,8 +201,9 @@ class AcsSnapshotTriggerTest // data after ACS when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), + updateHistory.getUpdates( + eqTo(Some((currentMigrationId, CantonTimestamp.MinValue.plusSeconds(1L)))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -215,69 +222,6 @@ class AcsSnapshotTriggerTest trigger.retrieveTasks().futureValue should be(empty) } - "when update history backfilling has not finished import updates: return no task" in new AcsSnapshotTriggerTestScope( - true - ) { - noPreviousSnapshot() - - // data after ACS - when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), - eqTo(PageLimit.tryCreate(1)), - )(any[TraceContext]) - ).thenReturn( - Future.successful( - Seq( - TreeUpdateWithMigrationId( - UpdateHistoryResponse(treeUpdate(now.minusSeconds(1800L)), dummyDomain), - 1L, - ) - ) - ) - ) - - historyPartiallyBackfilled( - currentMigrationId, - complete = true, - importUpdatesComplete = false, - ) - - trigger.retrieveTasks().futureValue should be(empty) - } - - // Currently, import updates are always backfilled after regular updates, but we do not want to depend on that in the trigger - "when update history backfilling has not finished regular updates: return no task" in new AcsSnapshotTriggerTestScope( - true - ) { - noPreviousSnapshot() - - // data after ACS - when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), - eqTo(PageLimit.tryCreate(1)), - )(any[TraceContext]) - ).thenReturn( - Future.successful( - Seq( - TreeUpdateWithMigrationId( - UpdateHistoryResponse(treeUpdate(now.minusSeconds(1800L)), dummyDomain), - 1L, - ) - ) - ) - ) - - historyPartiallyBackfilled( - currentMigrationId, - complete = false, - importUpdatesComplete = true, - ) - - trigger.retrieveTasks().futureValue should be(empty) - } - // this is the case of when an SV joins late (and the history is backfilled), // or was present at the beginning of a migration (no backfilling required). "when update history backfilling has finished: return the first task when due and no updates pending" in new AcsSnapshotTriggerTestScope( @@ -287,8 +231,9 @@ class AcsSnapshotTriggerTest // data after ACS when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), + updateHistory.getUpdates( + eqTo(Some((currentMigrationId, CantonTimestamp.MinValue.plusSeconds(1L)))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -309,8 +254,9 @@ class AcsSnapshotTriggerTest // no updates pending when( - updateHistory.getUpdatesWithoutImportUpdates( + updateHistory.getUpdates( eqTo(Some((currentMigrationId, firstSnapshotTime))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -362,7 +308,7 @@ class AcsSnapshotTriggerTest noPreviousSnapshot(previousMigrationId) firstUpdateAt( previousMigrationId, - queryRecordTime = CantonTimestamp.MinValue, + queryRecordTime = CantonTimestamp.MinValue.plusSeconds(1L), // first update happened 10d ~3h before the last update updateRecordTime = cantonTimestamp("2007-01-03T07:55:30.00Z"), ) @@ -434,7 +380,7 @@ class AcsSnapshotTriggerTest // first update at 2007-11-20T10:15:30.00Z firstUpdateAt( previousMigrationId - 1, - queryRecordTime = CantonTimestamp.MinValue, + queryRecordTime = CantonTimestamp.MinValue.plusSeconds(1L), updateRecordTime = cantonTimestamp("2007-11-20T10:15:30.00Z"), ) @@ -488,8 +434,9 @@ class AcsSnapshotTriggerTest // data after ACS when( - updateHistory.getUpdatesWithoutImportUpdates( - eqTo(Some((currentMigrationId, CantonTimestamp.MinValue))), + updateHistory.getUpdates( + eqTo(Some((currentMigrationId, CantonTimestamp.MinValue.plusSeconds(1L)))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -508,8 +455,9 @@ class AcsSnapshotTriggerTest // no updates pending when( - updateHistory.getUpdatesWithoutImportUpdates( + updateHistory.getUpdates( eqTo(Some((currentMigrationId, firstSnapshotTime))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -587,15 +535,7 @@ class AcsSnapshotTriggerTest when(sourceHistory.migrationInfo(anyLong)(any[TraceContext])) .thenReturn( Future.successful( - Some( - HistoryBackfilling.SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = Map.empty, - lastImportUpdateId = None, - complete = true, - importUpdatesComplete = true, - ) - ) + Some(HistoryBackfilling.SourceMigrationInfo(None, Map.empty, complete = true)) ) ) when(updateHistory.sourceHistory).thenReturn(sourceHistory) @@ -646,8 +586,9 @@ class AcsSnapshotTriggerTest updateRecordTime: CantonTimestamp, ) = { when( - updateHistory.getUpdatesWithoutImportUpdates( + updateHistory.getUpdates( eqTo(Some((migrationId, queryRecordTime))), + eqTo(true), eqTo(PageLimit.tryCreate(1)), )(any[TraceContext]) ).thenReturn( @@ -671,38 +612,7 @@ class AcsSnapshotTriggerTest ) .thenReturn( Future.successful( - Some( - HistoryBackfilling.SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = Map.empty, - lastImportUpdateId = None, - complete = complete, - importUpdatesComplete = complete, - ) - ) - ) - ) - } - - def historyPartiallyBackfilled( - migrationId: Long, - complete: Boolean, - importUpdatesComplete: Boolean, - ): Unit = { - when( - updateHistory.sourceHistory.migrationInfo(eqTo(migrationId))(any[TraceContext]) - ) - .thenReturn( - Future.successful( - Some( - HistoryBackfilling.SourceMigrationInfo( - previousMigrationId = None, - recordTimeRange = Map.empty, - lastImportUpdateId = None, - complete = complete, - importUpdatesComplete = importUpdatesComplete, - ) - ) + Some(HistoryBackfilling.SourceMigrationInfo(None, Map.empty, complete)) ) ) } diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/ScanHistoryBackfillingTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/ScanHistoryBackfillingTest.scala index 841944e860..2c555face8 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/ScanHistoryBackfillingTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/ScanHistoryBackfillingTest.scala @@ -1,9 +1,5 @@ package org.lfdecentralizedtrust.splice.store -import org.lfdecentralizedtrust.splice.environment.ledger.api.{ - ReassignmentUpdate, - TransactionTreeUpdate, -} import org.lfdecentralizedtrust.splice.scan.store.ScanHistoryBackfilling import org.lfdecentralizedtrust.splice.util.DomainRecordTimeRange import org.lfdecentralizedtrust.splice.scan.admin.api.client.BackfillingScanConnection @@ -15,7 +11,6 @@ import com.digitalasset.canton.topology.SynchronizerId import com.digitalasset.canton.tracing.TraceContext import scala.concurrent.Future -import scala.jdk.CollectionConverters.* class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { @@ -24,7 +19,6 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { for { testData <- setup() // Backfill - backfillingTerminated <- backfillAll( testData.sourceHistory, testData.destinationHistory, @@ -34,60 +28,22 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { .migrationInfo(0) .map(_.value.complete) // Check that the updates are the same - allUpdatesA <- testData.sourceHistory.getAllUpdates( - None, - PageLimit.tryCreate(1000), - ) - allUpdatesB <- testData.destinationHistory.getAllUpdates( - None, - PageLimit.tryCreate(1000), - ) - stdUpdatesA <- testData.sourceHistory.getUpdatesWithoutImportUpdates( + updatesA <- testData.sourceHistory.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) - stdUpdatesB <- testData.destinationHistory.getUpdatesWithoutImportUpdates( + updatesB <- testData.destinationHistory.getUpdates( None, - PageLimit.tryCreate(1000), - ) - importUpdatesA1 <- testData.sourceHistory.getImportUpdates( - 1, - "", - PageLimit.tryCreate(1000), - ) - importUpdatesB1 <- testData.destinationHistory.getImportUpdates( - 1, - "", - PageLimit.tryCreate(1000), - ) - importUpdatesA2 <- testData.sourceHistory.getImportUpdates( - 2, - "", - PageLimit.tryCreate(1000), - ) - importUpdatesB2 <- testData.destinationHistory.getImportUpdates( - 2, - "", + includeImportUpdates = true, PageLimit.tryCreate(1000), ) } yield { backfillingTerminated shouldBe true backfillingComplete shouldBe true - - // `getAllUpdates()` returns updates as they are stored in the database, but import updates backfilling - // rewrites them to be consistent across SVs. This transformation modifies update ids, so we can't use - // regular equality. Moreover, the order of import updates will differ across stores. - allUpdatesA should not contain theSameElementsAs(allUpdatesB) - allUpdatesA.map(testUpdateIdentifier) should contain theSameElementsAs allUpdatesB - .map(testUpdateIdentifier) - - // `getUpdatesWithoutImportUpdates()` returns updates as they are stored in the database. - // Regular update backfilling may modify event ids, but in this test code it doesn't. - stdUpdatesA should contain theSameElementsInOrderAs stdUpdatesB - - // `getImportUpdates()` returns updates in a form that is consistent across SVs - importUpdatesA1 should contain theSameElementsInOrderAs importUpdatesB1 - importUpdatesA2 should contain theSameElementsInOrderAs importUpdatesB2 + updatesA.map(_.update.update.updateId) should contain theSameElementsInOrderAs updatesB.map( + _.update.update.updateId + ) } } @@ -103,8 +59,9 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { ) migrationInfo0 <- testData.destinationHistory.sourceHistory .migrationInfo(0) - updatesB1 <- testData.destinationHistory.getAllUpdates( + updatesB1 <- testData.destinationHistory.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) @@ -119,12 +76,14 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { .map(_.value.complete) // Check that the updates are the same - allUpdatesA <- testData.sourceHistory.getAllUpdates( + updatesA <- testData.sourceHistory.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) - allUpdatesB2 <- testData.destinationHistory.getAllUpdates( + updatesB2 <- testData.destinationHistory.getUpdates( None, + includeImportUpdates = true, PageLimit.tryCreate(1000), ) } yield { @@ -132,15 +91,13 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { migrationInfo0 shouldBe None backfillingTerminated2 shouldBe true backfillingComplete2 shouldBe true - - // See comments in previous test for why we can't use regular equality here val updateIdsA1 = - allUpdatesA.filter(_.update.update.recordTime >= time(5)).map(testUpdateIdentifier) - val updateIdsA2 = allUpdatesA.map(testUpdateIdentifier) - val updateIdsB1 = updatesB1.map(testUpdateIdentifier) - val updateIdsB2 = allUpdatesB2.map(testUpdateIdentifier) - updateIdsB1 should contain theSameElementsAs updateIdsA1 - updateIdsB2 should contain theSameElementsAs updateIdsA2 + updatesA.filter(_.update.update.recordTime >= time(5)).map(_.update.update.updateId) + val updateIdsA2 = updatesA.map(_.update.update.updateId) + val updateIdsB1 = updatesB1.map(_.update.update.updateId) + val updateIdsB2 = updatesB2.map(_.update.update.updateId) + updateIdsB1 should contain theSameElementsInOrderAs updateIdsA1 + updateIdsB2 should contain theSameElementsInOrderAs updateIdsA2 } } } @@ -164,7 +121,7 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { // domain 1: 1 . 3 // domain 2: . 2 . _ <- initStore(storeA0) - tx01 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1)) + tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1)) _ <- assign( domain2, domain1, @@ -191,22 +148,18 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { // domain 1: 4 5 // domain 2: . . _ <- initStore(storeA1) - _ <- importUpdate(tx01, validOffset(1), storeA1) - tx11 <- create(domain1, validContractId(4), validOffset(4), party1, storeA1, time(4)) - tx12 <- create(domain1, validContractId(5), validOffset(5), party1, storeA1, time(5)) + _ <- create(domain1, validContractId(4), validOffset(4), party1, storeA1, time(4)) + _ <- create(domain1, validContractId(5), validOffset(5), party1, storeA1, time(5)) // Migration 2: // domain 1: . // domain 2: 6 _ <- initStore(storeA2) - _ <- importUpdate(tx01, validOffset(1), storeA2) - _ <- importUpdate(tx11, validOffset(4), storeA2) - _ <- importUpdate(tx12, validOffset(5), storeA2) _ <- create(domain2, validContractId(6), validOffset(6), party1, storeA2, time(6)) // At this point, storeA2 joins and both continue with migration 2: // domain 1: 7 . // domain 2: . 8 _ <- initStore(storeB2) - tx22 <- createMulti( + tx2 <- createMulti( domain1, validContractId(7), validOffset(7), @@ -224,36 +177,19 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { ) _ <- storeA0.initializeBackfilling( 0, - SynchronizerId.tryFromString(tx01.getSynchronizerId), - tx01.getUpdateId, + SynchronizerId.tryFromString(tx1.getSynchronizerId), + tx1.getUpdateId, complete = true, ) _ <- storeB2.initializeBackfilling( 2, - SynchronizerId.tryFromString(tx22.getSynchronizerId), - tx22.getUpdateId, + SynchronizerId.tryFromString(tx2.getSynchronizerId), + tx2.getUpdateId, complete = false, ) } yield TestData(storeA2, storeB2) } - /** Returns a string that uniquely identifies the given update. - * We use this to determine whether two updates are the same, without comparing the full content. - * This approach is easier to read and debug than a custom Equality instance. - */ - private def testUpdateIdentifier(tx: TreeUpdateWithMigrationId): String = { - val update = tx.update.update match { - case TransactionTreeUpdate(tree) - if tree.getRecordTime == CantonTimestamp.MinValue.toInstant => - s"TransactionTreeImportUpdate(${tree.getEventsById.asScala.values.map(_.getContractId).mkString(", ")})" - case TransactionTreeUpdate(tree) => - s"TransactionTreeUpdate(${tree.getUpdateId})" - case ReassignmentUpdate(transfer) => - s"ReassignmentUpdate(${transfer.updateId})" - } - s"TreeUpdateWithMigrationId(GetTreeUpdatesResponse($update, ${tx.update.synchronizerId}), ${tx.migrationId})" - } - private def backfillAll( source: UpdateHistory, destination: UpdateHistory, @@ -276,15 +212,7 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { backfiller.backfill().flatMap { case HistoryBackfilling.Outcome.MoreWorkAvailableNow(_) => go(i + 1) case HistoryBackfilling.Outcome.MoreWorkAvailableLater => Future.successful(false) - case HistoryBackfilling.Outcome.BackfillingIsComplete => goImportUpdates(1) - } - } - def goImportUpdates(i: Int): Future[Boolean] = { - logger.debug(s"backfillImportUpdates() iteration $i") - backfiller.backfillImportUpdates().flatMap { - case ImportUpdatesBackfilling.Outcome.MoreWorkAvailableNow(_) => goImportUpdates(i + 1) - case ImportUpdatesBackfilling.Outcome.MoreWorkAvailableLater => Future.successful(false) - case ImportUpdatesBackfilling.Outcome.BackfillingIsComplete => Future.successful(true) + case HistoryBackfilling.Outcome.BackfillingIsComplete => Future.successful(true) } } go(1) @@ -357,17 +285,5 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase { excludeBefore.get(synchronizerId).fold(false)(b => u.update.recordTime >= b) ) ) - - override def getImportUpdates(migrationId: Long, afterUpdateId: String, count: Int)(implicit - tc: TraceContext - ): Future[Seq[UpdateHistoryResponse]] = history - .getImportUpdates( - migrationId, - afterUpdateId, - PageLimit.tryCreate(count), - )(tc) - .map( - _.map(_.update) - ) } } diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala index f3743edc06..73ebff56e4 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala @@ -18,7 +18,7 @@ import org.lfdecentralizedtrust.splice.util.{Contract, HoldingsSummary, PackageQ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.FutureUnlessShutdown import com.digitalasset.canton.resource.DbStorage -import com.digitalasset.canton.topology.{PartyId, SynchronizerId} +import com.digitalasset.canton.topology.PartyId import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.MonadUtil import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} @@ -726,101 +726,11 @@ class AcsSnapshotStoreTest } - "fix for corrupt snapshots" should { - "remove corrupt snapshots" in { - val firstMigration = 1L - val secondMigration = 2L - for { - // Initial migration - updateHistory1 <- mkUpdateHistory( - firstMigration, - backfillingRequired = BackfillingRequirement.NeedsBackfilling, - ) - store1 = mkStore(updateHistory1) - update1 <- ingestCreate( - updateHistory1, - amuletRules(), - timestamp1.minusSeconds(1L), - ) - _ <- store1.insertNewSnapshot(None, firstMigration, timestamp1) - snapshot1first <- store1.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot1second <- store1.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - - _ = snapshot1first.map(_.snapshotRecordTime).value should be(timestamp1) - _ = snapshot1second should be(None) - - // Second migration. This is missing the import update corresponding to the create above. - updateHistory2 <- mkUpdateHistory( - secondMigration, - backfillingRequired = BackfillingRequirement.NeedsBackfilling, - ) - store2 = mkStore(updateHistory2) - _ <- ingestCreate( - updateHistory2, - amuletRules(), - timestamp2.minusSeconds(1L), - ) - _ <- store2.insertNewSnapshot(None, secondMigration, timestamp2) - snapshot2first <- store2.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot2second <- store2.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - - _ = snapshot2first.map(_.snapshotRecordTime).value should be(timestamp1) - _ = snapshot2second.map(_.snapshotRecordTime).value should be(timestamp2) - _ = snapshot2second.map(s => s.lastRowId - s.firstRowId).value should be(0) - - // Mark backfilling as complete, but import update backfilling as not complete. - // This is the state affected SVs should be in. - _ <- updateHistory1.initializeBackfilling( - joiningMigrationId = firstMigration, - joiningSynchronizerId = SynchronizerId.tryFromString(update1.tree.getSynchronizerId), - joiningUpdateId = update1.tree.getUpdateId, - complete = false, - ) - _ <- updateHistory1.destinationHistory.markBackfillingComplete() - - // Re-initialize the store. This should delete the second snapshot. - updateHistory3 <- mkUpdateHistory( - secondMigration, - backfillingRequired = BackfillingRequirement.NeedsBackfilling, - ) - store3 = mkStore(updateHistory3) - snapshot3first <- store3.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot3second <- store3.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - - _ = snapshot3first.map(_.snapshotRecordTime).value should be(timestamp1) - _ = snapshot3second should be(None) - - // Ingest some import update to simulate the import update backfilling and re-create the snapshot - _ <- ingestCreate( - updateHistory3, - amuletRules(), - CantonTimestamp.MinValue, - ) - _ <- updateHistory1.destinationHistory.markImportUpdatesBackfillingComplete() - _ <- store3.insertNewSnapshot(None, secondMigration, timestamp2) - - // Re-initialize the store. The second snapshot should now not be deleted. - updateHistory4 <- mkUpdateHistory( - secondMigration, - backfillingRequired = BackfillingRequirement.NeedsBackfilling, - ) - store4 = mkStore(updateHistory4) - snapshot4first <- store4.lookupSnapshotBefore(firstMigration, CantonTimestamp.MaxValue) - snapshot4second <- store4.lookupSnapshotBefore(secondMigration, CantonTimestamp.MaxValue) - } yield { - snapshot4first.map(_.snapshotRecordTime).value should be(timestamp1) - snapshot4second.map(_.snapshotRecordTime).value should be(timestamp2) - snapshot4second.map(s => s.lastRowId - s.firstRowId).value should be(1) - } - } - } } private def mkUpdateHistory( migrationId: Long = DefaultMigrationId, participantId: String = "whatever", - // Default to backfilling being always complete, to avoid unnecessary complexity in the tests - backfillingRequired: BackfillingRequirement = BackfillingRequirement.BackfillingNotRequired, ): Future[UpdateHistory] = { val updateHistory = new UpdateHistory( storage.underlying, // not under test @@ -828,7 +738,7 @@ class AcsSnapshotStoreTest "update_history_acs_snapshot_test", mkParticipantId(participantId), dsoParty, - backfillingRequired, + BackfillingRequirement.BackfillingNotRequired, loggerFactory, true, ) @@ -854,33 +764,30 @@ class AcsSnapshotStoreTest create: Contract[TCid, T], recordTime: CantonTimestamp, signatories: Seq[PartyId] = Seq(dsoParty), - ): Future[TransactionTreeUpdate] = { - val update = TransactionTreeUpdate( - mkCreateTx( - nextOffset(), - Seq(create.copy(createdAt = recordTime.toInstant).asInstanceOf[Contract[TCid, T]]), - Instant.now(), - signatories, - dummyDomain, - "acs-snapshot-store-test", - recordTime.toInstant, - packageName = DarResources - .lookupPackageId(create.identifier.getPackageId) - .getOrElse( - throw new IllegalArgumentException( - s"No package found for template ${create.identifier}" + ): Future[Unit] = { + updateHistory.ingestionSink.ingestUpdate( + dummyDomain, + TransactionTreeUpdate( + mkCreateTx( + nextOffset(), + Seq(create.copy(createdAt = recordTime.toInstant).asInstanceOf[Contract[TCid, T]]), + Instant.now(), + signatories, + dummyDomain, + "acs-snapshot-store-test", + recordTime.toInstant, + packageName = DarResources + .lookupPackageId(create.identifier.getPackageId) + .getOrElse( + throw new IllegalArgumentException( + s"No package found for template ${create.identifier}" + ) ) - ) - .metadata - .name, - ) + .metadata + .name, + ) + ), ) - updateHistory.ingestionSink - .ingestUpdate( - dummyDomain, - update, - ) - .map(_ => update) } private def ingestArchive[TCid <: ContractId[T], T]( From d4db4467d3acae3ed87011167b43c5f2bee9cc9b Mon Sep 17 00:00:00 2001 From: Robert Autenrieth Date: Wed, 18 Jun 2025 15:31:04 +0000 Subject: [PATCH 2/2] Keep database migration ... as it was already applied to CILR [ci] Signed-off-by: Robert Autenrieth --- .../stable/V039__backfilling_import_updates2.sql | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql diff --git a/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql b/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql new file mode 100644 index 0000000000..24ec7fc1fc --- /dev/null +++ b/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V039__backfilling_import_updates2.sql @@ -0,0 +1,13 @@ +-- Only SVs that joined the network at any point in the initial migration have all import updates +-- This statement was already part of the migration script V036__backfilling_import_updates.sql. +-- It needs to be executed again because Scala code for backfilling import updates +-- was reverted and re-applied between these two migrations. +update update_history_backfilling as bf +set import_updates_complete = true +where + bf.complete = true and + bf.joining_migration_id = ( + select min(migration_id) + from update_history_transactions as tx + where bf.history_id = tx.history_id + );