Iceshrimp.NET/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs
Kopper 94328a3eef [backend] Use a stack instead of queue when backfilling
This makes each reply chain load completely before loading the next
chain, instead of the current behavior that loads all replies of one
depth before loading the next "layer".

This won't make much of a difference *now*, but should result in more
intuitive behavior when live updating of newly loaded replies gets
implemented.
2024-11-06 20:02:29 +01:00

131 lines
5 KiB
C#

using AsyncKeyedLock;
using Iceshrimp.Backend.Core.Configuration;
using Iceshrimp.Backend.Core.Database;
using Iceshrimp.Backend.Core.Database.Tables;
using Iceshrimp.Backend.Core.Federation.ActivityStreams.Types;
using Iceshrimp.Backend.Core.Services;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using J = System.Text.Json.Serialization.JsonPropertyNameAttribute;
using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues;
file record struct BackfillData(string Id, string RepliesCollection);
public class BackfillQueue(int parallelism)
: PostgresJobQueue<BackfillJobData>("backfill", BackfillQueueProcessorDelegateAsync,
parallelism, TimeSpan.FromMinutes(10))
{
public const int MaxRepliesPerThread = 1000;
public const int MaxRepliesPerNote = 500;
public static readonly AsyncKeyedLocker<string> KeyedLocker = new(o =>
{
o.PoolSize = 100;
o.PoolInitialFill = 5;
});
private static async Task BackfillQueueProcessorDelegateAsync(
Job job,
BackfillJobData jobData,
IServiceProvider scope,
CancellationToken token
)
{
if (KeyedLocker.IsInUse(jobData.ThreadId)) return;
using var _ = await KeyedLocker.LockAsync(jobData.ThreadId, token);
var logger = scope.GetRequiredService<ILogger<BackfillQueue>>();
var backfillConfig = scope.GetRequiredService<IOptionsSnapshot<Config.BackfillSection>>();
var db = scope.GetRequiredService<DatabaseContext>();
var noteSvc = scope.GetRequiredService<NoteService>();
var objectResolver = scope.GetRequiredService<ActivityPub.ObjectResolver>();
var user = jobData.AuthenticatedUserId == null
? null
: await db.Users.Where(u => u.Id == jobData.AuthenticatedUserId).FirstOrDefaultAsync(token);
logger.LogDebug("Backfilling replies for thread {id} as user {userId}", jobData.ThreadId, user?.Username);
var cfg = backfillConfig.Value.Replies;
var backfillLimit = MaxRepliesPerThread;
var history = new HashSet<string>();
var toBackfillArray = await db.Notes
.Where(n => n.ThreadId == jobData.ThreadId
&& n.RepliesCount < MaxRepliesPerNote
&& n.UserHost != null
&& n.RepliesCollection != null
&& n.CreatedAt <= DateTime.UtcNow - cfg.NewNoteDelayTimeSpan
&& (n.RepliesFetchedAt == null || n.RepliesFetchedAt <= DateTime.UtcNow - cfg.RefreshAfterTimeSpan))
.Select(n => new BackfillData(n.Id, n.RepliesCollection!))
.ToArrayAsync(token);
var toBackfill = new Stack<BackfillData>(toBackfillArray);
while (toBackfill.TryPop(out var currentItem))
{
var current = currentItem;
if (!history.Add(current.RepliesCollection))
{
logger.LogDebug("Skipping {collection} as it was already backfilled in this run", current.RepliesCollection);
continue;
}
if (--backfillLimit <= 0)
{
logger.LogDebug("Reached backfill limit");
break;
}
logger.LogTrace("Backfilling collection {collection} (remaining limit {limit})", current.RepliesCollection, backfillLimit);
await db.Notes
.Where(n => n.Id == current.Id)
.ExecuteUpdateAsync(p => p.SetProperty(n => n.RepliesFetchedAt, DateTime.UtcNow), token);
await foreach (var asNote in objectResolver.IterateCollection(new ASCollection(current.RepliesCollection), user: user)
.Take(MaxRepliesPerNote)
.Where(p => p.Id != null)
.WithCancellation(token))
{
logger.LogTrace("Backfilling note {note} (remaining limit {limit})", asNote.Id, backfillLimit);
try
{
var note = await noteSvc.ResolveNoteAsync(asNote.Id!, asNote as ASNote, user, clearHistory: true,
forceRefresh: false);
backfillLimit -= Math.Max(noteSvc.NotesFetched, 1);
if (backfillLimit <= 0)
{
logger.LogDebug("Reached backfill limit");
toBackfill.Clear();
break;
}
if (note is { UserHost: not null, RepliesCollection: not null, RepliesCount: < MaxRepliesPerNote } &&
note.CreatedAt <= DateTime.UtcNow - cfg.NewNoteDelayTimeSpan &&
(note.RepliesFetchedAt == null ||
note.RepliesFetchedAt <= DateTime.UtcNow - cfg.RefreshAfterTimeSpan))
{
toBackfill.Push(new BackfillData(note.Id, note.RepliesCollection!));
}
}
catch (Exception e)
{
logger.LogWarning(e, "Failed to backfill {note}", asNote.Id);
}
}
}
await db.NoteThreads
.Where(t => t.Id == jobData.ThreadId)
.ExecuteUpdateAsync(p => p.SetProperty(t => t.BackfilledAt, DateTime.UtcNow), cancellationToken: default);
}
}
public class BackfillJobData
{
[JR] [J("threadId")] public required string ThreadId { get; set; }
[JR] [J("authenticatedUserId")] public required string? AuthenticatedUserId { get; set; }
}