[backend/core] Fix AsChunkedAsyncEnumerable pagination

This commit is contained in:
Laura Hausmann 2024-09-26 01:50:20 +02:00
parent c02af726e1
commit c68e0bbd94
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
4 changed files with 75 additions and 4 deletions

View file

@ -114,7 +114,7 @@ public class AdminController(
{
var jobs = db.Jobs
.Where(p => p.Queue == queue && p.Status == Job.JobStatus.Failed)
.AsChunkedAsyncEnumerable(10);
.AsChunkedAsyncEnumerable(10, p => p.Id);
await foreach (var job in jobs)
await queueSvc.RetryJobAsync(job);
@ -127,7 +127,7 @@ public class AdminController(
var jobs = db.Jobs
.Where(p => p.Queue == queue && p.Status == Job.JobStatus.Failed)
.Where(p => p.Id >= from && p.Id <= to)
.AsChunkedAsyncEnumerable(10);
.AsChunkedAsyncEnumerable(10, p => p.Id);
await foreach (var job in jobs)
await queueSvc.RetryJobAsync(job);

View file

@ -22,6 +22,7 @@ public static class QueryableExtensions
/// </summary>
/// <remarks>
/// Make sure to call .OrderBy() on the query, otherwise the results will be unpredictable.
/// Furthermore, this method is unsuitable for cases where the consumer removes elements from the original collection.
/// </remarks>
/// <returns>
/// The result set as an IAsyncEnumerable. Makes one DB roundtrip at the start of each chunk.
@ -40,6 +41,76 @@ public static class QueryableExtensions
}
}
/// <inheritdoc cref="AsChunkedAsyncEnumerable{T}(System.Linq.IQueryable{T},int)" select="summary|returns"/>
/// <remarks>
/// This overload requires you to pass a predicate to the identifier.
/// When <paramref name="isOrdered"/> is set to false, .OrderBy(<paramref name="idPredicate"/>) is appended to the query.
/// </remarks>
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerable<TResult>(
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, string>> idPredicate,
bool isOrdered = false
)
{
var pred = idPredicate.Compile();
query = isOrdered ? query : query.OrderBy(idPredicate);
string? last = null;
while (true)
{
// ReSharper disable once AccessToModifiedClosure
var final = last is not null ? query.Where(idPredicate.Compose(p => p.IsGreaterThan(last))) : query;
var res = await final.Take(chunkSize).ToArrayAsync();
if (res.Length == 0) break;
foreach (var item in res) yield return item;
if (res.Length < chunkSize) break;
last = pred.Invoke(res.Last());
}
}
/// <inheritdoc cref="AsChunkedAsyncEnumerable{T}(System.Linq.IQueryable{T},int,Expression{Func{T,string}},bool)"/>
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerable<TResult>(
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, Guid>> idPredicate,
bool isOrdered = false
)
{
var pred = idPredicate.Compile();
query = isOrdered ? query : query.OrderBy(idPredicate);
Guid? last = null;
while (true)
{
// ReSharper disable once AccessToModifiedClosure
var final = last is not null ? query.Where(idPredicate.Compose(p => p > last)) : query;
var res = await final.Take(chunkSize).ToArrayAsync();
if (res.Length == 0) break;
foreach (var item in res) yield return item;
if (res.Length < chunkSize) break;
last = pred.Invoke(res.Last());
}
}
/// <inheritdoc cref="AsChunkedAsyncEnumerable{T}(System.Linq.IQueryable{T},int,Expression{Func{T,string}},bool)"/>
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerable<TResult>(
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, int>> idPredicate,
bool isOrdered = false
)
{
var pred = idPredicate.Compile();
query = isOrdered ? query : query.OrderBy(idPredicate);
int? last = null;
while (true)
{
// ReSharper disable once AccessToModifiedClosure
var final = last is not null ? query.Where(idPredicate.Compose(p => p > last)) : query;
var res = await final.Take(chunkSize).ToArrayAsync();
if (res.Length == 0) break;
foreach (var item in res) yield return item;
if (res.Length < chunkSize) break;
last = pred.Invoke(res.Last());
}
}
public static IQueryable<T> Paginate<T>(
this IQueryable<T> query,
MastodonPaginationQuery pq,

View file

@ -507,7 +507,7 @@ public class ActivityHandlerService(
.OrderBy(p => p.Id)
.Select(p => p.Follower)
.PrecomputeRelationshipData(source)
.AsChunkedAsyncEnumerable(50);
.AsChunkedAsyncEnumerable(50, p => p.Id, isOrdered: true);
await foreach (var follower in followers)
{

View file

@ -34,7 +34,7 @@ public class MediaCleanupTask : ICronTask
var cnt = await fileIds.CountAsync();
logger.LogInformation("Expiring {count} files...", cnt);
await foreach (var fileId in fileIds.OrderBy(p => p).AsChunkedAsyncEnumerable(50))
await foreach (var fileId in fileIds.AsChunkedAsyncEnumerable(50, p => p))
{
await queueService.BackgroundTaskQueue.EnqueueAsync(new DriveFileDeleteJobData
{