[backend/queue] Fix AsyncAutoResetEvent handling, fix stalled queue workers causing no new workers to be spawned

This commit is contained in:
Laura Hausmann 2024-06-16 16:53:27 +02:00
parent 364d0c54a0
commit 7ed39d0c47
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
3 changed files with 135 additions and 7 deletions

View file

@ -2,7 +2,9 @@ namespace Iceshrimp.Backend.Core.Helpers;
public sealed class AsyncAutoResetEvent(bool signaled = false) public sealed class AsyncAutoResetEvent(bool signaled = false)
{ {
private readonly List<TaskCompletionSource<bool>> _taskCompletionSources = []; private readonly List<TaskCompletionSource<bool>> _taskCompletionSources = [];
private readonly List<TaskCompletionSource<bool>> _noResetTaskCompletionSources = [];
public bool Signaled => signaled;
public Task<bool> WaitAsync(CancellationToken cancellationToken = default) public Task<bool> WaitAsync(CancellationToken cancellationToken = default)
{ {
@ -21,19 +23,39 @@ public sealed class AsyncAutoResetEvent(bool signaled = false)
} }
} }
public Task<bool> WaitWithoutResetAsync(CancellationToken cancellationToken = default)
{
lock (_taskCompletionSources)
{
if (signaled)
return Task.FromResult(true);
var tcs = new TaskCompletionSource<bool>();
cancellationToken.Register(Callback, (this, tcs));
_noResetTaskCompletionSources.Add(tcs);
return tcs.Task;
}
}
public void Set() public void Set()
{ {
lock (_taskCompletionSources) lock (_taskCompletionSources)
{ {
if (_taskCompletionSources.Count > 0) signaled = true;
foreach (var tcs in _noResetTaskCompletionSources.ToList())
{ {
var tcs = _taskCompletionSources[0]; _noResetTaskCompletionSources.Remove(tcs);
_taskCompletionSources.RemoveAt(0);
tcs.TrySetResult(true); tcs.TrySetResult(true);
return;
} }
signaled = true; if (_taskCompletionSources.Count == 0) return;
signaled = false;
foreach (var tcs in _taskCompletionSources.ToList())
{
_taskCompletionSources.Remove(tcs);
tcs.TrySetResult(true);
}
} }
} }
@ -45,6 +67,7 @@ public sealed class AsyncAutoResetEvent(bool signaled = false)
if (tcs.Task.IsCompleted) return; if (tcs.Task.IsCompleted) return;
tcs.TrySetCanceled(); tcs.TrySetCanceled();
ev._taskCompletionSources.Remove(tcs); ev._taskCompletionSources.Remove(tcs);
ev._noResetTaskCompletionSources.Remove(tcs);
} }
} }
} }

View file

@ -259,8 +259,20 @@ public class PostgresJobQueue<T>(
continue; continue;
} }
// ReSharper disable MethodSupportsCancellation
var queuedChannelCts = new CancellationTokenSource();
if (actualParallelism < parallelism)
{
_ = _queuedChannel.WaitWithoutResetAsync()
.ContinueWith(_ => { queuedChannelCts.Cancel(); })
.SafeWaitAsync(queueToken);
}
// ReSharper restore MethodSupportsCancellation
var tasks = TaskExtensions.QueueMany(() => AttemptProcessJobAsync(token), actualParallelism); var tasks = TaskExtensions.QueueMany(() => AttemptProcessJobAsync(token), actualParallelism);
await Task.WhenAny(tasks).SafeWaitAsync(queueToken, () => Task.WhenAll(tasks).WaitAsync(token)); await Task.WhenAny(tasks)
.SafeWaitAsync(queuedChannelCts.Token)
.SafeWaitAsync(queueToken, () => Task.WhenAll(tasks).WaitAsync(token));
} }
catch (Exception e) catch (Exception e)
{ {

View file

@ -0,0 +1,93 @@
using Iceshrimp.Backend.Core.Helpers;
namespace Iceshrimp.Tests.Concurrency;
[TestClass]
public class EventTests
{
[TestMethod]
public async Task TestAsyncAutoResetEvent()
{
var autoResetEvent = new AsyncAutoResetEvent();
var pre = DateTime.Now;
var task = autoResetEvent.WaitAsync();
_ = Task.Run(async () =>
{
await Task.Delay(50);
autoResetEvent.Set();
});
await task;
(DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50));
autoResetEvent.Signaled.Should().BeFalse();
}
[TestMethod]
public async Task TestAsyncAutoResetEventWithoutReset()
{
var autoResetEvent = new AsyncAutoResetEvent();
var pre = DateTime.Now;
var task = autoResetEvent.WaitWithoutResetAsync();
_ = Task.Run(async () =>
{
await Task.Delay(50);
autoResetEvent.Set();
});
await task;
(DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50));
autoResetEvent.Signaled.Should().BeTrue();
}
[TestMethod]
public async Task TestAsyncAutoResetEventMulti()
{
var autoResetEvent = new AsyncAutoResetEvent();
var pre = DateTime.Now;
Task[] tasks = [autoResetEvent.WaitAsync(), autoResetEvent.WaitAsync()];
_ = Task.Run(async () =>
{
await Task.Delay(50);
autoResetEvent.Set();
});
await Task.WhenAll(tasks);
(DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50));
autoResetEvent.Signaled.Should().BeFalse();
}
[TestMethod]
public async Task TestAsyncAutoResetEventWithoutResetMulti()
{
var autoResetEvent = new AsyncAutoResetEvent();
var pre = DateTime.Now;
Task[] tasks = [autoResetEvent.WaitWithoutResetAsync(), autoResetEvent.WaitWithoutResetAsync()];
_ = Task.Run(async () =>
{
await Task.Delay(50);
autoResetEvent.Set();
});
await Task.WhenAll(tasks);
(DateTime.Now - pre).Should().BeGreaterThan(TimeSpan.FromMilliseconds(50));
autoResetEvent.Signaled.Should().BeTrue();
}
[TestMethod]
public async Task TestAsyncAutoResetEventPre()
{
var autoResetEvent = new AsyncAutoResetEvent();
autoResetEvent.Set();
await autoResetEvent.WaitAsync();
autoResetEvent.Signaled.Should().BeFalse();
await Assert.ThrowsExceptionAsync<TimeoutException>(() => autoResetEvent.WaitAsync()
.WaitAsync(TimeSpan.FromMilliseconds(50)));
}
[TestMethod]
public async Task TestAsyncAutoResetEventPreMulti()
{
var autoResetEvent = new AsyncAutoResetEvent();
autoResetEvent.Set();
await autoResetEvent.WaitWithoutResetAsync();
autoResetEvent.Signaled.Should().BeTrue();
await autoResetEvent.WaitWithoutResetAsync();
autoResetEvent.Signaled.Should().BeTrue();
}
}