From 024de937d092c1f34fe294204266d16f6bd2c96a Mon Sep 17 00:00:00 2001 From: Kopper Date: Fri, 13 Sep 2024 15:50:09 +0300 Subject: [PATCH] [backend/federation] Backfill replies --- .../Core/Configuration/Config.cs | 1 + .../DatabaseContextModelSnapshot.cs | 9 ++++ .../20240605203303_AddReplyCollection.cs | 43 +++++++++++++++++++ .../Core/Database/Tables/Note.cs | 10 +++++ .../Core/Queues/BackfillQueue.cs | 41 ++++++++++++++++++ .../Core/Services/NoteService.cs | 41 +++++++++++++++++- .../Core/Services/QueueService.cs | 3 +- 7 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta3/20240605203303_AddReplyCollection.cs create mode 100644 Iceshrimp.Backend/Core/Queues/BackfillQueue.cs diff --git a/Iceshrimp.Backend/Core/Configuration/Config.cs b/Iceshrimp.Backend/Core/Configuration/Config.cs index 264ff5e1..f53b2eac 100644 --- a/Iceshrimp.Backend/Core/Configuration/Config.cs +++ b/Iceshrimp.Backend/Core/Configuration/Config.cs @@ -356,6 +356,7 @@ public sealed class Config [Range(1, int.MaxValue)] public int Deliver { get; init; } = 20; [Range(1, int.MaxValue)] public int PreDeliver { get; init; } = 4; [Range(1, int.MaxValue)] public int BackgroundTask { get; init; } = 4; + [Range(1, int.MaxValue)] public int Backfill { get; init; } = 4; } public sealed class QueueSection diff --git a/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs b/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs index 89a0c97b..bc36b8b4 100644 --- a/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs +++ b/Iceshrimp.Backend/Core/Database/Migrations/DatabaseContextModelSnapshot.cs @@ -2448,12 +2448,21 @@ namespace Iceshrimp.Backend.Core.Database.Migrations .HasColumnName("renoteUserId") .HasComment("[Denormalized]"); + b.Property("RepliesCollection") + .HasMaxLength(512) + .HasColumnType("character varying(512)") + .HasColumnName("repliesCollection"); + b.Property("RepliesCount") .ValueGeneratedOnAdd() .HasColumnType("smallint") .HasDefaultValue((short)0) .HasColumnName("repliesCount"); + b.Property("RepliesFetchedAt") + .HasColumnType("timestamp with time zone") + .HasColumnName("repliesFetchedAt"); + b.Property("ReplyId") .HasMaxLength(32) .HasColumnType("character varying(32)") diff --git a/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta3/20240605203303_AddReplyCollection.cs b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta3/20240605203303_AddReplyCollection.cs new file mode 100644 index 00000000..fb814d47 --- /dev/null +++ b/Iceshrimp.Backend/Core/Database/Migrations/v2024.1-beta3/20240605203303_AddReplyCollection.cs @@ -0,0 +1,43 @@ +using System; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Iceshrimp.Backend.Core.Database.Migrations +{ + /// + [DbContext(typeof(DatabaseContext))] + [Migration("20240605203303_AddReplyCollection")] + public partial class AddReplyCollection : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AddColumn( + name: "repliesCollection", + table: "note", + type: "character varying(512)", + maxLength: 512, + nullable: true); + + migrationBuilder.AddColumn( + name: "repliesFetchedAt", + table: "note", + type: "timestamp with time zone", + nullable: true); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropColumn( + name: "repliesCollection", + table: "note"); + + migrationBuilder.DropColumn( + name: "repliesFetchedAt", + table: "note"); + } + } +} diff --git a/Iceshrimp.Backend/Core/Database/Tables/Note.cs b/Iceshrimp.Backend/Core/Database/Tables/Note.cs index d218b896..3b73744f 100644 --- a/Iceshrimp.Backend/Core/Database/Tables/Note.cs +++ b/Iceshrimp.Backend/Core/Database/Tables/Note.cs @@ -209,6 +209,16 @@ public class Note : IEntity [Column("updatedAt")] public DateTime? UpdatedAt { get; set; } + /// + /// ID of the ActivityStreams replies collection for this note, used to re-fetch replies. + /// + [Column("repliesCollection")] + [StringLength(512)] + public string? RepliesCollection { get; set; } + + [Column("repliesFetchedAt")] + public DateTime? RepliesFetchedAt { get;set; } + [ForeignKey(nameof(ChannelId))] [InverseProperty(nameof(Tables.Channel.Notes))] public virtual Channel? Channel { get; set; } diff --git a/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs b/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs new file mode 100644 index 00000000..97610c39 --- /dev/null +++ b/Iceshrimp.Backend/Core/Queues/BackfillQueue.cs @@ -0,0 +1,41 @@ +using Iceshrimp.Backend.Core.Database; +using Iceshrimp.Backend.Core.Database.Tables; +using Iceshrimp.Backend.Core.Services; +using Microsoft.EntityFrameworkCore; +using J = System.Text.Json.Serialization.JsonPropertyNameAttribute; +using JR = System.Text.Json.Serialization.JsonRequiredAttribute; + +namespace Iceshrimp.Backend.Core.Queues; + +public class BackfillQueue(int parallelism) + : PostgresJobQueue("backfill", BackfillQueueProcessorDelegateAsync, parallelism, TimeSpan.FromMinutes(5)) +{ + private static async Task BackfillQueueProcessorDelegateAsync( + Job job, + BackfillJobData jobData, + IServiceProvider scope, + CancellationToken token + ) + { + var logger = scope.GetRequiredService>(); + logger.LogDebug("Backfilling replies for note {id} as user {userId}", jobData.NoteId, jobData.AuthenticatedUserId); + + var db = scope.GetRequiredService(); + + var note = await db.Notes.Where(n => n.Id == jobData.NoteId).FirstOrDefaultAsync(token); + if (note == null) + return; + + var user = jobData.AuthenticatedUserId == null ? null : await db.Users.Where(u => u.Id == jobData.AuthenticatedUserId).FirstOrDefaultAsync(token); + + var noteSvc = scope.GetRequiredService(); + await noteSvc.BackfillRepliesAsync(note, user, jobData.RecursionLimit); + } +} + +public class BackfillJobData +{ + [JR] [J("noteId")] public required string NoteId { get; set; } + [JR] [J("recursionLimit")] public required int RecursionLimit { get; set; } + [JR] [J("authenticatedUserId")] public required string? AuthenticatedUserId { get; set; } +} \ No newline at end of file diff --git a/Iceshrimp.Backend/Core/Services/NoteService.cs b/Iceshrimp.Backend/Core/Services/NoteService.cs index 64f31bc4..caa48c31 100644 --- a/Iceshrimp.Backend/Core/Services/NoteService.cs +++ b/Iceshrimp.Backend/Core/Services/NoteService.cs @@ -220,7 +220,8 @@ public class NoteService( LocalOnly = localOnly, Emojis = emoji ?? [], ReplyUri = replyUri, - RenoteUri = renoteUri + RenoteUri = renoteUri, + RepliesCollection = asNote?.Replies?.Id }; if (poll != null) @@ -311,6 +312,18 @@ public class NoteService( }); } + // If we're renoting a note we backfilled replies to some time ago (and know how to backfill), enqueue a backfill. + if (renote != null && renote.RepliesCollection != null && renote.RepliesFetchedAt != null && renote.RepliesFetchedAt?.AddDays(7) <= DateTime.UtcNow) + { + logger.LogDebug("Enqueueing reply collection fetch for renote {renoteId}", renote.Id); + await queueSvc.BackfillQueue.EnqueueAsync(new BackfillJobData + { + NoteId = renote.Id, + RecursionLimit = _recursionLimit, + AuthenticatedUserId = null, // FIXME: for private replies + }); + } + if (user.IsRemoteUser) { _ = followupTaskSvc.ExecuteTask("UpdateInstanceNoteCounter", async provider => @@ -322,6 +335,19 @@ public class NoteService( .ExecuteUpdateAsync(p => p.SetProperty(i => i.NotesCount, i => i.NotesCount + 1)); }); + if (note.RepliesCollection != null) + { + var jobData = new BackfillJobData + { + NoteId = note.Id, + RecursionLimit = _recursionLimit, + AuthenticatedUserId = null, // FIXME: for private replies + }; + + logger.LogDebug("Enqueueing reply collection fetch for note {noteId}", note.Id); + await queueSvc.BackfillQueue.EnqueueAsync(jobData); + } + return note; } @@ -1133,6 +1159,19 @@ public class NoteService( return await ResolveNoteAsync(note.Id, note); } + public async Task BackfillRepliesAsync(Note note, User? fetchUser, int recursionLimit) + { + if (note.RepliesCollection == null) return; + note.RepliesFetchedAt = DateTime.UtcNow; // should get committed alongside the resolved reply objects + + _recursionLimit = recursionLimit; + await objectResolver.IterateCollection(new ASCollection(note.RepliesCollection, withType: true)) + .Take(100) // does this limit make sense? + .Where(p => p.Id != null) + .Select(p => ResolveNoteAsync(p.Id!, null, fetchUser, forceRefresh: false)) + .AwaitAllNoConcurrencyAsync(); + } + public async Task LikeNoteAsync(Note note, User user) { if (note.IsPureRenote) diff --git a/Iceshrimp.Backend/Core/Services/QueueService.cs b/Iceshrimp.Backend/Core/Services/QueueService.cs index 7d943b6c..1a59bd27 100644 --- a/Iceshrimp.Backend/Core/Services/QueueService.cs +++ b/Iceshrimp.Backend/Core/Services/QueueService.cs @@ -24,12 +24,13 @@ public class QueueService( public readonly DeliverQueue DeliverQueue = new(queueConcurrency.Value.Deliver); public readonly InboxQueue InboxQueue = new(queueConcurrency.Value.Inbox); public readonly PreDeliverQueue PreDeliverQueue = new(queueConcurrency.Value.PreDeliver); + public readonly BackfillQueue BackfillQueue = new(queueConcurrency.Value.Backfill); public IEnumerable QueueNames => _queues.Select(p => p.Name); protected override async Task ExecuteAsync(CancellationToken token) { - _queues.AddRange([InboxQueue, PreDeliverQueue, DeliverQueue, BackgroundTaskQueue]); + _queues.AddRange([InboxQueue, PreDeliverQueue, DeliverQueue, BackgroundTaskQueue, BackfillQueue]); var tokenSource = new CancellationTokenSource(); var queueTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, lifetime.ApplicationStopping);