From ca8ce9137d9938d6c3f8a9e8996318ebc89edf16 Mon Sep 17 00:00:00 2001 From: Laura Hausmann Date: Fri, 28 Jun 2024 18:20:59 +0200 Subject: [PATCH] [backend/queue] Refactor queue job count functions into DatabaseContext --- .../Core/Database/DatabaseContext.cs | 31 ++++++++++++++++--- .../Core/Services/QueueService.cs | 21 ++----------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs index 56186e3c..309f18f2 100644 --- a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs +++ b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs @@ -1251,10 +1251,10 @@ public class DatabaseContext(DbContextOptions options) public IQueryable Conversations(User user) => FromExpression(() => Conversations(user.Id)); - public IQueryable GetJobs(string queue, string? workerId) => - workerId == null ? GetJobs(queue) : GetWorkerJobs(queue, workerId); + public IQueryable GetJob(string queue, string? workerId) => + workerId == null ? GetJob(queue) : GetWorkerJob(queue, workerId); - private IQueryable GetJobs(string queue) + private IQueryable GetJob(string queue) => Database.SqlQuery($""" UPDATE "jobs" SET "status" = 'running', "started_at" = now() WHERE "id" = ( @@ -1266,7 +1266,7 @@ public class DatabaseContext(DbContextOptions options) RETURNING "jobs".*; """); - private IQueryable GetWorkerJobs(string queue, string workerId) + private IQueryable GetWorkerJob(string queue, string workerId) => Database.SqlQuery($""" UPDATE "jobs" SET "status" = 'running', "started_at" = now(), "worker_id" = {workerId}::varchar WHERE "id" = ( @@ -1284,6 +1284,29 @@ public class DatabaseContext(DbContextOptions options) RETURNING "jobs".*; """); + public Task GetJobRunningCount(string queue, string? workerId, CancellationToken token) => + workerId == null ? GetJobRunningCount(queue, token) : GetWorkerJobRunningCount(queue, workerId, token); + + private Task GetJobRunningCount(string queue, CancellationToken token) => + Jobs.CountAsync(p => p.Queue == queue && p.Status == Job.JobStatus.Running, token); + + private Task GetWorkerJobRunningCount(string queue, string workerId, CancellationToken token) => + Jobs.CountAsync(p => p.Queue == queue && p.Status == Job.JobStatus.Running && p.WorkerId == workerId, token); + + public Task GetJobQueuedCount(string queue, string? workerId, CancellationToken token) => + workerId == null ? GetJobQueuedCount(queue, token) : GetWorkerJobQueuedCount(queue, token); + + private Task GetJobQueuedCount(string queue, CancellationToken token) => + Jobs.CountAsync(p => p.Queue == queue && p.Status == Job.JobStatus.Queued, token); + + private Task GetWorkerJobQueuedCount(string queue, CancellationToken token) => + Jobs.CountAsync(p => p.Queue == queue && + (p.Status == Job.JobStatus.Queued || + (p.Status == Job.JobStatus.Running && + p.WorkerId != null && + !Workers.Any(w => w.Id == p.WorkerId && + w.Heartbeat > DateTime.UtcNow - TimeSpan.FromSeconds(90)))), token); + public async Task IsDatabaseEmpty() => !await Database.SqlQuery($""" select s.nspname from pg_class c diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index 11bc75ce..f296be98 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -311,23 +311,8 @@ public class PostgresJobQueue( await using var scope = GetScope(); await using var db = GetDbContext(scope); - var runningCount = _workerId == null - ? await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Running, token) - : await db.Jobs.CountAsync(p => p.Queue == name && - p.Status == Job.JobStatus.Running && - p.WorkerId != null && - p.WorkerId == _workerId, token); - var queuedCount = _workerId == null - ? await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Queued, token) - : await - db.Jobs.CountAsync(p => p.Queue == name && - (p.Status == Job.JobStatus.Queued || - (p.Status == Job.JobStatus.Running && - p.WorkerId != null && - !db.Workers.Any(w => w.Id == p.WorkerId && - w.Heartbeat > - DateTime.UtcNow - TimeSpan.FromSeconds(90)))), - token); + var runningCount = await db.GetJobRunningCount(name, _workerId, token); + var queuedCount = await db.GetJobQueuedCount(name, _workerId, token); var actualParallelism = Math.Min(parallelism - runningCount, queuedCount); if (actualParallelism <= 0) @@ -497,7 +482,7 @@ public class PostgresJobQueue( { await using var db = GetDbContext(processorScope); - if (await db.GetJobs(name, _workerId).ToListAsync(token) is not [{ } job]) + if (await db.GetJob(name, _workerId).ToListAsync(token) is not [{ } job]) return; _logger.LogTrace("Processing {queue} job {id}", name, job.Id);