[backend/queue] Clean completed jobs in a cron task instead of doing it at job completion

This prevents database deadlocks that can occur when many jobs are being processed simultaneously.
This commit is contained in:
Laura Hausmann 2024-05-14 15:05:01 +02:00
parent 61d6f73e90
commit 97532c2b22
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
2 changed files with 40 additions and 15 deletions

View file

@ -20,12 +20,14 @@ public class QueueService(
IOptions<Config.WorkerSection> config
) : BackgroundService
{
private readonly List<IPostgresJobQueue> _queues = [];
public readonly BackgroundTaskQueue BackgroundTaskQueue = new();
public readonly DeliverQueue DeliverQueue = new();
private readonly List<IPostgresJobQueue> _queues = [];
public readonly InboxQueue InboxQueue = new();
public readonly PreDeliverQueue PreDeliverQueue = new();
public IEnumerable<string> QueueNames => _queues.Select(p => p.Name);
public readonly BackgroundTaskQueue BackgroundTaskQueue = new();
public readonly DeliverQueue DeliverQueue = new();
public readonly InboxQueue InboxQueue = new();
public readonly PreDeliverQueue PreDeliverQueue = new();
private static async Task<NpgsqlConnection> GetNpgsqlConnection(IServiceScope scope)
{
@ -493,16 +495,6 @@ public class PostgresJobQueue<T>(
db.ChangeTracker.Clear();
db.Update(job);
await db.SaveChangesAsync(token);
await db.Jobs.Where(p => p.Queue == name && p.Status == Job.JobStatus.Completed)
.OrderByDescending(p => p.FinishedAt)
.Skip(10)
.ExecuteDeleteAsync(token);
await db.Jobs.Where(p => p.Queue == name && p.Status == Job.JobStatus.Failed)
.OrderByDescending(p => p.FinishedAt)
.Skip(100)
.ExecuteDeleteAsync(token);
}
public async Task EnqueueAsync(T jobData)

View file

@ -0,0 +1,33 @@
using System.Diagnostics.CodeAnalysis;
using Iceshrimp.Backend.Core.Database;
using Iceshrimp.Backend.Core.Database.Tables;
using Iceshrimp.Backend.Core.Services;
using Microsoft.EntityFrameworkCore;
namespace Iceshrimp.Backend.Core.Tasks;
[SuppressMessage("ReSharper", "UnusedType.Global", Justification = "Instantiated at runtime by CronService")]
public class JobCleanupTask : ICronTask
{
public async Task Invoke(IServiceProvider provider)
{
var db = provider.GetRequiredService<DatabaseContext>();
var queue = provider.GetRequiredService<QueueService>();
foreach (var name in queue.QueueNames)
{
await db.Jobs.Where(p => p.Queue == name && p.Status == Job.JobStatus.Completed)
.OrderByDescending(p => p.FinishedAt)
.Skip(10)
.ExecuteDeleteAsync();
await db.Jobs.Where(p => p.Queue == name && p.Status == Job.JobStatus.Failed)
.OrderByDescending(p => p.FinishedAt)
.Skip(100)
.ExecuteDeleteAsync();
}
}
public CronTaskType Type => CronTaskType.Interval;
public TimeSpan Trigger => TimeSpan.FromMinutes(15);
}