Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class RecoverExternalPartyIntegrationTest
}
val acsSnapshotFile = Files.createTempFile("acs", ".snapshot")
Files.write(acsSnapshotFile, acsSnapshot.toByteArray())
bobValidatorBackend.participantClient.repair.import_acs_old(acsSnapshotFile.toString)
bobValidatorBackend.participantClient.repair.import_acs(acsSnapshotFile.toString)
bobValidatorBackend.participantClient.synchronizers.reconnect_all()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ import com.digitalasset.canton.admin.api.client.data.{
ParticipantStatus,
PruningSchedule,
}
import com.digitalasset.canton.admin.participant.v30.{ExportAcsOldResponse, PruningServiceGrpc}
import com.digitalasset.canton.participant.admin.data.ContractIdImportMode
import com.digitalasset.canton.admin.participant.v30.{
ExportAcsResponse,
ExportAcsAtTimestampResponse,
PruningServiceGrpc,
}
import com.digitalasset.canton.admin.participant.v30.PruningServiceGrpc.PruningServiceStub
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.config.RequireTypes.{NonNegativeLong, PositiveInt}
import com.digitalasset.canton.config.{ApiLoggingConfig, ClientConfig, PositiveDurationSeconds}
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.logging.NamedLoggerFactory
Expand Down Expand Up @@ -283,27 +288,79 @@ class ParticipantAdminConnection(
)
}

def downloadAcsSnapshot(
def downloadAcsSnapshotForPartyMigration(
parties: Set[PartyId],
filterSynchronizerId: Option[SynchronizerId] = None,
timestamp: Option[Instant] = None,
force: Boolean = false,
filterSynchronizerId: SynchronizerId,
timestamp: Instant,
)(implicit traceContext: TraceContext): Future[ByteString] = {
logger.debug(
show"Downloading ACS snapshot from domain $filterSynchronizerId, for parties $parties at timestamp $timestamp"
)
val requestComplete = Promise[ByteString]()
// TODO(DACH-NY/canton-network-node#3298) just concatenate the byteString here. Make it scale to 2M contracts.
val observer = new GrpcByteChunksToByteArrayObserver[ExportAcsOldResponse](requestComplete)
val observer =
new GrpcByteChunksToByteArrayObserver[ExportAcsAtTimestampResponse](requestComplete)
runCmd(
ParticipantAdminCommands.ParticipantRepairManagement.ExportAcsOld(
ParticipantAdminCommands.PartyManagement.ExportAcsAtTimestamp(
parties = parties,
partiesOffboarding = false,
filterSynchronizerId,
timestamp,
observer,
Map.empty,
force,
)
).discard
requestComplete.future
}

def downloadAcsSnapshotForSynchronizerMigration(
parties: Set[PartyId],
synchronizerId: SynchronizerId,
timestamp: Instant,
disasterRecovery: Boolean,
)(implicit traceContext: TraceContext): Future[ByteString] = {
// ExportAcsAtTimestamp only works if the timestamp corresponds to a PartyToParticipant change so this
// is required for synchronizer migrations and disaster recovery. Without the force flag,
// this will fail until we have processed a timestamp >= requested timestamp. On migrations this is guaranteed
// to happen for all nodes due to time proofs. On disaster recovery, we cannot guarantee this so we
// use force=true which will not wait until a timestamp >= requested timestamp has been processed.
getHighestOffsetByTimestamp(synchronizerId, timestamp, force = disasterRecovery).flatMap {
offset =>
downloadAcsSnapshotAtOffset(parties, synchronizerId, offset.unwrap)
}
}

def getHighestOffsetByTimestamp(
synchronizerId: SynchronizerId,
timestamp: Instant,
force: Boolean,
)(implicit tc: TraceContext): Future[NonNegativeLong] = {
runCmd(
ParticipantAdminCommands.PartyManagement
.GetHighestOffsetByTimestamp(synchronizerId, timestamp, force)
).map { offset =>
logger.debug(s"Translated $timestamp on $synchronizerId to $offset with force=$force")
offset
}
}

def downloadAcsSnapshotAtOffset(
parties: Set[PartyId],
filterSynchronizerId: SynchronizerId,
offset: Long,
)(implicit traceContext: TraceContext): Future[ByteString] = {
logger.debug(
show"Downloading ACS snapshot from domain $filterSynchronizerId, for parties $parties at offset $offset"
)
val requestComplete = Promise[ByteString]()
// TODO(#3298) just concatenate the byteString here. Make it scale to 2M contracts.
val observer =
new GrpcByteChunksToByteArrayObserver[ExportAcsResponse](requestComplete)
runCmd(
ParticipantAdminCommands.PartyManagement.ExportAcs(
parties = parties,
Some(filterSynchronizerId),
offset,
observer,
contractSynchronizerRenames = Map.empty,
)
).discard
requestComplete.future
Expand All @@ -317,10 +374,10 @@ class ParticipantAdminConnection(
"Imports the acs in the participantl",
runCmd(
ParticipantAdminCommands.ParticipantRepairManagement
.ImportAcsOld(
.ImportAcs(
acsBytes,
IMPORT_ACS_WORKFLOW_ID_PREFIX,
allowContractIdSuffixRecomputation = false,
workflowIdPrefix = IMPORT_ACS_WORKFLOW_ID_PREFIX,
contractIdImportMode = ContractIdImportMode.Validation,
)
).map(_ => ()),
logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ class AcsExporter(
def exportAcsAtTimestamp(
domain: SynchronizerId,
timestamp: Instant,
force: Boolean,
disasterRecovery: Boolean,
parties: PartyId*
)(implicit
tc: TraceContext
): Future[ByteString] = {
participantAdminConnection.downloadAcsSnapshot(
participantAdminConnection.downloadAcsSnapshotForSynchronizerMigration(
parties = parties.toSet,
filterSynchronizerId = Some(domain),
timestamp = Some(timestamp),
force = force,
synchronizerId = domain,
timestamp = timestamp,
disasterRecovery = disasterRecovery,
)
}

Expand Down Expand Up @@ -90,11 +90,11 @@ class AcsExporter(
)
acsSnapshotTimestamp = domainParamsStateTopology.base.validFrom
snapshot <- EitherT.liftF[Future, AcsExportFailure, ByteString](
participantAdminConnection.downloadAcsSnapshot(
participantAdminConnection.downloadAcsSnapshotForSynchronizerMigration(
parties = parties.toSet,
filterSynchronizerId = Some(domain),
timestamp = Some(acsSnapshotTimestamp),
force = true,
synchronizerId = domain,
timestamp = acsSnapshotTimestamp,
disasterRecovery = false,
)
)
} yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class ScanApp(
config.spliceInstanceNames,
participantAdminConnection,
sequencerAdminConnection,
store,
automation,
acsSnapshotStore,
dsoAnsResolver,
config.miningRoundsCacheTimeToLiveOverride,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.lfdecentralizedtrust.splice.http.v0.definitions.{
import org.lfdecentralizedtrust.splice.http.v0.scan.ScanResource
import org.lfdecentralizedtrust.splice.http.v0.{definitions, scan as v0}
import org.lfdecentralizedtrust.splice.scan.store.{AcsSnapshotStore, ScanStore, TxLogEntry}
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion
import org.lfdecentralizedtrust.splice.util.{
Codec,
Contract,
Expand All @@ -53,9 +54,10 @@ import org.lfdecentralizedtrust.splice.util.{
}
import org.lfdecentralizedtrust.splice.util.PrettyInstances.*
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.participant.admin.data.ActiveContractOld as ActiveContract
import com.digitalasset.canton.participant.admin.data.ActiveContract
import com.digitalasset.canton.topology.{Member, PartyId, SynchronizerId}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.{ByteStringUtil, GrpcStreamingUtils, ResourceUtil}
import com.digitalasset.canton.util.ShowUtil.*
import com.google.protobuf.ByteString
import io.grpc.Status
Expand All @@ -65,6 +67,7 @@ import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters.*
import scala.jdk.OptionConverters.*
import scala.util.{Try, Using}
import java.io.ByteArrayInputStream
import java.util.Base64
import java.util.zip.GZIPOutputStream
import java.time.{Instant, OffsetDateTime, ZoneOffset}
Expand Down Expand Up @@ -98,7 +101,7 @@ class HttpScanHandler(
spliceInstanceNames: SpliceInstanceNamesConfig,
participantAdminConnection: ParticipantAdminConnection,
sequencerAdminConnection: SequencerAdminConnection,
protected val store: ScanStore,
protected val storeWithIngestion: AppStoreWithIngestion[ScanStore],
snapshotStore: AcsSnapshotStore,
dsoAnsResolver: DsoAnsResolver,
miningRoundsCacheTimeToLiveOverride: Option[NonNegativeFiniteDuration],
Expand All @@ -115,6 +118,7 @@ class HttpScanHandler(
with HttpValidatorLicensesHandler
with HttpFeatureSupportHandler {

private val store = storeWithIngestion.store
override protected val workflowId: String = this.getClass.getSimpleName
override protected val votesStore: VotesStore = store
override protected val validatorLicensesStore: AppStore = store
Expand Down Expand Up @@ -1147,25 +1151,45 @@ class HttpScanHandler(
/** Filter the given ACS snapshot to contracts the given party is a stakeholder on */
// TODO(#828) Move this logic inside a Canton gRPC API.
private def filterAcsSnapshot(input: ByteString, stakeholder: PartyId): ByteString = {
val contracts = ActiveContract
.loadFromByteString(input)
.valueOr(error =>
throw Status.INTERNAL
.withDescription(s"Failed to read ACS snapshot: ${error}")
.asRuntimeException()
)
val decompressedBytes =
ByteStringUtil
.decompressGzip(input, None)
.valueOr(err =>
throw Status.INVALID_ARGUMENT
.withDescription(s"Failed to decompress bytes: $err")
.asRuntimeException
)
val contracts = ResourceUtil.withResource(
new ByteArrayInputStream(decompressedBytes.toByteArray)
) { inputSource =>
GrpcStreamingUtils
.parseDelimitedFromTrusted[ActiveContract](
inputSource,
ActiveContract,
)
.valueOr(err =>
throw Status.INVALID_ARGUMENT
.withDescription(s"Failed to parse contracts in acs snapshot: $err")
.asRuntimeException
)
}
val output = ByteString.newOutput
Using.resource(new GZIPOutputStream(output)) { outputStream =>
contracts.filter(c => c.contract.metadata.stakeholders.contains(stakeholder.toLf)).foreach {
c =>
contracts
.filter(c =>
c.contract.getCreatedEvent.signatories.contains(
stakeholder.toLf
) || c.contract.getCreatedEvent.observers.contains(stakeholder.toLf)
)
.foreach { c =>
c.writeDelimitedTo(outputStream) match {
case Left(error) =>
throw Status.INTERNAL
.withDescription(s"Failed to write ACS snapshot: ${error}")
.asRuntimeException()
case Right(_) => outputStream.flush()
}
}
}
}
output.toByteString
}
Expand All @@ -1180,6 +1204,20 @@ class HttpScanHandler(
withSpan(s"$workflowId.getAcsSnapshot") { _ => _ =>
val partyId = PartyId.tryFromProtoPrimitive(party)
for {
synchronizerId <- store
.lookupAmuletRules()
.map(
_.getOrElse(
throw io.grpc.Status.FAILED_PRECONDITION
.withDescription("No amulet rules.")
.asRuntimeException()
).state.fold(
identity,
throw io.grpc.Status.FAILED_PRECONDITION
.withDescription("Amulet rules are in flight.")
.asRuntimeException(),
)
)
// The DSO party is a stakeholder on all "important" contracts, in particular, all amulet holdings and ANS entries.
// This means the SV participants ingest data for that party and we can take a snapshot for that party.
// To make sure the snapshot is the same regardless of which SV is queried, we filter it down to
Expand All @@ -1188,10 +1226,24 @@ class HttpScanHandler(
// that users backup their own ACS.
// As the DSO party is hosted on all SVs, an arbitrary scan instance can be chosen for the ACS snapshot.
// BFT reads are usually not required since ACS commitments act as a check that the ACS was correct.
acsSnapshot <- participantAdminConnection.downloadAcsSnapshot(
Set(partyId),
timestamp = recordTime.map(_.toInstant),
)
acsSnapshot <- recordTime match {
case None =>
storeWithIngestion.connection.ledgerEnd().flatMap { offset =>
participantAdminConnection.downloadAcsSnapshotAtOffset(
Set(partyId),
offset = offset,
filterSynchronizerId = synchronizerId,
)
}
case Some(time) =>
// To support more timestamp we use forSynchronizerMigration instead of forPartyMigration
participantAdminConnection.downloadAcsSnapshotForSynchronizerMigration(
Set(partyId),
timestamp = time.toInstant,
synchronizerId = synchronizerId,
disasterRecovery = false,
)
}
} yield {
val filteredAcsSnapshot =
filterAcsSnapshot(acsSnapshot, store.key.dsoParty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,6 @@ class SvApp(
localSynchronizerNode,
retryProvider,
new DsoPartyMigration(
svAutomation,
dsoAutomation,
participantAdminConnection,
retryProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ class HttpSvAdminHandler(
.getDomainDataSnapshot(
Instant.parse(timestamp),
partyId.map(Codec.tryDecode(Codec.Party)(_)),
force.getOrElse(false),
disasterRecovery = true,
)
.map { response =>
val responseHttp = response.toHttp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DomainDataSnapshotGenerator(
def getDomainDataSnapshot(
timestamp: Instant,
partyId: Option[PartyId],
force: Boolean,
disasterRecovery: Boolean,
)(implicit
ec: ExecutionContext,
tc: TraceContext,
Expand All @@ -57,7 +57,7 @@ class DomainDataSnapshotGenerator(
.exportAcsAtTimestamp(
decentralizedSynchronizer,
timestamp,
force,
disasterRecovery,
partyId.fold(Seq(dsoStore.key.dsoParty, dsoStore.key.svParty))(Seq(_))*
)
dars <- darExporter.exportAllDars()
Expand Down
Loading