Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Laura Hausmann
b148c8a0ba
wip: clustered events (ISH-141) 2024-07-24 23:06:47 +02:00
22 changed files with 525 additions and 96 deletions

View file

@ -24,7 +24,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
[EnableRateLimiting("sliding")]
[EnableCors("mastodon")]
[Produces(MediaTypeNames.Application.Json)]
public class FilterController(DatabaseContext db, QueueService queueSvc, EventService eventSvc) : ControllerBase
public class FilterController(DatabaseContext db, QueueService queueSvc, IEventService eventSvc) : ControllerBase
{
[HttpGet]
[Authorize("read:filters")]
@ -95,7 +95,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Add(filter);
await db.SaveChangesAsync();
eventSvc.RaiseFilterAdded(this, filter);
await eventSvc.RaiseFilterAdded(this, filter);
if (expiry.HasValue)
{
@ -159,7 +159,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Update(filter);
await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter);
await eventSvc.RaiseFilterUpdated(this, filter);
if (expiry.HasValue)
{
@ -183,7 +183,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Remove(filter);
await db.SaveChangesAsync();
eventSvc.RaiseFilterRemoved(this, filter);
await eventSvc.RaiseFilterRemoved(this, filter);
return new object();
}
@ -218,7 +218,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
db.Update(keyword);
await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter);
await eventSvc.RaiseFilterUpdated(this, filter);
return new FilterKeyword(keyword, filter.Id, filter.Keywords.Count - 1);
}
@ -257,7 +257,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
filter.Keywords[keywordId] = request.WholeWord ? $"\"{request.Keyword}\"" : request.Keyword;
db.Update(filter);
await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter);
await eventSvc.RaiseFilterUpdated(this, filter);
return new FilterKeyword(filter.Keywords[keywordId], filter.Id, keywordId);
}
@ -279,7 +279,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
filter.Keywords.RemoveAt(keywordId);
db.Update(filter);
await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter);
await eventSvc.RaiseFilterUpdated(this, filter);
return new object();
}

View file

@ -25,7 +25,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
[EnableRateLimiting("sliding")]
[EnableCors("mastodon")]
[Produces(MediaTypeNames.Application.Json)]
public class ListController(DatabaseContext db, UserRenderer userRenderer, EventService eventSvc) : ControllerBase
public class ListController(DatabaseContext db, UserRenderer userRenderer, IEventService eventSvc) : ControllerBase
{
[HttpGet]
[Authorize("read:lists")]
@ -139,7 +139,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
db.Remove(list);
await db.SaveChangesAsync();
eventSvc.RaiseListMembersUpdated(this, list);
await eventSvc.RaiseListMembersUpdated(this, list);
return new object();
}
@ -204,8 +204,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
await db.AddRangeAsync(memberships);
await db.SaveChangesAsync();
eventSvc.RaiseListMembersUpdated(this, list);
await eventSvc.RaiseListMembersUpdated(this, list);
return new object();
}
@ -229,7 +228,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
.Where(p => p.UserList == list && request.AccountIds.Contains(p.UserId))
.ExecuteDeleteAsync();
eventSvc.RaiseListMembersUpdated(this, list);
await eventSvc.RaiseListMembersUpdated(this, list);
return new object();
}

View file

@ -18,7 +18,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming;
public sealed class WebSocketConnection(
WebSocket socket,
OauthToken token,
EventService eventSvc,
IEventService eventSvc,
IServiceScopeFactory scopeFactory,
CancellationToken ct
) : IDisposable
@ -28,7 +28,7 @@ public sealed class WebSocketConnection(
private readonly SemaphorePlus _lock = new(1);
private readonly WriteLockingList<string> _muting = [];
public readonly List<IChannel> Channels = [];
public readonly EventService EventService = eventSvc;
public readonly IEventService EventService = eventSvc;
public readonly WriteLockingList<Filter> Filters = [];
public readonly WriteLockingList<string> Following = [];
public readonly IServiceScope Scope = scopeFactory.CreateScope();

View file

@ -8,7 +8,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming;
public static class WebSocketHandler
{
public static async Task HandleConnectionAsync(
WebSocket socket, OauthToken token, EventService eventSvc, IServiceScopeFactory scopeFactory,
WebSocket socket, OauthToken token, IEventService eventSvc, IServiceScopeFactory scopeFactory,
string? stream, string? list, string? tag, CancellationToken ct
)
{

View file

@ -14,7 +14,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
public class WebSocketController(
IHostApplicationLifetime appLifetime,
DatabaseContext db,
EventService eventSvc,
IEventService eventSvc,
IServiceScopeFactory scopeFactory,
ILogger<WebSocketController> logger
) : ControllerBase

View file

@ -19,7 +19,7 @@ namespace Iceshrimp.Backend.Controllers.Web;
[EnableRateLimiting("sliding")]
[Route("/api/iceshrimp/filters")]
[Produces(MediaTypeNames.Application.Json)]
public class FilterController(DatabaseContext db, EventService eventSvc) : ControllerBase
public class FilterController(DatabaseContext db, IEventService eventSvc) : ControllerBase
{
[HttpGet]
[ProducesResults(HttpStatusCode.OK)]
@ -49,7 +49,7 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
db.Add(filter);
await db.SaveChangesAsync();
eventSvc.RaiseFilterAdded(this, filter);
await eventSvc.RaiseFilterAdded(this, filter);
return FilterRenderer.RenderOne(filter);
}
@ -69,7 +69,7 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
filter.Contexts = request.Contexts.Cast<Filter.FilterContext>().ToList();
await db.SaveChangesAsync();
eventSvc.RaiseFilterUpdated(this, filter);
await eventSvc.RaiseFilterUpdated(this, filter);
}
[HttpDelete("{id:long}")]
@ -83,6 +83,6 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
db.Remove(filter);
await db.SaveChangesAsync();
eventSvc.RaiseFilterRemoved(this, filter);
await eventSvc.RaiseFilterRemoved(this, filter);
}
}

View file

@ -4,6 +4,6 @@ namespace Iceshrimp.Backend.Core.Events;
public class NoteInteraction
{
public required Note Note;
public required User User;
public required Note Note { get; init; }
public required User User { get; init; }
}

View file

@ -4,6 +4,6 @@ namespace Iceshrimp.Backend.Core.Events;
public class UserInteraction
{
public required User Actor;
public required User Object;
public required User Actor { get; init; }
public required User Object { get; init; }
}

View file

@ -36,8 +36,10 @@ namespace Iceshrimp.Backend.Core.Extensions;
public static class ServiceExtensions
{
public static void AddServices(this IServiceCollection services)
public static void AddServices(this IServiceCollection services, IConfigurationManager configuration)
{
var config = configuration.GetSection("Worker").Get<Config.WorkerSection>();
// Transient = instantiated per request and class
// Scoped = instantiated per request
@ -90,7 +92,6 @@ public static class ServiceExtensions
.AddSingleton<CronService>()
.AddSingleton<QueueService>()
.AddSingleton<ObjectStorageService>()
.AddSingleton<EventService>()
.AddSingleton<RequestBufferingMiddleware>()
.AddSingleton<AuthorizationMiddleware>()
.AddSingleton<RequestVerificationMiddleware>()
@ -105,6 +106,17 @@ public static class ServiceExtensions
services.AddHostedService<CronService>(provider => provider.GetRequiredService<CronService>());
services.AddHostedService<QueueService>(provider => provider.GetRequiredService<QueueService>());
services.AddHostedService<PushService>(provider => provider.GetRequiredService<PushService>());
// Add service implementations for clustered and non-clustered configurations
if (config?.WorkerId != null)
{
services.AddSingleton<IEventService, ClusteredEventService>()
.AddHostedService(p => (ClusteredEventService)p.GetRequiredService<IEventService>());
}
else
{
services.AddSingleton<IEventService, EventService>();
}
}
public static void ConfigureServices(this IServiceCollection services, IConfiguration configuration)

View file

@ -26,7 +26,7 @@ public class ActivityHandlerService(
ObjectResolver objectResolver,
FollowupTaskService followupTaskSvc,
EmojiService emojiSvc,
EventService eventSvc
IEventService eventSvc
)
{
public async Task PerformActivityAsync(ASActivity activity, string? inboxUserId, string? authenticatedUserId)
@ -508,7 +508,7 @@ public class ActivityHandlerService(
p.Notifier == follower)
.ExecuteDeleteAsync();
eventSvc.RaiseUserUnfollowed(this, follower, followee);
await eventSvc.RaiseUserUnfollowed(this, follower, followee);
}
}

View file

@ -207,8 +207,8 @@ public class BackgroundTaskQueue(int parallelism)
db.Remove(muting);
await db.SaveChangesAsync(token);
var eventSvc = scope.GetRequiredService<EventService>();
eventSvc.RaiseUserUnmuted(null, muting.Muter, muting.Mutee);
var eventSvc = scope.GetRequiredService<IEventService>();
await eventSvc.RaiseUserUnmuted(null, muting.Muter, muting.Mutee);
}
private static async Task ProcessFilterExpiry(

View file

@ -0,0 +1,307 @@
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Text.Json.Serialization;
using Iceshrimp.Backend.Core.Configuration;
using Iceshrimp.Backend.Core.Database;
using Iceshrimp.Backend.Core.Database.Tables;
using Iceshrimp.Backend.Core.Events;
using Iceshrimp.Backend.Core.Extensions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Npgsql;
namespace Iceshrimp.Backend.Core.Services;
public class ClusteredEventService(
IServiceScopeFactory scopeFactory,
IOptions<Config.DatabaseSection> dbConfig,
IOptions<Config.WorkerSection> workerConfig,
ILogger<ClusteredEventService> logger
) : BackgroundService, IEventService
{
private readonly NpgsqlDataSource _dataSource = DatabaseContext.GetDataSource(dbConfig.Value);
public event EventHandler<Note>? NotePublished;
public event EventHandler<Note>? NoteUpdated;
public event EventHandler<Note>? NoteDeleted;
public event EventHandler<NoteInteraction>? NoteLiked;
public event EventHandler<NoteInteraction>? NoteUnliked;
public event EventHandler<NoteReaction>? NoteReacted;
public event EventHandler<NoteReaction>? NoteUnreacted;
public event EventHandler<UserInteraction>? UserFollowed;
public event EventHandler<UserInteraction>? UserUnfollowed;
public event EventHandler<UserInteraction>? UserBlocked;
public event EventHandler<UserInteraction>? UserUnblocked;
public event EventHandler<UserInteraction>? UserMuted;
public event EventHandler<UserInteraction>? UserUnmuted;
public event EventHandler<Notification>? Notification;
public event EventHandler<Filter>? FilterAdded;
public event EventHandler<Filter>? FilterRemoved;
public event EventHandler<Filter>? FilterUpdated;
public event EventHandler<UserList>? ListMembersUpdated;
public Task RaiseNotePublished(object? sender, Note note) =>
EmitEvent(new NotePublishedEvent { Args = note });
public Task RaiseNoteUpdated(object? sender, Note note) =>
EmitEvent(new NoteUpdatedEvent { Args = note });
public Task RaiseNoteDeleted(object? sender, Note note) =>
EmitEvent(new NoteDeletedEvent { Args = note });
public Task RaiseNotification(object? sender, Notification notification) =>
EmitEvent(new NotificationEvent { Args = notification });
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications) =>
notifications.Select(p => RaiseNotification(sender, p)).AwaitAllAsync();
public Task RaiseNoteLiked(object? sender, Note note, User user) =>
EmitEvent(new NoteLikedEvent { Args = new NoteInteraction { Note = note, User = user } });
public Task RaiseNoteUnliked(object? sender, Note note, User user) =>
EmitEvent(new NoteUnlikedEvent { Args = new NoteInteraction { Note = note, User = user } });
public Task RaiseNoteReacted(object? sender, NoteReaction reaction) =>
EmitEvent(new NoteReactedEvent { Args = reaction });
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction) =>
EmitEvent(new NoteUnreactedEvent { Args = reaction });
public Task RaiseUserFollowed(object? sender, User actor, User obj) =>
EmitEvent(new UserFollowedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserUnfollowed(object? sender, User actor, User obj) =>
EmitEvent(new UserUnfollowedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserBlocked(object? sender, User actor, User obj) =>
EmitEvent(new UserBlockedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserUnblocked(object? sender, User actor, User obj) =>
EmitEvent(new UserUnblockedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserMuted(object? sender, User actor, User obj) =>
EmitEvent(new UserMutedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseUserUnmuted(object? sender, User actor, User obj) =>
EmitEvent(new UserUnmutedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
public Task RaiseFilterAdded(object? sender, Filter filter) =>
EmitEvent(new FilterAddedEvent { Args = filter });
public Task RaiseFilterRemoved(object? sender, Filter filter) =>
EmitEvent(new FilterRemovedEvent { Args = filter });
public Task RaiseFilterUpdated(object? sender, Filter filter) =>
EmitEvent(new FilterUpdatedEvent { Args = filter });
public Task RaiseListMembersUpdated(object? sender, UserList list) =>
EmitEvent(new ListMembersUpdatedEvent { Args = list });
[JsonDerivedType(typeof(NotePublishedEvent), "notePublished")]
[JsonDerivedType(typeof(NoteUpdatedEvent), "noteUpdated")]
[JsonDerivedType(typeof(NoteDeletedEvent), "noteDeleted")]
[JsonDerivedType(typeof(NoteLikedEvent), "noteLiked")]
[JsonDerivedType(typeof(NoteUnlikedEvent), "noteUnliked")]
[JsonDerivedType(typeof(NotificationEvent), "notification")]
private interface IClusterEvent
{
public string Payload { get; }
}
private class NotePublishedEvent : ClusterEvent<Note>;
private class NoteUpdatedEvent : ClusterEvent<Note>;
private class NoteDeletedEvent : ClusterEvent<Note>;
private class NoteLikedEvent : ClusterEvent<NoteInteraction>;
private class NoteUnlikedEvent : ClusterEvent<NoteInteraction>;
private class NoteReactedEvent : ClusterEvent<NoteReaction>;
private class NoteUnreactedEvent : ClusterEvent<NoteReaction>;
private class UserFollowedEvent : ClusterEvent<UserInteraction>;
private class UserUnfollowedEvent : ClusterEvent<UserInteraction>;
private class UserBlockedEvent : ClusterEvent<UserInteraction>;
private class UserUnblockedEvent : ClusterEvent<UserInteraction>;
private class UserMutedEvent : ClusterEvent<UserInteraction>;
private class UserUnmutedEvent : ClusterEvent<UserInteraction>;
private class NotificationEvent : ClusterEvent<Notification>;
private class FilterAddedEvent : ClusterEvent<Filter>;
private class FilterRemovedEvent : ClusterEvent<Filter>;
private class FilterUpdatedEvent : ClusterEvent<Filter>;
private class ListMembersUpdatedEvent : ClusterEvent<UserList>;
private void HandleEvent(string payload)
{
logger.LogInformation("Handling event: {payload}", payload);
var deserialized = JsonSerializer.Deserialize<IClusterEvent>(payload) ??
throw new Exception("Failed to deserialize cluster event");
switch (deserialized)
{
case NotePublishedEvent e:
NotePublished?.Invoke(this, e.Args);
break;
case NoteUpdatedEvent e:
NoteUpdated?.Invoke(this, e.Args);
break;
case NoteDeletedEvent e:
NoteDeleted?.Invoke(this, e.Args);
break;
case NoteLikedEvent e:
NoteLiked?.Invoke(this, e.Args);
break;
case NoteUnlikedEvent e:
NoteUnliked?.Invoke(this, e.Args);
break;
case NoteReactedEvent e:
NoteReacted?.Invoke(this, e.Args);
break;
case NoteUnreactedEvent e:
NoteUnreacted?.Invoke(this, e.Args);
break;
case UserFollowedEvent e:
UserFollowed?.Invoke(this, e.Args);
break;
case UserUnfollowedEvent e:
UserUnfollowed?.Invoke(this, e.Args);
break;
case UserBlockedEvent e:
UserBlocked?.Invoke(this, e.Args);
break;
case UserUnblockedEvent e:
UserUnblocked?.Invoke(this, e.Args);
break;
case UserMutedEvent e:
UserMuted?.Invoke(this, e.Args);
break;
case UserUnmutedEvent e:
UserUnmuted?.Invoke(this, e.Args);
break;
case NotificationEvent e:
Notification?.Invoke(this, e.Args);
break;
case FilterAddedEvent e:
FilterAdded?.Invoke(this, e.Args);
break;
case FilterRemovedEvent e:
FilterRemoved?.Invoke(this, e.Args);
break;
case FilterUpdatedEvent e:
FilterUpdated?.Invoke(this, e.Args);
break;
case ListMembersUpdatedEvent e:
ListMembersUpdated?.Invoke(this, e.Args);
break;
default:
throw new ArgumentOutOfRangeException(nameof(payload), @"Unknown event type");
}
}
private static readonly JsonSerializerOptions Options =
new(JsonSerializerOptions.Default)
{
ReferenceHandler = ReferenceHandler.Preserve, IgnoreReadOnlyProperties = true
};
[SuppressMessage("ReSharper", "UnusedMember.Local")]
private abstract class ClusterEvent<TEventArgs> : IClusterEvent
{
private readonly string? _payload;
private readonly TEventArgs? _args;
public string Payload
{
get => _payload ?? throw new Exception("_payload was null");
init
{
_payload = value;
_args = JsonSerializer.Deserialize<TEventArgs>(value, Options) ??
throw new Exception("Failed to deserialize cluster event payload");
}
}
public TEventArgs Args
{
get => _args ?? throw new Exception("_args was null");
init
{
_args = value;
_payload = JsonSerializer.Serialize(value, Options);
}
}
}
protected override async Task ExecuteAsync(CancellationToken token)
{
if (workerConfig.Value.WorkerType is not Enums.WorkerType.QueueOnly)
{
while (!token.IsCancellationRequested)
{
try
{
await using var conn = await GetNpgsqlConnection();
conn.Notification += (_, args) =>
{
try
{
if (args.Channel is not "event") return;
HandleEvent(args.Payload);
}
catch (Exception e)
{
logger.LogError("Failed to handle event: {error}", e);
}
};
await using (var cmd = new NpgsqlCommand("LISTEN event", conn))
{
await cmd.ExecuteNonQueryAsync(token);
}
while (!token.IsCancellationRequested)
{
await conn.WaitAsync(token);
}
}
catch
{
// ignored (logging this would spam logs on postgres restart)
}
}
}
}
private async Task EmitEvent(IClusterEvent ev)
{
await using var scope = GetScope();
await using var db = GetDbContext(scope);
var serialized = JsonSerializer.Serialize(ev, Options);
logger.LogInformation("Emitting event: {serialized}", serialized);
await db.Database.ExecuteSqlAsync($"SELECT pg_notify('event', {serialized});");
}
private async Task<NpgsqlConnection> GetNpgsqlConnection() =>
await _dataSource.OpenConnectionAsync();
private AsyncServiceScope GetScope() => scopeFactory.CreateAsyncScope();
private static DatabaseContext GetDbContext(IServiceScope scope) =>
scope.ServiceProvider.GetRequiredService<DatabaseContext>();
}

View file

@ -3,7 +3,7 @@ using Iceshrimp.Backend.Core.Events;
namespace Iceshrimp.Backend.Core.Services;
public class EventService
public class EventService : IEventService
{
public event EventHandler<Note>? NotePublished;
public event EventHandler<Note>? NoteUpdated;
@ -24,50 +24,117 @@ public class EventService
public event EventHandler<Filter>? FilterUpdated;
public event EventHandler<UserList>? ListMembersUpdated;
public void RaiseNotePublished(object? sender, Note note) => NotePublished?.Invoke(sender, note);
public void RaiseNoteUpdated(object? sender, Note note) => NoteUpdated?.Invoke(sender, note);
public void RaiseNoteDeleted(object? sender, Note note) => NoteDeleted?.Invoke(sender, note);
public Task RaiseNotePublished(object? sender, Note note)
{
NotePublished?.Invoke(sender, note);
return Task.CompletedTask;
}
public void RaiseNotification(object? sender, Notification notification) =>
public Task RaiseNoteUpdated(object? sender, Note note)
{
NoteUpdated?.Invoke(sender, note);
return Task.CompletedTask;
}
public Task RaiseNoteDeleted(object? sender, Note note)
{
NoteDeleted?.Invoke(sender, note);
return Task.CompletedTask;
}
public Task RaiseNotification(object? sender, Notification notification)
{
Notification?.Invoke(sender, notification);
return Task.CompletedTask;
}
public void RaiseNotifications(object? sender, IEnumerable<Notification> notifications)
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications)
{
foreach (var notification in notifications) Notification?.Invoke(sender, notification);
return Task.CompletedTask;
}
public void RaiseNoteLiked(object? sender, Note note, User user) =>
public Task RaiseNoteLiked(object? sender, Note note, User user)
{
NoteLiked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
public void RaiseNoteUnliked(object? sender, Note note, User user) =>
NoteUnliked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
public void RaiseNoteReacted(object? sender, NoteReaction reaction) =>
NoteReacted?.Invoke(sender, reaction);
public void RaiseNoteUnreacted(object? sender, NoteReaction reaction) =>
NoteUnreacted?.Invoke(sender, reaction);
public void RaiseUserFollowed(object? sender, User actor, User obj) =>
UserFollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
public void RaiseUserUnfollowed(object? sender, User actor, User obj) =>
UserUnfollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
public void RaiseUserBlocked(object? sender, User actor, User obj) =>
UserBlocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
public void RaiseUserUnblocked(object? sender, User actor, User obj) =>
UserUnblocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
public void RaiseUserMuted(object? sender, User actor, User obj) =>
UserMuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
public void RaiseUserUnmuted(object? sender, User actor, User obj) =>
UserUnmuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
public void RaiseFilterAdded(object? sender, Filter filter) => FilterAdded?.Invoke(sender, filter);
public void RaiseFilterRemoved(object? sender, Filter filter) => FilterRemoved?.Invoke(sender, filter);
public void RaiseFilterUpdated(object? sender, Filter filter) => FilterUpdated?.Invoke(sender, filter);
public void RaiseListMembersUpdated(object? sender, UserList list) => ListMembersUpdated?.Invoke(sender, list);
return Task.CompletedTask;
}
public Task RaiseNoteUnliked(object? sender, Note note, User user)
{
NoteUnliked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
return Task.CompletedTask;
}
public Task RaiseNoteReacted(object? sender, NoteReaction reaction)
{
NoteReacted?.Invoke(sender, reaction);
return Task.CompletedTask;
}
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction)
{
NoteUnreacted?.Invoke(sender, reaction);
return Task.CompletedTask;
}
public Task RaiseUserFollowed(object? sender, User actor, User obj)
{
UserFollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public Task RaiseUserUnfollowed(object? sender, User actor, User obj)
{
UserUnfollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public Task RaiseUserBlocked(object? sender, User actor, User obj)
{
UserBlocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public Task RaiseUserUnblocked(object? sender, User actor, User obj)
{
UserUnblocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public Task RaiseUserMuted(object? sender, User actor, User obj)
{
UserMuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public Task RaiseUserUnmuted(object? sender, User actor, User obj)
{
UserUnmuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
return Task.CompletedTask;
}
public Task RaiseFilterAdded(object? sender, Filter filter)
{
FilterAdded?.Invoke(sender, filter);
return Task.CompletedTask;
}
public Task RaiseFilterRemoved(object? sender, Filter filter)
{
FilterRemoved?.Invoke(sender, filter);
return Task.CompletedTask;
}
public Task RaiseFilterUpdated(object? sender, Filter filter)
{
FilterUpdated?.Invoke(sender, filter);
return Task.CompletedTask;
}
public Task RaiseListMembersUpdated(object? sender, UserList list)
{
ListMembersUpdated?.Invoke(sender, list);
return Task.CompletedTask;
}
}

View file

@ -0,0 +1,46 @@
using Iceshrimp.Backend.Core.Database.Tables;
using Iceshrimp.Backend.Core.Events;
namespace Iceshrimp.Backend.Core.Services;
public interface IEventService
{
public event EventHandler<Note>? NotePublished;
public event EventHandler<Note>? NoteUpdated;
public event EventHandler<Note>? NoteDeleted;
public event EventHandler<NoteInteraction>? NoteLiked;
public event EventHandler<NoteInteraction>? NoteUnliked;
public event EventHandler<NoteReaction>? NoteReacted;
public event EventHandler<NoteReaction>? NoteUnreacted;
public event EventHandler<UserInteraction>? UserFollowed;
public event EventHandler<UserInteraction>? UserUnfollowed;
public event EventHandler<UserInteraction>? UserBlocked;
public event EventHandler<UserInteraction>? UserUnblocked;
public event EventHandler<UserInteraction>? UserMuted;
public event EventHandler<UserInteraction>? UserUnmuted;
public event EventHandler<Notification>? Notification;
public event EventHandler<Filter>? FilterAdded;
public event EventHandler<Filter>? FilterRemoved;
public event EventHandler<Filter>? FilterUpdated;
public event EventHandler<UserList>? ListMembersUpdated;
public Task RaiseNotePublished(object? sender, Note note);
public Task RaiseNoteUpdated(object? sender, Note note);
public Task RaiseNoteDeleted(object? sender, Note note);
public Task RaiseNotification(object? sender, Notification notification);
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications);
public Task RaiseNoteLiked(object? sender, Note note, User user);
public Task RaiseNoteUnliked(object? sender, Note note, User user);
public Task RaiseNoteReacted(object? sender, NoteReaction reaction);
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction);
public Task RaiseUserFollowed(object? sender, User actor, User obj);
public Task RaiseUserUnfollowed(object? sender, User actor, User obj);
public Task RaiseUserBlocked(object? sender, User actor, User obj);
public Task RaiseUserUnblocked(object? sender, User actor, User obj);
public Task RaiseUserMuted(object? sender, User actor, User obj);
public Task RaiseUserUnmuted(object? sender, User actor, User obj);
public Task RaiseFilterAdded(object? sender, Filter filter);
public Task RaiseFilterRemoved(object? sender, Filter filter);
public Task RaiseFilterUpdated(object? sender, Filter filter);
public Task RaiseListMembersUpdated(object? sender, UserList list);
}

View file

@ -38,7 +38,7 @@ public class NoteService(
ActivityPub.MentionsResolver mentionsResolver,
DriveService driveSvc,
NotificationService notificationSvc,
EventService eventSvc,
IEventService eventSvc,
ActivityPub.ActivityRenderer activityRenderer,
EmojiService emojiSvc,
FollowupTaskService followupTaskSvc,
@ -243,7 +243,7 @@ public class NoteService(
await UpdateNoteCountersAsync(note, true);
await db.AddAsync(note);
await db.SaveChangesAsync();
eventSvc.RaiseNotePublished(this, note);
await eventSvc.RaiseNotePublished(this, note);
await notificationSvc.GenerateMentionNotifications(note, mentionedLocalUserIds);
await notificationSvc.GenerateReplyNotifications(note, mentionedLocalUserIds);
await notificationSvc.GenerateRenoteNotification(note);
@ -581,7 +581,7 @@ public class NoteService(
}
await db.SaveChangesAsync();
eventSvc.RaiseNoteUpdated(this, note);
await eventSvc.RaiseNoteUpdated(this, note);
if (!isEdit) return note;
@ -611,7 +611,7 @@ public class NoteService(
logger.LogDebug("Deleting note '{id}' owned by {userId}", note.Id, note.User.Id);
db.Remove(note);
eventSvc.RaiseNoteDeleted(this, note);
await eventSvc.RaiseNoteDeleted(this, note);
await db.SaveChangesAsync();
await UpdateNoteCountersAsync(note, false);
@ -710,8 +710,7 @@ public class NoteService(
await db.Notes.Where(p => p.Id == note.Id)
.ExecuteUpdateAsync(p => p.SetProperty(n => n.RenoteCount, n => n.RenoteCount - 1));
foreach (var hit in notes)
eventSvc.RaiseNoteDeleted(this, hit);
await notes.Select(hit => eventSvc.RaiseNoteDeleted(this, hit)).AwaitAllAsync();
}
public async Task<Note?> ProcessNoteAsync(ASNote note, User actor, User? user = null)
@ -1147,7 +1146,7 @@ public class NoteService(
await deliverSvc.DeliverToConditionalAsync(activity, user, note);
}
eventSvc.RaiseNoteLiked(this, note, user);
await eventSvc.RaiseNoteLiked(this, note, user);
await notificationSvc.GenerateLikeNotification(note, user);
return true;
}
@ -1172,7 +1171,7 @@ public class NoteService(
await deliverSvc.DeliverToConditionalAsync(activity, user, note);
}
eventSvc.RaiseNoteUnliked(this, note, user);
await eventSvc.RaiseNoteUnliked(this, note, user);
await db.Notifications
.Where(p => p.Type == Notification.NotificationType.Like &&
p.Notifiee == note.User &&
@ -1362,7 +1361,7 @@ public class NoteService(
await db.AddAsync(reaction);
await db.SaveChangesAsync();
eventSvc.RaiseNoteReacted(this, reaction);
await eventSvc.RaiseNoteReacted(this, reaction);
await notificationSvc.GenerateReactionNotification(reaction);
// @formatter:off
@ -1398,7 +1397,7 @@ public class NoteService(
if (reaction == null) return (name, false);
db.Remove(reaction);
await db.SaveChangesAsync();
eventSvc.RaiseNoteUnreacted(this, reaction);
await eventSvc.RaiseNoteUnreacted(this, reaction);
await db.Database
.ExecuteSqlAsync($"""UPDATE "note" SET "reactions" = jsonb_set("reactions", ARRAY[{name}], (COALESCE("reactions"->>{name}, '1')::int - 1)::text::jsonb) WHERE "id" = {note.Id}""");

View file

@ -10,7 +10,7 @@ namespace Iceshrimp.Backend.Core.Services;
public class NotificationService(
[SuppressMessage("ReSharper", "SuggestBaseTypeForParameterInConstructor")]
DatabaseContext db,
EventService eventSvc
IEventService eventSvc
)
{
public async Task GenerateMentionNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds)
@ -38,7 +38,7 @@ public class NotificationService(
await db.AddRangeAsync(notifications);
await db.SaveChangesAsync();
eventSvc.RaiseNotifications(this, notifications);
await eventSvc.RaiseNotifications(this, notifications);
}
public async Task GenerateReplyNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds)
@ -74,7 +74,7 @@ public class NotificationService(
await db.AddRangeAsync(notifications);
await db.SaveChangesAsync();
eventSvc.RaiseNotifications(this, notifications);
await eventSvc.RaiseNotifications(this, notifications);
}
[SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall",
@ -99,7 +99,7 @@ public class NotificationService(
await db.AddRangeAsync(notifications);
await db.SaveChangesAsync();
eventSvc.RaiseNotifications(this, notifications);
await eventSvc.RaiseNotifications(this, notifications);
}
public async Task GenerateLikeNotification(Note note, User user)
@ -119,7 +119,7 @@ public class NotificationService(
await db.AddAsync(notification);
await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification);
await eventSvc.RaiseNotification(this, notification);
}
public async Task GenerateReactionNotification(NoteReaction reaction)
@ -139,7 +139,7 @@ public class NotificationService(
await db.AddAsync(notification);
await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification);
await eventSvc.RaiseNotification(this, notification);
}
public async Task GenerateFollowNotification(User follower, User followee)
@ -157,8 +157,8 @@ public class NotificationService(
await db.AddAsync(notification);
await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification);
eventSvc.RaiseUserFollowed(this, follower, followee);
await eventSvc.RaiseNotification(this, notification);
await eventSvc.RaiseUserFollowed(this, follower, followee);
}
public async Task GenerateFollowRequestReceivedNotification(FollowRequest followRequest)
@ -177,7 +177,7 @@ public class NotificationService(
await db.AddAsync(notification);
await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification);
await eventSvc.RaiseNotification(this, notification);
}
public async Task GenerateFollowRequestAcceptedNotification(FollowRequest followRequest)
@ -196,8 +196,8 @@ public class NotificationService(
await db.AddAsync(notification);
await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification);
eventSvc.RaiseUserFollowed(this, followRequest.Follower, followRequest.Followee);
await eventSvc.RaiseNotification(this, notification);
await eventSvc.RaiseUserFollowed(this, followRequest.Follower, followRequest.Followee);
}
public async Task GenerateBiteNotification(Bite bite)
@ -215,7 +215,7 @@ public class NotificationService(
await db.AddAsync(notification);
await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification);
await eventSvc.RaiseNotification(this, notification);
}
public async Task GeneratePollEndedNotifications(Note note)
@ -252,8 +252,10 @@ public class NotificationService(
await db.AddRangeAsync(notifications);
await db.SaveChangesAsync();
foreach (var notification in notifications)
eventSvc.RaiseNotification(this, notification);
await notifications.Select(notification => eventSvc.RaiseNotification(this, notification))
.Chunk(20)
.Select(async chunk => await chunk.AwaitAllAsync())
.AwaitAllNoConcurrencyAsync();
}
[SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall", Justification = "Projectables")]
@ -277,6 +279,6 @@ public class NotificationService(
await db.AddAsync(notification);
await db.SaveChangesAsync();
eventSvc.RaiseNotification(this, notification);
await eventSvc.RaiseNotification(this, notification);
}
}

View file

@ -17,7 +17,7 @@ using WebPushSubscription = Iceshrimp.WebPush.PushSubscription;
namespace Iceshrimp.Backend.Core.Services;
public class PushService(
EventService eventSvc,
IEventService eventSvc,
ILogger<PushService> logger,
IServiceScopeFactory scopeFactory,
HttpClient httpClient,

View file

@ -19,14 +19,14 @@ public sealed class StreamingService
});
private readonly ConcurrentDictionary<string, StreamingConnectionAggregate> _connections = [];
private readonly EventService _eventSvc;
private readonly IEventService _eventSvc;
private readonly IHubContext<StreamingHub, IStreamingHubClient> _hub;
private readonly ILogger<StreamingService> _logger;
private readonly IServiceScopeFactory _scopeFactory;
public StreamingService(
IHubContext<StreamingHub, IStreamingHubClient> hub,
EventService eventSvc,
IEventService eventSvc,
IServiceScopeFactory scopeFactory,
ILogger<StreamingService> logger
)

View file

@ -36,7 +36,7 @@ public class UserService(
ActivityPub.MentionsResolver mentionsResolver,
ActivityPub.UserRenderer userRenderer,
QueueService queueSvc,
EventService eventSvc,
IEventService eventSvc,
WebFingerService webFingerSvc,
ActivityPub.FederationControlService fedCtrlSvc
)
@ -884,7 +884,7 @@ public class UserService(
}
followee.PrecomputedIsFollowedBy = false;
eventSvc.RaiseUserUnfollowed(this, user, followee);
await eventSvc.RaiseUserUnfollowed(this, user, followee);
}
if (followee.PrecomputedIsRequestedBy ?? false)
@ -1080,8 +1080,7 @@ public class UserService(
};
await db.AddAsync(muting);
await db.SaveChangesAsync();
eventSvc.RaiseUserMuted(this, muter, mutee);
await eventSvc.RaiseUserMuted(this, muter, mutee);
if (expiration != null)
{
@ -1096,7 +1095,7 @@ public class UserService(
return;
await db.Mutings.Where(p => p.Muter == muter && p.Mutee == mutee).ExecuteDeleteAsync();
eventSvc.RaiseUserUnmuted(this, muter, mutee);
await eventSvc.RaiseUserUnmuted(this, muter, mutee);
mutee.PrecomputedIsMutedBy = false;
}
@ -1139,8 +1138,7 @@ public class UserService(
await db.AddAsync(blocking);
await db.SaveChangesAsync();
eventSvc.RaiseUserBlocked(this, blocker, blockee);
await eventSvc.RaiseUserBlocked(this, blocker, blockee);
if (blocker.IsLocalUser && blockee.IsRemoteUser)
{
@ -1163,8 +1161,7 @@ public class UserService(
db.Remove(blocking);
await db.SaveChangesAsync();
eventSvc.RaiseUserUnblocked(this, blocker, blockee);
await eventSvc.RaiseUserUnblocked(this, blocker, blockee);
if (blocker.IsLocalUser && blockee.IsRemoteUser)
{

View file

@ -22,7 +22,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
private readonly WriteLockingList<string> _blocking = [];
private readonly WriteLockingList<string> _connectionIds = [];
private readonly EventService _eventService;
private readonly IEventService _eventService;
private readonly WriteLockingList<string> _following = [];
@ -239,7 +239,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
string userId,
User user,
IHubContext<StreamingHub, IStreamingHubClient> hub,
EventService eventSvc,
IEventService eventSvc,
IServiceScopeFactory scopeFactory, StreamingService streamingService
)
{

View file

@ -34,7 +34,7 @@ builder.Services.AddSignalR().AddMessagePackProtocol();
builder.Services.AddResponseCompression();
builder.Services.AddRazorPages();
builder.Services.AddServices();
builder.Services.AddServices(builder.Configuration);
builder.Services.ConfigureServices(builder.Configuration);
builder.WebHost.ConfigureKestrel(builder.Configuration);
builder.WebHost.UseStaticWebAssets();

View file

@ -39,10 +39,10 @@ public static class MockObjects
private static ServiceProvider GetServiceProvider()
{
var config = new ConfigurationManager();
config.AddIniFile("configuration.ini", false);
config.AddIniStream(AssemblyHelpers.GetEmbeddedResourceStream("configuration.ini"));
var collection = new ServiceCollection();
collection.AddServices();
collection.AddServices(config);
collection.ConfigureServices(config);
return collection.BuildServiceProvider();