diff --git a/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs b/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs index d032cc64..f12ce3d9 100644 --- a/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs +++ b/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs @@ -13,7 +13,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute; namespace Iceshrimp.Backend.Core.Queues; public class BackgroundTaskQueue(int parallelism) - : PostgresJobQueue("background-task", BackgroundTaskQueueProcessorDelegateAsync, parallelism) + : PostgresJobQueue("background-task", BackgroundTaskQueueProcessorDelegateAsync, + parallelism, TimeSpan.FromSeconds(60)) { private static async Task BackgroundTaskQueueProcessorDelegateAsync( Job job, diff --git a/Iceshrimp.Backend/Core/Queues/DeliverQueue.cs b/Iceshrimp.Backend/Core/Queues/DeliverQueue.cs index 9db57bf9..468533a7 100644 --- a/Iceshrimp.Backend/Core/Queues/DeliverQueue.cs +++ b/Iceshrimp.Backend/Core/Queues/DeliverQueue.cs @@ -8,7 +8,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute; namespace Iceshrimp.Backend.Core.Queues; public class DeliverQueue(int parallelism) - : PostgresJobQueue("deliver", DeliverQueueProcessorDelegateAsync, parallelism) + : PostgresJobQueue("deliver", DeliverQueueProcessorDelegateAsync, + parallelism, TimeSpan.FromSeconds(60)) { private static async Task DeliverQueueProcessorDelegateAsync( Job job, DeliverJobData jobData, IServiceProvider scope, CancellationToken token diff --git a/Iceshrimp.Backend/Core/Queues/InboxQueue.cs b/Iceshrimp.Backend/Core/Queues/InboxQueue.cs index 79e510b4..4a8a47d5 100644 --- a/Iceshrimp.Backend/Core/Queues/InboxQueue.cs +++ b/Iceshrimp.Backend/Core/Queues/InboxQueue.cs @@ -11,7 +11,7 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute; namespace Iceshrimp.Backend.Core.Queues; public class InboxQueue(int parallelism) - : PostgresJobQueue("inbox", InboxQueueProcessorDelegateAsync, parallelism) + : PostgresJobQueue("inbox", InboxQueueProcessorDelegateAsync, parallelism, TimeSpan.FromSeconds(60)) { private static async Task InboxQueueProcessorDelegateAsync( Job job, diff --git a/Iceshrimp.Backend/Core/Queues/PreDeliverQueue.cs b/Iceshrimp.Backend/Core/Queues/PreDeliverQueue.cs index f3c2a820..e17553d6 100644 --- a/Iceshrimp.Backend/Core/Queues/PreDeliverQueue.cs +++ b/Iceshrimp.Backend/Core/Queues/PreDeliverQueue.cs @@ -14,7 +14,8 @@ using JR = System.Text.Json.Serialization.JsonRequiredAttribute; namespace Iceshrimp.Backend.Core.Queues; public class PreDeliverQueue(int parallelism) - : PostgresJobQueue("pre-deliver", PreDeliverQueueProcessorDelegateAsync, parallelism) + : PostgresJobQueue("pre-deliver", PreDeliverQueueProcessorDelegateAsync, + parallelism, TimeSpan.FromSeconds(60)) { private static async Task PreDeliverQueueProcessorDelegateAsync( Job job, PreDeliverJobData jobData, IServiceProvider scope, CancellationToken token diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index 30563966..ce1f12eb 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -201,7 +201,8 @@ public interface IPostgresJobQueue public class PostgresJobQueue( string name, Func handler, - int parallelism + int parallelism, + TimeSpan timeout ) : IPostgresJobQueue where T : class { private readonly AsyncAutoResetEvent _delayedChannel = new(); @@ -424,7 +425,7 @@ public class PostgresJobQueue( try { - await handler(job, data, scope.ServiceProvider, token); + await handler(job, data, scope.ServiceProvider, token).WaitAsync(timeout, token); } catch (Exception e) { @@ -436,13 +437,18 @@ public class PostgresJobQueue( var queueName = data is BackgroundTaskJobData ? name + $" ({data.GetType().Name})" : name; if (e is GracefulException { Details: not null } ce) { - logger.LogError("Failed to process job {id} in {queue} queue: {error} - {details}", - queueName, job.Id.ToStringLower(), ce.Message, ce.Details); + logger.LogError("Failed to process job {id} in queue {queue}: {error} - {details}", + job.Id.ToStringLower(), queueName, ce.Message, ce.Details); + } + else if (e is TimeoutException) + { + logger.LogError("Job {id} in queue {queue} didn't complete within the configured timeout ({timeout} seconds)", + job.Id.ToStringLower(), queueName, (int)timeout.TotalSeconds); } else { - logger.LogError(e, "Failed to process job {id} in {queue} queue: {error}", job.Id.ToStringLower(), - queueName, e); + logger.LogError(e, "Failed to process job {id} in queue {queue}: {error}", + job.Id.ToStringLower(), queueName, e); } }