diff --git a/Iceshrimp.Backend/Core/Extensions/TaskExtensions.cs b/Iceshrimp.Backend/Core/Extensions/TaskExtensions.cs index 9f8f59d0..c794fa1f 100644 --- a/Iceshrimp.Backend/Core/Extensions/TaskExtensions.cs +++ b/Iceshrimp.Backend/Core/Extensions/TaskExtensions.cs @@ -13,7 +13,7 @@ public static class TaskExtensions // ignored } } - + public static async Task SafeWaitAsync(this Task task, CancellationToken token) { try @@ -25,7 +25,7 @@ public static class TaskExtensions // ignored } } - + public static async Task SafeWaitAsync(this Task task, CancellationToken token, Action action) { try @@ -37,7 +37,7 @@ public static class TaskExtensions action(); } } - + public static async Task SafeWaitAsync(this Task task, CancellationToken token, Func action) { try diff --git a/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs b/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs index 70c27c17..562e062f 100644 --- a/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs +++ b/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs @@ -1,57 +1,50 @@ namespace Iceshrimp.Backend.Core.Helpers; -public sealed class AsyncAutoResetEvent(bool signaled) +public sealed class AsyncAutoResetEvent(bool signaled = false) { - private readonly Queue _queue = new(); + private readonly List> _taskCompletionSources = []; - public Task WaitAsync(CancellationToken cancellationToken = default) + public Task WaitAsync(CancellationToken cancellationToken = default) { - lock (_queue) + lock (_taskCompletionSources) { if (signaled) { signaled = false; - return Task.CompletedTask; + return Task.FromResult(true); } - var tcs = new TaskCompletionSource(); - if (cancellationToken.CanBeCanceled) - { - // If the token is cancelled, cancel the waiter. - var registration = - cancellationToken.Register(() => tcs.TrySetCanceled(), false); - - // If the waiter completes or faults, unregister our interest in cancellation. - tcs.Task.ContinueWith( - _ => registration.Unregister(), - cancellationToken, - TaskContinuationOptions.OnlyOnRanToCompletion | - TaskContinuationOptions.NotOnFaulted, - TaskScheduler.Default); - } - - _queue.Enqueue(tcs); + var tcs = new TaskCompletionSource(); + cancellationToken.Register(Callback, (this, tcs)); + _taskCompletionSources.Add(tcs); return tcs.Task; } } public void Set() { - TaskCompletionSource? toRelease = null; - - lock (_queue) + lock (_taskCompletionSources) { - if (_queue.Count > 0) + if (_taskCompletionSources.Count > 0) { - toRelease = _queue.Dequeue(); + var tcs = _taskCompletionSources[0]; + _taskCompletionSources.RemoveAt(0); + tcs.TrySetResult(true); + return; } - else if (!signaled) - { - signaled = true; - } - } - // It's possible that the TCS has already been cancelled. - toRelease?.TrySetResult(); + signaled = true; + } + } + + private static void Callback(object? state) + { + var (ev, tcs) = ((AsyncAutoResetEvent, TaskCompletionSource))state!; + lock (ev._taskCompletionSources) + { + if (tcs.Task.IsCompleted) return; + tcs.TrySetCanceled(); + ev._taskCompletionSources.Remove(tcs); + } } } \ No newline at end of file diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index 152b1d27..0b99900d 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -203,8 +203,8 @@ public class PostgresJobQueue( int parallelism ) : IPostgresJobQueue where T : class { - private readonly AsyncAutoResetEvent _delayedChannel = new(false); - private readonly AsyncAutoResetEvent _queuedChannel = new(false); + private readonly AsyncAutoResetEvent _delayedChannel = new(); + private readonly AsyncAutoResetEvent _queuedChannel = new(); private IServiceScopeFactory _scopeFactory = null!; public string Name => name; private string? _workerId;