diff --git a/Iceshrimp.Backend/Core/Helpers/SemaphorePlus.cs b/Iceshrimp.Backend/Core/Helpers/SemaphorePlus.cs index 0cafb01d..20ed360e 100644 --- a/Iceshrimp.Backend/Core/Helpers/SemaphorePlus.cs +++ b/Iceshrimp.Backend/Core/Helpers/SemaphorePlus.cs @@ -4,4 +4,10 @@ public class SemaphorePlus(int maxCount) : SemaphoreSlim(maxCount, maxCount) { private readonly int _maxCount = maxCount; public int ActiveCount => _maxCount - CurrentCount; + + public async Task WaitAndReleaseAsync(CancellationToken token) + { + await WaitAsync(token); + Release(); + } } \ No newline at end of file diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index 0b166b9d..bd1421b8 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -188,24 +188,23 @@ public class PostgresJobQueue( await using var db = GetDbContext(scope); var queuedCount = await db.GetJobQueuedCount(name, token); - var actualParallelism = Math.Min(parallelism - _semaphore.ActiveCount, queuedCount); + var actualParallelism = Math.Min(_semaphore.CurrentCount, queuedCount); - if (actualParallelism <= 0) + if (actualParallelism == 0) { - await _queuedChannel.WaitAsync(token).SafeWaitAsync(queueToken); + if (_semaphore.CurrentCount == 0 && queuedCount > 0) + await _semaphore.WaitAndReleaseAsync(token).SafeWaitAsync(queueToken); + else + await _queuedChannel.WaitAsync(token).SafeWaitAsync(queueToken); continue; } // ReSharper disable MethodSupportsCancellation var queuedChannelCts = new CancellationTokenSource(); - if (_semaphore.ActiveCount + queuedCount < parallelism) + if (_semaphore.CurrentCount - queuedCount > 0) { _ = _queuedChannel.WaitWithoutResetAsync() - .ContinueWith(_ => - { - if (_semaphore.ActiveCount < parallelism) - queuedChannelCts.Cancel(); - }) + .ContinueWith(_ => queuedChannelCts.Cancel()) .SafeWaitAsync(queueToken); } // ReSharper restore MethodSupportsCancellation