Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Laura Hausmann
b148c8a0ba
wip: clustered events (ISH-141) 2024-07-24 23:06:47 +02:00
22 changed files with 525 additions and 96 deletions

View file

@ -24,7 +24,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
[EnableRateLimiting("sliding")] [EnableRateLimiting("sliding")]
[EnableCors("mastodon")] [EnableCors("mastodon")]
[Produces(MediaTypeNames.Application.Json)] [Produces(MediaTypeNames.Application.Json)]
public class FilterController(DatabaseContext db, QueueService queueSvc, EventService eventSvc) : ControllerBase public class FilterController(DatabaseContext db, QueueService queueSvc, IEventService eventSvc) : ControllerBase
{ {
[HttpGet] [HttpGet]
[Authorize("read:filters")] [Authorize("read:filters")]
@ -95,7 +95,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Add(filter); db.Add(filter);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterAdded(this, filter); await eventSvc.RaiseFilterAdded(this, filter);
if (expiry.HasValue) if (expiry.HasValue)
{ {
@ -159,7 +159,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Update(filter); db.Update(filter);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter); await eventSvc.RaiseFilterUpdated(this, filter);
if (expiry.HasValue) if (expiry.HasValue)
{ {
@ -183,7 +183,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Remove(filter); db.Remove(filter);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterRemoved(this, filter); await eventSvc.RaiseFilterRemoved(this, filter);
return new object(); return new object();
} }
@ -218,7 +218,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Update(keyword); db.Update(keyword);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter); await eventSvc.RaiseFilterUpdated(this, filter);
return new FilterKeyword(keyword, filter.Id, filter.Keywords.Count - 1); return new FilterKeyword(keyword, filter.Id, filter.Keywords.Count - 1);
} }
@ -257,7 +257,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
filter.Keywords[keywordId] = request.WholeWord ? $"\"{request.Keyword}\"" : request.Keyword; filter.Keywords[keywordId] = request.WholeWord ? $"\"{request.Keyword}\"" : request.Keyword;
db.Update(filter); db.Update(filter);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter); await eventSvc.RaiseFilterUpdated(this, filter);
return new FilterKeyword(filter.Keywords[keywordId], filter.Id, keywordId); return new FilterKeyword(filter.Keywords[keywordId], filter.Id, keywordId);
} }
@ -279,7 +279,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
filter.Keywords.RemoveAt(keywordId); filter.Keywords.RemoveAt(keywordId);
db.Update(filter); db.Update(filter);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter); await eventSvc.RaiseFilterUpdated(this, filter);
return new object(); return new object();
} }

View file

@ -25,7 +25,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
[EnableRateLimiting("sliding")] [EnableRateLimiting("sliding")]
[EnableCors("mastodon")] [EnableCors("mastodon")]
[Produces(MediaTypeNames.Application.Json)] [Produces(MediaTypeNames.Application.Json)]
public class ListController(DatabaseContext db, UserRenderer userRenderer, EventService eventSvc) : ControllerBase public class ListController(DatabaseContext db, UserRenderer userRenderer, IEventService eventSvc) : ControllerBase
{ {
[HttpGet] [HttpGet]
[Authorize("read:lists")] [Authorize("read:lists")]
@ -139,7 +139,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
db.Remove(list); db.Remove(list);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseListMembersUpdated(this, list); await eventSvc.RaiseListMembersUpdated(this, list);
return new object(); return new object();
} }
@ -204,8 +204,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
await db.AddRangeAsync(memberships); await db.AddRangeAsync(memberships);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
await eventSvc.RaiseListMembersUpdated(this, list);
eventSvc.RaiseListMembersUpdated(this, list);
return new object(); return new object();
} }
@ -229,7 +228,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
.Where(p => p.UserList == list && request.AccountIds.Contains(p.UserId)) .Where(p => p.UserList == list && request.AccountIds.Contains(p.UserId))
.ExecuteDeleteAsync(); .ExecuteDeleteAsync();
eventSvc.RaiseListMembersUpdated(this, list); await eventSvc.RaiseListMembersUpdated(this, list);
return new object(); return new object();
} }

View file

@ -18,7 +18,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming;
public sealed class WebSocketConnection( public sealed class WebSocketConnection(
WebSocket socket, WebSocket socket,
OauthToken token, OauthToken token,
EventService eventSvc, IEventService eventSvc,
IServiceScopeFactory scopeFactory, IServiceScopeFactory scopeFactory,
CancellationToken ct CancellationToken ct
) : IDisposable ) : IDisposable
@ -28,7 +28,7 @@ public sealed class WebSocketConnection(
private readonly SemaphorePlus _lock = new(1); private readonly SemaphorePlus _lock = new(1);
private readonly WriteLockingList<string> _muting = []; private readonly WriteLockingList<string> _muting = [];
public readonly List<IChannel> Channels = []; public readonly List<IChannel> Channels = [];
public readonly EventService EventService = eventSvc; public readonly IEventService EventService = eventSvc;
public readonly WriteLockingList<Filter> Filters = []; public readonly WriteLockingList<Filter> Filters = [];
public readonly WriteLockingList<string> Following = []; public readonly WriteLockingList<string> Following = [];
public readonly IServiceScope Scope = scopeFactory.CreateScope(); public readonly IServiceScope Scope = scopeFactory.CreateScope();

View file

@ -8,7 +8,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming;
public static class WebSocketHandler public static class WebSocketHandler
{ {
public static async Task HandleConnectionAsync( public static async Task HandleConnectionAsync(
WebSocket socket, OauthToken token, EventService eventSvc, IServiceScopeFactory scopeFactory, WebSocket socket, OauthToken token, IEventService eventSvc, IServiceScopeFactory scopeFactory,
string? stream, string? list, string? tag, CancellationToken ct string? stream, string? list, string? tag, CancellationToken ct
) )
{ {

View file

@ -14,7 +14,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
public class WebSocketController( public class WebSocketController(
IHostApplicationLifetime appLifetime, IHostApplicationLifetime appLifetime,
DatabaseContext db, DatabaseContext db,
EventService eventSvc, IEventService eventSvc,
IServiceScopeFactory scopeFactory, IServiceScopeFactory scopeFactory,
ILogger<WebSocketController> logger ILogger<WebSocketController> logger
) : ControllerBase ) : ControllerBase

View file

@ -19,7 +19,7 @@ namespace Iceshrimp.Backend.Controllers.Web;
[EnableRateLimiting("sliding")] [EnableRateLimiting("sliding")]
[Route("/api/iceshrimp/filters")] [Route("/api/iceshrimp/filters")]
[Produces(MediaTypeNames.Application.Json)] [Produces(MediaTypeNames.Application.Json)]
public class FilterController(DatabaseContext db, EventService eventSvc) : ControllerBase public class FilterController(DatabaseContext db, IEventService eventSvc) : ControllerBase
{ {
[HttpGet] [HttpGet]
[ProducesResults(HttpStatusCode.OK)] [ProducesResults(HttpStatusCode.OK)]
@ -49,7 +49,7 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
db.Add(filter); db.Add(filter);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterAdded(this, filter); await eventSvc.RaiseFilterAdded(this, filter);
return FilterRenderer.RenderOne(filter); return FilterRenderer.RenderOne(filter);
} }
@ -69,7 +69,7 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
filter.Contexts = request.Contexts.Cast<Filter.FilterContext>().ToList(); filter.Contexts = request.Contexts.Cast<Filter.FilterContext>().ToList();
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter); await eventSvc.RaiseFilterUpdated(this, filter);
} }
[HttpDelete("{id:long}")] [HttpDelete("{id:long}")]
@ -83,6 +83,6 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
db.Remove(filter); db.Remove(filter);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseFilterRemoved(this, filter); await eventSvc.RaiseFilterRemoved(this, filter);
} }
} }

View file

@ -4,6 +4,6 @@ namespace Iceshrimp.Backend.Core.Events;
public class NoteInteraction public class NoteInteraction
{ {
public required Note Note; public required Note Note { get; init; }
public required User User; public required User User { get; init; }
} }

View file

@ -4,6 +4,6 @@ namespace Iceshrimp.Backend.Core.Events;
public class UserInteraction public class UserInteraction
{ {
public required User Actor; public required User Actor { get; init; }
public required User Object; public required User Object { get; init; }
} }

View file

@ -36,8 +36,10 @@ namespace Iceshrimp.Backend.Core.Extensions;
public static class ServiceExtensions public static class ServiceExtensions
{ {
public static void AddServices(this IServiceCollection services) public static void AddServices(this IServiceCollection services, IConfigurationManager configuration)
{ {
var config = configuration.GetSection("Worker").Get<Config.WorkerSection>();
// Transient = instantiated per request and class // Transient = instantiated per request and class
// Scoped = instantiated per request // Scoped = instantiated per request
@ -90,7 +92,6 @@ public static class ServiceExtensions
.AddSingleton<CronService>() .AddSingleton<CronService>()
.AddSingleton<QueueService>() .AddSingleton<QueueService>()
.AddSingleton<ObjectStorageService>() .AddSingleton<ObjectStorageService>()
.AddSingleton<EventService>()
.AddSingleton<RequestBufferingMiddleware>() .AddSingleton<RequestBufferingMiddleware>()
.AddSingleton<AuthorizationMiddleware>() .AddSingleton<AuthorizationMiddleware>()
.AddSingleton<RequestVerificationMiddleware>() .AddSingleton<RequestVerificationMiddleware>()
@ -105,6 +106,17 @@ public static class ServiceExtensions
services.AddHostedService<CronService>(provider => provider.GetRequiredService<CronService>()); services.AddHostedService<CronService>(provider => provider.GetRequiredService<CronService>());
services.AddHostedService<QueueService>(provider => provider.GetRequiredService<QueueService>()); services.AddHostedService<QueueService>(provider => provider.GetRequiredService<QueueService>());
services.AddHostedService<PushService>(provider => provider.GetRequiredService<PushService>()); services.AddHostedService<PushService>(provider => provider.GetRequiredService<PushService>());
// Add service implementations for clustered and non-clustered configurations
if (config?.WorkerId != null)
{
services.AddSingleton<IEventService, ClusteredEventService>()
.AddHostedService(p => (ClusteredEventService)p.GetRequiredService<IEventService>());
}
else
{
services.AddSingleton<IEventService, EventService>();
}
} }
public static void ConfigureServices(this IServiceCollection services, IConfiguration configuration) public static void ConfigureServices(this IServiceCollection services, IConfiguration configuration)

View file

@ -26,7 +26,7 @@ public class ActivityHandlerService(
ObjectResolver objectResolver, ObjectResolver objectResolver,
FollowupTaskService followupTaskSvc, FollowupTaskService followupTaskSvc,
EmojiService emojiSvc, EmojiService emojiSvc,
EventService eventSvc IEventService eventSvc
) )
{ {
public async Task PerformActivityAsync(ASActivity activity, string? inboxUserId, string? authenticatedUserId) public async Task PerformActivityAsync(ASActivity activity, string? inboxUserId, string? authenticatedUserId)
@ -508,7 +508,7 @@ public class ActivityHandlerService(
p.Notifier == follower) p.Notifier == follower)
.ExecuteDeleteAsync(); .ExecuteDeleteAsync();
eventSvc.RaiseUserUnfollowed(this, follower, followee); await eventSvc.RaiseUserUnfollowed(this, follower, followee);
} }
} }

View file

@ -207,8 +207,8 @@ public class BackgroundTaskQueue(int parallelism)
db.Remove(muting); db.Remove(muting);
await db.SaveChangesAsync(token); await db.SaveChangesAsync(token);
var eventSvc = scope.GetRequiredService<EventService>(); var eventSvc = scope.GetRequiredService<IEventService>();
eventSvc.RaiseUserUnmuted(null, muting.Muter, muting.Mutee); await eventSvc.RaiseUserUnmuted(null, muting.Muter, muting.Mutee);
} }
private static async Task ProcessFilterExpiry( private static async Task ProcessFilterExpiry(

View file

@ -0,0 +1,307 @@
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Text.Json.Serialization;
using Iceshrimp.Backend.Core.Configuration;
using Iceshrimp.Backend.Core.Database;
using Iceshrimp.Backend.Core.Database.Tables;
using Iceshrimp.Backend.Core.Events;
using Iceshrimp.Backend.Core.Extensions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Npgsql;
namespace Iceshrimp.Backend.Core.Services;
public class ClusteredEventService(
IServiceScopeFactory scopeFactory,
IOptions<Config.DatabaseSection> dbConfig,
IOptions<Config.WorkerSection> workerConfig,
ILogger<ClusteredEventService> logger
) : BackgroundService, IEventService
{
private readonly NpgsqlDataSource _dataSource = DatabaseContext.GetDataSource(dbConfig.Value);
public event EventHandler<Note>? NotePublished;
public event EventHandler<Note>? NoteUpdated;
public event EventHandler<Note>? NoteDeleted;
public event EventHandler<NoteInteraction>? NoteLiked;
public event EventHandler<NoteInteraction>? NoteUnliked;
public event EventHandler<NoteReaction>? NoteReacted;
public event EventHandler<NoteReaction>? NoteUnreacted;
public event EventHandler<UserInteraction>? UserFollowed;
public event EventHandler<UserInteraction>? UserUnfollowed;
public event EventHandler<UserInteraction>? UserBlocked;
public event EventHandler<UserInteraction>? UserUnblocked;
public event EventHandler<UserInteraction>? UserMuted;
public event EventHandler<UserInteraction>? UserUnmuted;
public event EventHandler<Notification>? Notification;
public event EventHandler<Filter>? FilterAdded;
public event EventHandler<Filter>? FilterRemoved;
public event EventHandler<Filter>? FilterUpdated;
public event EventHandler<UserList>? ListMembersUpdated;
public Task RaiseNotePublished(object? sender, Note note) =>
EmitEvent(new NotePublishedEvent { Args = note });
public Task RaiseNoteUpdated(object? sender, Note note) =>
EmitEvent(new NoteUpdatedEvent { Args = note });
public Task RaiseNoteDeleted(object? sender, Note note) =>
EmitEvent(new NoteDeletedEvent { Args = note });
public Task RaiseNotification(object? sender, Notification notification) =>
EmitEvent(new NotificationEvent { Args = notification });
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications) =>
notifications.Select(p => RaiseNotification(sender, p)).AwaitAllAsync();
public Task RaiseNoteLiked(object? sender, Note note, User user) =>
EmitEvent(new NoteLikedEvent { Args = new NoteInteraction { Note = note, User = user } });
public Task RaiseNoteUnliked(object? sender, Note note, User user) =>
EmitEvent(new NoteUnlikedEvent { Args = new NoteInteraction { Note = note, User = user } });
public Task RaiseNoteReacted(object? sender, NoteReaction reaction) =>
EmitEvent(new NoteReactedEvent { Args = reaction });
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction) =>
EmitEvent(new NoteUnreactedEvent { Args = reaction });
public Task RaiseUserFollowed(object? sender, User actor, User obj) =>
EmitEvent(new UserFollowedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserUnfollowed(object? sender, User actor, User obj) =>
EmitEvent(new UserUnfollowedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserBlocked(object? sender, User actor, User obj) =>
EmitEvent(new UserBlockedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserUnblocked(object? sender, User actor, User obj) =>
EmitEvent(new UserUnblockedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserMuted(object? sender, User actor, User obj) =>
EmitEvent(new UserMutedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserUnmuted(object? sender, User actor, User obj) =>
EmitEvent(new UserUnmutedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseFilterAdded(object? sender, Filter filter) =>
EmitEvent(new FilterAddedEvent { Args = filter });
public Task RaiseFilterRemoved(object? sender, Filter filter) =>
EmitEvent(new FilterRemovedEvent { Args = filter });
public Task RaiseFilterUpdated(object? sender, Filter filter) =>
EmitEvent(new FilterUpdatedEvent { Args = filter });
public Task RaiseListMembersUpdated(object? sender, UserList list) =>
EmitEvent(new ListMembersUpdatedEvent { Args = list });
[JsonDerivedType(typeof(NotePublishedEvent), "notePublished")]
[JsonDerivedType(typeof(NoteUpdatedEvent), "noteUpdated")]
[JsonDerivedType(typeof(NoteDeletedEvent), "noteDeleted")]
[JsonDerivedType(typeof(NoteLikedEvent), "noteLiked")]
[JsonDerivedType(typeof(NoteUnlikedEvent), "noteUnliked")]
[JsonDerivedType(typeof(NotificationEvent), "notification")]
private interface IClusterEvent
{
public string Payload { get; }
}
private class NotePublishedEvent : ClusterEvent<Note>;
private class NoteUpdatedEvent : ClusterEvent<Note>;
private class NoteDeletedEvent : ClusterEvent<Note>;
private class NoteLikedEvent : ClusterEvent<NoteInteraction>;
private class NoteUnlikedEvent : ClusterEvent<NoteInteraction>;
private class NoteReactedEvent : ClusterEvent<NoteReaction>;
private class NoteUnreactedEvent : ClusterEvent<NoteReaction>;
private class UserFollowedEvent : ClusterEvent<UserInteraction>;
private class UserUnfollowedEvent : ClusterEvent<UserInteraction>;
private class UserBlockedEvent : ClusterEvent<UserInteraction>;
private class UserUnblockedEvent : ClusterEvent<UserInteraction>;
private class UserMutedEvent : ClusterEvent<UserInteraction>;
private class UserUnmutedEvent : ClusterEvent<UserInteraction>;
private class NotificationEvent : ClusterEvent<Notification>;
private class FilterAddedEvent : ClusterEvent<Filter>;
private class FilterRemovedEvent : ClusterEvent<Filter>;
private class FilterUpdatedEvent : ClusterEvent<Filter>;
private class ListMembersUpdatedEvent : ClusterEvent<UserList>;
private void HandleEvent(string payload)
{
logger.LogInformation("Handling event: {payload}", payload);
var deserialized = JsonSerializer.Deserialize<IClusterEvent>(payload) ??
throw new Exception("Failed to deserialize cluster event");
switch (deserialized)
{
case NotePublishedEvent e:
NotePublished?.Invoke(this, e.Args);
break;
case NoteUpdatedEvent e:
NoteUpdated?.Invoke(this, e.Args);
break;
case NoteDeletedEvent e:
NoteDeleted?.Invoke(this, e.Args);
break;
case NoteLikedEvent e:
NoteLiked?.Invoke(this, e.Args);
break;
case NoteUnlikedEvent e:
NoteUnliked?.Invoke(this, e.Args);
break;
case NoteReactedEvent e:
NoteReacted?.Invoke(this, e.Args);
break;
case NoteUnreactedEvent e:
NoteUnreacted?.Invoke(this, e.Args);
break;
case UserFollowedEvent e:
UserFollowed?.Invoke(this, e.Args);
break;
case UserUnfollowedEvent e:
UserUnfollowed?.Invoke(this, e.Args);
break;
case UserBlockedEvent e:
UserBlocked?.Invoke(this, e.Args);
break;
case UserUnblockedEvent e:
UserUnblocked?.Invoke(this, e.Args);
break;
case UserMutedEvent e:
UserMuted?.Invoke(this, e.Args);
break;
case UserUnmutedEvent e:
UserUnmuted?.Invoke(this, e.Args);
break;
case NotificationEvent e:
Notification?.Invoke(this, e.Args);
break;
case FilterAddedEvent e:
FilterAdded?.Invoke(this, e.Args);
break;
case FilterRemovedEvent e:
FilterRemoved?.Invoke(this, e.Args);
break;
case FilterUpdatedEvent e:
FilterUpdated?.Invoke(this, e.Args);
break;
case ListMembersUpdatedEvent e:
ListMembersUpdated?.Invoke(this, e.Args);
break;
default:
throw new ArgumentOutOfRangeException(nameof(payload), @"Unknown event type");
}
}
private static readonly JsonSerializerOptions Options =
new(JsonSerializerOptions.Default)
{
ReferenceHandler = ReferenceHandler.Preserve, IgnoreReadOnlyProperties = true
};
[SuppressMessage("ReSharper", "UnusedMember.Local")]
private abstract class ClusterEvent<TEventArgs> : IClusterEvent
{
private readonly string? _payload;
private readonly TEventArgs? _args;
public string Payload
{
get => _payload ?? throw new Exception("_payload was null");
init
{
_payload = value;
_args = JsonSerializer.Deserialize<TEventArgs>(value, Options) ??
throw new Exception("Failed to deserialize cluster event payload");
}
}
public TEventArgs Args
{
get => _args ?? throw new Exception("_args was null");
init
{
_args = value;
_payload = JsonSerializer.Serialize(value, Options);
}
}
}
protected override async Task ExecuteAsync(CancellationToken token)
{
if (workerConfig.Value.WorkerType is not Enums.WorkerType.QueueOnly)
{
while (!token.IsCancellationRequested)
{
try
{
await using var conn = await GetNpgsqlConnection();
conn.Notification += (_, args) =>
{
try
{
if (args.Channel is not "event") return;
HandleEvent(args.Payload);
}
catch (Exception e)
{
logger.LogError("Failed to handle event: {error}", e);
}
};
await using (var cmd = new NpgsqlCommand("LISTEN event", conn))
{
await cmd.ExecuteNonQueryAsync(token);
}
while (!token.IsCancellationRequested)
{
await conn.WaitAsync(token);
}
}
catch
{
// ignored (logging this would spam logs on postgres restart)
}
}
}
}
private async Task EmitEvent(IClusterEvent ev)
{
await using var scope = GetScope();
await using var db = GetDbContext(scope);
var serialized = JsonSerializer.Serialize(ev, Options);
logger.LogInformation("Emitting event: {serialized}", serialized);
await db.Database.ExecuteSqlAsync($"SELECT pg_notify('event', {serialized});");
}
private async Task<NpgsqlConnection> GetNpgsqlConnection() =>
await _dataSource.OpenConnectionAsync();
private AsyncServiceScope GetScope() => scopeFactory.CreateAsyncScope();
private static DatabaseContext GetDbContext(IServiceScope scope) =>
scope.ServiceProvider.GetRequiredService<DatabaseContext>();
}

View file

@ -3,7 +3,7 @@ using Iceshrimp.Backend.Core.Events;
namespace Iceshrimp.Backend.Core.Services; namespace Iceshrimp.Backend.Core.Services;
public class EventService public class EventService : IEventService
{ {
public event EventHandler<Note>? NotePublished; public event EventHandler<Note>? NotePublished;
public event EventHandler<Note>? NoteUpdated; public event EventHandler<Note>? NoteUpdated;
@ -24,50 +24,117 @@ public class EventService
public event EventHandler<Filter>? FilterUpdated; public event EventHandler<Filter>? FilterUpdated;
public event EventHandler<UserList>? ListMembersUpdated; public event EventHandler<UserList>? ListMembersUpdated;
public void RaiseNotePublished(object? sender, Note note) => NotePublished?.Invoke(sender, note); public Task RaiseNotePublished(object? sender, Note note)
public void RaiseNoteUpdated(object? sender, Note note) => NoteUpdated?.Invoke(sender, note);
public void RaiseNoteDeleted(object? sender, Note note) => NoteDeleted?.Invoke(sender, note);
public void RaiseNotification(object? sender, Notification notification) =>
Notification?.Invoke(sender, notification);
public void RaiseNotifications(object? sender, IEnumerable<Notification> notifications)
{ {
foreach (var notification in notifications) Notification?.Invoke(sender, notification); NotePublished?.Invoke(sender, note);
return Task.CompletedTask;
} }
public void RaiseNoteLiked(object? sender, Note note, User user) => public Task RaiseNoteUpdated(object? sender, Note note)
{
NoteUpdated?.Invoke(sender, note);
return Task.CompletedTask;
}
public Task RaiseNoteDeleted(object? sender, Note note)
{
NoteDeleted?.Invoke(sender, note);
return Task.CompletedTask;
}
public Task RaiseNotification(object? sender, Notification notification)
{
Notification?.Invoke(sender, notification);
return Task.CompletedTask;
}
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications)
{
foreach (var notification in notifications) Notification?.Invoke(sender, notification);
return Task.CompletedTask;
}
public Task RaiseNoteLiked(object? sender, Note note, User user)
{
NoteLiked?.Invoke(sender, new NoteInteraction { Note = note, User = user }); NoteLiked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
return Task.CompletedTask;
}
public void RaiseNoteUnliked(object? sender, Note note, User user) => public Task RaiseNoteUnliked(object? sender, Note note, User user)
{
NoteUnliked?.Invoke(sender, new NoteInteraction { Note = note, User = user }); NoteUnliked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
return Task.CompletedTask;
}
public void RaiseNoteReacted(object? sender, NoteReaction reaction) => public Task RaiseNoteReacted(object? sender, NoteReaction reaction)
{
NoteReacted?.Invoke(sender, reaction); NoteReacted?.Invoke(sender, reaction);
return Task.CompletedTask;
}
public void RaiseNoteUnreacted(object? sender, NoteReaction reaction) => public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction)
{
NoteUnreacted?.Invoke(sender, reaction); NoteUnreacted?.Invoke(sender, reaction);
return Task.CompletedTask;
}
public void RaiseUserFollowed(object? sender, User actor, User obj) => public Task RaiseUserFollowed(object? sender, User actor, User obj)
{
UserFollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj }); UserFollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public void RaiseUserUnfollowed(object? sender, User actor, User obj) => public Task RaiseUserUnfollowed(object? sender, User actor, User obj)
{
UserUnfollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj }); UserUnfollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public void RaiseUserBlocked(object? sender, User actor, User obj) => public Task RaiseUserBlocked(object? sender, User actor, User obj)
{
UserBlocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj }); UserBlocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public void RaiseUserUnblocked(object? sender, User actor, User obj) => public Task RaiseUserUnblocked(object? sender, User actor, User obj)
{
UserUnblocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj }); UserUnblocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public void RaiseUserMuted(object? sender, User actor, User obj) => public Task RaiseUserMuted(object? sender, User actor, User obj)
{
UserMuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj }); UserMuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public void RaiseUserUnmuted(object? sender, User actor, User obj) => public Task RaiseUserUnmuted(object? sender, User actor, User obj)
{
UserUnmuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj }); UserUnmuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public void RaiseFilterAdded(object? sender, Filter filter) => FilterAdded?.Invoke(sender, filter); public Task RaiseFilterAdded(object? sender, Filter filter)
public void RaiseFilterRemoved(object? sender, Filter filter) => FilterRemoved?.Invoke(sender, filter); {
public void RaiseFilterUpdated(object? sender, Filter filter) => FilterUpdated?.Invoke(sender, filter); FilterAdded?.Invoke(sender, filter);
public void RaiseListMembersUpdated(object? sender, UserList list) => ListMembersUpdated?.Invoke(sender, list); return Task.CompletedTask;
}
public Task RaiseFilterRemoved(object? sender, Filter filter)
{
FilterRemoved?.Invoke(sender, filter);
return Task.CompletedTask;
}
public Task RaiseFilterUpdated(object? sender, Filter filter)
{
FilterUpdated?.Invoke(sender, filter);
return Task.CompletedTask;
}
public Task RaiseListMembersUpdated(object? sender, UserList list)
{
ListMembersUpdated?.Invoke(sender, list);
return Task.CompletedTask;
}
} }

View file

@ -0,0 +1,46 @@
using Iceshrimp.Backend.Core.Database.Tables;
using Iceshrimp.Backend.Core.Events;
namespace Iceshrimp.Backend.Core.Services;
public interface IEventService
{
public event EventHandler<Note>? NotePublished;
public event EventHandler<Note>? NoteUpdated;
public event EventHandler<Note>? NoteDeleted;
public event EventHandler<NoteInteraction>? NoteLiked;
public event EventHandler<NoteInteraction>? NoteUnliked;
public event EventHandler<NoteReaction>? NoteReacted;
public event EventHandler<NoteReaction>? NoteUnreacted;
public event EventHandler<UserInteraction>? UserFollowed;
public event EventHandler<UserInteraction>? UserUnfollowed;
public event EventHandler<UserInteraction>? UserBlocked;
public event EventHandler<UserInteraction>? UserUnblocked;
public event EventHandler<UserInteraction>? UserMuted;
public event EventHandler<UserInteraction>? UserUnmuted;
public event EventHandler<Notification>? Notification;
public event EventHandler<Filter>? FilterAdded;
public event EventHandler<Filter>? FilterRemoved;
public event EventHandler<Filter>? FilterUpdated;
public event EventHandler<UserList>? ListMembersUpdated;
public Task RaiseNotePublished(object? sender, Note note);
public Task RaiseNoteUpdated(object? sender, Note note);
public Task RaiseNoteDeleted(object? sender, Note note);
public Task RaiseNotification(object? sender, Notification notification);
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications);
public Task RaiseNoteLiked(object? sender, Note note, User user);
public Task RaiseNoteUnliked(object? sender, Note note, User user);
public Task RaiseNoteReacted(object? sender, NoteReaction reaction);
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction);
public Task RaiseUserFollowed(object? sender, User actor, User obj);
public Task RaiseUserUnfollowed(object? sender, User actor, User obj);
public Task RaiseUserBlocked(object? sender, User actor, User obj);
public Task RaiseUserUnblocked(object? sender, User actor, User obj);
public Task RaiseUserMuted(object? sender, User actor, User obj);
public Task RaiseUserUnmuted(object? sender, User actor, User obj);
public Task RaiseFilterAdded(object? sender, Filter filter);
public Task RaiseFilterRemoved(object? sender, Filter filter);
public Task RaiseFilterUpdated(object? sender, Filter filter);
public Task RaiseListMembersUpdated(object? sender, UserList list);
}

View file

@ -38,7 +38,7 @@ public class NoteService(
ActivityPub.MentionsResolver mentionsResolver, ActivityPub.MentionsResolver mentionsResolver,
DriveService driveSvc, DriveService driveSvc,
NotificationService notificationSvc, NotificationService notificationSvc,
EventService eventSvc, IEventService eventSvc,
ActivityPub.ActivityRenderer activityRenderer, ActivityPub.ActivityRenderer activityRenderer,
EmojiService emojiSvc, EmojiService emojiSvc,
FollowupTaskService followupTaskSvc, FollowupTaskService followupTaskSvc,
@ -243,7 +243,7 @@ public class NoteService(
await UpdateNoteCountersAsync(note, true); await UpdateNoteCountersAsync(note, true);
await db.AddAsync(note); await db.AddAsync(note);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotePublished(this, note); await eventSvc.RaiseNotePublished(this, note);
await notificationSvc.GenerateMentionNotifications(note, mentionedLocalUserIds); await notificationSvc.GenerateMentionNotifications(note, mentionedLocalUserIds);
await notificationSvc.GenerateReplyNotifications(note, mentionedLocalUserIds); await notificationSvc.GenerateReplyNotifications(note, mentionedLocalUserIds);
await notificationSvc.GenerateRenoteNotification(note); await notificationSvc.GenerateRenoteNotification(note);
@ -581,7 +581,7 @@ public class NoteService(
} }
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNoteUpdated(this, note); await eventSvc.RaiseNoteUpdated(this, note);
if (!isEdit) return note; if (!isEdit) return note;
@ -611,7 +611,7 @@ public class NoteService(
logger.LogDebug("Deleting note '{id}' owned by {userId}", note.Id, note.User.Id); logger.LogDebug("Deleting note '{id}' owned by {userId}", note.Id, note.User.Id);
db.Remove(note); db.Remove(note);
eventSvc.RaiseNoteDeleted(this, note); await eventSvc.RaiseNoteDeleted(this, note);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
await UpdateNoteCountersAsync(note, false); await UpdateNoteCountersAsync(note, false);
@ -710,8 +710,7 @@ public class NoteService(
await db.Notes.Where(p => p.Id == note.Id) await db.Notes.Where(p => p.Id == note.Id)
.ExecuteUpdateAsync(p => p.SetProperty(n => n.RenoteCount, n => n.RenoteCount - 1)); .ExecuteUpdateAsync(p => p.SetProperty(n => n.RenoteCount, n => n.RenoteCount - 1));
foreach (var hit in notes) await notes.Select(hit => eventSvc.RaiseNoteDeleted(this, hit)).AwaitAllAsync();
eventSvc.RaiseNoteDeleted(this, hit);
} }
public async Task<Note?> ProcessNoteAsync(ASNote note, User actor, User? user = null) public async Task<Note?> ProcessNoteAsync(ASNote note, User actor, User? user = null)
@ -1147,7 +1146,7 @@ public class NoteService(
await deliverSvc.DeliverToConditionalAsync(activity, user, note); await deliverSvc.DeliverToConditionalAsync(activity, user, note);
} }
eventSvc.RaiseNoteLiked(this, note, user); await eventSvc.RaiseNoteLiked(this, note, user);
await notificationSvc.GenerateLikeNotification(note, user); await notificationSvc.GenerateLikeNotification(note, user);
return true; return true;
} }
@ -1172,7 +1171,7 @@ public class NoteService(
await deliverSvc.DeliverToConditionalAsync(activity, user, note); await deliverSvc.DeliverToConditionalAsync(activity, user, note);
} }
eventSvc.RaiseNoteUnliked(this, note, user); await eventSvc.RaiseNoteUnliked(this, note, user);
await db.Notifications await db.Notifications
.Where(p => p.Type == Notification.NotificationType.Like && .Where(p => p.Type == Notification.NotificationType.Like &&
p.Notifiee == note.User && p.Notifiee == note.User &&
@ -1362,7 +1361,7 @@ public class NoteService(
await db.AddAsync(reaction); await db.AddAsync(reaction);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNoteReacted(this, reaction); await eventSvc.RaiseNoteReacted(this, reaction);
await notificationSvc.GenerateReactionNotification(reaction); await notificationSvc.GenerateReactionNotification(reaction);
// @formatter:off // @formatter:off
@ -1398,7 +1397,7 @@ public class NoteService(
if (reaction == null) return (name, false); if (reaction == null) return (name, false);
db.Remove(reaction); db.Remove(reaction);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNoteUnreacted(this, reaction); await eventSvc.RaiseNoteUnreacted(this, reaction);
await db.Database await db.Database
.ExecuteSqlAsync($"""UPDATE "note" SET "reactions" = jsonb_set("reactions", ARRAY[{name}], (COALESCE("reactions"->>{name}, '1')::int - 1)::text::jsonb) WHERE "id" = {note.Id}"""); .ExecuteSqlAsync($"""UPDATE "note" SET "reactions" = jsonb_set("reactions", ARRAY[{name}], (COALESCE("reactions"->>{name}, '1')::int - 1)::text::jsonb) WHERE "id" = {note.Id}""");

View file

@ -10,7 +10,7 @@ namespace Iceshrimp.Backend.Core.Services;
public class NotificationService( public class NotificationService(
[SuppressMessage("ReSharper", "SuggestBaseTypeForParameterInConstructor")] [SuppressMessage("ReSharper", "SuggestBaseTypeForParameterInConstructor")]
DatabaseContext db, DatabaseContext db,
EventService eventSvc IEventService eventSvc
) )
{ {
public async Task GenerateMentionNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds) public async Task GenerateMentionNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds)
@ -38,7 +38,7 @@ public class NotificationService(
await db.AddRangeAsync(notifications); await db.AddRangeAsync(notifications);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotifications(this, notifications); await eventSvc.RaiseNotifications(this, notifications);
} }
public async Task GenerateReplyNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds) public async Task GenerateReplyNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds)
@ -74,7 +74,7 @@ public class NotificationService(
await db.AddRangeAsync(notifications); await db.AddRangeAsync(notifications);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotifications(this, notifications); await eventSvc.RaiseNotifications(this, notifications);
} }
[SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall", [SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall",
@ -99,7 +99,7 @@ public class NotificationService(
await db.AddRangeAsync(notifications); await db.AddRangeAsync(notifications);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotifications(this, notifications); await eventSvc.RaiseNotifications(this, notifications);
} }
public async Task GenerateLikeNotification(Note note, User user) public async Task GenerateLikeNotification(Note note, User user)
@ -119,7 +119,7 @@ public class NotificationService(
await db.AddAsync(notification); await db.AddAsync(notification);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification); await eventSvc.RaiseNotification(this, notification);
} }
public async Task GenerateReactionNotification(NoteReaction reaction) public async Task GenerateReactionNotification(NoteReaction reaction)
@ -139,7 +139,7 @@ public class NotificationService(
await db.AddAsync(notification); await db.AddAsync(notification);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification); await eventSvc.RaiseNotification(this, notification);
} }
public async Task GenerateFollowNotification(User follower, User followee) public async Task GenerateFollowNotification(User follower, User followee)
@ -157,8 +157,8 @@ public class NotificationService(
await db.AddAsync(notification); await db.AddAsync(notification);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification); await eventSvc.RaiseNotification(this, notification);
eventSvc.RaiseUserFollowed(this, follower, followee); await eventSvc.RaiseUserFollowed(this, follower, followee);
} }
public async Task GenerateFollowRequestReceivedNotification(FollowRequest followRequest) public async Task GenerateFollowRequestReceivedNotification(FollowRequest followRequest)
@ -177,7 +177,7 @@ public class NotificationService(
await db.AddAsync(notification); await db.AddAsync(notification);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification); await eventSvc.RaiseNotification(this, notification);
} }
public async Task GenerateFollowRequestAcceptedNotification(FollowRequest followRequest) public async Task GenerateFollowRequestAcceptedNotification(FollowRequest followRequest)
@ -196,8 +196,8 @@ public class NotificationService(
await db.AddAsync(notification); await db.AddAsync(notification);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification); await eventSvc.RaiseNotification(this, notification);
eventSvc.RaiseUserFollowed(this, followRequest.Follower, followRequest.Followee); await eventSvc.RaiseUserFollowed(this, followRequest.Follower, followRequest.Followee);
} }
public async Task GenerateBiteNotification(Bite bite) public async Task GenerateBiteNotification(Bite bite)
@ -215,7 +215,7 @@ public class NotificationService(
await db.AddAsync(notification); await db.AddAsync(notification);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification); await eventSvc.RaiseNotification(this, notification);
} }
public async Task GeneratePollEndedNotifications(Note note) public async Task GeneratePollEndedNotifications(Note note)
@ -252,8 +252,10 @@ public class NotificationService(
await db.AddRangeAsync(notifications); await db.AddRangeAsync(notifications);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
foreach (var notification in notifications) await notifications.Select(notification => eventSvc.RaiseNotification(this, notification))
eventSvc.RaiseNotification(this, notification); .Chunk(20)
.Select(async chunk => await chunk.AwaitAllAsync())
.AwaitAllNoConcurrencyAsync();
} }
[SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall", Justification = "Projectables")] [SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall", Justification = "Projectables")]
@ -277,6 +279,6 @@ public class NotificationService(
await db.AddAsync(notification); await db.AddAsync(notification);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification); await eventSvc.RaiseNotification(this, notification);
} }
} }

View file

@ -17,7 +17,7 @@ using WebPushSubscription = Iceshrimp.WebPush.PushSubscription;
namespace Iceshrimp.Backend.Core.Services; namespace Iceshrimp.Backend.Core.Services;
public class PushService( public class PushService(
EventService eventSvc, IEventService eventSvc,
ILogger<PushService> logger, ILogger<PushService> logger,
IServiceScopeFactory scopeFactory, IServiceScopeFactory scopeFactory,
HttpClient httpClient, HttpClient httpClient,

View file

@ -19,14 +19,14 @@ public sealed class StreamingService
}); });
private readonly ConcurrentDictionary<string, StreamingConnectionAggregate> _connections = []; private readonly ConcurrentDictionary<string, StreamingConnectionAggregate> _connections = [];
private readonly EventService _eventSvc; private readonly IEventService _eventSvc;
private readonly IHubContext<StreamingHub, IStreamingHubClient> _hub; private readonly IHubContext<StreamingHub, IStreamingHubClient> _hub;
private readonly ILogger<StreamingService> _logger; private readonly ILogger<StreamingService> _logger;
private readonly IServiceScopeFactory _scopeFactory; private readonly IServiceScopeFactory _scopeFactory;
public StreamingService( public StreamingService(
IHubContext<StreamingHub, IStreamingHubClient> hub, IHubContext<StreamingHub, IStreamingHubClient> hub,
EventService eventSvc, IEventService eventSvc,
IServiceScopeFactory scopeFactory, IServiceScopeFactory scopeFactory,
ILogger<StreamingService> logger ILogger<StreamingService> logger
) )

View file

@ -36,7 +36,7 @@ public class UserService(
ActivityPub.MentionsResolver mentionsResolver, ActivityPub.MentionsResolver mentionsResolver,
ActivityPub.UserRenderer userRenderer, ActivityPub.UserRenderer userRenderer,
QueueService queueSvc, QueueService queueSvc,
EventService eventSvc, IEventService eventSvc,
WebFingerService webFingerSvc, WebFingerService webFingerSvc,
ActivityPub.FederationControlService fedCtrlSvc ActivityPub.FederationControlService fedCtrlSvc
) )
@ -884,7 +884,7 @@ public class UserService(
} }
followee.PrecomputedIsFollowedBy = false; followee.PrecomputedIsFollowedBy = false;
eventSvc.RaiseUserUnfollowed(this, user, followee); await eventSvc.RaiseUserUnfollowed(this, user, followee);
} }
if (followee.PrecomputedIsRequestedBy ?? false) if (followee.PrecomputedIsRequestedBy ?? false)
@ -1080,8 +1080,7 @@ public class UserService(
}; };
await db.AddAsync(muting); await db.AddAsync(muting);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
await eventSvc.RaiseUserMuted(this, muter, mutee);
eventSvc.RaiseUserMuted(this, muter, mutee);
if (expiration != null) if (expiration != null)
{ {
@ -1096,7 +1095,7 @@ public class UserService(
return; return;
await db.Mutings.Where(p => p.Muter == muter && p.Mutee == mutee).ExecuteDeleteAsync(); await db.Mutings.Where(p => p.Muter == muter && p.Mutee == mutee).ExecuteDeleteAsync();
eventSvc.RaiseUserUnmuted(this, muter, mutee); await eventSvc.RaiseUserUnmuted(this, muter, mutee);
mutee.PrecomputedIsMutedBy = false; mutee.PrecomputedIsMutedBy = false;
} }
@ -1139,8 +1138,7 @@ public class UserService(
await db.AddAsync(blocking); await db.AddAsync(blocking);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
await eventSvc.RaiseUserBlocked(this, blocker, blockee);
eventSvc.RaiseUserBlocked(this, blocker, blockee);
if (blocker.IsLocalUser && blockee.IsRemoteUser) if (blocker.IsLocalUser && blockee.IsRemoteUser)
{ {
@ -1163,8 +1161,7 @@ public class UserService(
db.Remove(blocking); db.Remove(blocking);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
await eventSvc.RaiseUserUnblocked(this, blocker, blockee);
eventSvc.RaiseUserUnblocked(this, blocker, blockee);
if (blocker.IsLocalUser && blockee.IsRemoteUser) if (blocker.IsLocalUser && blockee.IsRemoteUser)
{ {

View file

@ -22,7 +22,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
private readonly WriteLockingList<string> _blocking = []; private readonly WriteLockingList<string> _blocking = [];
private readonly WriteLockingList<string> _connectionIds = []; private readonly WriteLockingList<string> _connectionIds = [];
private readonly EventService _eventService; private readonly IEventService _eventService;
private readonly WriteLockingList<string> _following = []; private readonly WriteLockingList<string> _following = [];
@ -239,7 +239,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
string userId, string userId,
User user, User user,
IHubContext<StreamingHub, IStreamingHubClient> hub, IHubContext<StreamingHub, IStreamingHubClient> hub,
EventService eventSvc, IEventService eventSvc,
IServiceScopeFactory scopeFactory, StreamingService streamingService IServiceScopeFactory scopeFactory, StreamingService streamingService
) )
{ {

View file

@ -34,7 +34,7 @@ builder.Services.AddSignalR().AddMessagePackProtocol();
builder.Services.AddResponseCompression(); builder.Services.AddResponseCompression();
builder.Services.AddRazorPages(); builder.Services.AddRazorPages();
builder.Services.AddServices(); builder.Services.AddServices(builder.Configuration);
builder.Services.ConfigureServices(builder.Configuration); builder.Services.ConfigureServices(builder.Configuration);
builder.WebHost.ConfigureKestrel(builder.Configuration); builder.WebHost.ConfigureKestrel(builder.Configuration);
builder.WebHost.UseStaticWebAssets(); builder.WebHost.UseStaticWebAssets();

View file

@ -39,10 +39,10 @@ public static class MockObjects
private static ServiceProvider GetServiceProvider() private static ServiceProvider GetServiceProvider()
{ {
var config = new ConfigurationManager(); var config = new ConfigurationManager();
config.AddIniFile("configuration.ini", false); config.AddIniStream(AssemblyHelpers.GetEmbeddedResourceStream("configuration.ini"));
var collection = new ServiceCollection(); var collection = new ServiceCollection();
collection.AddServices(); collection.AddServices(config);
collection.ConfigureServices(config); collection.ConfigureServices(config);
return collection.BuildServiceProvider(); return collection.BuildServiceProvider();