[backend/queue] Add queue processor timeout

For now this is hardcoded per queue, but this will be configurable down the line.
This commit is contained in:
Laura Hausmann 2024-06-13 22:41:41 +02:00
parent a7dcfbe75a
commit 35148d7796
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
5 changed files with 19 additions and 10 deletions

View file

@ -13,7 +13,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues; namespace Iceshrimp.Backend.Core.Queues;
public class BackgroundTaskQueue(int parallelism) public class BackgroundTaskQueue(int parallelism)
: PostgresJobQueue<BackgroundTaskJobData>("background-task", BackgroundTaskQueueProcessorDelegateAsync, parallelism) : PostgresJobQueue<BackgroundTaskJobData>("background-task", BackgroundTaskQueueProcessorDelegateAsync,
parallelism, TimeSpan.FromSeconds(60))
{ {
private static async Task BackgroundTaskQueueProcessorDelegateAsync( private static async Task BackgroundTaskQueueProcessorDelegateAsync(
Job job, Job job,

View file

@ -8,7 +8,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues; namespace Iceshrimp.Backend.Core.Queues;
public class DeliverQueue(int parallelism) public class DeliverQueue(int parallelism)
: PostgresJobQueue<DeliverJobData>("deliver", DeliverQueueProcessorDelegateAsync, parallelism) : PostgresJobQueue<DeliverJobData>("deliver", DeliverQueueProcessorDelegateAsync,
parallelism, TimeSpan.FromSeconds(60))
{ {
private static async Task DeliverQueueProcessorDelegateAsync( private static async Task DeliverQueueProcessorDelegateAsync(
Job job, DeliverJobData jobData, IServiceProvider scope, CancellationToken token Job job, DeliverJobData jobData, IServiceProvider scope, CancellationToken token

View file

@ -11,7 +11,7 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues; namespace Iceshrimp.Backend.Core.Queues;
public class InboxQueue(int parallelism) public class InboxQueue(int parallelism)
: PostgresJobQueue<InboxJobData>("inbox", InboxQueueProcessorDelegateAsync, parallelism) : PostgresJobQueue<InboxJobData>("inbox", InboxQueueProcessorDelegateAsync, parallelism, TimeSpan.FromSeconds(60))
{ {
private static async Task InboxQueueProcessorDelegateAsync( private static async Task InboxQueueProcessorDelegateAsync(
Job job, Job job,

View file

@ -14,7 +14,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues; namespace Iceshrimp.Backend.Core.Queues;
public class PreDeliverQueue(int parallelism) public class PreDeliverQueue(int parallelism)
: PostgresJobQueue<PreDeliverJobData>("pre-deliver", PreDeliverQueueProcessorDelegateAsync, parallelism) : PostgresJobQueue<PreDeliverJobData>("pre-deliver", PreDeliverQueueProcessorDelegateAsync,
parallelism, TimeSpan.FromSeconds(60))
{ {
private static async Task PreDeliverQueueProcessorDelegateAsync( private static async Task PreDeliverQueueProcessorDelegateAsync(
Job job, PreDeliverJobData jobData, IServiceProvider scope, CancellationToken token Job job, PreDeliverJobData jobData, IServiceProvider scope, CancellationToken token

View file

@ -201,7 +201,8 @@ public interface IPostgresJobQueue
public class PostgresJobQueue<T>( public class PostgresJobQueue<T>(
string name, string name,
Func<Job, T, IServiceProvider, CancellationToken, Task> handler, Func<Job, T, IServiceProvider, CancellationToken, Task> handler,
int parallelism int parallelism,
TimeSpan timeout
) : IPostgresJobQueue where T : class ) : IPostgresJobQueue where T : class
{ {
private readonly AsyncAutoResetEvent _delayedChannel = new(); private readonly AsyncAutoResetEvent _delayedChannel = new();
@ -424,7 +425,7 @@ public class PostgresJobQueue<T>(
try try
{ {
await handler(job, data, scope.ServiceProvider, token); await handler(job, data, scope.ServiceProvider, token).WaitAsync(timeout, token);
} }
catch (Exception e) catch (Exception e)
{ {
@ -436,13 +437,18 @@ public class PostgresJobQueue<T>(
var queueName = data is BackgroundTaskJobData ? name + $" ({data.GetType().Name})" : name; var queueName = data is BackgroundTaskJobData ? name + $" ({data.GetType().Name})" : name;
if (e is GracefulException { Details: not null } ce) if (e is GracefulException { Details: not null } ce)
{ {
logger.LogError("Failed to process job {id} in {queue} queue: {error} - {details}", logger.LogError("Failed to process job {id} in queue {queue}: {error} - {details}",
queueName, job.Id.ToStringLower(), ce.Message, ce.Details); job.Id.ToStringLower(), queueName, ce.Message, ce.Details);
}
else if (e is TimeoutException)
{
logger.LogError("Job {id} in queue {queue} didn't complete within the configured timeout ({timeout} seconds)",
job.Id.ToStringLower(), queueName, (int)timeout.TotalSeconds);
} }
else else
{ {
logger.LogError(e, "Failed to process job {id} in {queue} queue: {error}", job.Id.ToStringLower(), logger.LogError(e, "Failed to process job {id} in queue {queue}: {error}",
queueName, e); job.Id.ToStringLower(), queueName, e);
} }
} }