[backend/core] Delayed queue handling (ISH-86)
This commit is contained in:
parent
14ef53f577
commit
d976f82636
2 changed files with 134 additions and 35 deletions
|
@ -12,7 +12,7 @@ public class DeliverQueue
|
||||||
{
|
{
|
||||||
public static JobQueue<DeliverJob> Create(IConnectionMultiplexer redis, string prefix)
|
public static JobQueue<DeliverJob> Create(IConnectionMultiplexer redis, string prefix)
|
||||||
{
|
{
|
||||||
return new JobQueue<DeliverJob>("deliver", DeliverQueueProcessorDelegateAsync, 4, redis, prefix);
|
return new JobQueue<DeliverJob>("deliver", DeliverQueueProcessorDelegateAsync, 20, redis, prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static async Task DeliverQueueProcessorDelegateAsync(
|
private static async Task DeliverQueueProcessorDelegateAsync(
|
||||||
|
@ -44,7 +44,36 @@ public class DeliverQueue
|
||||||
|
|
||||||
var request =
|
var request =
|
||||||
await httpRqSvc.PostSignedAsync(job.InboxUrl, job.Payload, job.ContentType, job.UserId, key);
|
await httpRqSvc.PostSignedAsync(job.InboxUrl, job.Payload, job.ContentType, job.UserId, key);
|
||||||
await httpClient.SendAsync(request, token);
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var response = await httpClient.SendAsync(request, token).WaitAsync(TimeSpan.FromSeconds(10), token);
|
||||||
|
response.EnsureSuccessStatusCode();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
//TODO: prune dead instances after a while (and only resume sending activities after they come back)
|
||||||
|
|
||||||
|
if (job.RetryCount++ < 10)
|
||||||
|
{
|
||||||
|
var jitter = TimeSpan.FromSeconds(new Random().Next(0, 60));
|
||||||
|
var baseDelay = TimeSpan.FromMinutes(1);
|
||||||
|
var maxBackoff = TimeSpan.FromHours(8);
|
||||||
|
var backoff = (Math.Pow(2, job.RetryCount) - 1) * baseDelay;
|
||||||
|
if (backoff > maxBackoff)
|
||||||
|
backoff = maxBackoff;
|
||||||
|
backoff += jitter;
|
||||||
|
|
||||||
|
job.ExceptionMessage = e.Message;
|
||||||
|
job.ExceptionSource = e.Source;
|
||||||
|
job.DelayedUntil = DateTime.Now + backoff;
|
||||||
|
job.Status = Job.JobStatus.Delayed;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using Iceshrimp.Backend.Core.Configuration;
|
using Iceshrimp.Backend.Core.Configuration;
|
||||||
|
using Iceshrimp.Backend.Core.Extensions;
|
||||||
using Iceshrimp.Backend.Core.Helpers;
|
using Iceshrimp.Backend.Core.Helpers;
|
||||||
using Iceshrimp.Backend.Core.Middleware;
|
using Iceshrimp.Backend.Core.Middleware;
|
||||||
using Iceshrimp.Backend.Core.Queues;
|
using Iceshrimp.Backend.Core.Queues;
|
||||||
|
@ -40,11 +41,13 @@ public class QueueService : BackgroundService
|
||||||
await RecoverOrPrepareForExitAsync();
|
await RecoverOrPrepareForExitAsync();
|
||||||
token.Register(RecoverOrPrepareForExit);
|
token.Register(RecoverOrPrepareForExit);
|
||||||
|
|
||||||
while (!token.IsCancellationRequested)
|
await Task.Run(ExecuteBackgroundWorkers, token);
|
||||||
{
|
return;
|
||||||
foreach (var _ in _queues.Select(queue => queue.TickAsync(_serviceScopeFactory, token))) { }
|
|
||||||
|
|
||||||
await Task.Delay(100, token);
|
async Task? ExecuteBackgroundWorkers()
|
||||||
|
{
|
||||||
|
var tasks = _queues.Select(queue => queue.ExecuteAsync(_serviceScopeFactory, token));
|
||||||
|
await Task.WhenAll(tasks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,7 +65,7 @@ public class QueueService : BackgroundService
|
||||||
|
|
||||||
public interface IJobQueue
|
public interface IJobQueue
|
||||||
{
|
{
|
||||||
public Task TickAsync(IServiceScopeFactory scopeFactory, CancellationToken token);
|
public Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token);
|
||||||
public Task RecoverOrPrepareForExitAsync();
|
public Task RecoverOrPrepareForExitAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,31 +77,84 @@ public class JobQueue<T>(
|
||||||
string prefix
|
string prefix
|
||||||
) : IJobQueue where T : Job
|
) : IJobQueue where T : Job
|
||||||
{
|
{
|
||||||
//TODO: "Why is that best practice" - does this need to be called on every access? does not doing this cause a memory leak or something?
|
private readonly IDatabase _redisDb = redis.GetDatabase().WithKeyPrefix(prefix + name + ":");
|
||||||
// If this is /not/ required, we could call .WithKeyPrefix twice, once in the main method, (adding prefix) and once here, adding name to the then-passed IDatabase
|
private readonly ISubscriber _subscriber = redis.GetSubscriber();
|
||||||
private IDatabase Db => redis.GetDatabase().WithKeyPrefix(prefix + name + ":");
|
private readonly RedisChannel _queuedChannel = new(prefix + "channel:queued", RedisChannel.PatternMode.Literal);
|
||||||
|
private readonly RedisChannel _delayedChannel = new(prefix + "channel:delayed", RedisChannel.PatternMode.Literal);
|
||||||
|
|
||||||
public async Task TickAsync(IServiceScopeFactory scopeFactory, CancellationToken token)
|
private async Task DelayedJobHandlerAsync(IServiceScopeFactory scopeFactory, CancellationToken token)
|
||||||
{
|
{
|
||||||
var actualParallelism = Math.Min(parallelism - await Db.ListLengthAsync("running"),
|
var channel = await _subscriber.SubscribeAsync(_queuedChannel);
|
||||||
await Db.ListLengthAsync("queued"));
|
var logger = scopeFactory.CreateScope().ServiceProvider.GetRequiredService<ILogger<QueueService>>();
|
||||||
if (actualParallelism == 0) return;
|
while (!token.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var timestamp = (long)DateTime.Now.Subtract(DateTime.UnixEpoch).TotalSeconds;
|
||||||
|
var res = await _redisDb.SortedSetRangeByScoreAsync("delayed", 0, timestamp, take: 10);
|
||||||
|
|
||||||
var tasks = new List<Task>();
|
if (res.Length == 0)
|
||||||
for (var i = 0; i < actualParallelism; i++) tasks.Add(ProcessJobAsync(scopeFactory, token));
|
{
|
||||||
|
await channel.ReadAsync(token);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
await Task.WhenAll(tasks);
|
foreach (var item in res)
|
||||||
|
{
|
||||||
|
var transaction = _redisDb.CreateTransaction();
|
||||||
|
_ = transaction.ListRightPushAsync("queued", item);
|
||||||
|
_ = transaction.SortedSetRemoveAsync("delayed", item);
|
||||||
|
await transaction.ExecuteAsync();
|
||||||
|
await _subscriber.PublishAsync(_queuedChannel, "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
logger.LogError("DelayedJobHandlerAsync in queue {queue} failed with: {error}", name, e.Message);
|
||||||
|
await Task.Delay(1000, token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token)
|
||||||
|
{
|
||||||
|
var logger = scopeFactory.CreateScope().ServiceProvider.GetRequiredService<ILogger<QueueService>>();
|
||||||
|
_ = Task.Run(() => DelayedJobHandlerAsync(scopeFactory, token), token);
|
||||||
|
var channel = await _subscriber.SubscribeAsync(_queuedChannel);
|
||||||
|
while (!token.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var actualParallelism = Math.Min(parallelism - await _redisDb.ListLengthAsync("running"),
|
||||||
|
await _redisDb.ListLengthAsync("queued"));
|
||||||
|
if (actualParallelism == 0)
|
||||||
|
{
|
||||||
|
await channel.ReadAsync(token);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var tasks = new List<Task>();
|
||||||
|
for (var i = 0; i < actualParallelism; i++) tasks.Add(ProcessJobAsync(scopeFactory, token));
|
||||||
|
|
||||||
|
await Task.WhenAny(tasks);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
logger.LogError("ExecuteAsync in queue {queue} failed with: {error}", name, e.Message);
|
||||||
|
await Task.Delay(1000, token);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task RecoverOrPrepareForExitAsync()
|
public async Task RecoverOrPrepareForExitAsync()
|
||||||
{
|
{
|
||||||
while (await Db.ListLengthAsync("running") > 0)
|
while (await _redisDb.ListLengthAsync("running") > 0)
|
||||||
await Db.ListMoveAsync("running", "queued", ListSide.Right, ListSide.Left);
|
await _redisDb.ListMoveAsync("running", "queued", ListSide.Right, ListSide.Left);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task ProcessJobAsync(IServiceScopeFactory scopeFactory, CancellationToken token)
|
private async Task ProcessJobAsync(IServiceScopeFactory scopeFactory, CancellationToken token)
|
||||||
{
|
{
|
||||||
var res = await Db.ListMoveAsync("queued", "running", ListSide.Left, ListSide.Right);
|
var res = await _redisDb.ListMoveAsync("queued", "running", ListSide.Left, ListSide.Right);
|
||||||
if (res.IsNull || res.Box() is not byte[] buffer) return;
|
if (res.IsNull || res.Box() is not byte[] buffer) return;
|
||||||
var job = RedisHelpers.Deserialize<T>(buffer);
|
var job = RedisHelpers.Deserialize<T>(buffer);
|
||||||
if (job == null) return;
|
if (job == null) return;
|
||||||
|
@ -130,8 +186,8 @@ public class JobQueue<T>(
|
||||||
if (job.Status is Job.JobStatus.Failed)
|
if (job.Status is Job.JobStatus.Failed)
|
||||||
{
|
{
|
||||||
job.FinishedAt = DateTime.Now;
|
job.FinishedAt = DateTime.Now;
|
||||||
await Db.ListRemoveAsync("running", res, 1);
|
await _redisDb.ListRemoveAsync("running", res, 1);
|
||||||
await Db.ListRightPushAsync("failed", RedisValue.Unbox(RedisHelpers.Serialize(job)));
|
await _redisDb.ListRightPushAsync("failed", RedisValue.Unbox(RedisHelpers.Serialize(job)));
|
||||||
}
|
}
|
||||||
else if (job.Status is Job.JobStatus.Delayed)
|
else if (job.Status is Job.JobStatus.Delayed)
|
||||||
{
|
{
|
||||||
|
@ -149,8 +205,16 @@ public class JobQueue<T>(
|
||||||
job.FinishedAt = DateTime.Now;
|
job.FinishedAt = DateTime.Now;
|
||||||
|
|
||||||
var logger = scope.ServiceProvider.GetRequiredService<ILogger<QueueService>>();
|
var logger = scope.ServiceProvider.GetRequiredService<ILogger<QueueService>>();
|
||||||
logger.LogTrace("Job in queue {queue} completed after {duration} ms, was queued for {queueDuration} ms",
|
if (job.RetryCount == 0)
|
||||||
name, job.Duration, job.QueueDuration);
|
{
|
||||||
|
logger.LogTrace("Job in queue {queue} completed after {duration} ms, was queued for {queueDuration} ms",
|
||||||
|
name, job.Duration, job.QueueDuration);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
logger.LogTrace("Job in queue {queue} completed after {duration} ms, has been queued since {time}",
|
||||||
|
name, job.Duration, job.QueuedAt.ToStringMastodon());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var targetQueue = job.Status switch
|
var targetQueue = job.Status switch
|
||||||
|
@ -160,15 +224,23 @@ public class JobQueue<T>(
|
||||||
Job.JobStatus.Delayed => "delayed",
|
Job.JobStatus.Delayed => "delayed",
|
||||||
_ => throw new Exception("ProcessJob: unknown job state on finish")
|
_ => throw new Exception("ProcessJob: unknown job state on finish")
|
||||||
};
|
};
|
||||||
await Db.ListRemoveAsync("running", res, 1);
|
await _redisDb.ListRemoveAsync("running", res, 1);
|
||||||
if (targetQueue == "delayed")
|
if (targetQueue == "delayed")
|
||||||
{
|
{
|
||||||
await Db.ListRightPushAsync(targetQueue, RedisValue.Unbox(RedisHelpers.Serialize(job)));
|
job.DelayedUntil ??= DateTime.Now;
|
||||||
|
var logger = scope.ServiceProvider.GetRequiredService<ILogger<QueueService>>();
|
||||||
|
logger.LogTrace("Job in queue {queue} was delayed to {time} after {duration} ms, has been queued since {time}",
|
||||||
|
name, job.DelayedUntil.Value.ToStringMastodon(), job.Duration,
|
||||||
|
job.QueuedAt.ToStringMastodon());
|
||||||
|
|
||||||
|
var timestamp = (long)job.DelayedUntil.Value.Subtract(DateTime.UnixEpoch).TotalSeconds;
|
||||||
|
await _redisDb.SortedSetAddAsync(targetQueue, RedisValue.Unbox(RedisHelpers.Serialize(job)), timestamp);
|
||||||
|
await _subscriber.PublishAsync(_delayedChannel, "");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
await Db.ListLeftPushAsync(targetQueue, RedisValue.Unbox(RedisHelpers.Serialize(job)));
|
await _redisDb.ListLeftPushAsync(targetQueue, RedisValue.Unbox(RedisHelpers.Serialize(job)));
|
||||||
await Db.ListTrimAsync(targetQueue, 0, 9);
|
await _redisDb.ListTrimAsync(targetQueue, 0, 9);
|
||||||
}
|
}
|
||||||
|
|
||||||
scope.Dispose();
|
scope.Dispose();
|
||||||
|
@ -176,7 +248,8 @@ public class JobQueue<T>(
|
||||||
|
|
||||||
public async Task EnqueueAsync(T job)
|
public async Task EnqueueAsync(T job)
|
||||||
{
|
{
|
||||||
await Db.ListRightPushAsync("queued", RedisValue.Unbox(RedisHelpers.Serialize(job)));
|
await _redisDb.ListRightPushAsync("queued", RedisValue.Unbox(RedisHelpers.Serialize(job)));
|
||||||
|
await _subscriber.PublishAsync(_queuedChannel, "");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,15 +274,12 @@ public abstract class Job
|
||||||
[ProtoMember(3)] public DateTime? FinishedAt;
|
[ProtoMember(3)] public DateTime? FinishedAt;
|
||||||
[ProtoMember(4)] public DateTime? DelayedUntil;
|
[ProtoMember(4)] public DateTime? DelayedUntil;
|
||||||
|
|
||||||
|
[ProtoMember(5)] public int RetryCount;
|
||||||
|
|
||||||
[ProtoMember(10)] public string? ExceptionMessage;
|
[ProtoMember(10)] public string? ExceptionMessage;
|
||||||
[ProtoMember(11)] public string? ExceptionSource;
|
[ProtoMember(11)] public string? ExceptionSource;
|
||||||
|
|
||||||
public JobStatus Status = JobStatus.Queued;
|
public JobStatus Status = JobStatus.Queued;
|
||||||
public long Duration => (long)((FinishedAt ?? DateTime.Now) - (StartedAt ?? QueuedAt)).TotalMilliseconds;
|
public long Duration => (long)((FinishedAt ?? DateTime.Now) - (StartedAt ?? QueuedAt)).TotalMilliseconds;
|
||||||
public long QueueDuration => (long)((StartedAt ?? DateTime.Now) - QueuedAt).TotalMilliseconds;
|
public long QueueDuration => (long)((StartedAt ?? DateTime.Now) - QueuedAt).TotalMilliseconds;
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: handle delayed jobs
|
|
||||||
//TODO: retries
|
|
||||||
//TODO: exponential backoff with fail after certain point
|
|
||||||
//TODO: prune dead instances after a while (and only resume sending activities after they come back)
|
|
Loading…
Add table
Reference in a new issue