diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index 75bc2ac4..9b208caf 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -170,6 +170,47 @@ public class PostgresJobQueue( public string Name => name; public TimeSpan Timeout => timeout; + /* + * This is the main queue processor loop. The algorithm aims to only ever have as many workers running as needed, + * conserving resources. + * + * Before we explain the algorithm, a couple components need an explanation: + * + * The queuedChannel fires when a new task is queued, while the _delayedChannel fires when a task is delayed. + * These events are AsyncAutoResetEvents, meaning if they fire while no task is waiting for them to fire, + * they will "remember" that they fired until one runs .WaitAsync() on them, at which point they return immediately. + * + * There are two different CancellationTokens passed to this function. They are related to the two-step shutdown process. + * - 'queueToken' fires when the application begins shutting down. + * This gives the queue processor a grace period, where it can finish currently running jobs, + * without starting any new workers. + * - 'token' fires after the grace period, terminating any jobs that are still running. + * They get reset to the 'queued' state at a later point during the shutdown process. + * + * The _semaphore object tracks how many jobs are currently running / how many more can be started before reaching + * the configured maximum concurrency for this queue. + * + * With that out of the way, the actual loop runs the following algorithm: + * 1. If either 'token' or 'queueToken' are canceled, wait for all remaining tasks to finish before returning. + * 2. Obtain a service scope & a corresponding database context + * 3. Compute the actualParallelism from the number of queued jobs & the available worker slots + * 4. If actualParallelism is 0, enter a branch: + * 4.1 If there is at least one queued job, and there are no available worker slots, + * wait for a slot to become available and reset the loop + * 4.2 Otherwise, wait for a job to become queued and reset the loop + * 5. If it is likely that there are free worker slots after queueing actualParallelism jobs, + * cancel 'queuedChannelCts' when _queuedChannel fires (using a special function which will *not* reset the event) + * 6. Start actualParallelism tasks (each containing one worker processing one job) + * 7. Reset the loop if any of the following conditions are met: + * 7.1 Any of the queued tasks finish + * 7.2 'queuedChannelCts' is canceled (see 5.) + * 7.3 'queueToken' is canceled, though not before: + * 7.3.1 All queued tasks finish, or 'token' is canceled (allowing for a two-step graceful/hard shutdown process) + * + * The shutdown process functions as follows: + * 1. While 'token' is not yet canceled, and there are still running jobs: + * 2.1 Wait for any job them to finish, or for 'token' to be canceled + */ public async Task ExecuteAsync( IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken )