diff --git a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs index 65210deb..26a27aec 100644 --- a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs +++ b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs @@ -1289,6 +1289,39 @@ public class DatabaseContext(DbContextOptions options) public IQueryable Conversations(User user) => FromExpression(() => Conversations(user.Id)); + + public IQueryable GetJobs(string queue, string? workerId) => + workerId == null ? GetJobs(queue) : GetWorkerJobs(queue, workerId); + + public IQueryable GetJobs(string queue) + => Database.SqlQuery($""" + UPDATE "jobs" SET "status" = 'running', "started_at" = now() + WHERE "id" = ( + SELECT "id" FROM "jobs" + WHERE queue = {queue} AND status = 'queued' + ORDER BY COALESCE("delayed_until", "queued_at") + LIMIT 1 + FOR UPDATE SKIP LOCKED) + RETURNING "jobs".*; + """); + + public IQueryable GetWorkerJobs(string queue, string workerId) + => Database.SqlQuery($""" + UPDATE "jobs" SET "status" = 'running', "started_at" = now(), "worker_id" = {workerId}::varchar + WHERE "id" = ( + SELECT "id" FROM "jobs" + WHERE queue = {queue} AND + (status = 'queued' OR + (status = 'running' AND + "worker_id" IS NOT NULL AND NOT EXISTS + (SELECT FROM "worker" + WHERE "id" = "jobs"."worker_id" AND + "heartbeat" > now() - '45 seconds'::interval))) + ORDER BY COALESCE("delayed_until", "queued_at") + LIMIT 1 + FOR UPDATE SKIP LOCKED) + RETURNING "jobs".*; + """); } [SuppressMessage("ReSharper", "UnusedType.Global", diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index b3db476b..f8e238c6 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -390,41 +390,10 @@ public class PostgresJobQueue( private async Task ProcessJobAsync(IServiceScope scope, CancellationToken token) { - await using var db = GetDbContext(scope); + await using var db = GetDbContext(scope); - var sql = _workerId == null - ? (FormattableString) - $""" - UPDATE "jobs" SET "status" = 'running', "started_at" = now() - WHERE "id" = ( - SELECT "id" FROM "jobs" - WHERE queue = {name} AND status = 'queued' - ORDER BY COALESCE("delayed_until", "queued_at") - LIMIT 1 - FOR UPDATE SKIP LOCKED) - RETURNING "jobs".*; - """ - : $""" - UPDATE "jobs" SET "status" = 'running', "started_at" = now(), "worker_id" = {_workerId}::varchar - WHERE "id" = ( - SELECT "id" FROM "jobs" - WHERE queue = {name} AND - (status = 'queued' OR - (status = 'running' AND - "worker_id" IS NOT NULL AND NOT EXISTS - (SELECT FROM "worker" - WHERE "id" = "jobs"."worker_id" AND - "heartbeat" > now() - '45 seconds'::interval))) - ORDER BY COALESCE("delayed_until", "queued_at") - LIMIT 1 - FOR UPDATE SKIP LOCKED) - RETURNING "jobs".*; - """; - - var res = await db.Database.SqlQuery(sql) - .ToListAsync(token); - - if (res is not [{ } job]) return; + if (await db.GetJobs(name, _workerId).ToListAsync(token) is not [{ } job]) + return; var data = JsonSerializer.Deserialize(job.Data); if (data == null)