[backend/queue] Improved AsyncAutoResetEvent implementation

This implementation improves the memory behavior of AsyncAutoResetEvents that fire in rapid succession.
This commit is contained in:
Laura Hausmann 2024-05-25 13:34:06 +02:00
parent c80630f66f
commit 35153a9080
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
3 changed files with 32 additions and 39 deletions

View file

@ -13,7 +13,7 @@ public static class TaskExtensions
// ignored // ignored
} }
} }
public static async Task SafeWaitAsync(this Task task, CancellationToken token) public static async Task SafeWaitAsync(this Task task, CancellationToken token)
{ {
try try
@ -25,7 +25,7 @@ public static class TaskExtensions
// ignored // ignored
} }
} }
public static async Task SafeWaitAsync(this Task task, CancellationToken token, Action action) public static async Task SafeWaitAsync(this Task task, CancellationToken token, Action action)
{ {
try try
@ -37,7 +37,7 @@ public static class TaskExtensions
action(); action();
} }
} }
public static async Task SafeWaitAsync(this Task task, CancellationToken token, Func<Task> action) public static async Task SafeWaitAsync(this Task task, CancellationToken token, Func<Task> action)
{ {
try try

View file

@ -1,57 +1,50 @@
namespace Iceshrimp.Backend.Core.Helpers; namespace Iceshrimp.Backend.Core.Helpers;
public sealed class AsyncAutoResetEvent(bool signaled) public sealed class AsyncAutoResetEvent(bool signaled = false)
{ {
private readonly Queue<TaskCompletionSource> _queue = new(); private readonly List<TaskCompletionSource<bool>> _taskCompletionSources = [];
public Task WaitAsync(CancellationToken cancellationToken = default) public Task<bool> WaitAsync(CancellationToken cancellationToken = default)
{ {
lock (_queue) lock (_taskCompletionSources)
{ {
if (signaled) if (signaled)
{ {
signaled = false; signaled = false;
return Task.CompletedTask; return Task.FromResult(true);
} }
var tcs = new TaskCompletionSource(); var tcs = new TaskCompletionSource<bool>();
if (cancellationToken.CanBeCanceled) cancellationToken.Register(Callback, (this, tcs));
{ _taskCompletionSources.Add(tcs);
// If the token is cancelled, cancel the waiter.
var registration =
cancellationToken.Register(() => tcs.TrySetCanceled(), false);
// If the waiter completes or faults, unregister our interest in cancellation.
tcs.Task.ContinueWith(
_ => registration.Unregister(),
cancellationToken,
TaskContinuationOptions.OnlyOnRanToCompletion |
TaskContinuationOptions.NotOnFaulted,
TaskScheduler.Default);
}
_queue.Enqueue(tcs);
return tcs.Task; return tcs.Task;
} }
} }
public void Set() public void Set()
{ {
TaskCompletionSource? toRelease = null; lock (_taskCompletionSources)
lock (_queue)
{ {
if (_queue.Count > 0) if (_taskCompletionSources.Count > 0)
{ {
toRelease = _queue.Dequeue(); var tcs = _taskCompletionSources[0];
_taskCompletionSources.RemoveAt(0);
tcs.TrySetResult(true);
return;
} }
else if (!signaled)
{
signaled = true;
}
}
// It's possible that the TCS has already been cancelled. signaled = true;
toRelease?.TrySetResult(); }
}
private static void Callback(object? state)
{
var (ev, tcs) = ((AsyncAutoResetEvent, TaskCompletionSource<bool>))state!;
lock (ev._taskCompletionSources)
{
if (tcs.Task.IsCompleted) return;
tcs.TrySetCanceled();
ev._taskCompletionSources.Remove(tcs);
}
} }
} }

View file

@ -203,8 +203,8 @@ public class PostgresJobQueue<T>(
int parallelism int parallelism
) : IPostgresJobQueue where T : class ) : IPostgresJobQueue where T : class
{ {
private readonly AsyncAutoResetEvent _delayedChannel = new(false); private readonly AsyncAutoResetEvent _delayedChannel = new();
private readonly AsyncAutoResetEvent _queuedChannel = new(false); private readonly AsyncAutoResetEvent _queuedChannel = new();
private IServiceScopeFactory _scopeFactory = null!; private IServiceScopeFactory _scopeFactory = null!;
public string Name => name; public string Name => name;
private string? _workerId; private string? _workerId;