[backend/configuration] Add configuration for backfill of note replies
This commit is contained in:
parent
4081aeb036
commit
83e830b5df
5 changed files with 112 additions and 44 deletions
|
@ -20,6 +20,7 @@ public sealed class Config
|
||||||
public required StorageSection Storage { get; init; } = new();
|
public required StorageSection Storage { get; init; } = new();
|
||||||
public required PerformanceSection Performance { get; init; } = new();
|
public required PerformanceSection Performance { get; init; } = new();
|
||||||
public required QueueSection Queue { get; init; } = new();
|
public required QueueSection Queue { get; init; } = new();
|
||||||
|
public required BackfillSection Backfill { get; init; } = new();
|
||||||
|
|
||||||
public sealed class InstanceSection
|
public sealed class InstanceSection
|
||||||
{
|
{
|
||||||
|
@ -97,34 +98,7 @@ public sealed class Config
|
||||||
public string? MediaRetention
|
public string? MediaRetention
|
||||||
{
|
{
|
||||||
get => MediaRetentionTimeSpan?.ToString();
|
get => MediaRetentionTimeSpan?.ToString();
|
||||||
init
|
init => MediaRetentionTimeSpan = ParseNaturalDuration(value, "media retention time");
|
||||||
{
|
|
||||||
if (value == null || string.IsNullOrWhiteSpace(value) || value.Trim() == "0")
|
|
||||||
{
|
|
||||||
MediaRetentionTimeSpan = null;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (value.Trim() == "-1")
|
|
||||||
{
|
|
||||||
MediaRetentionTimeSpan = TimeSpan.MaxValue;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (value.Length < 2 || !int.TryParse(value[..^1].Trim(), out var num))
|
|
||||||
throw new Exception("Invalid media retention time");
|
|
||||||
|
|
||||||
var suffix = value[^1];
|
|
||||||
|
|
||||||
MediaRetentionTimeSpan = suffix switch
|
|
||||||
{
|
|
||||||
'd' => TimeSpan.FromDays(num),
|
|
||||||
'w' => TimeSpan.FromDays(num * 7),
|
|
||||||
'm' => TimeSpan.FromDays(num * 30),
|
|
||||||
'y' => TimeSpan.FromDays(num * 365),
|
|
||||||
_ => throw new Exception("Unsupported suffix, use one of: [d]ays, [w]eeks, [m]onths, [y]ears")
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public string? MaxUploadSize
|
public string? MaxUploadSize
|
||||||
|
@ -369,4 +343,63 @@ public sealed class Config
|
||||||
[Range(0, int.MaxValue)] public int Completed { get; init; } = 100;
|
[Range(0, int.MaxValue)] public int Completed { get; init; } = 100;
|
||||||
[Range(0, int.MaxValue)] public int Failed { get; init; } = 10;
|
[Range(0, int.MaxValue)] public int Failed { get; init; } = 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public sealed class BackfillSection
|
||||||
|
{
|
||||||
|
public BackfillRepliesSection Replies { get; init; } = new();
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class BackfillRepliesSection
|
||||||
|
{
|
||||||
|
public bool Enabled { get; init; } = false;
|
||||||
|
|
||||||
|
public string? NewNoteThreshold
|
||||||
|
{
|
||||||
|
get => NewNoteThresholdTimeSpan.ToString();
|
||||||
|
init => NewNoteThresholdTimeSpan =
|
||||||
|
ParseNaturalDuration(value, "new note threshold") ?? TimeSpan.FromHours(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
public string? NewNoteDelay
|
||||||
|
{
|
||||||
|
get => NewNoteDelayTimeSpan.ToString();
|
||||||
|
init => NewNoteDelayTimeSpan =
|
||||||
|
ParseNaturalDuration(value, "new note delay") ?? TimeSpan.FromHours(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
public string? RefreshOnRenoteAfter
|
||||||
|
{
|
||||||
|
get => RefreshOnRenoteAfterTimeSpan.ToString();
|
||||||
|
init => RefreshOnRenoteAfterTimeSpan =
|
||||||
|
ParseNaturalDuration(value, "refresh renote after duration") ?? TimeSpan.FromDays(7);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TimeSpan NewNoteThresholdTimeSpan = TimeSpan.FromHours(3);
|
||||||
|
public TimeSpan NewNoteDelayTimeSpan = TimeSpan.FromHours(3);
|
||||||
|
public TimeSpan RefreshOnRenoteAfterTimeSpan = TimeSpan.FromDays(7);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static TimeSpan? ParseNaturalDuration(string? value, string name)
|
||||||
|
{
|
||||||
|
if (value == null || string.IsNullOrWhiteSpace(value) || value.Trim() == "0")
|
||||||
|
return null;
|
||||||
|
|
||||||
|
if (value.Trim() == "-1")
|
||||||
|
return TimeSpan.MaxValue;
|
||||||
|
|
||||||
|
if (value.Length < 2 || !int.TryParse(value[..^1].Trim(), out var num))
|
||||||
|
throw new Exception($"Invalid {name}");
|
||||||
|
|
||||||
|
var suffix = value[^1];
|
||||||
|
|
||||||
|
return suffix switch
|
||||||
|
{
|
||||||
|
'h' => TimeSpan.FromHours(num),
|
||||||
|
'd' => TimeSpan.FromDays(num),
|
||||||
|
'w' => TimeSpan.FromDays(num * 7),
|
||||||
|
'm' => TimeSpan.FromDays(num * 30),
|
||||||
|
'y' => TimeSpan.FromDays(num * 365),
|
||||||
|
_ => throw new Exception("Unsupported suffix, use one of: [h]ours, [d]ays, [w]eeks, [m]onths, [y]ears")
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -135,6 +135,8 @@ public static class ServiceExtensions
|
||||||
.ConfigureWithValidation<Config.SecuritySection>(configuration, "Security")
|
.ConfigureWithValidation<Config.SecuritySection>(configuration, "Security")
|
||||||
.ConfigureWithValidation<Config.PerformanceSection>(configuration, "Performance")
|
.ConfigureWithValidation<Config.PerformanceSection>(configuration, "Performance")
|
||||||
.ConfigureWithValidation<Config.QueueConcurrencySection>(configuration, "Performance:QueueConcurrency")
|
.ConfigureWithValidation<Config.QueueConcurrencySection>(configuration, "Performance:QueueConcurrency")
|
||||||
|
.ConfigureWithValidation<Config.BackfillSection>(configuration, "Backfill")
|
||||||
|
.ConfigureWithValidation<Config.BackfillRepliesSection>(configuration, "Backfill:Replies")
|
||||||
.ConfigureWithValidation<Config.QueueSection>(configuration, "Queue")
|
.ConfigureWithValidation<Config.QueueSection>(configuration, "Queue")
|
||||||
.ConfigureWithValidation<Config.JobRetentionSection>(configuration, "Queue:JobRetention")
|
.ConfigureWithValidation<Config.JobRetentionSection>(configuration, "Queue:JobRetention")
|
||||||
.ConfigureWithValidation<Config.DatabaseSection>(configuration, "Database")
|
.ConfigureWithValidation<Config.DatabaseSection>(configuration, "Database")
|
||||||
|
|
|
@ -33,6 +33,7 @@ public class NoteService(
|
||||||
DatabaseContext db,
|
DatabaseContext db,
|
||||||
ActivityPub.UserResolver userResolver,
|
ActivityPub.UserResolver userResolver,
|
||||||
IOptionsSnapshot<Config.InstanceSection> config,
|
IOptionsSnapshot<Config.InstanceSection> config,
|
||||||
|
IOptionsSnapshot<Config.BackfillSection> backfillConfig,
|
||||||
ActivityPub.ActivityFetcherService fetchSvc,
|
ActivityPub.ActivityFetcherService fetchSvc,
|
||||||
ActivityPub.ActivityDeliverService deliverSvc,
|
ActivityPub.ActivityDeliverService deliverSvc,
|
||||||
ActivityPub.NoteRenderer noteRenderer,
|
ActivityPub.NoteRenderer noteRenderer,
|
||||||
|
@ -314,14 +315,18 @@ public class NoteService(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var replyBackfillConfig = backfillConfig.Value.Replies;
|
||||||
|
|
||||||
// If we're renoting a note we backfilled replies to some time ago (and know how to backfill), enqueue a backfill.
|
// If we're renoting a note we backfilled replies to some time ago (and know how to backfill), enqueue a backfill.
|
||||||
if (renote != null && renote.RepliesCollection != null && renote.RepliesFetchedAt != null && renote.RepliesFetchedAt?.AddDays(7) <= DateTime.UtcNow)
|
if (replyBackfillConfig.Enabled &&
|
||||||
|
renote?.RepliesCollection != null &&
|
||||||
|
renote.RepliesFetchedAt?.Add(replyBackfillConfig.RefreshOnRenoteAfterTimeSpan) <= DateTime.UtcNow)
|
||||||
{
|
{
|
||||||
logger.LogDebug("Enqueueing reply collection fetch for renote {renoteId}", renote.Id);
|
logger.LogDebug("Enqueueing reply collection fetch for renote {renoteId}", renote.Id);
|
||||||
await queueSvc.BackfillQueue.EnqueueAsync(new BackfillJobData
|
await queueSvc.BackfillQueue.EnqueueAsync(new BackfillJobData
|
||||||
{
|
{
|
||||||
NoteId = renote.Id,
|
NoteId = renote.Id,
|
||||||
RecursionLimit = _recursionLimit,
|
RecursionLimit = _recursionLimit,
|
||||||
AuthenticatedUserId = null, // TODO: for private replies
|
AuthenticatedUserId = null, // TODO: for private replies
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -337,23 +342,27 @@ public class NoteService(
|
||||||
.ExecuteUpdateAsync(p => p.SetProperty(i => i.NotesCount, i => i.NotesCount + 1));
|
.ExecuteUpdateAsync(p => p.SetProperty(i => i.NotesCount, i => i.NotesCount + 1));
|
||||||
});
|
});
|
||||||
|
|
||||||
if (note.RepliesCollection != null)
|
if (replyBackfillConfig.Enabled && note.RepliesCollection != null)
|
||||||
{
|
{
|
||||||
var jobData = new BackfillJobData
|
var jobData = new BackfillJobData
|
||||||
{
|
{
|
||||||
NoteId = note.Id,
|
NoteId = note.Id,
|
||||||
RecursionLimit = _recursionLimit,
|
RecursionLimit = _recursionLimit,
|
||||||
AuthenticatedUserId = null, // TODO: for private replies
|
AuthenticatedUserId = null, // TODO: for private replies
|
||||||
Collection = JsonConvert.SerializeObject(asNote?.Replies as ASObject, LdHelpers.JsonSerializerSettings)
|
Collection = asNote?.Replies?.IsUnresolved == false
|
||||||
|
? JsonConvert.SerializeObject(asNote.Replies, LdHelpers.JsonSerializerSettings)
|
||||||
|
: null
|
||||||
};
|
};
|
||||||
|
|
||||||
logger.LogDebug("Enqueueing reply collection fetch for note {noteId}", note.Id);
|
logger.LogDebug("Enqueueing reply collection fetch for note {noteId}", note.Id);
|
||||||
|
|
||||||
// Delay reply backfilling for brand new notes to allow them time to collect replies.
|
// Delay reply backfilling for brand new notes to allow them time to collect replies.
|
||||||
if (note.CreatedAt.AddHours(3) <= DateTime.UtcNow)
|
if (note.CreatedAt + replyBackfillConfig.NewNoteThresholdTimeSpan <= DateTime.UtcNow)
|
||||||
await queueSvc.BackfillQueue.EnqueueAsync(jobData);
|
await queueSvc.BackfillQueue.EnqueueAsync(jobData);
|
||||||
else
|
else
|
||||||
await queueSvc.BackfillQueue.ScheduleAsync(jobData, DateTime.UtcNow.AddHours(3));
|
await queueSvc.BackfillQueue.ScheduleAsync(jobData,
|
||||||
|
DateTime.UtcNow +
|
||||||
|
replyBackfillConfig.NewNoteDelayTimeSpan);
|
||||||
}
|
}
|
||||||
|
|
||||||
return note;
|
return note;
|
||||||
|
@ -1170,10 +1179,13 @@ public class NoteService(
|
||||||
return await ResolveNoteAsync(note.Id, note);
|
return await ResolveNoteAsync(note.Id, note);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task BackfillRepliesAsync(Note note, User? fetchUser, ASCollection? repliesCollection, int recursionLimit)
|
public async Task BackfillRepliesAsync(
|
||||||
|
Note note, User? fetchUser, ASCollection? repliesCollection, int recursionLimit
|
||||||
|
)
|
||||||
{
|
{
|
||||||
if (note.RepliesCollection == null) return;
|
if (note.RepliesCollection == null) return;
|
||||||
note.RepliesFetchedAt = DateTime.UtcNow; // should get committed alongside the resolved reply objects
|
await db.Notes.Where(p => p == note)
|
||||||
|
.ExecuteUpdateAsync(p => p.SetProperty(i => i.RepliesFetchedAt, _ => DateTime.UtcNow));
|
||||||
|
|
||||||
repliesCollection ??= new ASCollection(note.RepliesCollection);
|
repliesCollection ??= new ASCollection(note.RepliesCollection);
|
||||||
|
|
||||||
|
@ -1367,10 +1379,11 @@ public class NoteService(
|
||||||
|
|
||||||
// ReSharper disable once EntityFramework.UnsupportedServerSideFunctionCall
|
// ReSharper disable once EntityFramework.UnsupportedServerSideFunctionCall
|
||||||
var followingUser = await db.Users.FirstOrDefaultAsync(p => p.IsFollowing(user));
|
var followingUser = await db.Users.FirstOrDefaultAsync(p => p.IsFollowing(user));
|
||||||
var notes = await objectResolver.IterateCollection(collection).Take(10)
|
var notes = await objectResolver.IterateCollection(collection)
|
||||||
.Where(p => p.Id != null)
|
.Take(10)
|
||||||
.Select(p => ResolveNoteAsync(p.Id!, null, followingUser, true))
|
.Where(p => p.Id != null)
|
||||||
.AwaitAllNoConcurrencyAsync();
|
.Select(p => ResolveNoteAsync(p.Id!, null, followingUser, true))
|
||||||
|
.AwaitAllNoConcurrencyAsync();
|
||||||
|
|
||||||
var previousPins = await db.Users.Where(p => p.Id == user.Id)
|
var previousPins = await db.Users.Where(p => p.Id == user.Id)
|
||||||
.Select(p => p.PinnedNotes.Select(i => i.Id))
|
.Select(p => p.PinnedNotes.Select(i => i.Id))
|
||||||
|
|
|
@ -16,6 +16,7 @@ public class QueueService(
|
||||||
IServiceScopeFactory scopeFactory,
|
IServiceScopeFactory scopeFactory,
|
||||||
ILogger<QueueService> logger,
|
ILogger<QueueService> logger,
|
||||||
IOptions<Config.QueueConcurrencySection> queueConcurrency,
|
IOptions<Config.QueueConcurrencySection> queueConcurrency,
|
||||||
|
IOptions<Config.BackfillSection> backfill,
|
||||||
IHostApplicationLifetime lifetime
|
IHostApplicationLifetime lifetime
|
||||||
) : BackgroundService
|
) : BackgroundService
|
||||||
{
|
{
|
||||||
|
@ -30,7 +31,10 @@ public class QueueService(
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken token)
|
protected override async Task ExecuteAsync(CancellationToken token)
|
||||||
{
|
{
|
||||||
_queues.AddRange([InboxQueue, PreDeliverQueue, DeliverQueue, BackgroundTaskQueue, BackfillQueue]);
|
_queues.AddRange([InboxQueue, PreDeliverQueue, DeliverQueue, BackgroundTaskQueue]);
|
||||||
|
|
||||||
|
if (backfill.Value.Replies.Enabled)
|
||||||
|
_queues.Add(BackfillQueue);
|
||||||
|
|
||||||
var tokenSource = new CancellationTokenSource();
|
var tokenSource = new CancellationTokenSource();
|
||||||
var queueTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, lifetime.ApplicationStopping);
|
var queueTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, lifetime.ApplicationStopping);
|
||||||
|
|
|
@ -81,6 +81,22 @@ Inbox = 4
|
||||||
Deliver = 20
|
Deliver = 20
|
||||||
PreDeliver = 4
|
PreDeliver = 4
|
||||||
BackgroundTask = 4
|
BackgroundTask = 4
|
||||||
|
Backfill = 4
|
||||||
|
|
||||||
|
[Backfill:Replies]
|
||||||
|
;; Enables backfilling of replies. This is disabled by default as it may have a significant performance impact.
|
||||||
|
;; This is an experimental feature that hasn't had too much time to bake, so only enable if you're open for instability.
|
||||||
|
;; Note that replies can only be fetched from remote instances that expose a replies collection.
|
||||||
|
Enabled = false
|
||||||
|
|
||||||
|
;; Notes newer than this threshold will have reply backfilling delayed, to allow them time to accumulate replies.
|
||||||
|
NewNoteThreshold = 3h
|
||||||
|
|
||||||
|
;; The duration backfilling of new notes will be delayed by.
|
||||||
|
NewNoteDelay = 3h
|
||||||
|
|
||||||
|
;; Renoting a note that was backfilled before this threshold will attempt to fetch any new replies that may have been created since the last backfill.
|
||||||
|
RefreshOnRenoteAfter = 7d
|
||||||
|
|
||||||
;; How many completed & failed jobs to keep around, per queue.
|
;; How many completed & failed jobs to keep around, per queue.
|
||||||
;; Excess is trimmed every 15 minutes, oldest jobs first.
|
;; Excess is trimmed every 15 minutes, oldest jobs first.
|
||||||
|
|
Loading…
Add table
Reference in a new issue