[backend/federation] Filter skipped & blocked instances in PreDeliverQueue (ISH-366)

This:
- improves performance (because we can check them in batches)
- improves efficiency (because we don't create deliver jobs that get discarded immediately)
- allows us to check whether the activity is an ASFollow & let it through if it is (for skipped instances filter only)

We still check for blocked instances in the deliver queue in case a deliver job is created via another code path.
This commit is contained in:
Laura Hausmann 2024-06-19 17:32:35 +02:00
parent aed143cc59
commit 9dbc7cfc14
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
3 changed files with 63 additions and 43 deletions

View file

@ -37,12 +37,4 @@ public class FederationControlService(
return await db.BlockedInstances.AnyAsync(p => finalHosts.Any(host => host == p.Host || return await db.BlockedInstances.AnyAsync(p => finalHosts.Any(host => host == p.Host ||
host.EndsWith("." + p.Host))); host.EndsWith("." + p.Host)));
} }
public async Task<bool> ShouldSkipAsync(string host)
{
return await db.Instances.AnyAsync(p => p.Host == host.ToLowerInvariant() &&
((p.IsNotResponding &&
p.LastCommunicatedAt < DateTime.UtcNow - TimeSpan.FromDays(7)) ||
p.IsSuspended));
}
} }

View file

@ -29,12 +29,6 @@ public class DeliverQueue(int parallelism)
return; return;
} }
if (await fedCtrl.ShouldSkipAsync(jobData.RecipientHost))
{
logger.LogDebug("fedCtrl.ShouldSkipAsync returned true, skipping");
return;
}
logger.LogDebug("Delivering activity to: {uri}", jobData.InboxUrl); logger.LogDebug("Delivering activity to: {uri}", jobData.InboxUrl);
var key = await cache.FetchAsync($"userPrivateKey:{jobData.UserId}", TimeSpan.FromMinutes(60), async () => var key = await cache.FetchAsync($"userPrivateKey:{jobData.UserId}", TimeSpan.FromMinutes(60), async () =>

View file

@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using System.Linq.Expressions;
using Iceshrimp.Backend.Core.Configuration; using Iceshrimp.Backend.Core.Configuration;
using Iceshrimp.Backend.Core.Database; using Iceshrimp.Backend.Core.Database;
using Iceshrimp.Backend.Core.Database.Tables; using Iceshrimp.Backend.Core.Database.Tables;
@ -61,6 +62,8 @@ public class PreDeliverQueue(int parallelism)
var inboxQueryResults = await query.Where(p => p.InboxUrl != null && p.Host != null) var inboxQueryResults = await query.Where(p => p.InboxUrl != null && p.Host != null)
.Distinct() .Distinct()
.SkipDeadInstances(activity, db)
.SkipBlockedInstances(config.Value.FederationMode, db)
.ToListAsync(token); .ToListAsync(token);
if (inboxQueryResults.Count == 0) return; if (inboxQueryResults.Count == 0) return;
@ -91,43 +94,74 @@ public class PreDeliverQueue(int parallelism)
UserId = jobData.ActorId UserId = jobData.ActorId
}); });
} }
}
private class InboxQueryResult : IEquatable<InboxQueryResult> file class InboxQueryResult : IEquatable<InboxQueryResult>
{
public required string? Host;
public required string? InboxUrl;
public bool Equals(InboxQueryResult? other)
{ {
public required string? Host; if (ReferenceEquals(null, other)) return false;
public required string? InboxUrl; if (ReferenceEquals(this, other)) return true;
return InboxUrl == other.InboxUrl && Host == other.Host;
}
public bool Equals(InboxQueryResult? other) public override bool Equals(object? obj)
{ {
if (ReferenceEquals(null, other)) return false; if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, other)) return true; if (ReferenceEquals(this, obj)) return true;
return InboxUrl == other.InboxUrl && Host == other.Host; if (obj.GetType() != GetType()) return false;
} return Equals((InboxQueryResult)obj);
}
public override bool Equals(object? obj) [SuppressMessage("ReSharper", "NonReadonlyMemberInGetHashCode", Justification =
{ "We are using this as a Tuple that works with LINQ on our IQueryable iterator. This is therefore intended behavior.")]
if (ReferenceEquals(null, obj)) return false; public override int GetHashCode()
if (ReferenceEquals(this, obj)) return true; {
if (obj.GetType() != GetType()) return false; return HashCode.Combine(InboxUrl, Host);
return Equals((InboxQueryResult)obj); }
}
[SuppressMessage("ReSharper", "NonReadonlyMemberInGetHashCode", Justification = public static bool operator ==(InboxQueryResult? left, InboxQueryResult? right)
"We are using this as a Tuple that works with LINQ on our IQueryable iterator. This is therefore intended behavior.")] {
public override int GetHashCode() return Equals(left, right);
{ }
return HashCode.Combine(InboxUrl, Host);
}
public static bool operator ==(InboxQueryResult? left, InboxQueryResult? right) public static bool operator !=(InboxQueryResult? left, InboxQueryResult? right)
{ {
return Equals(left, right); return !Equals(left, right);
} }
}
public static bool operator !=(InboxQueryResult? left, InboxQueryResult? right) file static class QueryableExtensions
{
public static IQueryable<InboxQueryResult> SkipDeadInstances(
this IQueryable<InboxQueryResult> query, ASActivity activity, DatabaseContext db
)
{
if (activity is ASFollow) return query;
return query.Where(user => !db.Instances.Any(p => p.Host == user.Host &&
((p.IsNotResponding &&
p.LastCommunicatedAt <
DateTime.UtcNow - TimeSpan.FromDays(7)) ||
p.IsSuspended)));
}
public static IQueryable<InboxQueryResult> SkipBlockedInstances(
this IQueryable<InboxQueryResult> query, Enums.FederationMode mode, DatabaseContext db
)
{
Expression<Func<InboxQueryResult, bool>> expr = mode switch
{ {
return !Equals(left, right); Enums.FederationMode.BlockList => u =>
} u.Host == null || !db.BlockedInstances.Any(p => u.Host == p.Host || u.Host.EndsWith("." + p.Host)),
Enums.FederationMode.AllowList => u =>
u.Host == null || db.AllowedInstances.Any(p => u.Host == p.Host || u.Host.EndsWith("." + p.Host)),
_ => throw new ArgumentOutOfRangeException(nameof(mode), mode, null)
};
return query.Where(expr);
} }
} }