diff --git a/core/src/main/scala/com/phaller/rasync/Cell.scala b/core/src/main/scala/com/phaller/rasync/Cell.scala index 27d9525..ae940d2 100644 --- a/core/src/main/scala/com/phaller/rasync/Cell.scala +++ b/core/src/main/scala/com/phaller/rasync/Cell.scala @@ -1,6 +1,6 @@ package com.phaller.rasync -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } import java.util.concurrent.{ CountDownLatch, ExecutionException } import scala.annotation.tailrec @@ -26,7 +26,7 @@ trait Cell[K <: Key[V], V] { def isComplete: Boolean /** - * Adds a dependency on some `other` cell. + * Adds a dependency on some `other` cell, if such dependency does not exist yet. * * Example: * {{{ @@ -38,12 +38,13 @@ trait Cell[K <: Key[V], V] { * * @param other Cell that `this` Cell depends on. * @param valueCallback Callback that receives the final value of `other` and returns an `Outcome` for `this` cell. + * @return Returns true, iff the dependency was registered successfully. */ - def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit - def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit + def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean + def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean /** - * Adds a dependency on some `other` cell. + * Adds a dependency on some `other` cell, if such dependency does not exist yet. * * Example: * {{{ @@ -55,9 +56,10 @@ trait Cell[K <: Key[V], V] { * * @param other Cell that `this` Cell depends on. * @param valueCallback Callback that receives the new value of `other` and returns an `Outcome` for `this` cell. + * @return Returns true, iff the dependency was registered successfully. */ - def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit - def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit + def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean + def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean /** * Adds a dependency on some `other` cell. @@ -73,9 +75,10 @@ trait Cell[K <: Key[V], V] { * * @param other Cell that `this` Cell depends on. * @param valueCallback Callback that receives the new value of `other` and returns an `Outcome` for `this` cell. + * @return Returns true, iff the dependency was registered successfully. */ - def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit - def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit + def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean + def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean def zipFinal(that: Cell[K, V]): Cell[DefaultKey[(V, V)], (V, V)] @@ -118,6 +121,9 @@ trait Cell[K <: Key[V], V] { private[rasync] def removeAllCallbacks(cells: Seq[Cell[K, V]]): Unit def isADependee(): Boolean + + private[rasync] def incIncomingCallbacks(): Unit + private[rasync] def decIncomingCallbacks(): Unit } object Cell { @@ -197,6 +203,11 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U */ private val state = new AtomicReference[AnyRef](State.empty[K, V](updater)) + /* first element is the number of incoming callbacks, + * second element is the value that has been propagated to dependent cells (or initially bottom) + */ + private val numIncomingCallbacks = new AtomicReference[(Int, V)]((0, updater.bottom)) + // `CellCompleter` and corresponding `Cell` are the same run-time object. override def cell: Cell[K, V] = this @@ -333,90 +344,87 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U this.putFinal(value) } - override def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit = { - this.whenNext(other, valueCallback(_, false)) - this.whenComplete(other, valueCallback(_, true)) + override def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean = { + this.whenNext(other, valueCallback(_, false)) && this.whenComplete(other, valueCallback(_, true)) } - override def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit = { - this.whenNextSequential(other, valueCallback(_, false)) - this.whenCompleteSequential(other, valueCallback(_, true)) + override def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean = { + this.whenNextSequential(other, valueCallback(_, false)) && this.whenCompleteSequential(other, valueCallback(_, true)) } - override def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = { + override def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = { this.whenNext(other, valueCallback, sequential = false) } - override def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = { + override def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = { this.whenNext(other, valueCallback, sequential = true) } - private def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Unit = { - var success = false - while (!success) { - state.get() match { - case finalRes: Try[_] => // completed with final result - // do not add dependency - // in fact, do nothing - success = true - - case raw: State[_, _] => // not completed - val newDep: NextDepRunnable[K, V] = - if (sequential) new NextSequentialDepRunnable(pool, this, other, valueCallback) - else new NextConcurrentDepRunnable(pool, this, other, valueCallback) - - val current = raw.asInstanceOf[State[K, V]] - val depRegistered = - if (current.nextDeps.contains(other)) true - else { - val newState = new State(current.res, current.tasksActive, current.completeDeps, current.completeCallbacks, current.nextDeps + other, current.nextCallbacks) - state.compareAndSet(current, newState) - } - if (depRegistered) { - success = true - other.addNextCallback(newDep, this) - pool.triggerExecution(other) - } + @tailrec + private def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Boolean = state.get() match { + case finalRes: Try[_] => // completed with final result + // do not add dependency + // in fact, do nothing + false + + case raw: State[_, _] => // not completed + val current = raw.asInstanceOf[State[K, V]] + + if (current.nextDeps.contains(other)) + // Do not register a dependency on the same cell twice + false + else { + val newDep: NextDepRunnable[K, V] = + if (sequential) new NextSequentialDepRunnable(pool, this, other, valueCallback) + else new NextConcurrentDepRunnable(pool, this, other, valueCallback) + + val newState = new State(current.res, current.tasksActive, current.completeDeps, current.completeCallbacks, current.nextDeps + other, current.nextCallbacks) + + if (state.compareAndSet(current, newState)) { + other.addNextCallback(newDep, this) + pool.triggerExecution(other) + true + } else { + whenNext(other, valueCallback, sequential) + } } - } } - override def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = { + override def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = { this.whenComplete(other, valueCallback, false) } - override def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = { + override def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = { this.whenComplete(other, valueCallback, true) } - private def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Unit = { - var success = false - while (!success) { - state.get() match { - case finalRes: Try[_] => // completed with final result - // do not add dependency - // in fact, do nothing - success = true - - case raw: State[_, _] => // not completed - val newDep: CompleteDepRunnable[K, V] = - if (sequential) new CompleteSequentialDepRunnable(pool, this, other, valueCallback) - else new CompleteConcurrentDepRunnable(pool, this, other, valueCallback) - - val current = raw.asInstanceOf[State[K, V]] - val depRegistered = - if (current.completeDeps.contains(other)) true - else { - val newState = new State(current.res, current.tasksActive, current.completeDeps + other, current.completeCallbacks, current.nextDeps, current.nextCallbacks) - state.compareAndSet(current, newState) - } - if (depRegistered) { - success = true - other.addCompleteCallback(newDep, this) - pool.triggerExecution(other) - } + @tailrec + private def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Boolean = state.get() match { + case finalRes: Try[_] => // completed with final result + // do not add dependency + // in fact, do nothing + false + + case raw: State[_, _] => // not completed + val current = raw.asInstanceOf[State[K, V]] + + if (current.completeDeps.contains(other)) + // Do not register a dependency on the same cell twice + false + else { + val newDep: CompleteDepRunnable[K, V] = + if (sequential) new CompleteSequentialDepRunnable(pool, this, other, valueCallback) + else new CompleteConcurrentDepRunnable(pool, this, other, valueCallback) + val newState = new State(current.res, current.tasksActive, current.completeDeps + other, current.completeCallbacks, current.nextDeps, current.nextCallbacks) + + if (state.compareAndSet(current, newState)) { + other.addCompleteCallback(newDep, this) + pool.triggerExecution(other) + true + } else { + whenComplete(other, valueCallback, sequential) + } } - } } override private[rasync] def addCompleteCallback(callback: CompleteCallbackRunnable[K, V], cell: Cell[K, V]): Unit = { @@ -461,10 +469,11 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U if (!state.compareAndSet(current, newState)) { tryNewState(value) } else { - // CAS was successful, so there was a point in time where `newVal` was in the cell - current.nextCallbacks.values.foreach { callbacks => - callbacks.foreach(callback => callback.execute()) - } + // If we came here via a direct putNext (instead of Outcome of a whenNextCallback) + // this incoming change has not been counted. So we need to manually start outgoing callbacks. + // (This might lead to duplicate invocation.) + if (numIncomingCallbacks.get()._1 <= 0) + triggerNextCallbacks() true } } else true @@ -503,21 +512,12 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U res case (pre: State[K, V], newVal: Try[V]) => - val nextCallbacks = pre.nextCallbacks - val completeCallbacks = pre.completeCallbacks - - if (nextCallbacks.nonEmpty) - nextCallbacks.values.foreach { callbacks => - callbacks.foreach(callback => callback.execute()) - } - if (completeCallbacks.nonEmpty) - completeCallbacks.values.foreach { callbacks => - callbacks.foreach(callback => callback.execute()) - } + triggerCompleteCallbacks(pre) val depsCells = pre.completeDeps val nextDepsCells = pre.nextDeps + // Other cells do not need to call us any more if (depsCells.nonEmpty) depsCells.foreach(_.removeCompleteCallbacks(this)) if (nextDepsCells.nonEmpty) @@ -531,6 +531,29 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U res } + // We need to pass the list of callbacks here, because they + // might have already been deleted in `state` while the cell has + // been set to its final value. + private def triggerCompleteCallbacks(pre: State[K, V]): Unit = { + pre.completeCallbacks.values.foreach { callbacks => + callbacks.foreach(callback => callback.execute()) + } + pre.nextCallbacks.values.foreach { callbacks => + callbacks.foreach(callback => callback.execute()) + } + } + + private def triggerNextCallbacks(): Unit = { + state.get() match { + case _: Try[_] => /* Meanwhile, cell has been completed. No need to trigger `next` callbacks any more. */ + case raw: State[_, _] => + val current = raw.asInstanceOf[State[K, V]] + current.nextCallbacks.values.foreach { callbacks => + callbacks.foreach(callback => callback.execute()) + } + } + } + @tailrec override private[rasync] final def removeDep(cell: Cell[K, V]): Unit = { state.get() match { @@ -544,7 +567,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U else if (newDeps.isEmpty) nodepslatch.countDown() - case _ => /* do nothing */ + case _ => /* `this` has been completed and therefore does not have any deps stored. */ } } @@ -561,7 +584,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U else if (newNextDeps.isEmpty) nonextdepslatch.countDown() - case _ => /* do nothing */ + case _ => /* `this` has been completed and therefore does not have any deps stored. */ } } @@ -735,4 +758,36 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U numCompleteCallbacks > 0 || numNextCallbacks > 0 } + /** Called, when a CallbackRunnable r with r.dependentCell == this has been started. */ + @tailrec + override final private[rasync] def incIncomingCallbacks(): Unit = { + val current = numIncomingCallbacks.get() + val next = (current._1 + 1, current._2) + + if (!numIncomingCallbacks.compareAndSet(current, next)) + incIncomingCallbacks() + } + + /** + * Called, when a CallbackRunnable r with r.dependentCell == this has been completed. + * Triggers all outgoing callbacks, if no more incoming callbacks are running. + */ + @tailrec + override final private[rasync] def decIncomingCallbacks(): Unit = { + val current = numIncomingCallbacks.get() + + // If we drop to zero, store the current result as the latest propagated value + val next = + if (current._1 == 1) + (0, getResult()) + else + (current._1 - 1, current._2) + + if (numIncomingCallbacks.compareAndSet(current, next)) { + // CAS was successfull. Call dependent cells, if we dropped to zero and + // have new information to propagated + if (current._1 == 1 && next._2 != current._2) + triggerNextCallbacks() + } else decIncomingCallbacks() + } } diff --git a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala index 595f279..5bdaab3 100644 --- a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala +++ b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala @@ -311,6 +311,8 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => // Submit task to the pool incSubmittedTasks() + // println(s"added task $task") + // Run the task pool.execute(new Runnable { def run(): Unit = { diff --git a/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala b/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala index bf1646a..e93f1ab 100644 --- a/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala +++ b/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala @@ -65,6 +65,7 @@ private[rasync] trait Dependency[K <: Key[V], V] { /** * To be run when `otherCell` gets its final update. + * * @param pool The handler pool that runs the callback function * @param dependentCell The cell, that depends on `otherCell`. * @param otherCell Cell that triggers this callback. @@ -83,7 +84,14 @@ private[rasync] abstract class CompleteCallbackRunnable[K <: Key[V], V]( def run(): Unit = { require(!started) // can't complete it twice started = true - callback(Success(otherCell.getResult())) + + if (dependentCell != null) // dependentCell == null for oncomplete callbacks + dependentCell.incIncomingCallbacks() + try + callback(Success(otherCell.getResult())) + finally + if (dependentCell != null) + dependentCell.decIncomingCallbacks() } } @@ -166,7 +174,13 @@ private[rasync] abstract class NextCallbackRunnable[K <: Key[V], V]( extends CallbackRunnable[K, V] { def run(): Unit = { - callback(Success(otherCell.getResult())) + if (dependentCell != null) // dependetCell == null for onnext callbacks + dependentCell.incIncomingCallbacks() + try + callback(Success(otherCell.getResult())) + finally + if (dependentCell != null) + dependentCell.decIncomingCallbacks() } } @@ -199,7 +213,8 @@ private[rasync] abstract class NextDepRunnable[K <: Key[V], V]( override val valueCallback: V => Outcome[V]) extends NextCallbackRunnable[K, V](pool, dependentCompleter.cell, otherCell, t => { t match { case Success(_) => - valueCallback(otherCell.getResult()) match { + val callbackResult = valueCallback(otherCell.getResult()) + callbackResult match { case NextOutcome(v) => dependentCompleter.putNext(v) case FinalOutcome(v) => diff --git a/core/src/test/scala/com/phaller/rasync/test/base.scala b/core/src/test/scala/com/phaller/rasync/test/base.scala index 4abf9fb..6891993 100644 --- a/core/src/test/scala/com/phaller/rasync/test/base.scala +++ b/core/src/test/scala/com/phaller/rasync/test/base.scala @@ -778,14 +778,20 @@ class BaseSuite extends FunSuite { val cell1 = completer1.cell cell1.trigger() - pool.execute(() => cell1.whenComplete(completer2.cell, x => { - NoOutcome - })) + pool.execute(() => { + cell1.whenComplete(completer2.cell, x => { + NoOutcome + }) + () + }) - pool.execute(() => cell1.whenNext(completer2.cell, x => { - if (x == Mutable) NextOutcome(Mutable) - else NoOutcome - })) + pool.execute(() => { + cell1.whenNext(completer2.cell, x => { + if (x == Mutable) NextOutcome(Mutable) + else NoOutcome + }) + () + }) pool.execute(() => completer2.putFinal(Mutable)) val fut = pool.quiescentResolveCell @@ -808,14 +814,20 @@ class BaseSuite extends FunSuite { val cell1 = completer1.cell - pool.execute(() => cell1.whenCompleteSequential(completer2.cell, x => { - NoOutcome - })) + pool.execute(() => { + cell1.whenCompleteSequential(completer2.cell, x => { + NoOutcome + }) + () + }) - pool.execute(() => cell1.whenNextSequential(completer2.cell, x => { - if (x == Mutable) NextOutcome(Mutable) - else NoOutcome - })) + pool.execute(() => { + cell1.whenNextSequential(completer2.cell, x => { + if (x == Mutable) NextOutcome(Mutable) + else NoOutcome + }) + () + }) pool.execute(() => completer2.putFinal(Mutable)) val fut = pool.quiescentResolveCycles @@ -864,10 +876,13 @@ class BaseSuite extends FunSuite { val cell1 = completer1.cell - pool.execute(() => cell1.whenNext(completer2.cell, x => { - if (x == Mutable) NextOutcome(Mutable) - else NoOutcome - })) + pool.execute(() => { + cell1.whenNext(completer2.cell, x => { + if (x == Mutable) NextOutcome(Mutable) + else NoOutcome + }) + () + }) pool.execute(() => completer2.putNext(Mutable)) val fut = pool.quiescentResolveDefaults @@ -1225,7 +1240,7 @@ class BaseSuite extends FunSuite { assert(cell1.isComplete) - pool.shutdown() + pool.onQuiescenceShutdown() } test("put: isFinal == true") { @@ -1626,10 +1641,51 @@ class BaseSuite extends FunSuite { pool.onQuiescenceShutdown() } + // test("whenNext: 2-cSCC with constant resolution") { + // val latch = new CountDownLatch(4) + // + // implicit val pool = new HandlerPool(1) + // + // val completer1 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey) + // val cell1 = completer1.cell + // val completer2 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey) + // val cell2 = completer2.cell + // + // + // + // // create a cSCC, assert that none of the callbacks get called again. + // cell1.whenNext(cell2, v => if (v != 10) { assert(false); NextOutcome(20) } else NoOutcome) + // cell2.whenNext(cell1, v => if (v != 10) { assert(false); NextOutcome(20) } else NoOutcome) + // + // // set unwanted values: + // completer1.putNext(10) + // completer2.putNext(10) + // + // for (c <- List(cell1, cell2)) + // c.onComplete { + // case Success(v) => + // assert(v === 0) + // assert(c.numNextDependencies === 0) + // latch.countDown() + // case Failure(e) => + // assert(false) + // latch.countDown() + // } + // + // pool.triggerExecution(cell1) + // + // // resolve cells + // val fut = pool.quiescentResolveCell + // Await.result(fut, 2.seconds) + // latch.await() + // + // pool.shutdown() + // } + test("whenNext: cSCC with constant resolution") { val latch = new CountDownLatch(4) - implicit val pool = new HandlerPool + implicit val pool = new HandlerPool(1) val completer1 = CellCompleter[StringIntKey, Int]("somekey1") val cell1 = completer1.cell @@ -1761,7 +1817,6 @@ class BaseSuite extends FunSuite { } // resolve cells - pool.whileQuiescentResolveDefault val fut = pool.quiescentResolveDefaults Await.result(fut, 2.second) latch.await()