[backend/queue] Code cleanup

This commit is contained in:
Laura Hausmann 2024-07-25 00:09:13 +02:00
parent 0f400e8023
commit 09777fe6c4
No known key found for this signature in database
GPG key ID: D044E84C5BE01605

View file

@ -135,7 +135,7 @@ public class QueueService(
.SetProperty(i => i.ExceptionSource, _ => null) .SetProperty(i => i.ExceptionSource, _ => null)
.SetProperty(i => i.StackTrace, _ => null)); .SetProperty(i => i.StackTrace, _ => null));
if (cnt <= 0) return; if (cnt <= 0) return;
_queues.FirstOrDefault(p => p.Name == job.Queue)?.RaiseJobQueuedEvent(db); _queues.FirstOrDefault(p => p.Name == job.Queue)?.RaiseJobQueuedEvent();
} }
} }
} }
@ -148,8 +148,8 @@ public interface IPostgresJobQueue
public Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken); public Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken);
public Task RecoverOrPrepareForExitAsync(); public Task RecoverOrPrepareForExitAsync();
public void RaiseJobQueuedEvent(DatabaseContext db); public void RaiseJobQueuedEvent();
public void RaiseJobDelayedEvent(DatabaseContext db); public void RaiseJobDelayedEvent();
} }
public class PostgresJobQueue<T>( public class PostgresJobQueue<T>(
@ -167,9 +167,6 @@ public class PostgresJobQueue<T>(
public string Name => name; public string Name => name;
public TimeSpan Timeout => timeout; public TimeSpan Timeout => timeout;
public void RaiseLocalJobQueuedEvent() => QueuedChannelEvent?.Invoke(null, EventArgs.Empty);
public void RaiseLocalJobDelayedEvent() => DelayedChannelEvent?.Invoke(null, EventArgs.Empty);
public async Task ExecuteAsync( public async Task ExecuteAsync(
IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken
) )
@ -238,17 +235,14 @@ public class PostgresJobQueue<T>(
var cnt = await db.Jobs.Where(p => p.Status == Job.JobStatus.Running) var cnt = await db.Jobs.Where(p => p.Status == Job.JobStatus.Running)
.ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued)); .ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued));
if (cnt > 0) RaiseJobQueuedEvent(db); if (cnt > 0) RaiseJobQueuedEvent();
} }
private event EventHandler? QueuedChannelEvent; private event EventHandler? QueuedChannelEvent;
private event EventHandler? DelayedChannelEvent; private event EventHandler? DelayedChannelEvent;
// ReSharper disable once SuggestBaseTypeForParameter public void RaiseJobQueuedEvent() => QueuedChannelEvent?.Invoke(null, EventArgs.Empty);
public void RaiseJobQueuedEvent(DatabaseContext db) => QueuedChannelEvent?.Invoke(null, EventArgs.Empty); public void RaiseJobDelayedEvent() => DelayedChannelEvent?.Invoke(null, EventArgs.Empty);
// ReSharper disable once SuggestBaseTypeForParameter
public void RaiseJobDelayedEvent(DatabaseContext db) => DelayedChannelEvent?.Invoke(null, EventArgs.Empty);
private AsyncServiceScope GetScope() => _scopeFactory.CreateAsyncScope(); private AsyncServiceScope GetScope() => _scopeFactory.CreateAsyncScope();
@ -274,7 +268,7 @@ public class PostgresJobQueue<T>(
if (count > 0) if (count > 0)
{ {
RaiseJobQueuedEvent(db); RaiseJobQueuedEvent();
continue; continue;
} }
@ -312,7 +306,7 @@ public class PostgresJobQueue<T>(
if (ts.Value < DateTime.UtcNow) if (ts.Value < DateTime.UtcNow)
{ {
RaiseJobDelayedEvent(db); RaiseJobDelayedEvent();
} }
else else
{ {
@ -322,7 +316,7 @@ public class PostgresJobQueue<T>(
await using var bgScope = GetScope(); await using var bgScope = GetScope();
await using var bgDb = GetDbContext(bgScope); await using var bgDb = GetDbContext(bgScope);
await Task.Delay(trigger - DateTime.UtcNow, token); await Task.Delay(trigger - DateTime.UtcNow, token);
RaiseJobDelayedEvent(bgDb); RaiseJobDelayedEvent();
}, token); }, token);
} }
} }
@ -419,7 +413,7 @@ public class PostgresJobQueue<T>(
db.ChangeTracker.Clear(); db.ChangeTracker.Clear();
db.Update(job); db.Update(job);
await db.SaveChangesAsync(token); await db.SaveChangesAsync(token);
RaiseJobDelayedEvent(db); RaiseJobDelayedEvent();
return; return;
} }
} }
@ -458,7 +452,7 @@ public class PostgresJobQueue<T>(
}; };
db.Add(job); db.Add(job);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
RaiseJobQueuedEvent(db); RaiseJobQueuedEvent();
} }
public async Task ScheduleAsync(T jobData, DateTime triggerAt) public async Task ScheduleAsync(T jobData, DateTime triggerAt)
@ -476,6 +470,6 @@ public class PostgresJobQueue<T>(
}; };
db.Add(job); db.Add(job);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
RaiseJobDelayedEvent(db); RaiseJobDelayedEvent();
} }
} }