[backend/queue] Refactor cluster mode event methods to have less confusing method signatures

This commit is contained in:
Laura Hausmann 2024-07-24 18:01:26 +02:00
parent 35b42852e2
commit d7cfc24320
No known key found for this signature in database
GPG key ID: D044E84C5BE01605

View file

@ -88,9 +88,9 @@ public class QueueService(
if (queue == null) return; if (queue == null) return;
if (args.Channel == "queued") if (args.Channel == "queued")
queue.RaiseJobQueuedEvent(); queue.RaiseLocalJobQueuedEvent();
else else
queue.RaiseJobDelayedEvent(); queue.RaiseLocalJobDelayedEvent();
} }
catch catch
{ {
@ -253,7 +253,8 @@ public class QueueService(
.SetProperty(i => i.ExceptionSource, _ => null) .SetProperty(i => i.ExceptionSource, _ => null)
.SetProperty(i => i.StackTrace, _ => null)); .SetProperty(i => i.StackTrace, _ => null));
if (cnt <= 0) return; if (cnt <= 0) return;
_queues.FirstOrDefault(p => p.Name == job.Queue)?.RaiseJobQueuedEvent(); var queue = _queues.FirstOrDefault(p => p.Name == job.Queue);
if (queue != null) await queue.RaiseJobQueuedEvent(db);
} }
} }
} }
@ -266,9 +267,11 @@ public interface IPostgresJobQueue
public Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken); public Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken);
public Task RecoverOrPrepareForExitAsync(); public Task RecoverOrPrepareForExitAsync();
public Task RaiseJobQueuedEvent(DatabaseContext db);
public Task RaiseJobDelayedEvent(DatabaseContext db);
public void RaiseJobQueuedEvent(); public void RaiseLocalJobQueuedEvent();
public void RaiseJobDelayedEvent(); public void RaiseLocalJobDelayedEvent();
} }
public class PostgresJobQueue<T>( public class PostgresJobQueue<T>(
@ -287,8 +290,8 @@ public class PostgresJobQueue<T>(
public string Name => name; public string Name => name;
public TimeSpan Timeout => timeout; public TimeSpan Timeout => timeout;
public void RaiseJobQueuedEvent() => QueuedChannelEvent?.Invoke(null, EventArgs.Empty); public void RaiseLocalJobQueuedEvent() => QueuedChannelEvent?.Invoke(null, EventArgs.Empty);
public void RaiseJobDelayedEvent() => DelayedChannelEvent?.Invoke(null, EventArgs.Empty); public void RaiseLocalJobDelayedEvent() => DelayedChannelEvent?.Invoke(null, EventArgs.Empty);
public async Task ExecuteAsync( public async Task ExecuteAsync(
IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken IServiceScopeFactory scopeFactory, CancellationToken token, CancellationToken queueToken
@ -370,7 +373,7 @@ public class PostgresJobQueue<T>(
private event EventHandler? DelayedChannelEvent; private event EventHandler? DelayedChannelEvent;
// ReSharper disable once SuggestBaseTypeForParameter // ReSharper disable once SuggestBaseTypeForParameter
private async Task RaiseJobQueuedEvent(DatabaseContext db) public async Task RaiseJobQueuedEvent(DatabaseContext db)
{ {
if (_workerId == null) if (_workerId == null)
QueuedChannelEvent?.Invoke(null, EventArgs.Empty); QueuedChannelEvent?.Invoke(null, EventArgs.Empty);
@ -379,7 +382,7 @@ public class PostgresJobQueue<T>(
} }
// ReSharper disable once SuggestBaseTypeForParameter // ReSharper disable once SuggestBaseTypeForParameter
private async Task RaiseJobDelayedEvent(DatabaseContext db) public async Task RaiseJobDelayedEvent(DatabaseContext db)
{ {
if (_workerId == null) if (_workerId == null)
DelayedChannelEvent?.Invoke(null, EventArgs.Empty); DelayedChannelEvent?.Invoke(null, EventArgs.Empty);