From 71733733304ced9b0014fb19c19ff90f606a403f Mon Sep 17 00:00:00 2001 From: kopper Date: Thu, 24 Oct 2024 21:03:43 +0200 Subject: [PATCH] [backend/federation] Improved reply backfill Instead of backfilling every note we come across that has a reply collection, only schedule a backfill job if someone wants to see the replies (on GET MastoAPI /context, or Iceshrimp API /descendants) Reply backfilling is also done on a ThreadIdOrId basis as opposed to the previous way of backfilling individual notes. This allows us finer grained control over the recursion and frees up the job queue, alongside allowing for easier implementation of context collection backfill in the future (by mapping each context collection to a thread) --- Currently, note threads are implicit based on the "threadId" column of a note, which can be null (where it's the same as the note's "id") This commit turns note threads into an actual entity, and as a part of that, makes "threadId" non-nullable (by specifically setting it to "id" for those cases) This is done to attach extra metadata to the entire thread, currently just the time of when it was last backfilled, but more may be added in the future (the context collection associated with this thread, for example) --- The data format for backfill jobs have backwards-incompatibly changed since the introduction of the feature. We can drop all old jobs without causing too much trouble as they will be re-scheduled on demand --- Signed-off-by: Laura Hausmann --- .../Mastodon/ConversationsController.cs | 8 +- .../Mastodon/Renderers/NoteRenderer.cs | 8 +- .../Controllers/Mastodon/StatusController.cs | 6 +- .../Streaming/Channels/DirectChannel.cs | 2 +- .../Controllers/Web/MiscController.cs | 2 +- .../Controllers/Web/NoteController.cs | 6 +- .../Core/Configuration/Config.cs | 25 ++--- .../Core/Database/DatabaseContext.cs | 1 + .../DatabaseContextModelSnapshot.cs | 40 +++++++ .../20241022090702_AddNoteThread.cs | 77 +++++++++++++ .../20241023025700_RemoveOldBackfillJobs.cs | 27 +++++ ...0241024092102_AddThreadMutingForeignKey.cs | 36 ++++++ .../Core/Database/Tables/Note.cs | 10 +- .../Core/Database/Tables/NoteThread.cs | 24 ++++ .../Core/Database/Tables/NoteThreadMuting.cs | 4 + .../Core/Extensions/QueryableExtensions.cs | 4 +- .../Federation/ActivityPub/ObjectResolver.cs | 6 +- .../Core/Queues/BackfillQueue.cs | 105 ++++++++++++++---- .../Core/Services/NoteService.cs | 96 ++++++---------- .../Core/Services/QueueService.cs | 15 ++- .../Helpers/StreamingConnectionAggregate.cs | 2 +- Iceshrimp.Backend/configuration.ini | 15 +-- 22 files changed, 376 insertions(+), 143 deletions(-) create mode 100644 Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241022090702_AddNoteThread.cs create mode 100644 Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241023025700_RemoveOldBackfillJobs.cs create mode 100644 Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241024092102_AddThreadMutingForeignKey.cs create mode 100644 Iceshrimp.Backend/Core/Database/Tables/NoteThread.cs diff --git a/Iceshrimp.Backend/Controllers/Mastodon/ConversationsController.cs b/Iceshrimp.Backend/Controllers/Mastodon/ConversationsController.cs index 02ce63dc..1333307f 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/ConversationsController.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/ConversationsController.cs @@ -42,10 +42,10 @@ public class ConversationsController( .IncludeCommonProperties() .FilterHiddenConversations(user, db) .FilterMutedThreads(user, db) - .Paginate(p => p.ThreadIdOrId, pq, ControllerContext) + .Paginate(p => p.ThreadId, pq, ControllerContext) .Select(p => new Conversation { - Id = p.ThreadIdOrId, + Id = p.ThreadId, LastNote = p, UserIds = p.VisibleUserIds, Unread = db.Notifications.Any(n => n.Note == p && @@ -96,10 +96,10 @@ public class ConversationsController( var user = HttpContext.GetUserOrFail(); var conversation = await db.Conversations(user) .IncludeCommonProperties() - .Where(p => p.ThreadIdOrId == id) + .Where(p => p.ThreadId == id) .Select(p => new Conversation { - Id = p.ThreadIdOrId, + Id = p.ThreadId, LastNote = p, UserIds = p.VisibleUserIds, Unread = db.Notifications.Any(n => n.Note == p && diff --git a/Iceshrimp.Backend/Controllers/Mastodon/Renderers/NoteRenderer.cs b/Iceshrimp.Backend/Controllers/Mastodon/Renderers/NoteRenderer.cs index 10e2569e..102dbc03 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/Renderers/NoteRenderer.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/Renderers/NoteRenderer.cs @@ -64,8 +64,8 @@ public class NoteRenderer( await db.NoteLikes.AnyAsync(p => p.Note == note && p.User == user); var bookmarked = data?.BookmarkedNotes?.Contains(note.Id) ?? await db.NoteBookmarks.AnyAsync(p => p.Note == note && p.User == user); - var muted = data?.MutedNotes?.Contains(note.ThreadIdOrId) ?? - await db.NoteThreadMutings.AnyAsync(p => p.ThreadId == note.ThreadIdOrId && p.User == user); + var muted = data?.MutedNotes?.Contains(note.ThreadId) ?? + await db.NoteThreadMutings.AnyAsync(p => p.ThreadId == note.ThreadId && p.User == user); var pinned = data?.PinnedNotes?.Contains(note.Id) ?? await db.UserNotePins.AnyAsync(p => p.Note == note && p.User == user); var renoted = data?.Renotes?.Contains(note.Id) ?? @@ -167,7 +167,7 @@ public class NoteRenderer( Poll = poll, Reactions = reactions, Filtered = filterResult, - Pleroma = new PleromaStatusExtensions { Reactions = reactions, ConversationId = note.ThreadIdOrId } + Pleroma = new PleromaStatusExtensions { Reactions = reactions, ConversationId = note.ThreadId } }; return res; @@ -356,7 +356,7 @@ public class NoteRenderer( { if (user == null) return []; if (notes.Count == 0) return []; - var ids = notes.Select(p => p.ThreadIdOrId).Distinct(); + var ids = notes.Select(p => p.ThreadId).Distinct(); return await db.NoteThreadMutings.Where(p => p.User == user && ids.Contains(p.ThreadId)) .Select(p => p.ThreadId) .ToListAsync(); diff --git a/Iceshrimp.Backend/Controllers/Mastodon/StatusController.cs b/Iceshrimp.Backend/Controllers/Mastodon/StatusController.cs index b56f4543..1190ab37 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/StatusController.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/StatusController.cs @@ -123,6 +123,8 @@ public class StatusController( .PrecomputeVisibilities(user) .RenderAllForMastodonAsync(noteRenderer, user, Filter.FilterContext.Threads); + if (user != null) await noteSvc.EnqueueBackfillTaskAsync(note); + return new StatusContext { Ancestors = ancestors.OrderAncestors(), Descendants = descendants.OrderDescendants() @@ -652,7 +654,7 @@ public class StatusController( var user = HttpContext.GetUserOrFail(); var target = await db.Notes.Where(p => p.Id == id) .EnsureVisibleFor(user) - .Select(p => p.ThreadIdOrId) + .Select(p => p.ThreadId) .FirstOrDefaultAsync() ?? throw GracefulException.RecordNotFound(); @@ -677,7 +679,7 @@ public class StatusController( var user = HttpContext.GetUserOrFail(); var target = await db.Notes.Where(p => p.Id == id) .EnsureVisibleFor(user) - .Select(p => p.ThreadIdOrId) + .Select(p => p.ThreadId) .FirstOrDefaultAsync() ?? throw GracefulException.RecordNotFound(); diff --git a/Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/DirectChannel.cs b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/DirectChannel.cs index 57bb7920..cd448f17 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/DirectChannel.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/DirectChannel.cs @@ -91,7 +91,7 @@ public class DirectChannel(WebSocketConnection connection) : IChannel return new ConversationEntity { Accounts = accounts.ToList(), - Id = note.ThreadIdOrId, + Id = note.ThreadId, LastStatus = rendered, Unread = true }; diff --git a/Iceshrimp.Backend/Controllers/Web/MiscController.cs b/Iceshrimp.Backend/Controllers/Web/MiscController.cs index b79afda7..add16198 100644 --- a/Iceshrimp.Backend/Controllers/Web/MiscController.cs +++ b/Iceshrimp.Backend/Controllers/Web/MiscController.cs @@ -54,7 +54,7 @@ public class MiscController(DatabaseContext db, NoteRenderer noteRenderer, BiteS { var user = HttpContext.GetUserOrFail(); var notes = await db.Notes.IncludeCommonProperties() - .Where(p => db.NoteThreadMutings.Any(m => m.ThreadId == p.ThreadIdOrId)) + .Where(p => db.NoteThreadMutings.Any(m => m.ThreadId == p.ThreadId)) .EnsureVisibleFor(user) .FilterHidden(user, db, false, false) .Paginate(pq, ControllerContext) diff --git a/Iceshrimp.Backend/Controllers/Web/NoteController.cs b/Iceshrimp.Backend/Controllers/Web/NoteController.cs index 85267a6c..e2009dd8 100644 --- a/Iceshrimp.Backend/Controllers/Web/NoteController.cs +++ b/Iceshrimp.Backend/Controllers/Web/NoteController.cs @@ -137,6 +137,8 @@ public class NoteController( foreach (var item in res.Where(p => p.Reply != null && res.Any(i => i.Id == p.Reply.Id))) item.Reply = null; + if (user != null) await noteSvc.EnqueueBackfillTaskAsync(note); + return res.OrderDescendants(); } @@ -465,7 +467,7 @@ public class NoteController( var user = HttpContext.GetUserOrFail(); var target = await db.Notes.Where(p => p.Id == id) .EnsureVisibleFor(user) - .Select(p => p.ThreadIdOrId) + .Select(p => p.ThreadId) .FirstOrDefaultAsync() ?? throw GracefulException.NotFound("Note not found"); @@ -491,7 +493,7 @@ public class NoteController( var user = HttpContext.GetUserOrFail(); var target = await db.Notes.Where(p => p.Id == id) .EnsureVisibleFor(user) - .Select(p => p.ThreadIdOrId) + .Select(p => p.ThreadId) .FirstOrDefaultAsync() ?? throw GracefulException.NotFound("Note not found"); diff --git a/Iceshrimp.Backend/Core/Configuration/Config.cs b/Iceshrimp.Backend/Core/Configuration/Config.cs index 4e1406ce..7aaea27e 100644 --- a/Iceshrimp.Backend/Core/Configuration/Config.cs +++ b/Iceshrimp.Backend/Core/Configuration/Config.cs @@ -331,7 +331,7 @@ public sealed class Config [Range(1, int.MaxValue)] public int Deliver { get; init; } = 20; [Range(1, int.MaxValue)] public int PreDeliver { get; init; } = 4; [Range(1, int.MaxValue)] public int BackgroundTask { get; init; } = 4; - [Range(1, int.MaxValue)] public int Backfill { get; init; } = 4; + [Range(1, int.MaxValue)] public int Backfill { get; init; } = 10; } public sealed class QueueSection @@ -353,32 +353,23 @@ public sealed class Config public sealed class BackfillRepliesSection { public bool Enabled { get; init; } = false; - public bool BackfillEverything { get; init; } = false; - - public string? NewNoteThreshold - { - get => NewNoteThresholdTimeSpan.ToString(); - init => NewNoteThresholdTimeSpan = - ParseNaturalDuration(value, "new note threshold") ?? TimeSpan.FromHours(3); - } public string? NewNoteDelay { get => NewNoteDelayTimeSpan.ToString(); init => NewNoteDelayTimeSpan = - ParseNaturalDuration(value, "new note delay") ?? TimeSpan.FromHours(3); + ParseNaturalDuration(value, "new note delay") ?? TimeSpan.FromMinutes(5); } - public string? RefreshOnRenoteAfter + public string? RefreshAfter { - get => RefreshOnRenoteAfterTimeSpan.ToString(); - init => RefreshOnRenoteAfterTimeSpan = - ParseNaturalDuration(value, "refresh renote after duration") ?? TimeSpan.FromDays(7); + get => RefreshAfterTimeSpan.ToString(); + init => RefreshAfterTimeSpan = + ParseNaturalDuration(value, "refresh renote after duration") ?? TimeSpan.FromMinutes(15); } - public TimeSpan NewNoteThresholdTimeSpan = TimeSpan.FromHours(3); - public TimeSpan NewNoteDelayTimeSpan = TimeSpan.FromHours(3); - public TimeSpan RefreshOnRenoteAfterTimeSpan = TimeSpan.FromDays(7); + public TimeSpan NewNoteDelayTimeSpan = TimeSpan.FromMinutes(5); + public TimeSpan RefreshAfterTimeSpan = TimeSpan.FromMinutes(15); } private static readonly char[] Digits = [..Enumerable.Range(0, 10).Select(p => p.ToString()[0])]; diff --git a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs index 7586cc30..526b3855 100644 --- a/Iceshrimp.Backend/Core/Database/DatabaseContext.cs +++ b/Iceshrimp.Backend/Core/Database/DatabaseContext.cs @@ -43,6 +43,7 @@ public class DatabaseContext(DbContextOptions options) public virtual DbSet ModerationLogs { get; init; } = null!; public virtual DbSet Mutings { get; init; } = null!; public virtual DbSet Notes { get; init; } = null!; + public virtual DbSet NoteThreads { get; init; } = null!; public virtual DbSet NoteBookmarks { get; init; } = null!; public virtual DbSet NoteEdits { get; init; } = null!; public virtual DbSet NoteLikes { get; init; } = null!; diff --git a/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs b/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs index 32970fb7..a48e4ec7 100644 --- a/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs +++ b/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs @@ -2520,6 +2520,7 @@ namespace Iceshrimp.Backend.Core.Database.Migrations .HasColumnName("text"); b.Property("ThreadId") + .IsRequired() .HasMaxLength(256) .HasColumnType("character varying(256)") .HasColumnName("threadId"); @@ -2795,6 +2796,22 @@ namespace Iceshrimp.Backend.Core.Database.Migrations b.ToTable("note_reaction"); }); + modelBuilder.Entity("Iceshrimp.Backend.Core.Database.Tables.NoteThread", b => + { + b.Property("Id") + .HasMaxLength(256) + .HasColumnType("character varying(256)") + .HasColumnName("id"); + + b.Property("BackfilledAt") + .HasColumnType("timestamp with time zone") + .HasColumnName("backfilledAt"); + + b.HasKey("Id"); + + b.ToTable("note_thread"); + }); + modelBuilder.Entity("Iceshrimp.Backend.Core.Database.Tables.NoteThreadMuting", b => { b.Property("Id") @@ -5266,6 +5283,12 @@ namespace Iceshrimp.Backend.Core.Database.Migrations .HasForeignKey("ReplyId") .OnDelete(DeleteBehavior.Cascade); + b.HasOne("Iceshrimp.Backend.Core.Database.Tables.NoteThread", "Thread") + .WithMany("Notes") + .HasForeignKey("ThreadId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + b.HasOne("Iceshrimp.Backend.Core.Database.Tables.User", "User") .WithMany("Notes") .HasForeignKey("UserId") @@ -5278,6 +5301,8 @@ namespace Iceshrimp.Backend.Core.Database.Migrations b.Navigation("Reply"); + b.Navigation("Thread"); + b.Navigation("User"); }); @@ -5351,12 +5376,20 @@ namespace Iceshrimp.Backend.Core.Database.Migrations modelBuilder.Entity("Iceshrimp.Backend.Core.Database.Tables.NoteThreadMuting", b => { + b.HasOne("Iceshrimp.Backend.Core.Database.Tables.NoteThread", "Thread") + .WithMany("NoteThreadMutings") + .HasForeignKey("ThreadId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + b.HasOne("Iceshrimp.Backend.Core.Database.Tables.User", "User") .WithMany("NoteThreadMutings") .HasForeignKey("UserId") .OnDelete(DeleteBehavior.Cascade) .IsRequired(); + b.Navigation("Thread"); + b.Navigation("User"); }); @@ -5913,6 +5946,13 @@ namespace Iceshrimp.Backend.Core.Database.Migrations b.Navigation("UserNotePins"); }); + modelBuilder.Entity("Iceshrimp.Backend.Core.Database.Tables.NoteThread", b => + { + b.Navigation("NoteThreadMutings"); + + b.Navigation("Notes"); + }); + modelBuilder.Entity("Iceshrimp.Backend.Core.Database.Tables.OauthApp", b => { b.Navigation("OauthTokens"); diff --git a/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241022090702_AddNoteThread.cs b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241022090702_AddNoteThread.cs new file mode 100644 index 00000000..7e310ebb --- /dev/null +++ b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241022090702_AddNoteThread.cs @@ -0,0 +1,77 @@ +using System; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Iceshrimp.Backend.Core.Database.Migrations +{ + /// + [DbContext(typeof(DatabaseContext))] + [Migration("20241022090702_AddNoteThread")] + public partial class AddNoteThread : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "note_thread", + columns: table => new + { + id = table.Column(type: "character varying(256)", maxLength: 256, nullable: false), + backfilledAt = table.Column(type: "timestamp with time zone", nullable: true) + }, + constraints: table => + { + table.PrimaryKey("PK_note_thread", x => x.id); + }); + + migrationBuilder.Sql(""" + UPDATE "note" SET "threadId"="id" WHERE "threadId" is NULL; + INSERT INTO "note_thread"("id") SELECT DISTINCT "threadId" FROM "note"; + """); + + + migrationBuilder.AlterColumn( + name: "threadId", + table: "note", + type: "character varying(256)", + maxLength: 256, + nullable: false, + defaultValue: "", + oldClrType: typeof(string), + oldType: "character varying(256)", + oldMaxLength: 256, + oldNullable: true); + + migrationBuilder.AddForeignKey( + name: "FK_note_note_thread_threadId", + table: "note", + column: "threadId", + principalTable: "note_thread", + principalColumn: "id", + onDelete: ReferentialAction.Cascade); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropForeignKey( + name: "FK_note_note_thread_threadId", + table: "note"); + + migrationBuilder.DropTable( + name: "note_thread"); + + migrationBuilder.AlterColumn( + name: "threadId", + table: "note", + type: "character varying(256)", + maxLength: 256, + nullable: true, + oldClrType: typeof(string), + oldType: "character varying(256)", + oldMaxLength: 256); + } + } +} diff --git a/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241023025700_RemoveOldBackfillJobs.cs b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241023025700_RemoveOldBackfillJobs.cs new file mode 100644 index 00000000..efdd98ee --- /dev/null +++ b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241023025700_RemoveOldBackfillJobs.cs @@ -0,0 +1,27 @@ +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Infrastructure; + +#nullable disable + +namespace Iceshrimp.Backend.Core.Database.Migrations +{ + /// + [DbContext(typeof(DatabaseContext))] + [Migration("20241023025700_RemoveOldBackfillJobs")] + public partial class RemoveOldBackfillJobs : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + // the data format for backfill jobs have backwards-incompatibly changed since the introduction of the feature + // we can drop all old jobs without causing too much trouble as they will be re-scheduled on demand + migrationBuilder.Sql("""DELETE FROM "jobs" WHERE "queue" = 'backfill';"""); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + // nothing to do! + } + } +} diff --git a/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241024092102_AddThreadMutingForeignKey.cs b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241024092102_AddThreadMutingForeignKey.cs new file mode 100644 index 00000000..ea829606 --- /dev/null +++ b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta4/20241024092102_AddThreadMutingForeignKey.cs @@ -0,0 +1,36 @@ +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Infrastructure; + +#nullable disable + +namespace Iceshrimp.Backend.Core.Database.Migrations +{ + /// + [DbContext(typeof(DatabaseContext))] + [Migration("20241024092102_AddThreadMutingForeignKey")] + public partial class AddThreadMutingForeignKey : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder + .Sql("""DELETE FROM "note_thread_muting" WHERE "threadId" NOT IN (SELECT "id" FROM "note_thread");"""); + + migrationBuilder.AddForeignKey( + name: "FK_note_thread_muting_note_thread_threadId", + table: "note_thread_muting", + column: "threadId", + principalTable: "note_thread", + principalColumn: "id", + onDelete: ReferentialAction.Cascade); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropForeignKey( + name: "FK_note_thread_muting_note_thread_threadId", + table: "note_thread_muting"); + } + } +} diff --git a/Iceshrimp.Backend/Core/Database/Tables/Note.cs b/Iceshrimp.Backend/Core/Database/Tables/Note.cs index 63a292c4..eed747bb 100644 --- a/Iceshrimp.Backend/Core/Database/Tables/Note.cs +++ b/Iceshrimp.Backend/Core/Database/Tables/Note.cs @@ -199,9 +199,7 @@ public class Note : IEntity [Column("threadId")] [StringLength(256)] - public string? ThreadId { get; set; } - - [Projectable] [NotMapped] public string ThreadIdOrId => ThreadId ?? Id; + public string ThreadId { get; set; } = null!; /// /// The updated date of the Note. @@ -220,7 +218,7 @@ public class Note : IEntity public DateTime? RepliesFetchedAt { get;set; } [Column("combinedAltText")] - public string? CombinedAltText { get; set; } + public string? CombinedAltText { get; set; } [ForeignKey(nameof(ChannelId))] [InverseProperty(nameof(Tables.Channel.Notes))] @@ -276,6 +274,10 @@ public class Note : IEntity [ForeignKey(nameof(ReplyId))] [InverseProperty(nameof(InverseReply))] public virtual Note? Reply { get; set; } + + [ForeignKey(nameof(ThreadId))] + [InverseProperty(nameof(NoteThread.Notes))] + public virtual NoteThread Thread { get; set; } = null!; [Projectable] public string RawAttachments diff --git a/Iceshrimp.Backend/Core/Database/Tables/NoteThread.cs b/Iceshrimp.Backend/Core/Database/Tables/NoteThread.cs new file mode 100644 index 00000000..42e079db --- /dev/null +++ b/Iceshrimp.Backend/Core/Database/Tables/NoteThread.cs @@ -0,0 +1,24 @@ +using System.ComponentModel.DataAnnotations; +using System.ComponentModel.DataAnnotations.Schema; + +namespace Iceshrimp.Backend.Core.Database.Tables; + +[Table("note_thread")] +public class NoteThread : IEntity +{ + [Column("id")] + [StringLength(256)] + public required string Id { get; set; } + + /// + /// The last time this thread has been backfilled. + /// + [Column("backfilledAt")] + public DateTime? BackfilledAt { get; set; } + + [InverseProperty(nameof(Note.Thread))] + public virtual ICollection Notes { get; set; } = new List(); + + [InverseProperty(nameof(NoteThreadMuting.Thread))] + public virtual ICollection NoteThreadMutings { get; set; } = new List(); +} diff --git a/Iceshrimp.Backend/Core/Database/Tables/NoteThreadMuting.cs b/Iceshrimp.Backend/Core/Database/Tables/NoteThreadMuting.cs index cf6c5dff..9fffdf74 100644 --- a/Iceshrimp.Backend/Core/Database/Tables/NoteThreadMuting.cs +++ b/Iceshrimp.Backend/Core/Database/Tables/NoteThreadMuting.cs @@ -26,4 +26,8 @@ public class NoteThreadMuting [ForeignKey(nameof(UserId))] [InverseProperty(nameof(Tables.User.NoteThreadMutings))] public virtual User User { get; set; } = null!; + + [ForeignKey(nameof(ThreadId))] + [InverseProperty(nameof(NoteThread.NoteThreadMutings))] + public virtual NoteThread Thread { get; set; } = null!; } \ No newline at end of file diff --git a/Iceshrimp.Backend/Core/Extensions/QueryableExtensions.cs b/Iceshrimp.Backend/Core/Extensions/QueryableExtensions.cs index bb684953..5d106ccd 100644 --- a/Iceshrimp.Backend/Core/Extensions/QueryableExtensions.cs +++ b/Iceshrimp.Backend/Core/Extensions/QueryableExtensions.cs @@ -452,7 +452,7 @@ public static class QueryableExtensions public static IQueryable FilterMutedThreads(this IQueryable query, User user, DatabaseContext db) { return query.Where(p => p.User == user || - !db.NoteThreadMutings.Any(m => m.User == user && m.ThreadId == p.ThreadIdOrId)); + !db.NoteThreadMutings.Any(m => m.User == user && m.ThreadId == p.ThreadId)); } public static IQueryable FilterMutedThreads( @@ -460,7 +460,7 @@ public static class QueryableExtensions ) { return query.Where(p => p.Note == null || - !db.NoteThreadMutings.Any(m => m.User == user && m.ThreadId == p.Note.ThreadIdOrId)); + !db.NoteThreadMutings.Any(m => m.User == user && m.ThreadId == p.Note.ThreadId)); } private static (IQueryable hidden, IQueryable? mentionsHidden) FilterHiddenInternal( diff --git a/Iceshrimp.Backend/Core/Federation/ActivityPub/ObjectResolver.cs b/Iceshrimp.Backend/Core/Federation/ActivityPub/ObjectResolver.cs index a52c994b..8651ca01 100644 --- a/Iceshrimp.Backend/Core/Federation/ActivityPub/ObjectResolver.cs +++ b/Iceshrimp.Backend/Core/Federation/ActivityPub/ObjectResolver.cs @@ -107,13 +107,9 @@ public class ObjectResolver( yield return item; if (page.Next?.Id != null) - { - if (visitedPages.Contains(page.Next.Id)) + if (!visitedPages.Add(page.Next.Id)) break; - visitedPages.Add(page.Next.Id); - } - // we only limit based on pages here. the consumer of this iterator may // additionally limit per-item via System.Linq.Async Take() if (--pageLimit <= 0) diff --git a/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs b/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs index b2c1336c..fe7da6e2 100644 --- a/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs +++ b/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs @@ -1,18 +1,31 @@ +using AsyncKeyedLock; +using Iceshrimp.Backend.Core.Configuration; using Iceshrimp.Backend.Core.Database; using Iceshrimp.Backend.Core.Database.Tables; using Iceshrimp.Backend.Core.Federation.ActivityStreams.Types; using Iceshrimp.Backend.Core.Services; using Microsoft.EntityFrameworkCore; -using Newtonsoft.Json.Linq; +using Microsoft.Extensions.Options; using J = System.Text.Json.Serialization.JsonPropertyNameAttribute; using JR = System.Text.Json.Serialization.JsonRequiredAttribute; namespace Iceshrimp.Backend.Core.Queues; +file record struct BackfillData(string Id, string RepliesCollection); + public class BackfillQueue(int parallelism) : PostgresJobQueue("backfill", BackfillQueueProcessorDelegateAsync, - parallelism, TimeSpan.FromMinutes(5)) + parallelism, TimeSpan.FromMinutes(10)) { + public const int MaxRepliesPerThread = 1000; + public const int MaxRepliesPerNote = 500; + + public static readonly AsyncKeyedLocker KeyedLocker = new(o => + { + o.PoolSize = 100; + o.PoolInitialFill = 5; + }); + private static async Task BackfillQueueProcessorDelegateAsync( Job job, BackfillJobData jobData, @@ -20,38 +33,82 @@ public class BackfillQueue(int parallelism) CancellationToken token ) { - // Exit early if a job that ran out of it's recursion limit has been queued. - // This should usually not happen, but just in case. - if (jobData.RecursionLimit <= 0) return; - - var logger = scope.GetRequiredService>(); - logger.LogDebug("Backfilling replies for note {id} as user {userId}", jobData.NoteId, - jobData.AuthenticatedUserId); - - var db = scope.GetRequiredService(); - - var note = await db.Notes.Where(n => n.Id == jobData.NoteId).FirstOrDefaultAsync(token); - if (note == null) - return; - + if (KeyedLocker.IsInUse(jobData.ThreadId)) return; + using var _ = await KeyedLocker.LockAsync(jobData.ThreadId, token); + + var logger = scope.GetRequiredService>(); + var backfillConfig = scope.GetRequiredService>(); + var db = scope.GetRequiredService(); + var noteSvc = scope.GetRequiredService(); + var objectResolver = scope.GetRequiredService(); + var user = jobData.AuthenticatedUserId == null ? null : await db.Users.Where(u => u.Id == jobData.AuthenticatedUserId).FirstOrDefaultAsync(token); + + logger.LogDebug("Backfilling replies for thread {id} as user {userId}", jobData.ThreadId, user?.Username); - var noteSvc = scope.GetRequiredService(); + var cfg = backfillConfig.Value.Replies; + var backfillLimit = MaxRepliesPerThread; + var history = new HashSet(); + + var toBackfillArray = await db.Notes + .Where(n => n.ThreadId == jobData.ThreadId + && n.RepliesCount < MaxRepliesPerNote + && n.UserHost != null + && n.RepliesCollection != null + && n.CreatedAt <= DateTime.UtcNow - cfg.NewNoteDelayTimeSpan + && (n.RepliesFetchedAt == null || n.RepliesFetchedAt <= DateTime.UtcNow - cfg.RefreshAfterTimeSpan)) + .Select(n => new BackfillData(n.Id, n.RepliesCollection!)) + .ToArrayAsync(token); + + var toBackfill = new Queue(toBackfillArray); + while (toBackfill.TryDequeue(out var _current)) + { + var current = _current; + if (!history.Add(current.RepliesCollection)) + { + logger.LogDebug("Skipping {collection} as it was already backfilled in this run", current.RepliesCollection); + continue; + } - ASCollection? collection = null; - if (jobData.Collection != null) - collection = ASObject.Deserialize(JToken.Parse(jobData.Collection)) as ASCollection; + logger.LogTrace("Backfilling {collection} (remaining limit {limit})", current.RepliesCollection, backfillLimit); + + await db.Notes + .Where(n => n.Id == current.Id) + .ExecuteUpdateAsync(p => p.SetProperty(n => n.RepliesFetchedAt, DateTime.UtcNow), token); - await noteSvc.BackfillRepliesAsync(note, user, collection, jobData.RecursionLimit); + await foreach (var asNote in objectResolver.IterateCollection(new ASCollection(current.RepliesCollection)) + .Take(MaxRepliesPerNote) + .Where(p => p.Id != null) + .WithCancellation(token)) + { + if (--backfillLimit <= 0) + { + logger.LogDebug("Reached backfill limit"); + toBackfill.Clear(); + break; + } + + var note = await noteSvc.ResolveNoteAsync(asNote.Id!, asNote as ASNote, user, clearHistory: true, forceRefresh: false); + + if (note is { UserHost: not null, RepliesCollection: not null, RepliesCount: < MaxRepliesPerNote } + && note.CreatedAt <= DateTime.UtcNow - cfg.NewNoteDelayTimeSpan + && (note.RepliesFetchedAt == null || note.RepliesFetchedAt <= DateTime.UtcNow - cfg.RefreshAfterTimeSpan)) + { + toBackfill.Enqueue(new BackfillData(note.Id, note.RepliesCollection!)); + } + } + } + + await db.NoteThreads + .Where(t => t.Id == jobData.ThreadId) + .ExecuteUpdateAsync(p => p.SetProperty(t => t.BackfilledAt, DateTime.UtcNow), cancellationToken: default); } } public class BackfillJobData { - [JR] [J("noteId")] public required string NoteId { get; set; } - [JR] [J("recursionLimit")] public required int RecursionLimit { get; set; } + [JR] [J("threadId")] public required string ThreadId { get; set; } [JR] [J("authenticatedUserId")] public required string? AuthenticatedUserId { get; set; } - [JR] [J("collection")] public string? Collection { get; set; } } \ No newline at end of file diff --git a/Iceshrimp.Backend/Core/Services/NoteService.cs b/Iceshrimp.Backend/Core/Services/NoteService.cs index 27d3821a..c670e3f8 100644 --- a/Iceshrimp.Backend/Core/Services/NoteService.cs +++ b/Iceshrimp.Backend/Core/Services/NoteService.cs @@ -246,9 +246,15 @@ public class NoteService( var combinedAltText = data.Attachments?.Select(p => p.Comment).Where(c => c != null); policySvc.CallRewriteHooks(data, IRewritePolicy.HookLocationEnum.PostLogic); + var noteId = IdHelpers.GenerateSlowflakeId(data.CreatedAt); + var threadId = data.Reply?.ThreadId ?? noteId; + + var thread = await db.NoteThreads.Where(t => t.Id == threadId).FirstOrDefaultAsync() ?? + new NoteThread { Id = threadId }; + var note = new Note { - Id = IdHelpers.GenerateSlowflakeId(data.CreatedAt), + Id = noteId, Uri = data.Uri, Url = data.Url, Text = data.Text?.Trim(), @@ -269,7 +275,7 @@ public class NoteService( Mentions = mentionedUserIds, VisibleUserIds = visibleUserIds, MentionedRemoteUsers = remoteMentions, - ThreadId = data.Reply?.ThreadIdOrId, + Thread = thread, Tags = tags, LocalOnly = data.LocalOnly, Emojis = data.Emoji ?? [], @@ -367,23 +373,6 @@ public class NoteService( }); } - var replyBackfillConfig = backfillConfig.Value.Replies; - - // If we're renoting a note we backfilled replies to some time ago (and know how to backfill), enqueue a backfill. - if (replyBackfillConfig.Enabled && - data.Renote?.RepliesCollection != null && - data.Renote.RepliesFetchedAt?.Add(replyBackfillConfig.RefreshOnRenoteAfterTimeSpan) <= DateTime.UtcNow && - _recursionLimit > 0) - { - logger.LogDebug("Enqueueing reply collection fetch for renote {renoteId}", data.Renote.Id); - await queueSvc.BackfillQueue.EnqueueAsync(new BackfillJobData - { - NoteId = data.Renote.Id, - RecursionLimit = _recursionLimit, - AuthenticatedUserId = null, // TODO: for private replies - }); - } - if (data.User.IsRemoteUser) { _ = followupTaskSvc.ExecuteTask("UpdateInstanceNoteCounter", async provider => @@ -395,29 +384,6 @@ public class NoteService( .ExecuteUpdateAsync(p => p.SetProperty(i => i.NotesCount, i => i.NotesCount + 1)); }); - if (replyBackfillConfig.Enabled && note.RepliesCollection != null && _recursionLimit > 0) - { - var jobData = new BackfillJobData - { - NoteId = note.Id, - RecursionLimit = _recursionLimit, - AuthenticatedUserId = null, // TODO: for private replies - Collection = data.ASNote?.Replies?.IsUnresolved == false - ? JsonConvert.SerializeObject(data.ASNote.Replies, LdHelpers.JsonSerializerSettings) - : null - }; - - logger.LogDebug("Enqueueing reply collection fetch for note {noteId}", note.Id); - - // Delay reply backfilling for brand new notes to allow them time to collect replies. - if (note.CreatedAt + replyBackfillConfig.NewNoteThresholdTimeSpan <= DateTime.UtcNow) - await queueSvc.BackfillQueue.EnqueueAsync(jobData); - else - await queueSvc.BackfillQueue.ScheduleAsync(jobData, - DateTime.UtcNow + - replyBackfillConfig.NewNoteDelayTimeSpan); - } - return note; } @@ -1258,27 +1224,39 @@ public class NoteService( return await ResolveNoteAsync(note.Id, note); } - public async Task BackfillRepliesAsync( - Note note, User? fetchUser, ASCollection? repliesCollection, int recursionLimit - ) + public async Task EnqueueBackfillTaskAsync(Note note) { - if (note.RepliesCollection == null) return; - await db.Notes.Where(p => p == note) - .ExecuteUpdateAsync(p => p.SetProperty(i => i.RepliesFetchedAt, _ => DateTime.UtcNow)); + var cfg = backfillConfig.Value.Replies; - repliesCollection ??= new ASCollection(note.RepliesCollection); + // return immediately if backfilling is not enabled + if (!cfg.Enabled) return; - var collectionId = new Uri(note.RepliesCollection); - var replyBackfillConfig = backfillConfig.Value.Replies; + // don't try to schedule a backfill for local notes + if (note.UserHost == null) return; - _recursionLimit = recursionLimit; - await objectResolver.IterateCollection(repliesCollection) - .Take(50) - .Where(p => p.Id != null && - (replyBackfillConfig.BackfillEverything || - new Uri(p.Id).Authority == collectionId.Authority)) - .Select(p => ResolveNoteAsync(p.Id!, null, fetchUser, forceRefresh: false)) - .AwaitAllNoConcurrencyAsync(); + // don't try to schedule a backfill when we're actively backfilling the thread + if (BackfillQueue.KeyedLocker.IsInUse(note.ThreadId)) return; + + var updatedRows = await db.NoteThreads + .Where(t => t.Id == note.ThreadId && + t.Notes.Count < BackfillQueue.MaxRepliesPerThread && + (t.BackfilledAt == null || + t.BackfilledAt <= DateTime.UtcNow - cfg.RefreshAfterTimeSpan)) + .ExecuteUpdateAsync(p => p.SetProperty(t => t.BackfilledAt, DateTime.UtcNow)); + + // only queue if the thread's backfill timestamp got updated. if it didn't, it means the cooldown is still in effect + // (or the thread doesn't exist, which shouldn't be possible) + if (updatedRows <= 0) return; + + await queueSvc.BackfillQueue.EnqueueAsync(new BackfillJobData + { + ThreadId = note.ThreadId, + + // TODO: should this ever be set? + // this can be used as a "read receipt" and may potentially be a privacy problem. but it also allows for + // fetching private replies if the remote provides those + AuthenticatedUserId = null + }, mutex: $"backfill:{note.ThreadId}"); } public async Task LikeNoteAsync(Note note, User user) diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index a8f98c8a..2b9669e8 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using EntityFramework.Exceptions.Common; using Iceshrimp.Backend.Core.Configuration; using Iceshrimp.Backend.Core.Database; using Iceshrimp.Backend.Core.Database.Tables; @@ -521,7 +522,7 @@ public abstract class PostgresJobQueue( await db.SaveChangesAsync(token); } - public async Task EnqueueAsync(T jobData) + public async Task EnqueueAsync(T jobData, string? mutex = null) { await using var scope = GetScope(); await using var db = GetDbContext(scope); @@ -529,15 +530,16 @@ public abstract class PostgresJobQueue( var job = new Job { Id = Ulid.NewUlid().ToGuid(), + Mutex = mutex, Queue = name, Data = JsonSerializer.Serialize(jobData) }; - db.Add(job); - await db.SaveChangesAsync(); + + await db.Jobs.Upsert(job).On(j => j.Mutex!).NoUpdate().RunAsync(); RaiseJobQueuedEvent(); } - public async Task ScheduleAsync(T jobData, DateTime triggerAt) + public async Task ScheduleAsync(T jobData, DateTime triggerAt, string? mutex = null) { await using var scope = GetScope(); await using var db = GetDbContext(scope); @@ -545,13 +547,14 @@ public abstract class PostgresJobQueue( var job = new Job { Id = Ulid.NewUlid().ToGuid(), + Mutex = mutex, Queue = name, Data = JsonSerializer.Serialize(jobData), Status = Job.JobStatus.Delayed, DelayedUntil = triggerAt.ToUniversalTime() }; - db.Add(job); - await db.SaveChangesAsync(); + + await db.Jobs.Upsert(job).On(j => j.Mutex!).NoUpdate().RunAsync(); RaiseJobDelayedEvent(); } } \ No newline at end of file diff --git a/Iceshrimp.Backend/SignalR/Helpers/StreamingConnectionAggregate.cs b/Iceshrimp.Backend/SignalR/Helpers/StreamingConnectionAggregate.cs index c3bd4553..669cd64e 100644 --- a/Iceshrimp.Backend/SignalR/Helpers/StreamingConnectionAggregate.cs +++ b/Iceshrimp.Backend/SignalR/Helpers/StreamingConnectionAggregate.cs @@ -175,7 +175,7 @@ public sealed class StreamingConnectionAggregate : IDisposable if (!isNotification && note.Reply == null) return false; if (!isNotification && note.User.Id == _userId) return false; var db = scope.ServiceProvider.GetRequiredService(); - return await db.NoteThreadMutings.AnyAsync(p => p.UserId == _userId && p.ThreadId == note.ThreadIdOrId); + return await db.NoteThreadMutings.AnyAsync(p => p.UserId == _userId && p.ThreadId == note.ThreadId); } [SuppressMessage("ReSharper", "SuggestBaseTypeForParameter")] diff --git a/Iceshrimp.Backend/configuration.ini b/Iceshrimp.Backend/configuration.ini index fe346f9f..e2aeb545 100644 --- a/Iceshrimp.Backend/configuration.ini +++ b/Iceshrimp.Backend/configuration.ini @@ -85,7 +85,7 @@ Inbox = 4 Deliver = 20 PreDeliver = 4 BackgroundTask = 4 -Backfill = 4 +Backfill = 10 [Backfill:Replies] ;; Enables backfilling of replies. This is disabled by default as it may have a significant performance impact. @@ -93,18 +93,11 @@ Backfill = 4 ;; Note that replies can only be fetched from remote instances that expose a replies collection. Enabled = false -;; By default, reply backfill is limited to replies from the same instance as the parent post -;; to lower the performance impact of backfilling in exchange for worse UX. This emulates Mastodon's backfilling behavior. -BackfillEverything = false - ;; Notes newer than this threshold will have reply backfilling delayed, to allow them time to accumulate replies. -NewNoteThreshold = 3h +NewNoteDelay = 1h -;; The duration backfilling of new notes will be delayed by. -NewNoteDelay = 3h - -;; Renoting a note that was backfilled before this threshold will attempt to fetch any new replies that may have been created since the last backfill. -RefreshOnRenoteAfter = 7d +;; The cooldown between multiple backfill attempts. +RefreshAfter = 1h ;; How many completed & failed jobs to keep around, per queue. ;; Excess is trimmed every 15 minutes, oldest jobs first.