From 7ed39d0c47d01f5a184233ad426c9afb7fec01d0 Mon Sep 17 00:00:00 2001 From: Laura Hausmann Date: Sun, 16 Jun 2024 16:53:27 +0200 Subject: [PATCH] [backend/queue] Fix AsyncAutoResetEvent handling, fix stalled queue workers causing no new workers to be spawned --- .../Core/Helpers/EventHelpers.cs | 35 +++++-- .../Core/Services/QueueService.cs | 14 ++- Iceshrimp.Tests/Concurrency/EventTests.cs | 93 +++++++++++++++++++ 3 files changed, 135 insertions(+), 7 deletions(-) create mode 100644 Iceshrimp.Tests/Concurrency/EventTests.cs diff --git a/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs b/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs index 562e062f..c0310977 100644 --- a/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs +++ b/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs @@ -2,7 +2,9 @@ namespace Iceshrimp.Backend.Core.Helpers; public sealed class AsyncAutoResetEvent(bool signaled = false) { - private readonly List> _taskCompletionSources = []; + private readonly List> _taskCompletionSources = []; + private readonly List> _noResetTaskCompletionSources = []; + public bool Signaled => signaled; public Task WaitAsync(CancellationToken cancellationToken = default) { @@ -21,19 +23,39 @@ public sealed class AsyncAutoResetEvent(bool signaled = false) } } + public Task WaitWithoutResetAsync(CancellationToken cancellationToken = default) + { + lock (_taskCompletionSources) + { + if (signaled) + return Task.FromResult(true); + + var tcs = new TaskCompletionSource(); + cancellationToken.Register(Callback, (this, tcs)); + _noResetTaskCompletionSources.Add(tcs); + return tcs.Task; + } + } + public void Set() { lock (_taskCompletionSources) { - if (_taskCompletionSources.Count > 0) + signaled = true; + foreach (var tcs in _noResetTaskCompletionSources.ToList()) { - var tcs = _taskCompletionSources[0]; - _taskCompletionSources.RemoveAt(0); + _noResetTaskCompletionSources.Remove(tcs); tcs.TrySetResult(true); - return; } - signaled = true; + if (_taskCompletionSources.Count == 0) return; + + signaled = false; + foreach (var tcs in _taskCompletionSources.ToList()) + { + _taskCompletionSources.Remove(tcs); + tcs.TrySetResult(true); + } } } @@ -45,6 +67,7 @@ public sealed class AsyncAutoResetEvent(bool signaled = false) if (tcs.Task.IsCompleted) return; tcs.TrySetCanceled(); ev._taskCompletionSources.Remove(tcs); + ev._noResetTaskCompletionSources.Remove(tcs); } } } \ No newline at end of file diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index ce1f12eb..8b3e52a8 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -259,8 +259,20 @@ public class PostgresJobQueue( continue; } + // ReSharper disable MethodSupportsCancellation + var queuedChannelCts = new CancellationTokenSource(); + if (actualParallelism < parallelism) + { + _ = _queuedChannel.WaitWithoutResetAsync() + .ContinueWith(_ => { queuedChannelCts.Cancel(); }) + .SafeWaitAsync(queueToken); + } + // ReSharper restore MethodSupportsCancellation + var tasks = TaskExtensions.QueueMany(() => AttemptProcessJobAsync(token), actualParallelism); - await Task.WhenAny(tasks).SafeWaitAsync(queueToken, () => Task.WhenAll(tasks).WaitAsync(token)); + await Task.WhenAny(tasks) + .SafeWaitAsync(queuedChannelCts.Token) + .SafeWaitAsync(queueToken, () => Task.WhenAll(tasks).WaitAsync(token)); } catch (Exception e) { diff --git a/Iceshrimp.Tests/Concurrency/EventTests.cs b/Iceshrimp.Tests/Concurrency/EventTests.cs new file mode 100644 index 00000000..03887607 --- /dev/null +++ b/Iceshrimp.Tests/Concurrency/EventTests.cs @@ -0,0 +1,93 @@ +using Iceshrimp.Backend.Core.Helpers; + +namespace Iceshrimp.Tests.Concurrency; + +[TestClass] +public class EventTests +{ + [TestMethod] + public async Task TestAsyncAutoResetEvent() + { + var autoResetEvent = new AsyncAutoResetEvent(); + var pre = DateTime.Now; + var task = autoResetEvent.WaitAsync(); + _ = Task.Run(async () => + { + await Task.Delay(50); + autoResetEvent.Set(); + }); + await task; + (DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50)); + autoResetEvent.Signaled.Should().BeFalse(); + } + + [TestMethod] + public async Task TestAsyncAutoResetEventWithoutReset() + { + var autoResetEvent = new AsyncAutoResetEvent(); + var pre = DateTime.Now; + var task = autoResetEvent.WaitWithoutResetAsync(); + _ = Task.Run(async () => + { + await Task.Delay(50); + autoResetEvent.Set(); + }); + await task; + (DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50)); + autoResetEvent.Signaled.Should().BeTrue(); + } + + [TestMethod] + public async Task TestAsyncAutoResetEventMulti() + { + var autoResetEvent = new AsyncAutoResetEvent(); + var pre = DateTime.Now; + Task[] tasks = [autoResetEvent.WaitAsync(), autoResetEvent.WaitAsync()]; + _ = Task.Run(async () => + { + await Task.Delay(50); + autoResetEvent.Set(); + }); + await Task.WhenAll(tasks); + (DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50)); + autoResetEvent.Signaled.Should().BeFalse(); + } + + [TestMethod] + public async Task TestAsyncAutoResetEventWithoutResetMulti() + { + var autoResetEvent = new AsyncAutoResetEvent(); + var pre = DateTime.Now; + Task[] tasks = [autoResetEvent.WaitWithoutResetAsync(), autoResetEvent.WaitWithoutResetAsync()]; + _ = Task.Run(async () => + { + await Task.Delay(50); + autoResetEvent.Set(); + }); + await Task.WhenAll(tasks); + (DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50)); + autoResetEvent.Signaled.Should().BeTrue(); + } + + [TestMethod] + public async Task TestAsyncAutoResetEventPre() + { + var autoResetEvent = new AsyncAutoResetEvent(); + autoResetEvent.Set(); + await autoResetEvent.WaitAsync(); + autoResetEvent.Signaled.Should().BeFalse(); + await Assert.ThrowsExceptionAsync(() => autoResetEvent.WaitAsync() + .WaitAsync(TimeSpan.FromMilliseconds(50))); + } + + [TestMethod] + public async Task TestAsyncAutoResetEventPreMulti() + { + var autoResetEvent = new AsyncAutoResetEvent(); + autoResetEvent.Set(); + await autoResetEvent.WaitWithoutResetAsync(); + autoResetEvent.Signaled.Should().BeTrue(); + await autoResetEvent.WaitWithoutResetAsync(); + autoResetEvent.Signaled.Should().BeTrue(); + } +} \ No newline at end of file