diff --git a/Iceshrimp.Backend/Controllers/Mastodon/Schemas/InstanceInfoV2Response.cs b/Iceshrimp.Backend/Controllers/Mastodon/Schemas/InstanceInfoV2Response.cs index 5d8a227c..798b1e55 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/Schemas/InstanceInfoV2Response.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/Schemas/InstanceInfoV2Response.cs @@ -24,7 +24,7 @@ public class InstanceInfoV2Response( [J("registrations")] public InstanceRegistrations Registrations => new(config.Security); [J("configuration")] public InstanceConfigurationV2 Configuration => new(config.Instance); - [J("usage")] public required InstanceUsage Usage { get; set; } + [J("usage")] public required InstanceUsage Usage { get; set; } //TODO: add the rest } diff --git a/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs index 60bd51a1..ddba7356 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs @@ -16,17 +16,17 @@ public sealed class WebSocketConnection( CancellationToken ct ) : IDisposable { - private readonly SemaphoreSlim _lock = new(1); - public readonly List Channels = []; - public readonly EventService EventService = eventSvc; - public readonly OauthToken Token = token; - public readonly IServiceScope Scope = scopeFactory.CreateScope(); + private readonly SemaphoreSlim _lock = new(1); + public readonly List Channels = []; + public readonly EventService EventService = eventSvc; + public readonly IServiceScope Scope = scopeFactory.CreateScope(); + public readonly OauthToken Token = token; public void Dispose() { foreach (var channel in Channels) channel.Dispose(); - + Scope.Dispose(); } diff --git a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs index 90cadfc4..63611e3b 100644 --- a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs +++ b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs @@ -85,9 +85,9 @@ public class DatabaseContext(DbContextOptions options) public virtual DbSet AllowedInstances { get; init; } = null!; public virtual DbSet BlockedInstances { get; init; } = null!; public virtual DbSet MetaStore { get; init; } = null!; - public virtual DbSet DataProtectionKeys { get; init; } = null!; public virtual DbSet CacheStore { get; init; } = null!; - public virtual DbSet Jobs { get; init; } = null!; + public virtual DbSet Jobs { get; init; } = null!; + public virtual DbSet DataProtectionKeys { get; init; } = null!; public static NpgsqlDataSource GetDataSource(Config.DatabaseSection? config) { @@ -1248,7 +1248,7 @@ public class DatabaseContext(DbContextOptions options) .WithMany(p => p.Webhooks) .OnDelete(DeleteBehavior.Cascade); }); - + modelBuilder.Entity(entity => { entity.Property(e => e.Status).HasDefaultValue(Job.JobStatus.Queued); diff --git a/Iceshrimp.Backend/Core/Database/Tables/Job.cs b/Iceshrimp.Backend/Core/Database/Tables/Job.cs index c2a69a28..87000c52 100644 --- a/Iceshrimp.Backend/Core/Database/Tables/Job.cs +++ b/Iceshrimp.Backend/Core/Database/Tables/Job.cs @@ -11,6 +11,15 @@ namespace Iceshrimp.Backend.Core.Database.Tables; [Index("DelayedUntil")] public class Job { + public enum JobStatus + { + Queued, + Delayed, + Running, + Completed, + Failed + } + [Key] [Column("id")] public Guid Id { get; set; } [Column("queue")] public string Queue { get; set; } = null!; @@ -28,13 +37,4 @@ public class Job public long Duration => (long)((FinishedAt ?? DateTime.Now) - (StartedAt ?? QueuedAt)).TotalMilliseconds; [NotMapped] public long QueueDuration => (long)((StartedAt ?? DateTime.Now) - QueuedAt).TotalMilliseconds; - - public enum JobStatus - { - Queued, - Delayed, - Running, - Completed, - Failed - } } \ No newline at end of file diff --git a/Iceshrimp.Backend/Core/Extensions/ArrayDestructuringExtensions.cs b/Iceshrimp.Backend/Core/Extensions/ArrayDestructuringExtensions.cs index a9e22444..8ddb0207 100644 --- a/Iceshrimp.Backend/Core/Extensions/ArrayDestructuringExtensions.cs +++ b/Iceshrimp.Backend/Core/Extensions/ArrayDestructuringExtensions.cs @@ -39,7 +39,7 @@ public static class ArrayDestructuringExtensions item3 = array[2]; item4 = array[3]; } - + public static void Deconstruct(this T[] array, out T item1, out T item2, out T item3, out T item4, out T item5) { if (array.Length != 5) diff --git a/Iceshrimp.Backend/Core/Extensions/ListDestructuringExtensions.cs b/Iceshrimp.Backend/Core/Extensions/ListDestructuringExtensions.cs index 47910ae3..3c55389a 100644 --- a/Iceshrimp.Backend/Core/Extensions/ListDestructuringExtensions.cs +++ b/Iceshrimp.Backend/Core/Extensions/ListDestructuringExtensions.cs @@ -39,8 +39,10 @@ public static class ListDestructuringExtensions item3 = list[2]; item4 = list[3]; } - - public static void Deconstruct(this IList list, out T item1, out T item2, out T item3, out T item4, out T item5) + + public static void Deconstruct( + this IList list, out T item1, out T item2, out T item3, out T item4, out T item5 + ) { if (list.Count != 5) throw new Exception("This deconstructor only takes lists of length 5"); diff --git a/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs b/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs index fff824c6..70c27c17 100644 --- a/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs +++ b/Iceshrimp.Backend/Core/Helpers/EventHelpers.cs @@ -19,7 +19,7 @@ public sealed class AsyncAutoResetEvent(bool signaled) { // If the token is cancelled, cancel the waiter. var registration = - cancellationToken.Register(() => tcs.TrySetCanceled(), useSynchronizationContext: false); + cancellationToken.Register(() => tcs.TrySetCanceled(), false); // If the waiter completes or faults, unregister our interest in cancellation. tcs.Task.ContinueWith( diff --git a/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs b/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs index 108e3826..ac4655bc 100644 --- a/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs +++ b/Iceshrimp.Backend/Core/Queues/BackgroundTaskQueue.cs @@ -189,10 +189,10 @@ public class BackgroundTaskQueue() } } -[JsonDerivedType(typeof(BackgroundTaskJobData), typeDiscriminator: "base")] -[JsonDerivedType(typeof(DriveFileDeleteJobData), typeDiscriminator: "driveFileDelete")] -[JsonDerivedType(typeof(PollExpiryJobData), typeDiscriminator: "pollExpiry")] -[JsonDerivedType(typeof(MuteExpiryJobData), typeDiscriminator: "muteExpiry")] +[JsonDerivedType(typeof(BackgroundTaskJobData), "base")] +[JsonDerivedType(typeof(DriveFileDeleteJobData), "driveFileDelete")] +[JsonDerivedType(typeof(PollExpiryJobData), "pollExpiry")] +[JsonDerivedType(typeof(MuteExpiryJobData), "muteExpiry")] public class BackgroundTaskJobData : Job; public class DriveFileDeleteJobData : BackgroundTaskJobData diff --git a/Iceshrimp.Backend/Core/Services/CronService.cs b/Iceshrimp.Backend/Core/Services/CronService.cs index d2860b41..a1e69010 100644 --- a/Iceshrimp.Backend/Core/Services/CronService.cs +++ b/Iceshrimp.Backend/Core/Services/CronService.cs @@ -24,7 +24,7 @@ public class CronService(IServiceScopeFactory serviceScopeFactory) : BackgroundS { try { - await using var scope = serviceScopeFactory.CreateAsyncScope(); + await using var scope = serviceScopeFactory.CreateAsyncScope(); await task.Invoke(scope.ServiceProvider); } catch diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index 6e8e659a..b3ad76fa 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -14,12 +14,12 @@ namespace Iceshrimp.Backend.Core.Services; public class QueueService(IServiceScopeFactory scopeFactory) : BackgroundService { - private readonly List _queues = []; + private readonly List _queues = []; + public readonly BackgroundTaskQueue BackgroundTaskQueue = new(); + public readonly DeliverQueue DeliverQueue = new(); - public readonly InboxQueue InboxQueue = new(); - public readonly DeliverQueue DeliverQueue = new(); - public readonly PreDeliverQueue PreDeliverQueue = new(); - public readonly BackgroundTaskQueue BackgroundTaskQueue = new(); + public readonly InboxQueue InboxQueue = new(); + public readonly PreDeliverQueue PreDeliverQueue = new(); private async Task GetNpgsqlConnection(IServiceScope scope) { @@ -124,32 +124,16 @@ public class PostgresJobQueue( int parallelism ) : IPostgresJobQueue where T : class { - public string Name => name; - - private event EventHandler? QueuedChannelEvent; - private event EventHandler? DelayedChannelEvent; - - private readonly AsyncAutoResetEvent _queuedChannel = new(false); private readonly AsyncAutoResetEvent _delayedChannel = new(false); + private readonly AsyncAutoResetEvent _queuedChannel = new(false); + + private IServiceScopeFactory _scopeFactory = null!; + public string Name => name; + public void RaiseJobQueuedEvent() => QueuedChannelEvent?.Invoke(null, EventArgs.Empty); public void RaiseJobDelayedEvent() => DelayedChannelEvent?.Invoke(null, EventArgs.Empty); - // ReSharper disable once SuggestBaseTypeForParameter - private async Task RaiseJobQueuedEvent(DatabaseContext db) => - await db.Database.ExecuteSqlAsync($"SELECT pg_notify('queued', {name});"); - - // ReSharper disable once SuggestBaseTypeForParameter - private async Task RaiseJobDelayedEvent(DatabaseContext db) => - await db.Database.ExecuteSqlAsync($"SELECT pg_notify('delayed', {name});"); - - private AsyncServiceScope GetScope() => _scopeFactory.CreateAsyncScope(); - - private static DatabaseContext GetDbContext(IServiceScope scope) => - scope.ServiceProvider.GetRequiredService(); - - private IServiceScopeFactory _scopeFactory = null!; - public async Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token) { _scopeFactory = scopeFactory; @@ -170,10 +154,10 @@ public class PostgresJobQueue( var runningCount = await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Running, - cancellationToken: token); + token); var queuedCount = await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Queued, - cancellationToken: token); + token); var actualParallelism = Math.Min(parallelism - runningCount, queuedCount); if (actualParallelism == 0) @@ -198,6 +182,31 @@ public class PostgresJobQueue( } } + public async Task RecoverOrPrepareForExitAsync() + { + //TODO: Make this support clustering + await using var scope = GetScope(); + await using var db = GetDbContext(scope); + await db.Jobs.Where(p => p.Status == Job.JobStatus.Running) + .ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued)); + } + + private event EventHandler? QueuedChannelEvent; + private event EventHandler? DelayedChannelEvent; + + // ReSharper disable once SuggestBaseTypeForParameter + private async Task RaiseJobQueuedEvent(DatabaseContext db) => + await db.Database.ExecuteSqlAsync($"SELECT pg_notify('queued', {name});"); + + // ReSharper disable once SuggestBaseTypeForParameter + private async Task RaiseJobDelayedEvent(DatabaseContext db) => + await db.Database.ExecuteSqlAsync($"SELECT pg_notify('delayed', {name});"); + + private AsyncServiceScope GetScope() => _scopeFactory.CreateAsyncScope(); + + private static DatabaseContext GetDbContext(IServiceScope scope) => + scope.ServiceProvider.GetRequiredService(); + private async Task DelayedJobHandlerAsync(CancellationToken token) { using var loggerScope = _scopeFactory.CreateScope(); @@ -214,7 +223,7 @@ public class PostgresJobQueue( p.Status == Job.JobStatus.Delayed && (p.DelayedUntil == null || p.DelayedUntil < DateTime.UtcNow)) .ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued), - cancellationToken: token); + token); if (count > 0) continue; @@ -403,13 +412,4 @@ public class PostgresJobQueue( await RaiseJobDelayedEvent(db); } - - public async Task RecoverOrPrepareForExitAsync() - { - //TODO: Make this support clustering - await using var scope = GetScope(); - await using var db = GetDbContext(scope); - await db.Jobs.Where(p => p.Status == Job.JobStatus.Running) - .ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued)); - } } \ No newline at end of file diff --git a/Iceshrimp.Backend/Iceshrimp.Backend.csproj b/Iceshrimp.Backend/Iceshrimp.Backend.csproj index 27168487..e9ac3041 100644 --- a/Iceshrimp.Backend/Iceshrimp.Backend.csproj +++ b/Iceshrimp.Backend/Iceshrimp.Backend.csproj @@ -25,7 +25,7 @@ - +