[backend/queue] Add a detailed explanation of the queue system algorithm

This commit is contained in:
Laura Hausmann 2024-07-27 01:54:07 +02:00
parent ec6b6ee17f
commit 7b415d4063
No known key found for this signature in database
GPG key ID: D044E84C5BE01605

View file

@ -170,6 +170,47 @@ public class PostgresJobQueue<T>(
public string Name => name;
public TimeSpan Timeout => timeout;
/*
* This is the main queue processor loop. The algorithm aims to only ever have as many workers running as needed,
* conserving resources.
*
* Before we explain the algorithm, a couple components need an explanation:
*
* The queuedChannel fires when a new task is queued, while the _delayedChannel fires when a task is delayed.
* These events are AsyncAutoResetEvents, meaning if they fire while no task is waiting for them to fire,
* they will "remember" that they fired until one runs .WaitAsync() on them, at which point they return immediately.
*
* There are two different CancellationTokens passed to this function. They are related to the two-step shutdown process.
* - 'queueToken' fires when the application begins shutting down.
* This gives the queue processor a grace period, where it can finish currently running jobs,
* without starting any new workers.
* - 'token' fires after the grace period, terminating any jobs that are still running.
* They get reset to the 'queued' state at a later point during the shutdown process.
*
* The _semaphore object tracks how many jobs are currently running / how many more can be started before reaching
* the configured maximum concurrency for this queue.
*
* With that out of the way, the actual loop runs the following algorithm:
* 1. If either 'token' or 'queueToken' are canceled, wait for all remaining tasks to finish before returning.
* 2. Obtain a service scope & a corresponding database context
* 3. Compute the actualParallelism from the number of queued jobs & the available worker slots
* 4. If actualParallelism is 0, enter a branch:
* 4.1 If there is at least one queued job, and there are no available worker slots,
* wait for a slot to become available and reset the loop
* 4.2 Otherwise, wait for a job to become queued and reset the loop
* 5. If it is likely that there are free worker slots after queueing actualParallelism jobs,
* cancel 'queuedChannelCts' when _queuedChannel fires (using a special function which will *not* reset the event)
* 6. Start actualParallelism tasks (each containing one worker processing one job)
* 7. Reset the loop if any of the following conditions are met:
* 7.1 Any of the queued tasks finish
* 7.2 'queuedChannelCts' is canceled (see 5.)
* 7.3 'queueToken' is canceled, though not before:
* 7.3.1 All queued tasks finish, or 'token' is canceled (allowing for a two-step graceful/hard shutdown process)
*
* The shutdown process functions as follows:
* 1. While 'token' is not yet canceled, and there are still running jobs:
* 2.1 Wait for any job them to finish, or for 'token' to be canceled
*/
public async Task ExecuteAsync(
IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken
)