using Iceshrimp.Backend.Core.Database; using Iceshrimp.Backend.Core.Extensions; using Iceshrimp.Backend.Core.Services; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Caching.Distributed; using ProtoBuf; using StackExchange.Redis; namespace Iceshrimp.Backend.Core.Queues; public class DeliverQueue { public static JobQueue Create(IConnectionMultiplexer redis, string prefix) { return new JobQueue("deliver", DeliverQueueProcessorDelegateAsync, 20, redis, prefix); } private static async Task DeliverQueueProcessorDelegateAsync( DeliverJob job, IServiceProvider scope, CancellationToken token ) { var logger = scope.GetRequiredService>(); var httpClient = scope.GetRequiredService(); var httpRqSvc = scope.GetRequiredService(); var cache = scope.GetRequiredService(); var db = scope.GetRequiredService(); var fedCtrl = scope.GetRequiredService(); var followup = scope.GetRequiredService(); if (await fedCtrl.ShouldBlockAsync(job.InboxUrl, job.RecipientHost)) { logger.LogDebug("Refusing to deliver activity to blocked instance ({uri})", job.InboxUrl); return; } if (await fedCtrl.ShouldSkipAsync(job.RecipientHost)) { logger.LogDebug("fedCtrl.ShouldSkipAsync returned true, skipping"); return; } logger.LogDebug("Delivering activity to: {uri}", job.InboxUrl); var key = await cache.FetchAsync($"userPrivateKey:{job.UserId}", TimeSpan.FromMinutes(60), async () => { var keypair = await db.UserKeypairs.FirstOrDefaultAsync(p => p.UserId == job.UserId, token); return keypair?.PrivateKey ?? throw new Exception($"Failed to get keypair for user {job.UserId}"); }); var request = await httpRqSvc.PostSignedAsync(job.InboxUrl, job.Payload, job.ContentType, job.UserId, key); try { var response = await httpClient.SendAsync(request, token).WaitAsync(TimeSpan.FromSeconds(10), token); _ = followup.ExecuteTask("UpdateInstanceMetadata", async provider => { var instanceSvc = provider.GetRequiredService(); await instanceSvc.UpdateInstanceStatusAsync(job.RecipientHost, new Uri(job.InboxUrl).Host, (int)response.StatusCode, !response.IsSuccessStatusCode); }); response.EnsureSuccessStatusCode(); } catch (Exception e) { //TODO: prune dead instances after a while (and only resume sending activities after they come back) if (job.RetryCount++ < 10) { var jitter = TimeSpan.FromSeconds(new Random().Next(0, 60)); var baseDelay = TimeSpan.FromMinutes(1); var maxBackoff = TimeSpan.FromHours(8); var backoff = (Math.Pow(2, job.RetryCount) - 1) * baseDelay; if (backoff > maxBackoff) backoff = maxBackoff; backoff += jitter; job.ExceptionMessage = e.Message; job.ExceptionSource = e.Source; job.DelayedUntil = DateTime.Now + backoff; job.Status = Job.JobStatus.Delayed; } else { throw; } } } } [ProtoContract] public class DeliverJob : Job { [ProtoMember(1)] public required string InboxUrl; [ProtoMember(2)] public required string Payload; [ProtoMember(3)] public required string ContentType; [ProtoMember(10)] public required string UserId; [ProtoMember(11)] public required string RecipientHost; }