[backend/queue] Make queue concurrency configurable

This commit is contained in:
Laura Hausmann 2024-06-10 17:58:11 +02:00
parent 5771e43dda
commit aa21e4b05a
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
8 changed files with 33 additions and 12 deletions

View file

@ -269,6 +269,16 @@ public sealed class Config
public sealed class PerformanceSection
{
[Range(0, int.MaxValue)] public int FederationRequestHandlerConcurrency { get; init; } = 0;
public QueueConcurrencySection QueueConcurrency { get; init; } = new();
[Range(0, int.MaxValue)] public int FederationRequestHandlerConcurrency { get; init; } = 0;
}
public sealed class QueueConcurrencySection
{
[Range(1, int.MaxValue)] public int Inbox { get; init; } = 4;
[Range(1, int.MaxValue)] public int Deliver { get; init; } = 20;
[Range(1, int.MaxValue)] public int PreDeliver { get; init; } = 4;
[Range(1, int.MaxValue)] public int BackgroundTask { get; init; } = 4;
}
}

View file

@ -105,6 +105,7 @@ public static class ServiceExtensions
.ConfigureWithValidation<Config.WorkerSection>(configuration, "Worker")
.ConfigureWithValidation<Config.SecuritySection>(configuration, "Security")
.ConfigureWithValidation<Config.PerformanceSection>(configuration, "Performance")
.ConfigureWithValidation<Config.QueueConcurrencySection>(configuration, "Performance:QueueConcurrency")
.ConfigureWithValidation<Config.DatabaseSection>(configuration, "Database")
.ConfigureWithValidation<Config.StorageSection>(configuration, "Storage")
.ConfigureWithValidation<Config.LocalStorageSection>(configuration, "Storage:Local")

View file

@ -12,8 +12,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues;
public class BackgroundTaskQueue()
: PostgresJobQueue<BackgroundTaskJobData>("background-task", BackgroundTaskQueueProcessorDelegateAsync, 4)
public class BackgroundTaskQueue(int parallelism)
: PostgresJobQueue<BackgroundTaskJobData>("background-task", BackgroundTaskQueueProcessorDelegateAsync, parallelism)
{
private static async Task BackgroundTaskQueueProcessorDelegateAsync(
Job job,

View file

@ -7,7 +7,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues;
public class DeliverQueue() : PostgresJobQueue<DeliverJobData>("deliver", DeliverQueueProcessorDelegateAsync, 20)
public class DeliverQueue(int parallelism)
: PostgresJobQueue<DeliverJobData>("deliver", DeliverQueueProcessorDelegateAsync, parallelism)
{
private static async Task DeliverQueueProcessorDelegateAsync(
Job job, DeliverJobData jobData, IServiceProvider scope, CancellationToken token

View file

@ -10,7 +10,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues;
public class InboxQueue() : PostgresJobQueue<InboxJobData>("inbox", InboxQueueProcessorDelegateAsync, 4)
public class InboxQueue(int parallelism)
: PostgresJobQueue<InboxJobData>("inbox", InboxQueueProcessorDelegateAsync, parallelism)
{
private static async Task InboxQueueProcessorDelegateAsync(
Job job,

View file

@ -13,8 +13,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues;
public class PreDeliverQueue()
: PostgresJobQueue<PreDeliverJobData>("pre-deliver", PreDeliverQueueProcessorDelegateAsync, 4)
public class PreDeliverQueue(int parallelism)
: PostgresJobQueue<PreDeliverJobData>("pre-deliver", PreDeliverQueueProcessorDelegateAsync, parallelism)
{
private static async Task PreDeliverQueueProcessorDelegateAsync(
Job job, PreDeliverJobData jobData, IServiceProvider scope, CancellationToken token

View file

@ -17,17 +17,18 @@ namespace Iceshrimp.Backend.Core.Services;
public class QueueService(
IServiceScopeFactory scopeFactory,
ILogger<QueueService> logger,
IOptions<Config.WorkerSection> config
IOptions<Config.WorkerSection> config,
IOptions<Config.QueueConcurrencySection> queueConcurrency
) : BackgroundService
{
private readonly List<IPostgresJobQueue> _queues = [];
public IEnumerable<string> QueueNames => _queues.Select(p => p.Name);
public readonly BackgroundTaskQueue BackgroundTaskQueue = new();
public readonly DeliverQueue DeliverQueue = new();
public readonly InboxQueue InboxQueue = new();
public readonly PreDeliverQueue PreDeliverQueue = new();
public readonly InboxQueue InboxQueue = new(queueConcurrency.Value.Inbox);
public readonly DeliverQueue DeliverQueue = new(queueConcurrency.Value.Deliver);
public readonly PreDeliverQueue PreDeliverQueue = new(queueConcurrency.Value.PreDeliver);
public readonly BackgroundTaskQueue BackgroundTaskQueue = new(queueConcurrency.Value.BackgroundTask);
private static async Task<NpgsqlConnection> GetNpgsqlConnection(IServiceScope scope)
{

View file

@ -74,6 +74,13 @@ PublicPreview = Public
;; (0 = no limit)
FederationRequestHandlerConcurrency = 0
;; Maximum number of jobs to process concurrently, for each queue.
[Performance:QueueConcurrency]
Inbox = 4
Deliver = 20
PreDeliver = 4
BackgroundTask = 4
[Database]
;; Hostname, IP address or path to unix socket directory (specifying port is required even for unix sockets)
Host = localhost