diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs index 5221045a9..284fb348e 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs @@ -613,29 +613,20 @@ public partial class AsyncObserver var nullGate = new object(); var nullGroup = default(IAsyncSubject); + bool observerComplete = false; async ValueTask OnErrorAsync(Exception ex) { - var nullGroupLocal = default(IAsyncSubject); - - lock (nullGate) - { - nullGroupLocal = nullGroup; - } - - if (nullGroupLocal != null) - { - await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false); - } - - foreach (var group in groups.Values) - { - await group.OnErrorAsync(ex).ConfigureAwait(false); - } + await ErrorAndRemoveNullGroupIfPresentAsync(ex); + await ErrorAndRemoveAllGroupsIfPresentAsync(ex); using (await gate.LockAsync().ConfigureAwait(false)) { - await observer.OnErrorAsync(ex).ConfigureAwait(false); + if (!observerComplete) + { + observerComplete = true; + await observer.OnErrorAsync(ex).ConfigureAwait(false); + } } } @@ -714,7 +705,10 @@ async ValueTask OnErrorAsync(Exception ex) using (await gate.LockAsync().ConfigureAwait(false)) { - await observer.OnNextAsync(g).ConfigureAwait(false); + if (!observerComplete) + { + await observer.OnNextAsync(g).ConfigureAwait(false); + } } var durationSubscription = new SingleAssignmentAsyncDisposable(); @@ -723,18 +717,7 @@ async ValueTask Expire() { if (key == null) { - var oldNullGroup = default(IAsyncSubject); - - lock (nullGate) - { - oldNullGroup = nullGroup; - nullGroup = null; - } - - if (oldNullGroup != null) - { - await oldNullGroup.OnCompletedAsync().ConfigureAwait(false); - } + await CompleteAndRemoveNullGroupIfPresentAsync(); } else { @@ -778,22 +761,78 @@ async ValueTask Expire() { if (nullGroup != null) { - await nullGroup.OnCompletedAsync().ConfigureAwait(false); + await CompleteAndRemoveNullGroupIfPresentAsync(); } - foreach (var group in groups.Values) - { - await group.OnCompletedAsync().ConfigureAwait(false); - } + await CompleteAndRemoveAllGroupsIfPresentAsync(); using (await gate.LockAsync().ConfigureAwait(false)) { - await observer.OnCompletedAsync().ConfigureAwait(false); + if (!observerComplete) + { + observerComplete = true; + await observer.OnCompletedAsync().ConfigureAwait(false); + } } } ), refCount ); + + ValueTask CompleteAndRemoveNullGroupIfPresentAsync() => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(null); + ValueTask ErrorAndRemoveNullGroupIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(x); + async ValueTask CompleteOrErrorAndRemoveNullGroupIfPresentAsync(Exception x) + { + var oldNullGroup = default(IAsyncSubject); + + lock (nullGate) + { + oldNullGroup = nullGroup; + nullGroup = null; + } + + if (oldNullGroup != null) + { + if (x is null) + { + await oldNullGroup.OnCompletedAsync().ConfigureAwait(false); + } + else + { + await oldNullGroup.OnErrorAsync(x).ConfigureAwait(false); + } + } + } + + ValueTask CompleteAndRemoveAllGroupsIfPresentAsync() => CompleteOrErrorAndRemoveAllGroupsAsync(null); + ValueTask ErrorAndRemoveAllGroupsIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveAllGroupsAsync(x); + async ValueTask CompleteOrErrorAndRemoveAllGroupsAsync(Exception x) + { + foreach (var key in groups.Keys) + { + // The ConcurrentDictionary's Keys property is a snapshot, so + // although this TryRemove should always succeed for the first + // key in the dictionary (as long as our upstream observable is + // obeying the rules, and not making multiple concurrent calls + // to our observer) each await in this loop offers an opportunity + // for one of the group duration observables to complete, which + // will cause the Expire method above to run, meaning that an + // entry that was present when we retrieved Keys at the start of + // this loop might already have been completed and removed by the + // time this loop reaches it. + if (groups.TryRemove(key, out var group)) + { + if (x is null) + { + await group.OnCompletedAsync().ConfigureAwait(false); + } + else + { + await group.OnErrorAsync(x).ConfigureAwait(false); + } + } + } + } } } } diff --git a/azure-pipelines.asyncrx.yml b/azure-pipelines.asyncrx.yml index 5ff9cbb58..437a55270 100644 --- a/azure-pipelines.asyncrx.yml +++ b/azure-pipelines.asyncrx.yml @@ -33,12 +33,27 @@ stages: DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true steps: + - task: UseDotNet@2 + displayName: Use .NET 8.0.x SDK + inputs: + version: 8.0.x + performMultiLevelLookup: true + + # We need .NET 7.0 and 6.0 to be able to run all tests. + # For .NET 7.0, the runtime package is sufficient because we don't need to build anything. + # That doesn't work for 6.0, because we need the desktop framework, and the only way to + # get that into a build agent seems to be to install the SDK. - task: UseDotNet@2 displayName: Use .NET Core 7.0.x SDK inputs: version: 7.0.x performMultiLevelLookup: true + - task: UseDotNet@2 + displayName: Use .NET 6.0 SDK + inputs: + version: '6.0.x' + - task: DotNetCoreCLI@2 inputs: command: custom