[backend/database] Move queue job queries into DatabaseContext

This commit is contained in:
Laura Hausmann 2024-03-26 17:33:21 +01:00
parent c235f1a586
commit f24c7dff08
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
2 changed files with 36 additions and 34 deletions

View file

@ -1289,6 +1289,39 @@ public class DatabaseContext(DbContextOptions<DatabaseContext> options)
public IQueryable<Note> Conversations(User user)
=> FromExpression(() => Conversations(user.Id));
public IQueryable<Job> GetJobs(string queue, string? workerId) =>
workerId == null ? GetJobs(queue) : GetWorkerJobs(queue, workerId);
public IQueryable<Job> GetJobs(string queue)
=> Database.SqlQuery<Job>($"""
UPDATE "jobs" SET "status" = 'running', "started_at" = now()
WHERE "id" = (
SELECT "id" FROM "jobs"
WHERE queue = {queue} AND status = 'queued'
ORDER BY COALESCE("delayed_until", "queued_at")
LIMIT 1
FOR UPDATE SKIP LOCKED)
RETURNING "jobs".*;
""");
public IQueryable<Job> GetWorkerJobs(string queue, string workerId)
=> Database.SqlQuery<Job>($"""
UPDATE "jobs" SET "status" = 'running', "started_at" = now(), "worker_id" = {workerId}::varchar
WHERE "id" = (
SELECT "id" FROM "jobs"
WHERE queue = {queue} AND
(status = 'queued' OR
(status = 'running' AND
"worker_id" IS NOT NULL AND NOT EXISTS
(SELECT FROM "worker"
WHERE "id" = "jobs"."worker_id" AND
"heartbeat" > now() - '45 seconds'::interval)))
ORDER BY COALESCE("delayed_until", "queued_at")
LIMIT 1
FOR UPDATE SKIP LOCKED)
RETURNING "jobs".*;
""");
}
[SuppressMessage("ReSharper", "UnusedType.Global",

View file

@ -390,41 +390,10 @@ public class PostgresJobQueue<T>(
private async Task ProcessJobAsync(IServiceScope scope, CancellationToken token)
{
await using var db = GetDbContext(scope);
await using var db = GetDbContext(scope);
var sql = _workerId == null
? (FormattableString)
$"""
UPDATE "jobs" SET "status" = 'running', "started_at" = now()
WHERE "id" = (
SELECT "id" FROM "jobs"
WHERE queue = {name} AND status = 'queued'
ORDER BY COALESCE("delayed_until", "queued_at")
LIMIT 1
FOR UPDATE SKIP LOCKED)
RETURNING "jobs".*;
"""
: $"""
UPDATE "jobs" SET "status" = 'running', "started_at" = now(), "worker_id" = {_workerId}::varchar
WHERE "id" = (
SELECT "id" FROM "jobs"
WHERE queue = {name} AND
(status = 'queued' OR
(status = 'running' AND
"worker_id" IS NOT NULL AND NOT EXISTS
(SELECT FROM "worker"
WHERE "id" = "jobs"."worker_id" AND
"heartbeat" > now() - '45 seconds'::interval)))
ORDER BY COALESCE("delayed_until", "queued_at")
LIMIT 1
FOR UPDATE SKIP LOCKED)
RETURNING "jobs".*;
""";
var res = await db.Database.SqlQuery<Job>(sql)
.ToListAsync(token);
if (res is not [{ } job]) return;
if (await db.GetJobs(name, _workerId).ToListAsync(token) is not [{ } job])
return;
var data = JsonSerializer.Deserialize<T>(job.Data);
if (data == null)