[backend/federation] Backfill replies

This commit is contained in:
Kopper 2024-09-13 15:50:09 +03:00 committed by Laura Hausmann
parent ade4481ae9
commit 024de937d0
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
7 changed files with 146 additions and 2 deletions

View file

@ -356,6 +356,7 @@ public sealed class Config
[Range(1, int.MaxValue)] public int Deliver { get; init; } = 20;
[Range(1, int.MaxValue)] public int PreDeliver { get; init; } = 4;
[Range(1, int.MaxValue)] public int BackgroundTask { get; init; } = 4;
[Range(1, int.MaxValue)] public int Backfill { get; init; } = 4;
}
public sealed class QueueSection

View file

@ -2448,12 +2448,21 @@ namespace Iceshrimp.Backend.Core.Database.Migrations
.HasColumnName("renoteUserId")
.HasComment("[Denormalized]");
b.Property<string>("RepliesCollection")
.HasMaxLength(512)
.HasColumnType("character varying(512)")
.HasColumnName("repliesCollection");
b.Property<short>("RepliesCount")
.ValueGeneratedOnAdd()
.HasColumnType("smallint")
.HasDefaultValue((short)0)
.HasColumnName("repliesCount");
b.Property<DateTime?>("RepliesFetchedAt")
.HasColumnType("timestamp with time zone")
.HasColumnName("repliesFetchedAt");
b.Property<string>("ReplyId")
.HasMaxLength(32)
.HasColumnType("character varying(32)")

View file

@ -0,0 +1,43 @@
using System;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace Iceshrimp.Backend.Core.Database.Migrations
{
/// <inheritdoc />
[DbContext(typeof(DatabaseContext))]
[Migration("20240605203303_AddReplyCollection")]
public partial class AddReplyCollection : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<string>(
name: "repliesCollection",
table: "note",
type: "character varying(512)",
maxLength: 512,
nullable: true);
migrationBuilder.AddColumn<DateTime>(
name: "repliesFetchedAt",
table: "note",
type: "timestamp with time zone",
nullable: true);
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropColumn(
name: "repliesCollection",
table: "note");
migrationBuilder.DropColumn(
name: "repliesFetchedAt",
table: "note");
}
}
}

View file

@ -209,6 +209,16 @@ public class Note : IEntity
[Column("updatedAt")]
public DateTime? UpdatedAt { get; set; }
/// <summary>
/// ID of the ActivityStreams replies collection for this note, used to re-fetch replies.
/// </summary>
[Column("repliesCollection")]
[StringLength(512)]
public string? RepliesCollection { get; set; }
[Column("repliesFetchedAt")]
public DateTime? RepliesFetchedAt { get;set; }
[ForeignKey(nameof(ChannelId))]
[InverseProperty(nameof(Tables.Channel.Notes))]
public virtual Channel? Channel { get; set; }

View file

@ -0,0 +1,41 @@
using Iceshrimp.Backend.Core.Database;
using Iceshrimp.Backend.Core.Database.Tables;
using Iceshrimp.Backend.Core.Services;
using Microsoft.EntityFrameworkCore;
using J = System.Text.Json.Serialization.JsonPropertyNameAttribute;
using JR = System.Text.Json.Serialization.JsonRequiredAttribute;
namespace Iceshrimp.Backend.Core.Queues;
public class BackfillQueue(int parallelism)
: PostgresJobQueue<BackfillJobData>("backfill", BackfillQueueProcessorDelegateAsync, parallelism, TimeSpan.FromMinutes(5))
{
private static async Task BackfillQueueProcessorDelegateAsync(
Job job,
BackfillJobData jobData,
IServiceProvider scope,
CancellationToken token
)
{
var logger = scope.GetRequiredService<ILogger<BackfillQueue>>();
logger.LogDebug("Backfilling replies for note {id} as user {userId}", jobData.NoteId, jobData.AuthenticatedUserId);
var db = scope.GetRequiredService<DatabaseContext>();
var note = await db.Notes.Where(n => n.Id == jobData.NoteId).FirstOrDefaultAsync(token);
if (note == null)
return;
var user = jobData.AuthenticatedUserId == null ? null : await db.Users.Where(u => u.Id == jobData.AuthenticatedUserId).FirstOrDefaultAsync(token);
var noteSvc = scope.GetRequiredService<NoteService>();
await noteSvc.BackfillRepliesAsync(note, user, jobData.RecursionLimit);
}
}
public class BackfillJobData
{
[JR] [J("noteId")] public required string NoteId { get; set; }
[JR] [J("recursionLimit")] public required int RecursionLimit { get; set; }
[JR] [J("authenticatedUserId")] public required string? AuthenticatedUserId { get; set; }
}

View file

@ -220,7 +220,8 @@ public class NoteService(
LocalOnly = localOnly,
Emojis = emoji ?? [],
ReplyUri = replyUri,
RenoteUri = renoteUri
RenoteUri = renoteUri,
RepliesCollection = asNote?.Replies?.Id
};
if (poll != null)
@ -311,6 +312,18 @@ public class NoteService(
});
}
// If we're renoting a note we backfilled replies to some time ago (and know how to backfill), enqueue a backfill.
if (renote != null && renote.RepliesCollection != null && renote.RepliesFetchedAt != null && renote.RepliesFetchedAt?.AddDays(7) <= DateTime.UtcNow)
{
logger.LogDebug("Enqueueing reply collection fetch for renote {renoteId}", renote.Id);
await queueSvc.BackfillQueue.EnqueueAsync(new BackfillJobData
{
NoteId = renote.Id,
RecursionLimit = _recursionLimit,
AuthenticatedUserId = null, // FIXME: for private replies
});
}
if (user.IsRemoteUser)
{
_ = followupTaskSvc.ExecuteTask("UpdateInstanceNoteCounter", async provider =>
@ -322,6 +335,19 @@ public class NoteService(
.ExecuteUpdateAsync(p => p.SetProperty(i => i.NotesCount, i => i.NotesCount + 1));
});
if (note.RepliesCollection != null)
{
var jobData = new BackfillJobData
{
NoteId = note.Id,
RecursionLimit = _recursionLimit,
AuthenticatedUserId = null, // FIXME: for private replies
};
logger.LogDebug("Enqueueing reply collection fetch for note {noteId}", note.Id);
await queueSvc.BackfillQueue.EnqueueAsync(jobData);
}
return note;
}
@ -1133,6 +1159,19 @@ public class NoteService(
return await ResolveNoteAsync(note.Id, note);
}
public async Task BackfillRepliesAsync(Note note, User? fetchUser, int recursionLimit)
{
if (note.RepliesCollection == null) return;
note.RepliesFetchedAt = DateTime.UtcNow; // should get committed alongside the resolved reply objects
_recursionLimit = recursionLimit;
await objectResolver.IterateCollection(new ASCollection(note.RepliesCollection, withType: true))
.Take(100) // does this limit make sense?
.Where(p => p.Id != null)
.Select(p => ResolveNoteAsync(p.Id!, null, fetchUser, forceRefresh: false))
.AwaitAllNoConcurrencyAsync();
}
public async Task<bool> LikeNoteAsync(Note note, User user)
{
if (note.IsPureRenote)

View file

@ -24,12 +24,13 @@ public class QueueService(
public readonly DeliverQueue DeliverQueue = new(queueConcurrency.Value.Deliver);
public readonly InboxQueue InboxQueue = new(queueConcurrency.Value.Inbox);
public readonly PreDeliverQueue PreDeliverQueue = new(queueConcurrency.Value.PreDeliver);
public readonly BackfillQueue BackfillQueue = new(queueConcurrency.Value.Backfill);
public IEnumerable<string> QueueNames => _queues.Select(p => p.Name);
protected override async Task ExecuteAsync(CancellationToken token)
{
_queues.AddRange([InboxQueue, PreDeliverQueue, DeliverQueue, BackgroundTaskQueue]);
_queues.AddRange([InboxQueue, PreDeliverQueue, DeliverQueue, BackgroundTaskQueue, BackfillQueue]);
var tokenSource = new CancellationTokenSource();
var queueTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, lifetime.ApplicationStopping);