[backend/core] Revert IAsyncEnumerable naming
This commit is contained in:
parent
9f4c989ca8
commit
a762a9201e
11 changed files with 30 additions and 23 deletions
|
@ -227,7 +227,7 @@ public class AdminController(
|
||||||
{
|
{
|
||||||
var jobs = db.Jobs
|
var jobs = db.Jobs
|
||||||
.Where(p => p.Queue == queue && p.Status == Job.JobStatus.Failed)
|
.Where(p => p.Queue == queue && p.Status == Job.JobStatus.Failed)
|
||||||
.AsChunkedAsyncEnumerableAsync(10, p => p.Id);
|
.AsChunkedAsyncEnumerable(10, p => p.Id);
|
||||||
|
|
||||||
await foreach (var job in jobs)
|
await foreach (var job in jobs)
|
||||||
await queueSvc.RetryJobAsync(job);
|
await queueSvc.RetryJobAsync(job);
|
||||||
|
@ -240,7 +240,7 @@ public class AdminController(
|
||||||
var jobs = db.Jobs
|
var jobs = db.Jobs
|
||||||
.Where(p => p.Queue == queue && p.Status == Job.JobStatus.Failed)
|
.Where(p => p.Queue == queue && p.Status == Job.JobStatus.Failed)
|
||||||
.Where(p => p.Id >= from && p.Id <= to)
|
.Where(p => p.Id >= from && p.Id <= to)
|
||||||
.AsChunkedAsyncEnumerableAsync(10, p => p.Id);
|
.AsChunkedAsyncEnumerable(10, p => p.Id);
|
||||||
|
|
||||||
await foreach (var job in jobs)
|
await foreach (var job in jobs)
|
||||||
await queueSvc.RetryJobAsync(job);
|
await queueSvc.RetryJobAsync(job);
|
||||||
|
|
|
@ -28,7 +28,8 @@ public static class QueryableExtensions
|
||||||
/// The result set as an IAsyncEnumerable. Makes one DB roundtrip at the start of each chunk.
|
/// The result set as an IAsyncEnumerable. Makes one DB roundtrip at the start of each chunk.
|
||||||
/// Successive items in the chunk are yielded instantaneously.
|
/// Successive items in the chunk are yielded instantaneously.
|
||||||
/// </returns>
|
/// </returns>
|
||||||
public static async IAsyncEnumerable<T> AsChunkedAsyncEnumerableAsync<T>(this IQueryable<T> query, int chunkSize)
|
[SuppressMessage("ReSharper", "InconsistentNaming")]
|
||||||
|
public static async IAsyncEnumerable<T> AsChunkedAsyncEnumerable<T>(this IQueryable<T> query, int chunkSize)
|
||||||
{
|
{
|
||||||
var offset = 0;
|
var offset = 0;
|
||||||
while (true)
|
while (true)
|
||||||
|
@ -41,13 +42,14 @@ public static class QueryableExtensions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc cref="AsChunkedAsyncEnumerableAsync{T}(System.Linq.IQueryable{T},int)" select="summary|returns"/>
|
/// <inheritdoc cref="AsChunkedAsyncEnumerable{T}(System.Linq.IQueryable{T},int)" select="summary|returns"/>
|
||||||
/// <remarks>
|
/// <remarks>
|
||||||
/// This overload requires you to pass a predicate to the identifier.
|
/// This overload requires you to pass a predicate to the identifier.
|
||||||
/// .OrderBy(<paramref name="idPredicate"/>) is appended to the query.
|
/// .OrderBy(<paramref name="idPredicate"/>) is appended to the query.
|
||||||
/// Set the <paramref name="hook"/> parameter to append things to the query after pagination, for cases where query translation would fail otherwise.
|
/// Set the <paramref name="hook"/> parameter to append things to the query after pagination, for cases where query translation would fail otherwise.
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerableAsync<TResult>(
|
[SuppressMessage("ReSharper", "InconsistentNaming")]
|
||||||
|
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerable<TResult>(
|
||||||
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, string>> idPredicate,
|
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, string>> idPredicate,
|
||||||
Func<IQueryable<TResult>, IQueryable<TResult>>? hook = null
|
Func<IQueryable<TResult>, IQueryable<TResult>>? hook = null
|
||||||
)
|
)
|
||||||
|
@ -70,8 +72,9 @@ public static class QueryableExtensions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc cref="AsChunkedAsyncEnumerableAsync{TResult}(System.Linq.IQueryable{TResult},int,System.Linq.Expressions.Expression{System.Func{TResult,string}},System.Func{System.Linq.IQueryable{TResult},System.Linq.IQueryable{TResult}}?)"/>
|
/// <inheritdoc cref="AsChunkedAsyncEnumerable{TResult}(System.Linq.IQueryable{TResult},int,System.Linq.Expressions.Expression{System.Func{TResult,string}},System.Func{System.Linq.IQueryable{TResult},System.Linq.IQueryable{TResult}}?)"/>
|
||||||
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerableAsync<TResult>(
|
[SuppressMessage("ReSharper", "InconsistentNaming")]
|
||||||
|
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerable<TResult>(
|
||||||
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, Guid>> idPredicate,
|
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, Guid>> idPredicate,
|
||||||
Func<IQueryable<TResult>, IQueryable<TResult>>? hook = null
|
Func<IQueryable<TResult>, IQueryable<TResult>>? hook = null
|
||||||
)
|
)
|
||||||
|
@ -94,8 +97,9 @@ public static class QueryableExtensions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc cref="AsChunkedAsyncEnumerableAsync{TResult}(System.Linq.IQueryable{TResult},int,System.Linq.Expressions.Expression{System.Func{TResult,string}},System.Func{System.Linq.IQueryable{TResult},System.Linq.IQueryable{TResult}}?)"/>
|
/// <inheritdoc cref="AsChunkedAsyncEnumerable{TResult}(System.Linq.IQueryable{TResult},int,System.Linq.Expressions.Expression{System.Func{TResult,string}},System.Func{System.Linq.IQueryable{TResult},System.Linq.IQueryable{TResult}}?)"/>
|
||||||
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerableAsync<TResult>(
|
[SuppressMessage("ReSharper", "InconsistentNaming")]
|
||||||
|
public static async IAsyncEnumerable<TResult> AsChunkedAsyncEnumerable<TResult>(
|
||||||
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, int>> idPredicate,
|
this IQueryable<TResult> query, int chunkSize, Expression<Func<TResult, int>> idPredicate,
|
||||||
Func<IQueryable<TResult>, IQueryable<TResult>>? hook = null
|
Func<IQueryable<TResult>, IQueryable<TResult>>? hook = null
|
||||||
)
|
)
|
||||||
|
|
|
@ -432,9 +432,10 @@ file sealed class EntityFrameworkCoreXmlRepositoryAsync<TContext> : IXmlReposito
|
||||||
|
|
||||||
public IReadOnlyCollection<XElement> GetAllElements()
|
public IReadOnlyCollection<XElement> GetAllElements()
|
||||||
{
|
{
|
||||||
return GetAllElementsCoreAsync().ToBlockingEnumerable().ToList().AsReadOnly();
|
return GetAllElementsCore().ToBlockingEnumerable().ToList().AsReadOnly();
|
||||||
|
|
||||||
async IAsyncEnumerable<XElement> GetAllElementsCoreAsync()
|
[SuppressMessage("ReSharper", "InconsistentNaming")]
|
||||||
|
async IAsyncEnumerable<XElement> GetAllElementsCore()
|
||||||
{
|
{
|
||||||
using var scope = _services.CreateScope();
|
using var scope = _services.CreateScope();
|
||||||
var @enum = scope.ServiceProvider.GetRequiredService<TContext>()
|
var @enum = scope.ServiceProvider.GetRequiredService<TContext>()
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using Iceshrimp.Backend.Core.Configuration;
|
using Iceshrimp.Backend.Core.Configuration;
|
||||||
using Iceshrimp.Backend.Core.Database;
|
using Iceshrimp.Backend.Core.Database;
|
||||||
using Iceshrimp.Backend.Core.Database.Tables;
|
using Iceshrimp.Backend.Core.Database.Tables;
|
||||||
|
@ -92,7 +93,8 @@ public class ObjectResolver(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async IAsyncEnumerable<ASObject> IterateCollectionAsync(
|
[SuppressMessage("ReSharper", "InconsistentNaming")]
|
||||||
|
public async IAsyncEnumerable<ASObject> IterateCollection(
|
||||||
ASCollection? collection, User? user = null, int pageLimit = 10
|
ASCollection? collection, User? user = null, int pageLimit = 10
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class BackfillQueue(int parallelism)
|
||||||
.Where(n => n.Id == current.Id)
|
.Where(n => n.Id == current.Id)
|
||||||
.ExecuteUpdateAsync(p => p.SetProperty(n => n.RepliesFetchedAt, DateTime.UtcNow), token);
|
.ExecuteUpdateAsync(p => p.SetProperty(n => n.RepliesFetchedAt, DateTime.UtcNow), token);
|
||||||
|
|
||||||
await foreach (var asNote in objectResolver.IterateCollectionAsync(new ASCollection(current.RepliesCollection), user: user)
|
await foreach (var asNote in objectResolver.IterateCollection(new ASCollection(current.RepliesCollection), user: user)
|
||||||
.Take(MaxRepliesPerNote)
|
.Take(MaxRepliesPerNote)
|
||||||
.Where(p => p.Id != null)
|
.Where(p => p.Id != null)
|
||||||
.WithCancellation(token))
|
.WithCancellation(token))
|
||||||
|
|
|
@ -272,7 +272,7 @@ public class BackgroundTaskQueue(int parallelism)
|
||||||
|
|
||||||
var fileIdQ = db.DriveFiles.Where(p => p.User == user).Select(p => p.Id);
|
var fileIdQ = db.DriveFiles.Where(p => p.User == user).Select(p => p.Id);
|
||||||
var fileIdCnt = await fileIdQ.CountAsync(token);
|
var fileIdCnt = await fileIdQ.CountAsync(token);
|
||||||
var fileIds = fileIdQ.AsChunkedAsyncEnumerableAsync(50, p => p);
|
var fileIds = fileIdQ.AsChunkedAsyncEnumerable(50, p => p);
|
||||||
logger.LogDebug("Removing {count} files for user {id}", fileIdCnt, user.Id);
|
logger.LogDebug("Removing {count} files for user {id}", fileIdCnt, user.Id);
|
||||||
await foreach (var id in fileIds)
|
await foreach (var id in fileIds)
|
||||||
{
|
{
|
||||||
|
@ -284,7 +284,7 @@ public class BackgroundTaskQueue(int parallelism)
|
||||||
|
|
||||||
var noteQ = db.Notes.Where(p => p.User == user).Select(p => p.Id);
|
var noteQ = db.Notes.Where(p => p.User == user).Select(p => p.Id);
|
||||||
var noteCnt = await noteQ.CountAsync(token);
|
var noteCnt = await noteQ.CountAsync(token);
|
||||||
var noteIds = noteQ.AsChunkedAsyncEnumerableAsync(50, p => p);
|
var noteIds = noteQ.AsChunkedAsyncEnumerable(50, p => p);
|
||||||
logger.LogDebug("Removing {count} notes for user {id}", noteCnt, user.Id);
|
logger.LogDebug("Removing {count} notes for user {id}", noteCnt, user.Id);
|
||||||
await foreach (var id in noteIds)
|
await foreach (var id in noteIds)
|
||||||
{
|
{
|
||||||
|
|
|
@ -1450,7 +1450,7 @@ public class NoteService(
|
||||||
|
|
||||||
// ReSharper disable once EntityFramework.UnsupportedServerSideFunctionCall
|
// ReSharper disable once EntityFramework.UnsupportedServerSideFunctionCall
|
||||||
var followingUser = await db.Users.FirstOrDefaultAsync(p => p.IsFollowing(user));
|
var followingUser = await db.Users.FirstOrDefaultAsync(p => p.IsFollowing(user));
|
||||||
var notes = await objectResolver.IterateCollectionAsync(collection)
|
var notes = await objectResolver.IterateCollection(collection)
|
||||||
.Take(10)
|
.Take(10)
|
||||||
.Where(p => p.Id != null)
|
.Where(p => p.Id != null)
|
||||||
.Select(p => ResolveNoteAsync(p.Id!, null, followingUser, true))
|
.Select(p => ResolveNoteAsync(p.Id!, null, followingUser, true))
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using System.Collections.Immutable;
|
using System.Collections.Immutable;
|
||||||
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using System.Net.Http.Headers;
|
using System.Net.Http.Headers;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using Carbon.Storage;
|
using Carbon.Storage;
|
||||||
|
@ -128,7 +129,8 @@ public class ObjectStorageService(IOptions<Config.StorageSection> config, HttpCl
|
||||||
await _bucket.DeleteAsync(filenames.Select(GetKeyWithPrefix).ToImmutableList());
|
await _bucket.DeleteAsync(filenames.Select(GetKeyWithPrefix).ToImmutableList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public async IAsyncEnumerable<string> EnumerateFilesAsync()
|
[SuppressMessage("ReSharper", "InconsistentNaming")]
|
||||||
|
public async IAsyncEnumerable<string> EnumerateFiles()
|
||||||
{
|
{
|
||||||
if (_bucket == null)
|
if (_bucket == null)
|
||||||
throw new Exception("Refusing to enumerate files from object storage with invalid configuration");
|
throw new Exception("Refusing to enumerate files from object storage with invalid configuration");
|
||||||
|
|
|
@ -155,7 +155,7 @@ public class StorageMaintenanceService(
|
||||||
var localFiles = driveSvc.GetAllFileNamesFromLocalStorage();
|
var localFiles = driveSvc.GetAllFileNamesFromLocalStorage();
|
||||||
var objStorageFiles = await driveSvc.GetAllFileNamesFromObjectStorageAsync();
|
var objStorageFiles = await driveSvc.GetAllFileNamesFromObjectStorageAsync();
|
||||||
|
|
||||||
await foreach (var file in query.AsChunkedAsyncEnumerableAsync(50, p => p.Id))
|
await foreach (var file in query.AsChunkedAsyncEnumerable(50, p => p.Id))
|
||||||
{
|
{
|
||||||
if (++progress % 500 == 0)
|
if (++progress % 500 == 0)
|
||||||
logger.LogInformation("Validating files... ({idx}/{total})", progress, total);
|
logger.LogInformation("Validating files... ({idx}/{total})", progress, total);
|
||||||
|
|
|
@ -1384,8 +1384,7 @@ public class UserService(
|
||||||
var followers = db.Followings
|
var followers = db.Followings
|
||||||
.Where(p => p.Followee == source && p.Follower.IsLocalUser)
|
.Where(p => p.Followee == source && p.Follower.IsLocalUser)
|
||||||
.Select(p => p.Follower)
|
.Select(p => p.Follower)
|
||||||
.AsChunkedAsyncEnumerableAsync(50, p => p.Id,
|
.AsChunkedAsyncEnumerable(50, p => p.Id, hook: p => p.PrecomputeRelationshipData(source));
|
||||||
hook: p => p.PrecomputeRelationshipData(source));
|
|
||||||
|
|
||||||
await foreach (var follower in followers)
|
await foreach (var follower in followers)
|
||||||
{
|
{
|
||||||
|
@ -1413,8 +1412,7 @@ public class UserService(
|
||||||
var following = db.Followings
|
var following = db.Followings
|
||||||
.Where(p => p.Follower == source)
|
.Where(p => p.Follower == source)
|
||||||
.Select(p => p.Follower)
|
.Select(p => p.Follower)
|
||||||
.AsChunkedAsyncEnumerableAsync(50, p => p.Id,
|
.AsChunkedAsyncEnumerable(50, p => p.Id, hook: p => p.PrecomputeRelationshipData(source));
|
||||||
hook: p => p.PrecomputeRelationshipData(source));
|
|
||||||
|
|
||||||
await foreach (var followee in following)
|
await foreach (var followee in following)
|
||||||
{
|
{
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class MediaCleanupTask : ICronTask
|
||||||
var cnt = await fileIds.CountAsync();
|
var cnt = await fileIds.CountAsync();
|
||||||
|
|
||||||
logger.LogInformation("Expiring {count} files...", cnt);
|
logger.LogInformation("Expiring {count} files...", cnt);
|
||||||
await foreach (var fileId in fileIds.AsChunkedAsyncEnumerableAsync(50, p => p))
|
await foreach (var fileId in fileIds.AsChunkedAsyncEnumerable(50, p => p))
|
||||||
{
|
{
|
||||||
await queueService.BackgroundTaskQueue.EnqueueAsync(new DriveFileDeleteJobData
|
await queueService.BackgroundTaskQueue.EnqueueAsync(new DriveFileDeleteJobData
|
||||||
{
|
{
|
||||||
|
|
Loading…
Add table
Reference in a new issue