diff --git a/signals/src/main/java/com/vaadin/signals/AbstractSignal.java b/signals/src/main/java/com/vaadin/signals/AbstractSignal.java index dbe13be2389..e3064d9b51e 100644 --- a/signals/src/main/java/com/vaadin/signals/AbstractSignal.java +++ b/signals/src/main/java/com/vaadin/signals/AbstractSignal.java @@ -135,13 +135,7 @@ public T value() { return value; } - /** - * Reads the value without setting up any dependencies. This method returns - * the same value as {@link #value()} but without creating a dependency when - * used inside a transaction, effect or computed signal. - * - * @return the signal value - */ + @Override public T peek() { return extractValue(data(Transaction.getCurrent())); } diff --git a/signals/src/main/java/com/vaadin/signals/NumberSignal.java b/signals/src/main/java/com/vaadin/signals/NumberSignal.java index f08647346a3..fbddf8bebbc 100644 --- a/signals/src/main/java/com/vaadin/signals/NumberSignal.java +++ b/signals/src/main/java/com/vaadin/signals/NumberSignal.java @@ -148,6 +148,11 @@ public NumberSignal withValidator(Predicate validator) { * @return the new readonly number signal, not null */ public NumberSignal asReadonly() { + /* + * While this method could semantically be declared to return a less + * specific type that doesn't provide mutator methods, that would also + * remove access to e.g. the valueAsInt method. + */ return withValidator(anything -> false); } diff --git a/signals/src/main/java/com/vaadin/signals/ReferenceSignal.java b/signals/src/main/java/com/vaadin/signals/ReferenceSignal.java new file mode 100644 index 00000000000..84dcf6a2970 --- /dev/null +++ b/signals/src/main/java/com/vaadin/signals/ReferenceSignal.java @@ -0,0 +1,296 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.signals; + +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.UnaryOperator; + +import com.vaadin.signals.impl.Transaction; +import com.vaadin.signals.impl.TransientListener; +import com.vaadin.signals.impl.UsageTracker; +import com.vaadin.signals.impl.UsageTracker.Usage; +import com.vaadin.signals.operations.CancelableOperation; +import com.vaadin.signals.operations.SignalOperation; + +/** + * A writable signal that holds a reference to an object. + *

+ * Changing the signal to reference another immutable value is an atomic + * operation. It is safe to concurrently read and write the signal value from + * multiple threads. + *

+ * The signal can also be used with mutable values in which case no thread + * safety is provided. Mutations must be done through {@link #modify(Consumer)} + * to ensure dependents are informed after the modification is applied. + *

+ * Reference signals can't be used inside signal transactions. + *

+ * All operation objects returned from methods on this class are resolved + * immediately. + * + * @param + * the signal value type + */ +public class ReferenceSignal implements WritableSignal { + + private T value; + private int version; + private boolean modifyRunning = false; + + private final List listeners = new ArrayList<>(); + // package-protected for testing + final ReentrantLock lock = new ReentrantLock(); + + public ReferenceSignal(T initialValue) { + this.value = initialValue; + } + + public ReferenceSignal() { + this(null); + } + + private void checkPreconditions() { + assert lock.isHeldByCurrentThread(); + + if (Transaction.inTransaction()) { + throw new IllegalStateException( + "ReferenceSignal cannot be used inside signal transactions."); + } + + if (modifyRunning) { + throw new ConcurrentModificationException(); + } + } + + @Override + public T value() { + lock.lock(); + try { + checkPreconditions(); + + if (UsageTracker.isActive()) { + UsageTracker.registerUsage(createUsage(version)); + } + + return value; + } finally { + lock.unlock(); + } + } + + private Usage createUsage(int originalVersion) { + return new Usage() { + @Override + public boolean hasChanges() { + lock.lock(); + boolean hasChanges = version != originalVersion; + lock.unlock(); + + return hasChanges; + } + + @Override + public Runnable onNextChange(TransientListener listener) { + lock.lock(); + try { + if (hasChanges()) { + boolean keep = listener.invoke(true); + if (!keep) { + return () -> { + }; + } + } + + listeners.add(listener); + return () -> { + lock.lock(); + try { + listeners.remove(listener); + } finally { + lock.unlock(); + } + }; + + } finally { + lock.unlock(); + } + } + }; + } + + @Override + public T peek() { + lock.lock(); + try { + checkPreconditions(); + + return value; + } finally { + lock.unlock(); + } + } + + private void setAndNotify(T newValue) { + assert lock.isHeldByCurrentThread(); + + this.value = newValue; + + version++; + + List copy = List.copyOf(listeners); + listeners.clear(); + for (var listener : copy) { + boolean keep = listener.invoke(false); + + if (keep) { + listeners.add(listener); + } + } + } + + @Override + public SignalOperation value(T value) { + lock.lock(); + try { + checkPreconditions(); + + T oldValue = this.value; + + setAndNotify(value); + + return new SignalOperation<>( + new SignalOperation.Result<>(oldValue)); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + *

+ * Comparison between the expected value and the new value is performed + * using {@link #equals(Object)}. + */ + @Override + public SignalOperation replace(T expectedValue, T newValue) { + lock.lock(); + try { + checkPreconditions(); + + if (Objects.equals(expectedValue, value)) { + setAndNotify(newValue); + return new SignalOperation<>( + new SignalOperation.Result<>(null)); + } else { + return new SignalOperation<>( + new SignalOperation.Error<>("Unexpected value")); + } + } finally { + lock.unlock(); + } + } + + /** + * Updates the signal value based on the given callback. The callback + * receives the current signal value and returns the new value to use. This + * implementation acquires a lock while running the updater which means that + * it's never necessary to run the callback again. This also means that + * canceling the returned operation will never have any effect. + *

+ * The result of the returned operation is resolved with the same value that + * was passed to the updater callback. + * + * @param updater + * the value update callback, not null + * @return an operation containing the result + */ + @Override + public synchronized CancelableOperation update( + UnaryOperator updater) { + Objects.requireNonNull(updater); + lock.lock(); + try { + checkPreconditions(); + + T oldValue = this.value; + T newValue = updater.apply(oldValue); + if (newValue != oldValue) { + setAndNotify(newValue); + } + + CancelableOperation operation = new CancelableOperation<>(); + operation.result().complete(new SignalOperation.Result<>(oldValue)); + return operation; + } finally { + lock.unlock(); + } + } + + /** + * Runs the given callback to apply changes to a mutable referenced value + * and then notifies dependents. + *

+ * This method is only intended for cases where concurrency is limited + * through other means, such as Vaadin's session lock. Using this method + * concurrently with any other methods on the same instance may, but is not + * guaranteed to, cause an {@link ConcurrentModificationException}. The + * exception can be thrown either from this method or from the other invoked + * method. This can happen even if the other method is safe for concurrent + * use. + * + * @param modifier + * a callback that receives the current value to modify, not + * null + */ + public void modify(Consumer modifier) { + Objects.requireNonNull(modifier); + + if (!lock.tryLock()) { + throw new ConcurrentModificationException(); + } + try { + checkPreconditions(); + + modifyRunning = true; + } finally { + lock.unlock(); + } + + boolean completed = false; + try { + modifier.accept(value); + + completed = true; + } finally { + lock.lock(); + try { + modifyRunning = false; + + if (completed) { + setAndNotify(value); + } + + } finally { + lock.unlock(); + } + } + } +} diff --git a/signals/src/main/java/com/vaadin/signals/Signal.java b/signals/src/main/java/com/vaadin/signals/Signal.java index 1511407c701..8303a5c3b0e 100644 --- a/signals/src/main/java/com/vaadin/signals/Signal.java +++ b/signals/src/main/java/com/vaadin/signals/Signal.java @@ -65,6 +65,21 @@ public interface Signal { */ T value(); + /** + * Reads the value without setting up any dependencies. This method returns + * the same value as {@link #value()} but without creating a dependency when + * used inside a transaction, effect or computed signal. + * + * @return the signal value + */ + default T peek() { + /* + * Subclasses are encouraged to use an approach with less overhead than + * what this very generic implementation can do. + */ + return untracked(() -> value()); + } + /** * Creates a simple computed signal based on a mapper function that is * passed the value of this signal. If the mapper function accesses other diff --git a/signals/src/main/java/com/vaadin/signals/ValueSignal.java b/signals/src/main/java/com/vaadin/signals/ValueSignal.java index 35d21fa9095..4387e30bc87 100644 --- a/signals/src/main/java/com/vaadin/signals/ValueSignal.java +++ b/signals/src/main/java/com/vaadin/signals/ValueSignal.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.function.UnaryOperator; @@ -37,7 +36,8 @@ * @param * the signal value type */ -public class ValueSignal extends AbstractSignal { +public class ValueSignal extends AbstractSignal + implements WritableSignal { private final Class valueType; /** @@ -89,15 +89,7 @@ protected ValueSignal(SignalTree tree, Id id, this.valueType = Objects.requireNonNull(valueType); } - /** - * Sets the value of this signal. The result of the returned operation will - * be resolved with the previous value at the time when this operation was - * confirmed. - * - * @param value - * the value to set - * @return an operation containing the eventual result - */ + @Override public SignalOperation value(T value) { assert value == null || valueType.isInstance(value); @@ -121,20 +113,7 @@ protected Object usageChangeValue(Data data) { return data.value(); } - /** - * Sets the value of this signal if and only if the signal has the expected - * value at the time when the operation is confirmed. This is the signal - * counterpart to {@link AtomicReference#compareAndSet(Object, Object)}. The - * result of the returned operation will be resolved as successful if the - * expected value was present and resolved as unsuccessful if any other - * value was present when the operation is processed. - * - * @param expectedValue - * the expected value - * @param newValue - * the new value - * @return an operation containing the eventual result - */ + @Override public SignalOperation replace(T expectedValue, T newValue) { var condition = new SignalCommand.ValueCondition(Id.random(), id(), toJson(expectedValue)); @@ -145,30 +124,7 @@ public SignalOperation replace(T expectedValue, T newValue) { List.of(condition, set))); } - /** - * Updates the signal value based on the given callback. The callback - * receives the current signal value and returns the new value to use. If - * the original value has changed by the time this change is confirmed, then - * the returned value is ignored and the callback is run again with the new - * value as input. This process is repeated until cancelled or until the - * update succeeds without conflicting changes. - *

- * The process can be cancelled through the returned operation instance. - * Note that canceling will only prevent further retries but the change will - * still be made if the currently running attempt succeeds. - *

- * The result of the returned operation will be resolved with the previous - * value at the time when a successful update operation was confirmed. - *

- * Update operations cannot participate in transactions since any retry - * would occur after the original transaction has already been committed. - * For this reason, the whole operation completely bypasses all transaction - * handling. - * - * @param updater - * the value update callback, not null - * @return an operation containing the eventual result - */ + @Override public CancelableOperation update(UnaryOperator updater) { CancelableOperation operation = new CancelableOperation<>(); @@ -244,15 +200,13 @@ public ValueSignal withValidator(Predicate validator) { valueType); } - /** - * Wraps this signal to not accept changes. - *

- * This signal will keep its current configuration and changes applied - * through this instance will be visible through the wrapped instance. - * - * @return the new readonly value signal, not null - */ + @Override public ValueSignal asReadonly() { + /* + * While this method could semantically be declared to return a less + * specific type that doesn't provide mutator methods, that would also + * remove access to e.g. the verifyValue method. + */ return withValidator(anything -> false); } diff --git a/signals/src/main/java/com/vaadin/signals/WritableSignal.java b/signals/src/main/java/com/vaadin/signals/WritableSignal.java new file mode 100644 index 00000000000..c711ca31277 --- /dev/null +++ b/signals/src/main/java/com/vaadin/signals/WritableSignal.java @@ -0,0 +1,95 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.signals; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; + +import com.vaadin.signals.operations.CancelableOperation; +import com.vaadin.signals.operations.SignalOperation; + +/** + * A signal to which a new value can be directly written. + * + * @param + * the signal value type + */ +public interface WritableSignal extends Signal { + /** + * Sets the value of this signal. The result of the returned operation will + * be resolved with the previous value at the time when this operation was + * confirmed. + * + * @param value + * the value to set + * @return an operation containing the eventual result + */ + SignalOperation value(T value); + + /** + * Sets the value of this signal if and only if the signal has the expected + * value at the time when the operation is confirmed. This is the signal + * counterpart to {@link AtomicReference#compareAndSet(Object, Object)}. The + * result of the returned operation will be resolved as successful if the + * expected value was present and resolved as unsuccessful if any other + * value was present when the operation is processed. + * + * @param expectedValue + * the expected value + * @param newValue + * the new value + * @return an operation containing the eventual result + */ + SignalOperation replace(T expectedValue, T newValue); + + /** + * Updates the signal value based on the given callback. The callback + * receives the current signal value and returns the new value to use. If + * the original value has changed by the time this change is confirmed, then + * the returned value is ignored and the callback is run again with the new + * value as input. This process is repeated until cancelled or until the + * update succeeds without conflicting changes. + *

+ * The process can be cancelled through the returned operation instance. + * Note that canceling will only prevent further retries but the change will + * still be made if the currently running attempt succeeds. + *

+ * The result of the returned operation will be resolved with the previous + * value at the time when a successful update operation was confirmed. + *

+ * Update operations cannot participate in transactions since any retry + * would occur after the original transaction has already been committed. + * For this reason, the whole operation completely bypasses all transaction + * handling. + * + * @param updater + * the value update callback, not null + * @return an operation containing the eventual result + */ + CancelableOperation update(UnaryOperator updater); + + /** + * Wraps this signal to not accept changes. + *

+ * This signal will keep its current configuration and changes applied + * through this instance will be visible through the wrapped instance. + * + * @return the new readonly signal, not null + */ + default Signal asReadonly() { + return () -> value(); + } +} diff --git a/signals/src/main/java/com/vaadin/signals/operations/SignalOperation.java b/signals/src/main/java/com/vaadin/signals/operations/SignalOperation.java index 406f313f9bd..3b6a36aa104 100644 --- a/signals/src/main/java/com/vaadin/signals/operations/SignalOperation.java +++ b/signals/src/main/java/com/vaadin/signals/operations/SignalOperation.java @@ -73,12 +73,24 @@ public boolean successful() { } } - private final CompletableFuture> result = new CompletableFuture<>(); + private final CompletableFuture> result; /** - * Creates a new signal operation. + * Creates a new signal operation that will get a result later. */ public SignalOperation() { + result = new CompletableFuture<>(); + } + + /** + * Creates a new signal operation that already has a result. + * + * @param resultOrError + * the result of the operation, not null + */ + public SignalOperation(ResultOrError resultOrError) { + assert resultOrError != null; + result = CompletableFuture.completedFuture(resultOrError); } /** diff --git a/signals/src/test/java/com/vaadin/signals/ReferenceSignalTest.java b/signals/src/test/java/com/vaadin/signals/ReferenceSignalTest.java new file mode 100644 index 00000000000..8a5e036ff8f --- /dev/null +++ b/signals/src/test/java/com/vaadin/signals/ReferenceSignalTest.java @@ -0,0 +1,451 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.signals; + +import java.util.ConcurrentModificationException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; + +import org.junit.jupiter.api.Test; + +import com.vaadin.signals.impl.UsageTracker; +import com.vaadin.signals.impl.UsageTracker.Usage; +import com.vaadin.signals.operations.CancelableOperation; +import com.vaadin.signals.operations.SignalOperation; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ReferenceSignalTest extends SignalTestBase { + + @Test + void constructor_noArgs_nullValue() { + ReferenceSignal signal = new ReferenceSignal<>(); + + assertNull(signal.value()); + } + + @Test + void constructor_initialValue_initialValueUsed() { + ReferenceSignal signal = new ReferenceSignal<>("value"); + + assertEquals("value", signal.value()); + } + + @Test + void setValue_valueUsed() { + ReferenceSignal signal = new ReferenceSignal<>(); + signal.value("value"); + + assertEquals("value", signal.value()); + } + + @Test + void setValue_oldValueInResult() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + SignalOperation operation = signal.value("update"); + + String resultValue = TestUtil.assertSuccess(operation); + assertEquals("initial", resultValue); + } + + @Test + void replace_expectedValue_valueUpdated() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + SignalOperation operation = signal.replace("initial", "update"); + + TestUtil.assertSuccess(operation); + assertEquals("update", signal.value()); + } + + @Test + void replace_otherValue_valueNotUpdated() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + SignalOperation operation = signal.replace("other", "update"); + + TestUtil.assertFailure(operation); + assertEquals("initial", signal.value()); + } + + @Test + void update_updatesTheValue() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + CancelableOperation operation = signal.update(oldValue -> { + assertEquals("initial", oldValue); + return "update"; + }); + + String oldValue = TestUtil.assertSuccess(operation); + assertEquals("initial", oldValue); + + assertEquals("update", signal.value()); + } + + @Test + void update_callbackThrows_exceptionPropagated() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + RuntimeException theException = new RuntimeException(); + + RuntimeException caught = assertThrows(RuntimeException.class, () -> { + signal.update(ignore -> { + throw theException; + }); + }); + + assertSame(theException, caught); + } + + @Test + void modify_modifiesValue_valueModified() { + String[] holder = new String[] { "initial" }; + ReferenceSignal signal = new ReferenceSignal<>(holder); + + signal.modify(value -> { + assertSame(holder, value); + holder[0] = "update"; + }); + + assertEquals("update", holder[0]); + assertSame(holder, signal.value()); + } + + @Test + void asReadonly_notWritable() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + Signal readonly = signal.asReadonly(); + + assertFalse(readonly instanceof WritableSignal); + } + + @Test + void asReadonly_changeSignal_readonlyUpdated() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + Signal readonly = signal.asReadonly(); + + signal.value("update"); + assertEquals("update", readonly.value()); + } + + @Test + void usageTracker_setNewValue_changeDetected() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.value(); + }); + + assertFalse(usage.hasChanges()); + + AtomicBoolean invoked = new AtomicBoolean(false); + usage.onNextChange(initial -> { + assertFalse(initial); + invoked.set(true); + return false; + }); + + signal.value("update"); + + assertTrue(usage.hasChanges()); + assertTrue(invoked.get()); + } + + @Test + void usageTracker_updateSameValue_noChangeDetected() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.value(); + }); + + AtomicBoolean invoked = new AtomicBoolean(false); + usage.onNextChange(initial -> { + assertFalse(initial); + invoked.set(true); + return false; + }); + + signal.update(x -> x); + + assertFalse(usage.hasChanges()); + assertFalse(invoked.get()); + } + + @Test + void usageTracker_listenToChangedUsage_initialFlagSet() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.value(); + }); + + signal.value("update"); + + AtomicBoolean invoked = new AtomicBoolean(false); + usage.onNextChange(initial -> { + assertTrue(initial); + invoked.set(true); + return false; + }); + + assertTrue(invoked.get()); + } + + @Test + void usageTracker_keepListening_listenerKept() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.value(); + }); + + signal.value("update1"); + + AtomicInteger count = new AtomicInteger(); + usage.onNextChange(ignore -> { + count.incrementAndGet(); + return true; + }); + + // Verify preserving after initial and trigger subsequent update + signal.value("update2"); + assertEquals(2, count.get()); + + // Verify subsequent update + signal.value("update3"); + assertEquals(3, count.get()); + } + + @Test + void usageTracker_stopAfterInitial_stopped() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.value(); + }); + + signal.value("update1"); + + AtomicInteger count = new AtomicInteger(); + usage.onNextChange(ignore -> { + count.incrementAndGet(); + return false; + }); + + assertEquals(1, count.intValue()); + + signal.value("update2"); + assertEquals(1, count.intValue()); + } + + @Test + void usageTracker_stopAfterSubsequent_stopped() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.value(); + }); + + AtomicInteger count = new AtomicInteger(); + usage.onNextChange(ignore -> { + count.incrementAndGet(); + return false; + }); + + signal.value("update1"); + assertEquals(1, count.intValue()); + + signal.value("update2"); + assertEquals(1, count.intValue()); + } + + @Test + void usageTracker_anyModify_detectedAsAChange() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.value(); + }); + + signal.modify(value -> { + }); + assertTrue(usage.hasChanges()); + } + + @Test + void usageTracker_peek_noUsageDetected() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Usage usage = UsageTracker.track(() -> { + signal.peek(); + }); + + assertSame(UsageTracker.NO_USAGE, usage); + } + + @Test + void concurrency_updateHoldsLock() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + signal.update(value -> { + assertTrue(signal.lock.isHeldByCurrentThread()); + return value; + }); + assertFalse(signal.lock.isHeldByCurrentThread()); + } + + @Test + void concurrency_lockHeld_operationsAreBlocked() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + signal.lock.lock(); + + AtomicInteger completed = new AtomicInteger(); + + Thread.startVirtualThread(() -> { + signal.value(); + completed.incrementAndGet(); + }); + + Thread.startVirtualThread(() -> { + signal.peek(); + completed.incrementAndGet(); + }); + + Thread.startVirtualThread(() -> { + signal.value("update"); + completed.incrementAndGet(); + }); + + Thread.startVirtualThread(() -> { + signal.replace("foo", "bar"); + completed.incrementAndGet(); + }); + + Thread.startVirtualThread(() -> { + signal.update(x -> x); + completed.incrementAndGet(); + }); + + // Wait for all threads to start + assertEventually(() -> signal.lock.getQueueLength() == 5); + assertEquals(0, completed.get()); + + signal.lock.unlock(); + + // Wait for all threads to complete + assertEventually(() -> completed.get() == 5); + assertEquals(0, signal.lock.getQueueLength()); + } + + @Test + void concurrency_modifyWhileLocked_modifyThrowsEagerly() + throws InterruptedException { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Thread lockThread = Thread.startVirtualThread(() -> signal.lock.lock()); + // Wait until locked + lockThread.join(); + + assertThrows(ConcurrentModificationException.class, () -> { + signal.modify(x -> { + fail("Should never get here"); + }); + }); + } + + @Test + void concurrency_otherUsageWhileModifying_otherUsageThrows() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + Semaphore modifyStarted = new Semaphore(0); + Semaphore modifyCanProceed = new Semaphore(0); + + // Modify on another thread + Thread.startVirtualThread(() -> { + signal.modify(value -> { + modifyStarted.release(); + // Block until all assertions are done + modifyCanProceed.acquireUninterruptibly(); + }); + }); + + // Wait until other thread is inside the modify method + modifyStarted.acquireUninterruptibly(); + + assertThrows(ConcurrentModificationException.class, + () -> signal.value()); + assertThrows(ConcurrentModificationException.class, + () -> signal.peek()); + assertThrows(ConcurrentModificationException.class, + () -> signal.value("update")); + assertThrows(ConcurrentModificationException.class, + () -> signal.replace("foo", "bar")); + assertThrows(ConcurrentModificationException.class, + () -> signal.update(x -> x)); + assertThrows(ConcurrentModificationException.class, + () -> signal.modify(x -> { + })); + + modifyCanProceed.release(); + } + + @Test + void transactions_readSignalInTransaction_throws() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + assertThrows(IllegalStateException.class, () -> { + Signal.runInTransaction(() -> { + signal.value(); + }); + }); + } + + @Test + void transactions_writeSignalInTransaction_throws() { + ReferenceSignal signal = new ReferenceSignal<>("initial"); + + assertThrows(IllegalStateException.class, () -> { + Signal.runInTransaction(() -> { + signal.value("update"); + }); + }); + } + + private static void assertEventually(BooleanSupplier test) { + for (int i = 0; i < 10; i++) { + if (test.getAsBoolean()) { + return; + } + try { + Thread.sleep(i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + fail(); + } +}