[backend] Code cleanup

This commit is contained in:
Laura Hausmann 2024-03-24 14:17:02 +01:00
parent c02b2bd34c
commit a67fe3722d
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
11 changed files with 69 additions and 67 deletions

View file

@ -19,8 +19,8 @@ public sealed class WebSocketConnection(
private readonly SemaphoreSlim _lock = new(1); private readonly SemaphoreSlim _lock = new(1);
public readonly List<IChannel> Channels = []; public readonly List<IChannel> Channels = [];
public readonly EventService EventService = eventSvc; public readonly EventService EventService = eventSvc;
public readonly OauthToken Token = token;
public readonly IServiceScope Scope = scopeFactory.CreateScope(); public readonly IServiceScope Scope = scopeFactory.CreateScope();
public readonly OauthToken Token = token;
public void Dispose() public void Dispose()
{ {

View file

@ -85,9 +85,9 @@ public class DatabaseContext(DbContextOptions<DatabaseContext> options)
public virtual DbSet<AllowedInstance> AllowedInstances { get; init; } = null!; public virtual DbSet<AllowedInstance> AllowedInstances { get; init; } = null!;
public virtual DbSet<BlockedInstance> BlockedInstances { get; init; } = null!; public virtual DbSet<BlockedInstance> BlockedInstances { get; init; } = null!;
public virtual DbSet<MetaStoreEntry> MetaStore { get; init; } = null!; public virtual DbSet<MetaStoreEntry> MetaStore { get; init; } = null!;
public virtual DbSet<DataProtectionKey> DataProtectionKeys { get; init; } = null!;
public virtual DbSet<CacheEntry> CacheStore { get; init; } = null!; public virtual DbSet<CacheEntry> CacheStore { get; init; } = null!;
public virtual DbSet<Job> Jobs { get; init; } = null!; public virtual DbSet<Job> Jobs { get; init; } = null!;
public virtual DbSet<DataProtectionKey> DataProtectionKeys { get; init; } = null!;
public static NpgsqlDataSource GetDataSource(Config.DatabaseSection? config) public static NpgsqlDataSource GetDataSource(Config.DatabaseSection? config)
{ {

View file

@ -11,6 +11,15 @@ namespace Iceshrimp.Backend.Core.Database.Tables;
[Index("DelayedUntil")] [Index("DelayedUntil")]
public class Job public class Job
{ {
public enum JobStatus
{
Queued,
Delayed,
Running,
Completed,
Failed
}
[Key] [Column("id")] public Guid Id { get; set; } [Key] [Column("id")] public Guid Id { get; set; }
[Column("queue")] public string Queue { get; set; } = null!; [Column("queue")] public string Queue { get; set; } = null!;
@ -28,13 +37,4 @@ public class Job
public long Duration => (long)((FinishedAt ?? DateTime.Now) - (StartedAt ?? QueuedAt)).TotalMilliseconds; public long Duration => (long)((FinishedAt ?? DateTime.Now) - (StartedAt ?? QueuedAt)).TotalMilliseconds;
[NotMapped] public long QueueDuration => (long)((StartedAt ?? DateTime.Now) - QueuedAt).TotalMilliseconds; [NotMapped] public long QueueDuration => (long)((StartedAt ?? DateTime.Now) - QueuedAt).TotalMilliseconds;
public enum JobStatus
{
Queued,
Delayed,
Running,
Completed,
Failed
}
} }

View file

@ -40,7 +40,9 @@ public static class ListDestructuringExtensions
item4 = list[3]; item4 = list[3];
} }
public static void Deconstruct<T>(this IList<T> list, out T item1, out T item2, out T item3, out T item4, out T item5) public static void Deconstruct<T>(
this IList<T> list, out T item1, out T item2, out T item3, out T item4, out T item5
)
{ {
if (list.Count != 5) if (list.Count != 5)
throw new Exception("This deconstructor only takes lists of length 5"); throw new Exception("This deconstructor only takes lists of length 5");

View file

@ -19,7 +19,7 @@ public sealed class AsyncAutoResetEvent(bool signaled)
{ {
// If the token is cancelled, cancel the waiter. // If the token is cancelled, cancel the waiter.
var registration = var registration =
cancellationToken.Register(() => tcs.TrySetCanceled(), useSynchronizationContext: false); cancellationToken.Register(() => tcs.TrySetCanceled(), false);
// If the waiter completes or faults, unregister our interest in cancellation. // If the waiter completes or faults, unregister our interest in cancellation.
tcs.Task.ContinueWith( tcs.Task.ContinueWith(

View file

@ -189,10 +189,10 @@ public class BackgroundTaskQueue()
} }
} }
[JsonDerivedType(typeof(BackgroundTaskJobData), typeDiscriminator: "base")] [JsonDerivedType(typeof(BackgroundTaskJobData), "base")]
[JsonDerivedType(typeof(DriveFileDeleteJobData), typeDiscriminator: "driveFileDelete")] [JsonDerivedType(typeof(DriveFileDeleteJobData), "driveFileDelete")]
[JsonDerivedType(typeof(PollExpiryJobData), typeDiscriminator: "pollExpiry")] [JsonDerivedType(typeof(PollExpiryJobData), "pollExpiry")]
[JsonDerivedType(typeof(MuteExpiryJobData), typeDiscriminator: "muteExpiry")] [JsonDerivedType(typeof(MuteExpiryJobData), "muteExpiry")]
public class BackgroundTaskJobData : Job; public class BackgroundTaskJobData : Job;
public class DriveFileDeleteJobData : BackgroundTaskJobData public class DriveFileDeleteJobData : BackgroundTaskJobData

View file

@ -15,11 +15,11 @@ namespace Iceshrimp.Backend.Core.Services;
public class QueueService(IServiceScopeFactory scopeFactory) : BackgroundService public class QueueService(IServiceScopeFactory scopeFactory) : BackgroundService
{ {
private readonly List<IPostgresJobQueue> _queues = []; private readonly List<IPostgresJobQueue> _queues = [];
public readonly BackgroundTaskQueue BackgroundTaskQueue = new();
public readonly DeliverQueue DeliverQueue = new();
public readonly InboxQueue InboxQueue = new(); public readonly InboxQueue InboxQueue = new();
public readonly DeliverQueue DeliverQueue = new();
public readonly PreDeliverQueue PreDeliverQueue = new(); public readonly PreDeliverQueue PreDeliverQueue = new();
public readonly BackgroundTaskQueue BackgroundTaskQueue = new();
private async Task<NpgsqlConnection> GetNpgsqlConnection(IServiceScope scope) private async Task<NpgsqlConnection> GetNpgsqlConnection(IServiceScope scope)
{ {
@ -124,32 +124,16 @@ public class PostgresJobQueue<T>(
int parallelism int parallelism
) : IPostgresJobQueue where T : class ) : IPostgresJobQueue where T : class
{ {
public string Name => name; private readonly AsyncAutoResetEvent _delayedChannel = new(false);
private event EventHandler? QueuedChannelEvent;
private event EventHandler? DelayedChannelEvent;
private readonly AsyncAutoResetEvent _queuedChannel = new(false); private readonly AsyncAutoResetEvent _queuedChannel = new(false);
private readonly AsyncAutoResetEvent _delayedChannel = new(false);
private IServiceScopeFactory _scopeFactory = null!;
public string Name => name;
public void RaiseJobQueuedEvent() => QueuedChannelEvent?.Invoke(null, EventArgs.Empty); public void RaiseJobQueuedEvent() => QueuedChannelEvent?.Invoke(null, EventArgs.Empty);
public void RaiseJobDelayedEvent() => DelayedChannelEvent?.Invoke(null, EventArgs.Empty); public void RaiseJobDelayedEvent() => DelayedChannelEvent?.Invoke(null, EventArgs.Empty);
// ReSharper disable once SuggestBaseTypeForParameter
private async Task RaiseJobQueuedEvent(DatabaseContext db) =>
await db.Database.ExecuteSqlAsync($"SELECT pg_notify('queued', {name});");
// ReSharper disable once SuggestBaseTypeForParameter
private async Task RaiseJobDelayedEvent(DatabaseContext db) =>
await db.Database.ExecuteSqlAsync($"SELECT pg_notify('delayed', {name});");
private AsyncServiceScope GetScope() => _scopeFactory.CreateAsyncScope();
private static DatabaseContext GetDbContext(IServiceScope scope) =>
scope.ServiceProvider.GetRequiredService<DatabaseContext>();
private IServiceScopeFactory _scopeFactory = null!;
public async Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token) public async Task ExecuteAsync(IServiceScopeFactory scopeFactory, CancellationToken token)
{ {
_scopeFactory = scopeFactory; _scopeFactory = scopeFactory;
@ -170,10 +154,10 @@ public class PostgresJobQueue<T>(
var runningCount = var runningCount =
await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Running, await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Running,
cancellationToken: token); token);
var queuedCount = var queuedCount =
await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Queued, await db.Jobs.CountAsync(p => p.Queue == name && p.Status == Job.JobStatus.Queued,
cancellationToken: token); token);
var actualParallelism = Math.Min(parallelism - runningCount, queuedCount); var actualParallelism = Math.Min(parallelism - runningCount, queuedCount);
if (actualParallelism == 0) if (actualParallelism == 0)
@ -198,6 +182,31 @@ public class PostgresJobQueue<T>(
} }
} }
public async Task RecoverOrPrepareForExitAsync()
{
//TODO: Make this support clustering
await using var scope = GetScope();
await using var db = GetDbContext(scope);
await db.Jobs.Where(p => p.Status == Job.JobStatus.Running)
.ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued));
}
private event EventHandler? QueuedChannelEvent;
private event EventHandler? DelayedChannelEvent;
// ReSharper disable once SuggestBaseTypeForParameter
private async Task RaiseJobQueuedEvent(DatabaseContext db) =>
await db.Database.ExecuteSqlAsync($"SELECT pg_notify('queued', {name});");
// ReSharper disable once SuggestBaseTypeForParameter
private async Task RaiseJobDelayedEvent(DatabaseContext db) =>
await db.Database.ExecuteSqlAsync($"SELECT pg_notify('delayed', {name});");
private AsyncServiceScope GetScope() => _scopeFactory.CreateAsyncScope();
private static DatabaseContext GetDbContext(IServiceScope scope) =>
scope.ServiceProvider.GetRequiredService<DatabaseContext>();
private async Task DelayedJobHandlerAsync(CancellationToken token) private async Task DelayedJobHandlerAsync(CancellationToken token)
{ {
using var loggerScope = _scopeFactory.CreateScope(); using var loggerScope = _scopeFactory.CreateScope();
@ -214,7 +223,7 @@ public class PostgresJobQueue<T>(
p.Status == Job.JobStatus.Delayed && p.Status == Job.JobStatus.Delayed &&
(p.DelayedUntil == null || p.DelayedUntil < DateTime.UtcNow)) (p.DelayedUntil == null || p.DelayedUntil < DateTime.UtcNow))
.ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued), .ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued),
cancellationToken: token); token);
if (count > 0) continue; if (count > 0) continue;
@ -403,13 +412,4 @@ public class PostgresJobQueue<T>(
await RaiseJobDelayedEvent(db); await RaiseJobDelayedEvent(db);
} }
public async Task RecoverOrPrepareForExitAsync()
{
//TODO: Make this support clustering
await using var scope = GetScope();
await using var db = GetDbContext(scope);
await db.Jobs.Where(p => p.Status == Job.JobStatus.Running)
.ExecuteUpdateAsync(p => p.SetProperty(i => i.Status, i => Job.JobStatus.Queued));
}
} }