Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
bin
.gradle/
**/build
/*/ignite/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,12 @@ class NodeAdapterJS(val node: INode) : INodeJS_ {
}

override fun setReferenceTargetNode(role: ReferenceRole, target: INodeJS?) {
val unwrappedTarget = if (target == null) null else (target as NodeAdapterJS).node
node.asWritableNode().setReferenceTarget(toReferenceLink(role), unwrappedTarget?.asWritableNode())
if (target is NodeAdapterJS) {
node.asWritableNode().setReferenceTarget(toReferenceLink(role), target.node.asWritableNode())
} else {
val targetRef = target?.getReference()
setReferenceTargetRef(role, targetRef)
}
}

override fun setReferenceTargetRef(role: ReferenceRole, target: INodeReferenceJS?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
Expand All @@ -20,6 +21,7 @@ import org.modelix.model.api.INodeReference
import org.modelix.model.api.runSynchronized
import org.modelix.model.lazy.BranchReference
import org.modelix.model.lazy.CLVersion
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.mutable.IGenericMutableModelTree
import org.modelix.model.mutable.IMutableModelTree
import org.modelix.model.mutable.INodeIdGenerator
Expand Down Expand Up @@ -56,43 +58,80 @@ import org.modelix.model.mutable.asModel
* Dispose should be called on this, as otherwise a regular polling will go on.
*
* @property client the model client to connect to the model server
* @property branchRef the model server branch to fetch the data from
* @property providedScope the CoroutineScope to use for the suspendable tasks
* @property initialRemoteVersion the last version on the server from which we want to start the synchronization
* @param branchRefOrNull branch reference, null if strictly version-based
* @property idGenerator the generator for node IDs
* @param providedScope the CoroutineScope to use for the suspendable tasks
* @param initialRemoteVersion the last version on the server from which we want to start the synchronization
* @param repositoryId the repository ID, required if [versionHash] is provided
* @param versionHash the version hash to replicate, if not using a branch reference
*/
class ReplicatedModel(
val client: IModelClientV2,
val branchRef: BranchReference,
private val branchRefOrNull: BranchReference?,
val idGenerator: (TreeId) -> INodeIdGenerator<INodeReference>,
private val providedScope: CoroutineScope? = null,
initialRemoteVersion: CLVersion? = null,
repositoryId: RepositoryId? = null,
versionHash: String? = null,
) : Closeable {

constructor(
client: IModelClientV2,
branchRef: BranchReference,
idGenerator: (TreeId) -> INodeIdGenerator<INodeReference>,
providedScope: CoroutineScope? = null,
initialRemoteVersion: CLVersion? = null,
) : this(client, branchRef, idGenerator, providedScope, initialRemoteVersion, null, null)

val branchRef: BranchReference get() = branchRefOrNull ?: error("ReplicatedModel is in read-only version mode")

private val scope = providedScope ?: CoroutineScope(Dispatchers.Default)
private var state = State.New
private var localModel: LocalModel? = null
private val remoteVersion = RemoteVersion(client, branchRef, initialRemoteVersion)

private val remoteVersion: IRemoteVersion

private var pollingJob: Job? = null

init {
if (initialRemoteVersion != null) {
localModel = LocalModel(initialRemoteVersion, client.getIdGenerator(), idGenerator(initialRemoteVersion.getModelTree().getId())) { client.getUserId() }
}

if (branchRefOrNull != null) {
check(versionHash == null) { "Cannot provide both branchRef and versionHash" }
remoteVersion = RemoteVersionFromBranch(client, branchRefOrNull, initialRemoteVersion)
} else {
require(versionHash != null) { "Either branchRef or versionHash must be provided" }
require(repositoryId != null) { "repositoryId is required when versionHash is provided" }
remoteVersion = RemoteVersionFromHash(client, repositoryId, versionHash)
}
}

private fun getLocalModel(): LocalModel = checkNotNull(localModel) { "Model is not initialized yet" }

/**
* Returns the model as an [IMutableModel].
*/
fun getModel(): IMutableModel {
return getLocalModel().versionedModelTree.asModel()
}

/**
* Returns the underlying [VersionedModelTree].
*/
fun getVersionedModelTree(): IMutableModelTree = getLocalModel().versionedModelTree

/**
* Starts the synchronization process.
* Use [dispose] to stop it.
*/
suspend fun start(): IMutableModelTree {
if (state != State.New) throw IllegalStateException("already started")
check(state == State.New) { "already started" }
state = State.Starting

if (localModel == null) {
val initialVersion = remoteVersion.pull()
val initialVersion = remoteVersion.getInitialVersion()
localModel = LocalModel(initialVersion, client.getIdGenerator(), idGenerator(initialVersion.getModelTree().getId())) { client.getUserId() }
}

Expand All @@ -106,11 +145,11 @@ class ReplicatedModel(
remoteVersionReceived(newRemoteVersion, null)
nextDelayMs = 0
} catch (ex: CancellationException) {
LOG.debug { "Stop polling branch $branchRef after disposing." }
LOG.debug { "Stop polling after disposing." }
throw ex
} catch (ex: Throwable) {
LOG.error(ex) { "Failed polling branch $branchRef" }
nextDelayMs = (nextDelayMs * 3 / 2).coerceIn(1000, 30000)
LOG.error(ex) { "Failed polling" }
nextDelayMs = (nextDelayMs * 3 / 2).coerceIn(POLLING_MIN_DELAY, POLLING_MAX_DELAY)
}
}
}
Expand All @@ -133,16 +172,23 @@ class ReplicatedModel(
return getVersionedModelTree()
}

/**
* Resets the local model to the latest version on the server.
*/
suspend fun resetToServerVersion() {
getLocalModel().resetToVersion(client.pull(branchRef, lastKnownVersion = null).upcast())
// This delegates to remoteVersion which handles pull/load
val version = remoteVersion.getInitialVersion()
getLocalModel().resetToVersion(version)
}

/**
* Returns true if this [ReplicatedModel] has been disposed.
*/
fun isDisposed(): Boolean = state == State.Disposed

private fun checkDisposed() {
if (state == State.Disposed) throw IllegalStateException("disposed")
}

/**
* Stops the synchronization and releases resources.
*/
fun dispose() {
if (state == State.Disposed) return
pollingJob?.cancel("disposed")
Expand Down Expand Up @@ -179,16 +225,19 @@ class ReplicatedModel(
private suspend fun pushLocalChanges() {
if (isDisposed()) return

for (attempt in 1..10) {
repeat(PUSH_MAX_ATTEMPTS) {
val version = getLocalModel().createNewLocalVersion() ?: getLocalModel().getCurrentVersion()
val received = remoteVersion.push(version)
if (received.getContentHash() == version.getContentHash()) return
remoteVersionReceived(received, version)
}

throw IllegalStateException("Failed to push local changes after 10 attempts")
error("Failed to push local changes after $PUSH_MAX_ATTEMPTS attempts")
}

/**
* Returns the current version of the local model.
*/
fun getCurrentVersion(): CLVersion {
return getLocalModel().getCurrentVersion()
}
Expand All @@ -202,13 +251,22 @@ class ReplicatedModel(

companion object {
private val LOG = KotlinLogging.logger { }
private const val POLLING_MIN_DELAY = 1000L
private const val POLLING_MAX_DELAY = 30000L
private const val PUSH_MAX_ATTEMPTS = 10
}
}

/**
* Creates a [ReplicatedModel] for the given branch.
*/
fun IModelClientV2.getReplicatedModel(branchRef: BranchReference, idGenerator: (TreeId) -> INodeIdGenerator<INodeReference>): ReplicatedModel {
return ReplicatedModel(this, branchRef, idGenerator)
}

/**
* Creates a [ReplicatedModel] for the given branch with a provided scope.
*/
fun IModelClientV2.getReplicatedModel(branchRef: BranchReference, idGenerator: (TreeId) -> INodeIdGenerator<INodeReference>, scope: CoroutineScope): ReplicatedModel {
return ReplicatedModel(this, branchRef, idGenerator, scope)
}
Expand Down Expand Up @@ -308,16 +366,22 @@ private class LocalModel(initialVersion: CLVersion, val versionIdGenerator: IIdG
}
}

private class RemoteVersion(
private interface IRemoteVersion {
suspend fun getInitialVersion(): CLVersion
suspend fun poll(): CLVersion
suspend fun push(version: CLVersion): CLVersion
}

private class RemoteVersionFromBranch(
val client: IModelClientV2,
val branchRef: BranchReference,
private var lastKnownRemoteVersion: CLVersion? = null,
) {
) : IRemoteVersion {
private val unconfirmedVersions: MutableSet<String> = LinkedHashSet()

fun getNumberOfUnconfirmed() = runSynchronized(unconfirmedVersions) { unconfirmedVersions.size }

suspend fun pull(): CLVersion {
override suspend fun getInitialVersion(): CLVersion {
return versionReceived(
client.pull(
branchRef,
Expand All @@ -332,11 +396,11 @@ private class RemoteVersion(
)
}

suspend fun poll(): CLVersion {
override suspend fun poll(): CLVersion {
return versionReceived(client.poll(branchRef, lastKnownVersion = lastKnownRemoteVersion).upcast())
}

suspend fun push(version: CLVersion): CLVersion {
override suspend fun push(version: CLVersion): CLVersion {
if (lastKnownRemoteVersion?.getContentHash() == version.getContentHash()) return version
runSynchronized(unconfirmedVersions) {
if (!unconfirmedVersions.add(version.getContentHash())) return version
Expand All @@ -359,4 +423,23 @@ private class RemoteVersion(
}
}

private class RemoteVersionFromHash(
val client: IModelClientV2,
val repositoryId: RepositoryId,
val versionHash: String,
) : IRemoteVersion {
override suspend fun getInitialVersion(): CLVersion {
return client.lazyLoadVersion(repositoryId, versionHash) as CLVersion
}

override suspend fun poll(): CLVersion {
// let's pretent to do something. The version is actually immutable and won't ever change…
awaitCancellation()
}

override suspend fun push(version: CLVersion): CLVersion {
throw UnsupportedOperationException("Read-only model")
}
}

private fun IVersion.upcast(): CLVersion = this as CLVersion
Loading
Loading