[backend/queue] Refactor queue job count functions into DatabaseContext
This commit is contained in:
parent
6ba374a681
commit
ca8ce9137d
2 changed files with 30 additions and 22 deletions
|
@ -1251,10 +1251,10 @@ public class DatabaseContext(DbContextOptions<DatabaseContext> options)
|
|||
public IQueryable<Note> Conversations(User user)
|
||||
=> FromExpression(() => Conversations(user.Id));
|
||||
|
||||
public IQueryable<Job> GetJobs(string queue, string? workerId) =>
|
||||
workerId == null ? GetJobs(queue) : GetWorkerJobs(queue, workerId);
|
||||
public IQueryable<Job> GetJob(string queue, string? workerId) =>
|
||||
workerId == null ? GetJob(queue) : GetWorkerJob(queue, workerId);
|
||||
|
||||
private IQueryable<Job> GetJobs(string queue)
|
||||
private IQueryable<Job> GetJob(string queue)
|
||||
=> Database.SqlQuery<Job>($"""
|
||||
UPDATE "jobs" SET "status" = 'running', "started_at" = now()
|
||||
WHERE "id" = (
|
||||
|
@ -1266,7 +1266,7 @@ public class DatabaseContext(DbContextOptions<DatabaseContext> options)
|
|||
RETURNING "jobs".*;
|
||||
""");
|
||||
|
||||
private IQueryable<Job> GetWorkerJobs(string queue, string workerId)
|
||||
private IQueryable<Job> GetWorkerJob(string queue, string workerId)
|
||||
=> Database.SqlQuery<Job>($"""
|
||||
UPDATE "jobs" SET "status" = 'running', "started_at" = now(), "worker_id" = {workerId}::varchar
|
||||
WHERE "id" = (
|
||||
|
@ -1284,6 +1284,29 @@ public class DatabaseContext(DbContextOptions<DatabaseContext> options)
|
|||
RETURNING "jobs".*;
|
||||
""");
|
||||
|
||||
public Task<int> GetJobRunningCount(string queue, string? workerId, CancellationToken token) =>
|
||||
workerId == null ? GetJobRunningCount(queue, token) : GetWorkerJobRunningCount(queue, workerId, token);
|
||||
|
||||
private Task<int> GetJobRunningCount(string queue, CancellationToken token) =>
|
||||
Jobs.CountAsync(p => p.Queue == queue && p.Status == Job.JobStatus.Running, token);
|
||||
|
||||
private Task<int> GetWorkerJobRunningCount(string queue, string workerId, CancellationToken token) =>
|
||||
Jobs.CountAsync(p => p.Queue == queue && p.Status == Job.JobStatus.Running && p.WorkerId == workerId, token);
|
||||
|
||||
public Task<int> GetJobQueuedCount(string queue, string? workerId, CancellationToken token) =>
|
||||
workerId == null ? GetJobQueuedCount(queue, token) : GetWorkerJobQueuedCount(queue, token);
|
||||
|
||||
private Task<int> GetJobQueuedCount(string queue, CancellationToken token) =>
|
||||
Jobs.CountAsync(p => p.Queue == queue && p.Status == Job.JobStatus.Queued, token);
|
||||
|
||||
private Task<int> GetWorkerJobQueuedCount(string queue, CancellationToken token) =>
|
||||
Jobs.CountAsync(p => p.Queue == queue &&
|
||||
(p.Status == Job.JobStatus.Queued ||
|
||||
(p.Status == Job.JobStatus.Running &&
|
||||
p.WorkerId != null &&
|
||||
!Workers.Any(w => w.Id == p.WorkerId &&
|
||||
w.Heartbeat > DateTime.UtcNow - TimeSpan.FromSeconds(90)))), token);
|
||||
|
||||
public async Task<bool> IsDatabaseEmpty()
|
||||
=> !await Database.SqlQuery<object>($"""
|
||||
select s.nspname from pg_class c
|
||||
|
|
|
@ -311,23 +311,8 @@ public class PostgresJobQueue<T>(
|
|||
await using var scope = GetScope();
|
||||
await using var db = GetDbContext(scope);
|
||||
|
||||
var runningCount = _workerId == null
|
||||
? await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Running, token)
|
||||
: await db.Jobs.CountAsync(p => p.Queue == name &&
|
||||
p.Status == Job.JobStatus.Running &&
|
||||
p.WorkerId != null &&
|
||||
p.WorkerId == _workerId, token);
|
||||
var queuedCount = _workerId == null
|
||||
? await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Queued, token)
|
||||
: await
|
||||
db.Jobs.CountAsync(p => p.Queue == name &&
|
||||
(p.Status == Job.JobStatus.Queued ||
|
||||
(p.Status == Job.JobStatus.Running &&
|
||||
p.WorkerId != null &&
|
||||
!db.Workers.Any(w => w.Id == p.WorkerId &&
|
||||
w.Heartbeat >
|
||||
DateTime.UtcNow - TimeSpan.FromSeconds(90)))),
|
||||
token);
|
||||
var runningCount = await db.GetJobRunningCount(name, _workerId, token);
|
||||
var queuedCount = await db.GetJobQueuedCount(name, _workerId, token);
|
||||
|
||||
var actualParallelism = Math.Min(parallelism - runningCount, queuedCount);
|
||||
if (actualParallelism <= 0)
|
||||
|
@ -497,7 +482,7 @@ public class PostgresJobQueue<T>(
|
|||
{
|
||||
await using var db = GetDbContext(processorScope);
|
||||
|
||||
if (await db.GetJobs(name, _workerId).ToListAsync(token) is not [{ } job])
|
||||
if (await db.GetJob(name, _workerId).ToListAsync(token) is not [{ } job])
|
||||
return;
|
||||
|
||||
_logger.LogTrace("Processing {queue} job {id}", name, job.Id);
|
||||
|
|
Loading…
Add table
Reference in a new issue