Compare commits
1 commit
dev
...
wip/cluste
Author | SHA1 | Date | |
---|---|---|---|
![]() |
b148c8a0ba |
22 changed files with 525 additions and 96 deletions
|
@ -24,7 +24,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
|
|||
[EnableRateLimiting("sliding")]
|
||||
[EnableCors("mastodon")]
|
||||
[Produces(MediaTypeNames.Application.Json)]
|
||||
public class FilterController(DatabaseContext db, QueueService queueSvc, EventService eventSvc) : ControllerBase
|
||||
public class FilterController(DatabaseContext db, QueueService queueSvc, IEventService eventSvc) : ControllerBase
|
||||
{
|
||||
[HttpGet]
|
||||
[Authorize("read:filters")]
|
||||
|
@ -95,7 +95,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
|
|||
|
||||
db.Add(filter);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterAdded(this, filter);
|
||||
await eventSvc.RaiseFilterAdded(this, filter);
|
||||
|
||||
if (expiry.HasValue)
|
||||
{
|
||||
|
@ -159,7 +159,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
|
|||
|
||||
db.Update(filter);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterUpdated(this, filter);
|
||||
await eventSvc.RaiseFilterUpdated(this, filter);
|
||||
|
||||
if (expiry.HasValue)
|
||||
{
|
||||
|
@ -183,7 +183,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
|
|||
|
||||
db.Remove(filter);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterRemoved(this, filter);
|
||||
await eventSvc.RaiseFilterRemoved(this, filter);
|
||||
|
||||
return new object();
|
||||
}
|
||||
|
@ -218,7 +218,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
|
|||
|
||||
db.Update(keyword);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterUpdated(this, filter);
|
||||
await eventSvc.RaiseFilterUpdated(this, filter);
|
||||
|
||||
return new FilterKeyword(keyword, filter.Id, filter.Keywords.Count - 1);
|
||||
}
|
||||
|
@ -257,7 +257,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
|
|||
filter.Keywords[keywordId] = request.WholeWord ? $"\"{request.Keyword}\"" : request.Keyword;
|
||||
db.Update(filter);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterUpdated(this, filter);
|
||||
await eventSvc.RaiseFilterUpdated(this, filter);
|
||||
|
||||
return new FilterKeyword(filter.Keywords[keywordId], filter.Id, keywordId);
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ public class FilterController(DatabaseContext db, QueueService queueSvc, EventSe
|
|||
filter.Keywords.RemoveAt(keywordId);
|
||||
db.Update(filter);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterUpdated(this, filter);
|
||||
await eventSvc.RaiseFilterUpdated(this, filter);
|
||||
|
||||
return new object();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
|
|||
[EnableRateLimiting("sliding")]
|
||||
[EnableCors("mastodon")]
|
||||
[Produces(MediaTypeNames.Application.Json)]
|
||||
public class ListController(DatabaseContext db, UserRenderer userRenderer, EventService eventSvc) : ControllerBase
|
||||
public class ListController(DatabaseContext db, UserRenderer userRenderer, IEventService eventSvc) : ControllerBase
|
||||
{
|
||||
[HttpGet]
|
||||
[Authorize("read:lists")]
|
||||
|
@ -139,7 +139,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
|
|||
|
||||
db.Remove(list);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseListMembersUpdated(this, list);
|
||||
await eventSvc.RaiseListMembersUpdated(this, list);
|
||||
return new object();
|
||||
}
|
||||
|
||||
|
@ -204,8 +204,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
|
|||
|
||||
await db.AddRangeAsync(memberships);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
eventSvc.RaiseListMembersUpdated(this, list);
|
||||
await eventSvc.RaiseListMembersUpdated(this, list);
|
||||
|
||||
return new object();
|
||||
}
|
||||
|
@ -229,7 +228,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer, Event
|
|||
.Where(p => p.UserList == list && request.AccountIds.Contains(p.UserId))
|
||||
.ExecuteDeleteAsync();
|
||||
|
||||
eventSvc.RaiseListMembersUpdated(this, list);
|
||||
await eventSvc.RaiseListMembersUpdated(this, list);
|
||||
|
||||
return new object();
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming;
|
|||
public sealed class WebSocketConnection(
|
||||
WebSocket socket,
|
||||
OauthToken token,
|
||||
EventService eventSvc,
|
||||
IEventService eventSvc,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
CancellationToken ct
|
||||
) : IDisposable
|
||||
|
@ -28,7 +28,7 @@ public sealed class WebSocketConnection(
|
|||
private readonly SemaphorePlus _lock = new(1);
|
||||
private readonly WriteLockingList<string> _muting = [];
|
||||
public readonly List<IChannel> Channels = [];
|
||||
public readonly EventService EventService = eventSvc;
|
||||
public readonly IEventService EventService = eventSvc;
|
||||
public readonly WriteLockingList<Filter> Filters = [];
|
||||
public readonly WriteLockingList<string> Following = [];
|
||||
public readonly IServiceScope Scope = scopeFactory.CreateScope();
|
||||
|
|
|
@ -8,7 +8,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming;
|
|||
public static class WebSocketHandler
|
||||
{
|
||||
public static async Task HandleConnectionAsync(
|
||||
WebSocket socket, OauthToken token, EventService eventSvc, IServiceScopeFactory scopeFactory,
|
||||
WebSocket socket, OauthToken token, IEventService eventSvc, IServiceScopeFactory scopeFactory,
|
||||
string? stream, string? list, string? tag, CancellationToken ct
|
||||
)
|
||||
{
|
||||
|
|
|
@ -14,7 +14,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
|
|||
public class WebSocketController(
|
||||
IHostApplicationLifetime appLifetime,
|
||||
DatabaseContext db,
|
||||
EventService eventSvc,
|
||||
IEventService eventSvc,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<WebSocketController> logger
|
||||
) : ControllerBase
|
||||
|
|
|
@ -19,7 +19,7 @@ namespace Iceshrimp.Backend.Controllers.Web;
|
|||
[EnableRateLimiting("sliding")]
|
||||
[Route("/api/iceshrimp/filters")]
|
||||
[Produces(MediaTypeNames.Application.Json)]
|
||||
public class FilterController(DatabaseContext db, EventService eventSvc) : ControllerBase
|
||||
public class FilterController(DatabaseContext db, IEventService eventSvc) : ControllerBase
|
||||
{
|
||||
[HttpGet]
|
||||
[ProducesResults(HttpStatusCode.OK)]
|
||||
|
@ -49,7 +49,7 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
|
|||
|
||||
db.Add(filter);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterAdded(this, filter);
|
||||
await eventSvc.RaiseFilterAdded(this, filter);
|
||||
return FilterRenderer.RenderOne(filter);
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
|
|||
filter.Contexts = request.Contexts.Cast<Filter.FilterContext>().ToList();
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterUpdated(this, filter);
|
||||
await eventSvc.RaiseFilterUpdated(this, filter);
|
||||
}
|
||||
|
||||
[HttpDelete("{id:long}")]
|
||||
|
@ -83,6 +83,6 @@ public class FilterController(DatabaseContext db, EventService eventSvc) : Contr
|
|||
|
||||
db.Remove(filter);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseFilterRemoved(this, filter);
|
||||
await eventSvc.RaiseFilterRemoved(this, filter);
|
||||
}
|
||||
}
|
|
@ -4,6 +4,6 @@ namespace Iceshrimp.Backend.Core.Events;
|
|||
|
||||
public class NoteInteraction
|
||||
{
|
||||
public required Note Note;
|
||||
public required User User;
|
||||
public required Note Note { get; init; }
|
||||
public required User User { get; init; }
|
||||
}
|
|
@ -4,6 +4,6 @@ namespace Iceshrimp.Backend.Core.Events;
|
|||
|
||||
public class UserInteraction
|
||||
{
|
||||
public required User Actor;
|
||||
public required User Object;
|
||||
public required User Actor { get; init; }
|
||||
public required User Object { get; init; }
|
||||
}
|
|
@ -36,8 +36,10 @@ namespace Iceshrimp.Backend.Core.Extensions;
|
|||
|
||||
public static class ServiceExtensions
|
||||
{
|
||||
public static void AddServices(this IServiceCollection services)
|
||||
public static void AddServices(this IServiceCollection services, IConfigurationManager configuration)
|
||||
{
|
||||
var config = configuration.GetSection("Worker").Get<Config.WorkerSection>();
|
||||
|
||||
// Transient = instantiated per request and class
|
||||
|
||||
// Scoped = instantiated per request
|
||||
|
@ -90,7 +92,6 @@ public static class ServiceExtensions
|
|||
.AddSingleton<CronService>()
|
||||
.AddSingleton<QueueService>()
|
||||
.AddSingleton<ObjectStorageService>()
|
||||
.AddSingleton<EventService>()
|
||||
.AddSingleton<RequestBufferingMiddleware>()
|
||||
.AddSingleton<AuthorizationMiddleware>()
|
||||
.AddSingleton<RequestVerificationMiddleware>()
|
||||
|
@ -105,6 +106,17 @@ public static class ServiceExtensions
|
|||
services.AddHostedService<CronService>(provider => provider.GetRequiredService<CronService>());
|
||||
services.AddHostedService<QueueService>(provider => provider.GetRequiredService<QueueService>());
|
||||
services.AddHostedService<PushService>(provider => provider.GetRequiredService<PushService>());
|
||||
|
||||
// Add service implementations for clustered and non-clustered configurations
|
||||
if (config?.WorkerId != null)
|
||||
{
|
||||
services.AddSingleton<IEventService, ClusteredEventService>()
|
||||
.AddHostedService(p => (ClusteredEventService)p.GetRequiredService<IEventService>());
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<IEventService, EventService>();
|
||||
}
|
||||
}
|
||||
|
||||
public static void ConfigureServices(this IServiceCollection services, IConfiguration configuration)
|
||||
|
|
|
@ -26,7 +26,7 @@ public class ActivityHandlerService(
|
|||
ObjectResolver objectResolver,
|
||||
FollowupTaskService followupTaskSvc,
|
||||
EmojiService emojiSvc,
|
||||
EventService eventSvc
|
||||
IEventService eventSvc
|
||||
)
|
||||
{
|
||||
public async Task PerformActivityAsync(ASActivity activity, string? inboxUserId, string? authenticatedUserId)
|
||||
|
@ -508,7 +508,7 @@ public class ActivityHandlerService(
|
|||
p.Notifier == follower)
|
||||
.ExecuteDeleteAsync();
|
||||
|
||||
eventSvc.RaiseUserUnfollowed(this, follower, followee);
|
||||
await eventSvc.RaiseUserUnfollowed(this, follower, followee);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -207,8 +207,8 @@ public class BackgroundTaskQueue(int parallelism)
|
|||
|
||||
db.Remove(muting);
|
||||
await db.SaveChangesAsync(token);
|
||||
var eventSvc = scope.GetRequiredService<EventService>();
|
||||
eventSvc.RaiseUserUnmuted(null, muting.Muter, muting.Mutee);
|
||||
var eventSvc = scope.GetRequiredService<IEventService>();
|
||||
await eventSvc.RaiseUserUnmuted(null, muting.Muter, muting.Mutee);
|
||||
}
|
||||
|
||||
private static async Task ProcessFilterExpiry(
|
||||
|
|
307
Iceshrimp.Backend/Core/Services/ClusteredEventService.cs
Normal file
307
Iceshrimp.Backend/Core/Services/ClusteredEventService.cs
Normal file
|
@ -0,0 +1,307 @@
|
|||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using Iceshrimp.Backend.Core.Configuration;
|
||||
using Iceshrimp.Backend.Core.Database;
|
||||
using Iceshrimp.Backend.Core.Database.Tables;
|
||||
using Iceshrimp.Backend.Core.Events;
|
||||
using Iceshrimp.Backend.Core.Extensions;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Npgsql;
|
||||
|
||||
namespace Iceshrimp.Backend.Core.Services;
|
||||
|
||||
public class ClusteredEventService(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
IOptions<Config.DatabaseSection> dbConfig,
|
||||
IOptions<Config.WorkerSection> workerConfig,
|
||||
ILogger<ClusteredEventService> logger
|
||||
) : BackgroundService, IEventService
|
||||
{
|
||||
private readonly NpgsqlDataSource _dataSource = DatabaseContext.GetDataSource(dbConfig.Value);
|
||||
|
||||
public event EventHandler<Note>? NotePublished;
|
||||
public event EventHandler<Note>? NoteUpdated;
|
||||
public event EventHandler<Note>? NoteDeleted;
|
||||
public event EventHandler<NoteInteraction>? NoteLiked;
|
||||
public event EventHandler<NoteInteraction>? NoteUnliked;
|
||||
public event EventHandler<NoteReaction>? NoteReacted;
|
||||
public event EventHandler<NoteReaction>? NoteUnreacted;
|
||||
public event EventHandler<UserInteraction>? UserFollowed;
|
||||
public event EventHandler<UserInteraction>? UserUnfollowed;
|
||||
public event EventHandler<UserInteraction>? UserBlocked;
|
||||
public event EventHandler<UserInteraction>? UserUnblocked;
|
||||
public event EventHandler<UserInteraction>? UserMuted;
|
||||
public event EventHandler<UserInteraction>? UserUnmuted;
|
||||
public event EventHandler<Notification>? Notification;
|
||||
public event EventHandler<Filter>? FilterAdded;
|
||||
public event EventHandler<Filter>? FilterRemoved;
|
||||
public event EventHandler<Filter>? FilterUpdated;
|
||||
public event EventHandler<UserList>? ListMembersUpdated;
|
||||
|
||||
public Task RaiseNotePublished(object? sender, Note note) =>
|
||||
EmitEvent(new NotePublishedEvent { Args = note });
|
||||
|
||||
public Task RaiseNoteUpdated(object? sender, Note note) =>
|
||||
EmitEvent(new NoteUpdatedEvent { Args = note });
|
||||
|
||||
public Task RaiseNoteDeleted(object? sender, Note note) =>
|
||||
EmitEvent(new NoteDeletedEvent { Args = note });
|
||||
|
||||
public Task RaiseNotification(object? sender, Notification notification) =>
|
||||
EmitEvent(new NotificationEvent { Args = notification });
|
||||
|
||||
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications) =>
|
||||
notifications.Select(p => RaiseNotification(sender, p)).AwaitAllAsync();
|
||||
|
||||
public Task RaiseNoteLiked(object? sender, Note note, User user) =>
|
||||
EmitEvent(new NoteLikedEvent { Args = new NoteInteraction { Note = note, User = user } });
|
||||
|
||||
public Task RaiseNoteUnliked(object? sender, Note note, User user) =>
|
||||
EmitEvent(new NoteUnlikedEvent { Args = new NoteInteraction { Note = note, User = user } });
|
||||
|
||||
public Task RaiseNoteReacted(object? sender, NoteReaction reaction) =>
|
||||
EmitEvent(new NoteReactedEvent { Args = reaction });
|
||||
|
||||
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction) =>
|
||||
EmitEvent(new NoteUnreactedEvent { Args = reaction });
|
||||
|
||||
public Task RaiseUserFollowed(object? sender, User actor, User obj) =>
|
||||
EmitEvent(new UserFollowedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
|
||||
|
||||
public Task RaiseUserUnfollowed(object? sender, User actor, User obj) =>
|
||||
EmitEvent(new UserUnfollowedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
|
||||
|
||||
public Task RaiseUserBlocked(object? sender, User actor, User obj) =>
|
||||
EmitEvent(new UserBlockedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
|
||||
|
||||
public Task RaiseUserUnblocked(object? sender, User actor, User obj) =>
|
||||
EmitEvent(new UserUnblockedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
|
||||
|
||||
public Task RaiseUserMuted(object? sender, User actor, User obj) =>
|
||||
EmitEvent(new UserMutedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
|
||||
|
||||
public Task RaiseUserUnmuted(object? sender, User actor, User obj) =>
|
||||
EmitEvent(new UserUnmutedEvent { Args = new UserInteraction { Actor = actor, Object = obj } });
|
||||
|
||||
public Task RaiseFilterAdded(object? sender, Filter filter) =>
|
||||
EmitEvent(new FilterAddedEvent { Args = filter });
|
||||
|
||||
public Task RaiseFilterRemoved(object? sender, Filter filter) =>
|
||||
EmitEvent(new FilterRemovedEvent { Args = filter });
|
||||
|
||||
public Task RaiseFilterUpdated(object? sender, Filter filter) =>
|
||||
EmitEvent(new FilterUpdatedEvent { Args = filter });
|
||||
|
||||
public Task RaiseListMembersUpdated(object? sender, UserList list) =>
|
||||
EmitEvent(new ListMembersUpdatedEvent { Args = list });
|
||||
|
||||
[JsonDerivedType(typeof(NotePublishedEvent), "notePublished")]
|
||||
[JsonDerivedType(typeof(NoteUpdatedEvent), "noteUpdated")]
|
||||
[JsonDerivedType(typeof(NoteDeletedEvent), "noteDeleted")]
|
||||
[JsonDerivedType(typeof(NoteLikedEvent), "noteLiked")]
|
||||
[JsonDerivedType(typeof(NoteUnlikedEvent), "noteUnliked")]
|
||||
[JsonDerivedType(typeof(NotificationEvent), "notification")]
|
||||
private interface IClusterEvent
|
||||
{
|
||||
public string Payload { get; }
|
||||
}
|
||||
|
||||
private class NotePublishedEvent : ClusterEvent<Note>;
|
||||
|
||||
private class NoteUpdatedEvent : ClusterEvent<Note>;
|
||||
|
||||
private class NoteDeletedEvent : ClusterEvent<Note>;
|
||||
|
||||
private class NoteLikedEvent : ClusterEvent<NoteInteraction>;
|
||||
|
||||
private class NoteUnlikedEvent : ClusterEvent<NoteInteraction>;
|
||||
|
||||
private class NoteReactedEvent : ClusterEvent<NoteReaction>;
|
||||
|
||||
private class NoteUnreactedEvent : ClusterEvent<NoteReaction>;
|
||||
|
||||
private class UserFollowedEvent : ClusterEvent<UserInteraction>;
|
||||
|
||||
private class UserUnfollowedEvent : ClusterEvent<UserInteraction>;
|
||||
|
||||
private class UserBlockedEvent : ClusterEvent<UserInteraction>;
|
||||
|
||||
private class UserUnblockedEvent : ClusterEvent<UserInteraction>;
|
||||
|
||||
private class UserMutedEvent : ClusterEvent<UserInteraction>;
|
||||
|
||||
private class UserUnmutedEvent : ClusterEvent<UserInteraction>;
|
||||
|
||||
private class NotificationEvent : ClusterEvent<Notification>;
|
||||
|
||||
private class FilterAddedEvent : ClusterEvent<Filter>;
|
||||
|
||||
private class FilterRemovedEvent : ClusterEvent<Filter>;
|
||||
|
||||
private class FilterUpdatedEvent : ClusterEvent<Filter>;
|
||||
|
||||
private class ListMembersUpdatedEvent : ClusterEvent<UserList>;
|
||||
|
||||
private void HandleEvent(string payload)
|
||||
{
|
||||
logger.LogInformation("Handling event: {payload}", payload);
|
||||
|
||||
var deserialized = JsonSerializer.Deserialize<IClusterEvent>(payload) ??
|
||||
throw new Exception("Failed to deserialize cluster event");
|
||||
|
||||
switch (deserialized)
|
||||
{
|
||||
case NotePublishedEvent e:
|
||||
NotePublished?.Invoke(this, e.Args);
|
||||
break;
|
||||
case NoteUpdatedEvent e:
|
||||
NoteUpdated?.Invoke(this, e.Args);
|
||||
break;
|
||||
case NoteDeletedEvent e:
|
||||
NoteDeleted?.Invoke(this, e.Args);
|
||||
break;
|
||||
case NoteLikedEvent e:
|
||||
NoteLiked?.Invoke(this, e.Args);
|
||||
break;
|
||||
case NoteUnlikedEvent e:
|
||||
NoteUnliked?.Invoke(this, e.Args);
|
||||
break;
|
||||
case NoteReactedEvent e:
|
||||
NoteReacted?.Invoke(this, e.Args);
|
||||
break;
|
||||
case NoteUnreactedEvent e:
|
||||
NoteUnreacted?.Invoke(this, e.Args);
|
||||
break;
|
||||
case UserFollowedEvent e:
|
||||
UserFollowed?.Invoke(this, e.Args);
|
||||
break;
|
||||
case UserUnfollowedEvent e:
|
||||
UserUnfollowed?.Invoke(this, e.Args);
|
||||
break;
|
||||
case UserBlockedEvent e:
|
||||
UserBlocked?.Invoke(this, e.Args);
|
||||
break;
|
||||
case UserUnblockedEvent e:
|
||||
UserUnblocked?.Invoke(this, e.Args);
|
||||
break;
|
||||
case UserMutedEvent e:
|
||||
UserMuted?.Invoke(this, e.Args);
|
||||
break;
|
||||
case UserUnmutedEvent e:
|
||||
UserUnmuted?.Invoke(this, e.Args);
|
||||
break;
|
||||
case NotificationEvent e:
|
||||
Notification?.Invoke(this, e.Args);
|
||||
break;
|
||||
case FilterAddedEvent e:
|
||||
FilterAdded?.Invoke(this, e.Args);
|
||||
break;
|
||||
case FilterRemovedEvent e:
|
||||
FilterRemoved?.Invoke(this, e.Args);
|
||||
break;
|
||||
case FilterUpdatedEvent e:
|
||||
FilterUpdated?.Invoke(this, e.Args);
|
||||
break;
|
||||
case ListMembersUpdatedEvent e:
|
||||
ListMembersUpdated?.Invoke(this, e.Args);
|
||||
break;
|
||||
default:
|
||||
throw new ArgumentOutOfRangeException(nameof(payload), @"Unknown event type");
|
||||
}
|
||||
}
|
||||
|
||||
private static readonly JsonSerializerOptions Options =
|
||||
new(JsonSerializerOptions.Default)
|
||||
{
|
||||
ReferenceHandler = ReferenceHandler.Preserve, IgnoreReadOnlyProperties = true
|
||||
};
|
||||
|
||||
[SuppressMessage("ReSharper", "UnusedMember.Local")]
|
||||
private abstract class ClusterEvent<TEventArgs> : IClusterEvent
|
||||
{
|
||||
private readonly string? _payload;
|
||||
private readonly TEventArgs? _args;
|
||||
|
||||
public string Payload
|
||||
{
|
||||
get => _payload ?? throw new Exception("_payload was null");
|
||||
init
|
||||
{
|
||||
_payload = value;
|
||||
_args = JsonSerializer.Deserialize<TEventArgs>(value, Options) ??
|
||||
throw new Exception("Failed to deserialize cluster event payload");
|
||||
}
|
||||
}
|
||||
|
||||
public TEventArgs Args
|
||||
{
|
||||
get => _args ?? throw new Exception("_args was null");
|
||||
init
|
||||
{
|
||||
_args = value;
|
||||
_payload = JsonSerializer.Serialize(value, Options);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken token)
|
||||
{
|
||||
if (workerConfig.Value.WorkerType is not Enums.WorkerType.QueueOnly)
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var conn = await GetNpgsqlConnection();
|
||||
|
||||
conn.Notification += (_, args) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (args.Channel is not "event") return;
|
||||
HandleEvent(args.Payload);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
logger.LogError("Failed to handle event: {error}", e);
|
||||
}
|
||||
};
|
||||
|
||||
await using (var cmd = new NpgsqlCommand("LISTEN event", conn))
|
||||
{
|
||||
await cmd.ExecuteNonQueryAsync(token);
|
||||
}
|
||||
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
await conn.WaitAsync(token);
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignored (logging this would spam logs on postgres restart)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EmitEvent(IClusterEvent ev)
|
||||
{
|
||||
await using var scope = GetScope();
|
||||
await using var db = GetDbContext(scope);
|
||||
|
||||
var serialized = JsonSerializer.Serialize(ev, Options);
|
||||
logger.LogInformation("Emitting event: {serialized}", serialized);
|
||||
await db.Database.ExecuteSqlAsync($"SELECT pg_notify('event', {serialized});");
|
||||
}
|
||||
|
||||
private async Task<NpgsqlConnection> GetNpgsqlConnection() =>
|
||||
await _dataSource.OpenConnectionAsync();
|
||||
|
||||
private AsyncServiceScope GetScope() => scopeFactory.CreateAsyncScope();
|
||||
|
||||
private static DatabaseContext GetDbContext(IServiceScope scope) =>
|
||||
scope.ServiceProvider.GetRequiredService<DatabaseContext>();
|
||||
}
|
|
@ -3,7 +3,7 @@ using Iceshrimp.Backend.Core.Events;
|
|||
|
||||
namespace Iceshrimp.Backend.Core.Services;
|
||||
|
||||
public class EventService
|
||||
public class EventService : IEventService
|
||||
{
|
||||
public event EventHandler<Note>? NotePublished;
|
||||
public event EventHandler<Note>? NoteUpdated;
|
||||
|
@ -24,50 +24,117 @@ public class EventService
|
|||
public event EventHandler<Filter>? FilterUpdated;
|
||||
public event EventHandler<UserList>? ListMembersUpdated;
|
||||
|
||||
public void RaiseNotePublished(object? sender, Note note) => NotePublished?.Invoke(sender, note);
|
||||
public void RaiseNoteUpdated(object? sender, Note note) => NoteUpdated?.Invoke(sender, note);
|
||||
public void RaiseNoteDeleted(object? sender, Note note) => NoteDeleted?.Invoke(sender, note);
|
||||
public Task RaiseNotePublished(object? sender, Note note)
|
||||
{
|
||||
NotePublished?.Invoke(sender, note);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void RaiseNotification(object? sender, Notification notification) =>
|
||||
public Task RaiseNoteUpdated(object? sender, Note note)
|
||||
{
|
||||
NoteUpdated?.Invoke(sender, note);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseNoteDeleted(object? sender, Note note)
|
||||
{
|
||||
NoteDeleted?.Invoke(sender, note);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseNotification(object? sender, Notification notification)
|
||||
{
|
||||
Notification?.Invoke(sender, notification);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void RaiseNotifications(object? sender, IEnumerable<Notification> notifications)
|
||||
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications)
|
||||
{
|
||||
foreach (var notification in notifications) Notification?.Invoke(sender, notification);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void RaiseNoteLiked(object? sender, Note note, User user) =>
|
||||
public Task RaiseNoteLiked(object? sender, Note note, User user)
|
||||
{
|
||||
NoteLiked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
|
||||
|
||||
public void RaiseNoteUnliked(object? sender, Note note, User user) =>
|
||||
NoteUnliked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
|
||||
|
||||
public void RaiseNoteReacted(object? sender, NoteReaction reaction) =>
|
||||
NoteReacted?.Invoke(sender, reaction);
|
||||
|
||||
public void RaiseNoteUnreacted(object? sender, NoteReaction reaction) =>
|
||||
NoteUnreacted?.Invoke(sender, reaction);
|
||||
|
||||
public void RaiseUserFollowed(object? sender, User actor, User obj) =>
|
||||
UserFollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
|
||||
public void RaiseUserUnfollowed(object? sender, User actor, User obj) =>
|
||||
UserUnfollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
|
||||
public void RaiseUserBlocked(object? sender, User actor, User obj) =>
|
||||
UserBlocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
|
||||
public void RaiseUserUnblocked(object? sender, User actor, User obj) =>
|
||||
UserUnblocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
|
||||
public void RaiseUserMuted(object? sender, User actor, User obj) =>
|
||||
UserMuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
|
||||
public void RaiseUserUnmuted(object? sender, User actor, User obj) =>
|
||||
UserUnmuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
|
||||
public void RaiseFilterAdded(object? sender, Filter filter) => FilterAdded?.Invoke(sender, filter);
|
||||
public void RaiseFilterRemoved(object? sender, Filter filter) => FilterRemoved?.Invoke(sender, filter);
|
||||
public void RaiseFilterUpdated(object? sender, Filter filter) => FilterUpdated?.Invoke(sender, filter);
|
||||
public void RaiseListMembersUpdated(object? sender, UserList list) => ListMembersUpdated?.Invoke(sender, list);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseNoteUnliked(object? sender, Note note, User user)
|
||||
{
|
||||
NoteUnliked?.Invoke(sender, new NoteInteraction { Note = note, User = user });
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseNoteReacted(object? sender, NoteReaction reaction)
|
||||
{
|
||||
NoteReacted?.Invoke(sender, reaction);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction)
|
||||
{
|
||||
NoteUnreacted?.Invoke(sender, reaction);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseUserFollowed(object? sender, User actor, User obj)
|
||||
{
|
||||
UserFollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseUserUnfollowed(object? sender, User actor, User obj)
|
||||
{
|
||||
UserUnfollowed?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseUserBlocked(object? sender, User actor, User obj)
|
||||
{
|
||||
UserBlocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseUserUnblocked(object? sender, User actor, User obj)
|
||||
{
|
||||
UserUnblocked?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseUserMuted(object? sender, User actor, User obj)
|
||||
{
|
||||
UserMuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseUserUnmuted(object? sender, User actor, User obj)
|
||||
{
|
||||
UserUnmuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj });
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseFilterAdded(object? sender, Filter filter)
|
||||
{
|
||||
FilterAdded?.Invoke(sender, filter);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseFilterRemoved(object? sender, Filter filter)
|
||||
{
|
||||
FilterRemoved?.Invoke(sender, filter);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseFilterUpdated(object? sender, Filter filter)
|
||||
{
|
||||
FilterUpdated?.Invoke(sender, filter);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RaiseListMembersUpdated(object? sender, UserList list)
|
||||
{
|
||||
ListMembersUpdated?.Invoke(sender, list);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
46
Iceshrimp.Backend/Core/Services/IEventService.cs
Normal file
46
Iceshrimp.Backend/Core/Services/IEventService.cs
Normal file
|
@ -0,0 +1,46 @@
|
|||
using Iceshrimp.Backend.Core.Database.Tables;
|
||||
using Iceshrimp.Backend.Core.Events;
|
||||
|
||||
namespace Iceshrimp.Backend.Core.Services;
|
||||
|
||||
public interface IEventService
|
||||
{
|
||||
public event EventHandler<Note>? NotePublished;
|
||||
public event EventHandler<Note>? NoteUpdated;
|
||||
public event EventHandler<Note>? NoteDeleted;
|
||||
public event EventHandler<NoteInteraction>? NoteLiked;
|
||||
public event EventHandler<NoteInteraction>? NoteUnliked;
|
||||
public event EventHandler<NoteReaction>? NoteReacted;
|
||||
public event EventHandler<NoteReaction>? NoteUnreacted;
|
||||
public event EventHandler<UserInteraction>? UserFollowed;
|
||||
public event EventHandler<UserInteraction>? UserUnfollowed;
|
||||
public event EventHandler<UserInteraction>? UserBlocked;
|
||||
public event EventHandler<UserInteraction>? UserUnblocked;
|
||||
public event EventHandler<UserInteraction>? UserMuted;
|
||||
public event EventHandler<UserInteraction>? UserUnmuted;
|
||||
public event EventHandler<Notification>? Notification;
|
||||
public event EventHandler<Filter>? FilterAdded;
|
||||
public event EventHandler<Filter>? FilterRemoved;
|
||||
public event EventHandler<Filter>? FilterUpdated;
|
||||
public event EventHandler<UserList>? ListMembersUpdated;
|
||||
|
||||
public Task RaiseNotePublished(object? sender, Note note);
|
||||
public Task RaiseNoteUpdated(object? sender, Note note);
|
||||
public Task RaiseNoteDeleted(object? sender, Note note);
|
||||
public Task RaiseNotification(object? sender, Notification notification);
|
||||
public Task RaiseNotifications(object? sender, IEnumerable<Notification> notifications);
|
||||
public Task RaiseNoteLiked(object? sender, Note note, User user);
|
||||
public Task RaiseNoteUnliked(object? sender, Note note, User user);
|
||||
public Task RaiseNoteReacted(object? sender, NoteReaction reaction);
|
||||
public Task RaiseNoteUnreacted(object? sender, NoteReaction reaction);
|
||||
public Task RaiseUserFollowed(object? sender, User actor, User obj);
|
||||
public Task RaiseUserUnfollowed(object? sender, User actor, User obj);
|
||||
public Task RaiseUserBlocked(object? sender, User actor, User obj);
|
||||
public Task RaiseUserUnblocked(object? sender, User actor, User obj);
|
||||
public Task RaiseUserMuted(object? sender, User actor, User obj);
|
||||
public Task RaiseUserUnmuted(object? sender, User actor, User obj);
|
||||
public Task RaiseFilterAdded(object? sender, Filter filter);
|
||||
public Task RaiseFilterRemoved(object? sender, Filter filter);
|
||||
public Task RaiseFilterUpdated(object? sender, Filter filter);
|
||||
public Task RaiseListMembersUpdated(object? sender, UserList list);
|
||||
}
|
|
@ -38,7 +38,7 @@ public class NoteService(
|
|||
ActivityPub.MentionsResolver mentionsResolver,
|
||||
DriveService driveSvc,
|
||||
NotificationService notificationSvc,
|
||||
EventService eventSvc,
|
||||
IEventService eventSvc,
|
||||
ActivityPub.ActivityRenderer activityRenderer,
|
||||
EmojiService emojiSvc,
|
||||
FollowupTaskService followupTaskSvc,
|
||||
|
@ -243,7 +243,7 @@ public class NoteService(
|
|||
await UpdateNoteCountersAsync(note, true);
|
||||
await db.AddAsync(note);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotePublished(this, note);
|
||||
await eventSvc.RaiseNotePublished(this, note);
|
||||
await notificationSvc.GenerateMentionNotifications(note, mentionedLocalUserIds);
|
||||
await notificationSvc.GenerateReplyNotifications(note, mentionedLocalUserIds);
|
||||
await notificationSvc.GenerateRenoteNotification(note);
|
||||
|
@ -581,7 +581,7 @@ public class NoteService(
|
|||
}
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNoteUpdated(this, note);
|
||||
await eventSvc.RaiseNoteUpdated(this, note);
|
||||
|
||||
if (!isEdit) return note;
|
||||
|
||||
|
@ -611,7 +611,7 @@ public class NoteService(
|
|||
logger.LogDebug("Deleting note '{id}' owned by {userId}", note.Id, note.User.Id);
|
||||
|
||||
db.Remove(note);
|
||||
eventSvc.RaiseNoteDeleted(this, note);
|
||||
await eventSvc.RaiseNoteDeleted(this, note);
|
||||
await db.SaveChangesAsync();
|
||||
await UpdateNoteCountersAsync(note, false);
|
||||
|
||||
|
@ -710,8 +710,7 @@ public class NoteService(
|
|||
await db.Notes.Where(p => p.Id == note.Id)
|
||||
.ExecuteUpdateAsync(p => p.SetProperty(n => n.RenoteCount, n => n.RenoteCount - 1));
|
||||
|
||||
foreach (var hit in notes)
|
||||
eventSvc.RaiseNoteDeleted(this, hit);
|
||||
await notes.Select(hit => eventSvc.RaiseNoteDeleted(this, hit)).AwaitAllAsync();
|
||||
}
|
||||
|
||||
public async Task<Note?> ProcessNoteAsync(ASNote note, User actor, User? user = null)
|
||||
|
@ -1147,7 +1146,7 @@ public class NoteService(
|
|||
await deliverSvc.DeliverToConditionalAsync(activity, user, note);
|
||||
}
|
||||
|
||||
eventSvc.RaiseNoteLiked(this, note, user);
|
||||
await eventSvc.RaiseNoteLiked(this, note, user);
|
||||
await notificationSvc.GenerateLikeNotification(note, user);
|
||||
return true;
|
||||
}
|
||||
|
@ -1172,7 +1171,7 @@ public class NoteService(
|
|||
await deliverSvc.DeliverToConditionalAsync(activity, user, note);
|
||||
}
|
||||
|
||||
eventSvc.RaiseNoteUnliked(this, note, user);
|
||||
await eventSvc.RaiseNoteUnliked(this, note, user);
|
||||
await db.Notifications
|
||||
.Where(p => p.Type == Notification.NotificationType.Like &&
|
||||
p.Notifiee == note.User &&
|
||||
|
@ -1362,7 +1361,7 @@ public class NoteService(
|
|||
|
||||
await db.AddAsync(reaction);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNoteReacted(this, reaction);
|
||||
await eventSvc.RaiseNoteReacted(this, reaction);
|
||||
await notificationSvc.GenerateReactionNotification(reaction);
|
||||
|
||||
// @formatter:off
|
||||
|
@ -1398,7 +1397,7 @@ public class NoteService(
|
|||
if (reaction == null) return (name, false);
|
||||
db.Remove(reaction);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNoteUnreacted(this, reaction);
|
||||
await eventSvc.RaiseNoteUnreacted(this, reaction);
|
||||
|
||||
await db.Database
|
||||
.ExecuteSqlAsync($"""UPDATE "note" SET "reactions" = jsonb_set("reactions", ARRAY[{name}], (COALESCE("reactions"->>{name}, '1')::int - 1)::text::jsonb) WHERE "id" = {note.Id}""");
|
||||
|
|
|
@ -10,7 +10,7 @@ namespace Iceshrimp.Backend.Core.Services;
|
|||
public class NotificationService(
|
||||
[SuppressMessage("ReSharper", "SuggestBaseTypeForParameterInConstructor")]
|
||||
DatabaseContext db,
|
||||
EventService eventSvc
|
||||
IEventService eventSvc
|
||||
)
|
||||
{
|
||||
public async Task GenerateMentionNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds)
|
||||
|
@ -38,7 +38,7 @@ public class NotificationService(
|
|||
|
||||
await db.AddRangeAsync(notifications);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotifications(this, notifications);
|
||||
await eventSvc.RaiseNotifications(this, notifications);
|
||||
}
|
||||
|
||||
public async Task GenerateReplyNotifications(Note note, IReadOnlyCollection<string> mentionedLocalUserIds)
|
||||
|
@ -74,7 +74,7 @@ public class NotificationService(
|
|||
|
||||
await db.AddRangeAsync(notifications);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotifications(this, notifications);
|
||||
await eventSvc.RaiseNotifications(this, notifications);
|
||||
}
|
||||
|
||||
[SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall",
|
||||
|
@ -99,7 +99,7 @@ public class NotificationService(
|
|||
|
||||
await db.AddRangeAsync(notifications);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotifications(this, notifications);
|
||||
await eventSvc.RaiseNotifications(this, notifications);
|
||||
}
|
||||
|
||||
public async Task GenerateLikeNotification(Note note, User user)
|
||||
|
@ -119,7 +119,7 @@ public class NotificationService(
|
|||
|
||||
await db.AddAsync(notification);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
await eventSvc.RaiseNotification(this, notification);
|
||||
}
|
||||
|
||||
public async Task GenerateReactionNotification(NoteReaction reaction)
|
||||
|
@ -139,7 +139,7 @@ public class NotificationService(
|
|||
|
||||
await db.AddAsync(notification);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
await eventSvc.RaiseNotification(this, notification);
|
||||
}
|
||||
|
||||
public async Task GenerateFollowNotification(User follower, User followee)
|
||||
|
@ -157,8 +157,8 @@ public class NotificationService(
|
|||
|
||||
await db.AddAsync(notification);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
eventSvc.RaiseUserFollowed(this, follower, followee);
|
||||
await eventSvc.RaiseNotification(this, notification);
|
||||
await eventSvc.RaiseUserFollowed(this, follower, followee);
|
||||
}
|
||||
|
||||
public async Task GenerateFollowRequestReceivedNotification(FollowRequest followRequest)
|
||||
|
@ -177,7 +177,7 @@ public class NotificationService(
|
|||
|
||||
await db.AddAsync(notification);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
await eventSvc.RaiseNotification(this, notification);
|
||||
}
|
||||
|
||||
public async Task GenerateFollowRequestAcceptedNotification(FollowRequest followRequest)
|
||||
|
@ -196,8 +196,8 @@ public class NotificationService(
|
|||
|
||||
await db.AddAsync(notification);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
eventSvc.RaiseUserFollowed(this, followRequest.Follower, followRequest.Followee);
|
||||
await eventSvc.RaiseNotification(this, notification);
|
||||
await eventSvc.RaiseUserFollowed(this, followRequest.Follower, followRequest.Followee);
|
||||
}
|
||||
|
||||
public async Task GenerateBiteNotification(Bite bite)
|
||||
|
@ -215,7 +215,7 @@ public class NotificationService(
|
|||
|
||||
await db.AddAsync(notification);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
await eventSvc.RaiseNotification(this, notification);
|
||||
}
|
||||
|
||||
public async Task GeneratePollEndedNotifications(Note note)
|
||||
|
@ -252,8 +252,10 @@ public class NotificationService(
|
|||
await db.AddRangeAsync(notifications);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
foreach (var notification in notifications)
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
await notifications.Select(notification => eventSvc.RaiseNotification(this, notification))
|
||||
.Chunk(20)
|
||||
.Select(async chunk => await chunk.AwaitAllAsync())
|
||||
.AwaitAllNoConcurrencyAsync();
|
||||
}
|
||||
|
||||
[SuppressMessage("ReSharper", "EntityFramework.UnsupportedServerSideFunctionCall", Justification = "Projectables")]
|
||||
|
@ -277,6 +279,6 @@ public class NotificationService(
|
|||
|
||||
await db.AddAsync(notification);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseNotification(this, notification);
|
||||
await eventSvc.RaiseNotification(this, notification);
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@ using WebPushSubscription = Iceshrimp.WebPush.PushSubscription;
|
|||
namespace Iceshrimp.Backend.Core.Services;
|
||||
|
||||
public class PushService(
|
||||
EventService eventSvc,
|
||||
IEventService eventSvc,
|
||||
ILogger<PushService> logger,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
HttpClient httpClient,
|
||||
|
|
|
@ -19,14 +19,14 @@ public sealed class StreamingService
|
|||
});
|
||||
|
||||
private readonly ConcurrentDictionary<string, StreamingConnectionAggregate> _connections = [];
|
||||
private readonly EventService _eventSvc;
|
||||
private readonly IEventService _eventSvc;
|
||||
private readonly IHubContext<StreamingHub, IStreamingHubClient> _hub;
|
||||
private readonly ILogger<StreamingService> _logger;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
|
||||
public StreamingService(
|
||||
IHubContext<StreamingHub, IStreamingHubClient> hub,
|
||||
EventService eventSvc,
|
||||
IEventService eventSvc,
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<StreamingService> logger
|
||||
)
|
||||
|
|
|
@ -36,7 +36,7 @@ public class UserService(
|
|||
ActivityPub.MentionsResolver mentionsResolver,
|
||||
ActivityPub.UserRenderer userRenderer,
|
||||
QueueService queueSvc,
|
||||
EventService eventSvc,
|
||||
IEventService eventSvc,
|
||||
WebFingerService webFingerSvc,
|
||||
ActivityPub.FederationControlService fedCtrlSvc
|
||||
)
|
||||
|
@ -884,7 +884,7 @@ public class UserService(
|
|||
}
|
||||
|
||||
followee.PrecomputedIsFollowedBy = false;
|
||||
eventSvc.RaiseUserUnfollowed(this, user, followee);
|
||||
await eventSvc.RaiseUserUnfollowed(this, user, followee);
|
||||
}
|
||||
|
||||
if (followee.PrecomputedIsRequestedBy ?? false)
|
||||
|
@ -1080,8 +1080,7 @@ public class UserService(
|
|||
};
|
||||
await db.AddAsync(muting);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
eventSvc.RaiseUserMuted(this, muter, mutee);
|
||||
await eventSvc.RaiseUserMuted(this, muter, mutee);
|
||||
|
||||
if (expiration != null)
|
||||
{
|
||||
|
@ -1096,7 +1095,7 @@ public class UserService(
|
|||
return;
|
||||
|
||||
await db.Mutings.Where(p => p.Muter == muter && p.Mutee == mutee).ExecuteDeleteAsync();
|
||||
eventSvc.RaiseUserUnmuted(this, muter, mutee);
|
||||
await eventSvc.RaiseUserUnmuted(this, muter, mutee);
|
||||
|
||||
mutee.PrecomputedIsMutedBy = false;
|
||||
}
|
||||
|
@ -1139,8 +1138,7 @@ public class UserService(
|
|||
|
||||
await db.AddAsync(blocking);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
eventSvc.RaiseUserBlocked(this, blocker, blockee);
|
||||
await eventSvc.RaiseUserBlocked(this, blocker, blockee);
|
||||
|
||||
if (blocker.IsLocalUser && blockee.IsRemoteUser)
|
||||
{
|
||||
|
@ -1163,8 +1161,7 @@ public class UserService(
|
|||
|
||||
db.Remove(blocking);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
eventSvc.RaiseUserUnblocked(this, blocker, blockee);
|
||||
await eventSvc.RaiseUserUnblocked(this, blocker, blockee);
|
||||
|
||||
if (blocker.IsLocalUser && blockee.IsRemoteUser)
|
||||
{
|
||||
|
|
|
@ -22,7 +22,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
|
|||
private readonly WriteLockingList<string> _blocking = [];
|
||||
private readonly WriteLockingList<string> _connectionIds = [];
|
||||
|
||||
private readonly EventService _eventService;
|
||||
private readonly IEventService _eventService;
|
||||
|
||||
private readonly WriteLockingList<string> _following = [];
|
||||
|
||||
|
@ -239,7 +239,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
|
|||
string userId,
|
||||
User user,
|
||||
IHubContext<StreamingHub, IStreamingHubClient> hub,
|
||||
EventService eventSvc,
|
||||
IEventService eventSvc,
|
||||
IServiceScopeFactory scopeFactory, StreamingService streamingService
|
||||
)
|
||||
{
|
||||
|
|
|
@ -34,7 +34,7 @@ builder.Services.AddSignalR().AddMessagePackProtocol();
|
|||
builder.Services.AddResponseCompression();
|
||||
builder.Services.AddRazorPages();
|
||||
|
||||
builder.Services.AddServices();
|
||||
builder.Services.AddServices(builder.Configuration);
|
||||
builder.Services.ConfigureServices(builder.Configuration);
|
||||
builder.WebHost.ConfigureKestrel(builder.Configuration);
|
||||
builder.WebHost.UseStaticWebAssets();
|
||||
|
|
|
@ -39,10 +39,10 @@ public static class MockObjects
|
|||
private static ServiceProvider GetServiceProvider()
|
||||
{
|
||||
var config = new ConfigurationManager();
|
||||
config.AddIniFile("configuration.ini", false);
|
||||
config.AddIniStream(AssemblyHelpers.GetEmbeddedResourceStream("configuration.ini"));
|
||||
|
||||
var collection = new ServiceCollection();
|
||||
collection.AddServices();
|
||||
collection.AddServices(config);
|
||||
collection.ConfigureServices(config);
|
||||
|
||||
return collection.BuildServiceProvider();
|
||||
|
|
Loading…
Add table
Reference in a new issue