From dc09a4ada28b5882d7fec9d819ab0635b76a0ff0 Mon Sep 17 00:00:00 2001 From: Laura Hausmann Date: Thu, 23 May 2024 15:55:33 +0200 Subject: [PATCH] [backend/masto-client] Add list streaming channel (ISH-332) --- .../Controllers/Mastodon/ListController.cs | 8 +- .../Streaming/Channels/ListChannel.cs | 222 ++++++++++++++++++ .../Mastodon/Streaming/WebSocketConnection.cs | 1 + .../Core/Helpers/WriteLockingList.cs | 13 +- .../Core/Services/EventService.cs | 8 +- 5 files changed, 243 insertions(+), 9 deletions(-) create mode 100644 Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/ListChannel.cs diff --git a/Iceshrimp.Backend/Controllers/Mastodon/ListController.cs b/Iceshrimp.Backend/Controllers/Mastodon/ListController.cs index d320f5b9..7639f17a 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/ListController.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/ListController.cs @@ -10,6 +10,7 @@ using Iceshrimp.Backend.Core.Database.Tables; using Iceshrimp.Backend.Core.Extensions; using Iceshrimp.Backend.Core.Helpers; using Iceshrimp.Backend.Core.Middleware; +using Iceshrimp.Backend.Core.Services; using Microsoft.AspNetCore.Cors; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.RateLimiting; @@ -23,7 +24,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon; [EnableRateLimiting("sliding")] [EnableCors("mastodon")] [Produces(MediaTypeNames.Application.Json)] -public class ListController(DatabaseContext db, UserRenderer userRenderer) : ControllerBase +public class ListController(DatabaseContext db, UserRenderer userRenderer, EventService eventSvc) : ControllerBase { [HttpGet] [Authorize("read:lists")] @@ -143,6 +144,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer) : Con db.Remove(list); await db.SaveChangesAsync(); + eventSvc.RaiseListMembersUpdated(this, list); return Ok(new object()); } @@ -210,6 +212,8 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer) : Con await db.AddRangeAsync(memberships); await db.SaveChangesAsync(); + eventSvc.RaiseListMembersUpdated(this, list); + return Ok(new object()); } @@ -231,6 +235,8 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer) : Con .Where(p => p.UserList == list && request.AccountIds.Contains(p.UserId)) .ExecuteDeleteAsync(); + eventSvc.RaiseListMembersUpdated(this, list); + return Ok(new object()); } } \ No newline at end of file diff --git a/Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/ListChannel.cs b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/ListChannel.cs new file mode 100644 index 00000000..44dc2c60 --- /dev/null +++ b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/Channels/ListChannel.cs @@ -0,0 +1,222 @@ +using System.Collections.Concurrent; +using System.Net.WebSockets; +using System.Text.Json; +using Iceshrimp.Backend.Controllers.Mastodon.Renderers; +using Iceshrimp.Backend.Controllers.Mastodon.Schemas.Entities; +using Iceshrimp.Backend.Core.Database; +using Iceshrimp.Backend.Core.Database.Tables; +using Iceshrimp.Backend.Core.Extensions; +using Iceshrimp.Backend.Core.Helpers; +using Microsoft.EntityFrameworkCore; + +namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming.Channels; + +public class ListChannel(WebSocketConnection connection) : IChannel +{ + private readonly ILogger _logger = + connection.Scope.ServiceProvider.GetRequiredService>(); + + public string Name => "list"; + public List Scopes => ["read:statuses"]; + public bool IsSubscribed => _lists.Count != 0; + public bool IsAggregate => true; + + private readonly WriteLockingList _lists = []; + private readonly ConcurrentDictionary> _members = []; + private IEnumerable _applicableUserIds = []; + + public async Task Subscribe(StreamingRequestMessage msg) + { + if (msg.List == null) + { + await connection.CloseAsync(WebSocketCloseStatus.InvalidPayloadData); + return; + } + + if (!IsSubscribed) + { + connection.EventService.NotePublished += OnNotePublished; + connection.EventService.NoteUpdated += OnNoteUpdated; + connection.EventService.NoteDeleted += OnNoteDeleted; + connection.EventService.ListMembersUpdated += OnListMembersUpdated; + } + + if (_lists.AddIfMissing(msg.List)) + { + await using var scope = connection.ScopeFactory.CreateAsyncScope(); + + var db = scope.ServiceProvider.GetRequiredService(); + var list = await db.UserLists.FirstOrDefaultAsync(p => p.UserId == connection.Token.User.Id && + p.Id == msg.List); + if (list == null) + { + await connection.CloseAsync(WebSocketCloseStatus.InvalidPayloadData); + return; + } + + var members = await db.UserListMembers.Where(p => p.UserList == list).Select(p => p.UserId).ToListAsync(); + _members.AddOrUpdate(list.Id, _ => new WriteLockingList(members), (_, existing) => existing); + _applicableUserIds = _members.Values.SelectMany(p => p).Distinct(); + } + } + + public async Task Unsubscribe(StreamingRequestMessage msg) + { + if (msg.List == null) + { + await connection.CloseAsync(WebSocketCloseStatus.InvalidPayloadData); + return; + } + + _lists.RemoveAll(p => p == msg.List); + _members.TryRemove(msg.List, out _); + _applicableUserIds = _members.Values.SelectMany(p => p).Distinct(); + + if (!IsSubscribed) Dispose(); + } + + public void Dispose() + { + connection.EventService.NotePublished -= OnNotePublished; + connection.EventService.NoteUpdated -= OnNoteUpdated; + connection.EventService.NoteDeleted -= OnNoteDeleted; + connection.EventService.ListMembersUpdated -= OnListMembersUpdated; + } + + private NoteWithVisibilities? IsApplicable(Note note) + { + if (!IsApplicableBool(note)) return null; + var res = EnforceRenoteReplyVisibility(note); + return res is not { Note.IsPureRenote: true, Renote: null } ? res : null; + } + + private bool IsApplicableBool(Note note) => + _applicableUserIds.Contains(note.UserId) && note.IsVisibleFor(connection.Token.User, connection.Following); + + private NoteWithVisibilities EnforceRenoteReplyVisibility(Note note) + { + var wrapped = new NoteWithVisibilities(note); + if (!wrapped.Renote?.IsVisibleFor(connection.Token.User, connection.Following) ?? false) + wrapped.Renote = null; + + return wrapped; + } + + private class NoteWithVisibilities(Note note) + { + public readonly Note Note = note; + public Note? Renote = note.Renote; + } + + private static StatusEntity EnforceRenoteReplyVisibility(StatusEntity rendered, NoteWithVisibilities note) + { + var renote = note.Renote == null && rendered.Renote != null; + if (!renote) return rendered; + + rendered = (StatusEntity)rendered.Clone(); + if (renote) rendered.Renote = null; + return rendered; + } + + private IEnumerable RenderMessage( + IEnumerable lists, string eventType, string payload + ) => lists.Select(list => new StreamingUpdateMessage + { + Stream = [Name, list], + Event = eventType, + Payload = payload + }); + + private async void OnNotePublished(object? _, Note note) + { + try + { + var wrapped = IsApplicable(note); + if (wrapped == null) return; + if (connection.IsFiltered(note)) return; + await using var scope = connection.ScopeFactory.CreateAsyncScope(); + + var renderer = scope.ServiceProvider.GetRequiredService(); + var data = new NoteRenderer.NoteRendererDto { Filters = connection.Filters.ToList() }; + var intermediate = await renderer.RenderAsync(note, connection.Token.User, data: data); + var rendered = EnforceRenoteReplyVisibility(intermediate, wrapped); + + var lists = _members.Where(p => p.Value.Contains(note.UserId)).Select(p => p.Key); + var messages = RenderMessage(lists, "update", JsonSerializer.Serialize(rendered)); + foreach (var message in messages) + await connection.SendMessageAsync(JsonSerializer.Serialize(message)); + } + catch (Exception e) + { + _logger.LogError("Event handler OnNotePublished threw exception: {e}", e); + } + } + + private async void OnNoteUpdated(object? _, Note note) + { + try + { + var wrapped = IsApplicable(note); + if (wrapped == null) return; + if (connection.IsFiltered(note)) return; + await using var scope = connection.ScopeFactory.CreateAsyncScope(); + + var renderer = scope.ServiceProvider.GetRequiredService(); + var data = new NoteRenderer.NoteRendererDto { Filters = connection.Filters.ToList() }; + var intermediate = await renderer.RenderAsync(note, connection.Token.User, data: data); + var rendered = EnforceRenoteReplyVisibility(intermediate, wrapped); + + var lists = _members.Where(p => p.Value.Contains(note.UserId)).Select(p => p.Key); + var messages = RenderMessage(lists, "status.update", JsonSerializer.Serialize(rendered)); + foreach (var message in messages) + await connection.SendMessageAsync(JsonSerializer.Serialize(message)); + } + catch (Exception e) + { + _logger.LogError("Event handler OnNoteUpdated threw exception: {e}", e); + } + } + + private async void OnNoteDeleted(object? _, Note note) + { + try + { + if (!IsApplicableBool(note)) return; + if (connection.IsFiltered(note)) return; + + var lists = _members.Where(p => p.Value.Contains(note.UserId)).Select(p => p.Key); + var messages = RenderMessage(lists, "status.update", note.Id); + foreach (var message in messages) + await connection.SendMessageAsync(JsonSerializer.Serialize(message)); + } + catch (Exception e) + { + _logger.LogError("Event handler OnNoteDeleted threw exception: {e}", e); + } + } + + private async void OnListMembersUpdated(object? _, UserList list) + { + try + { + if (list.UserId != connection.Token.User.Id) return; + if (!_lists.Contains(list.Id)) return; + + await using var scope = connection.ScopeFactory.CreateAsyncScope(); + + var db = scope.ServiceProvider.GetRequiredService(); + var members = await db.UserListMembers.Where(p => p.UserListId == list.Id) + .Select(p => p.UserId) + .ToListAsync(); + + var wlMembers = new WriteLockingList(members); + + _members.AddOrUpdate(list.Id, _ => wlMembers, (_, _) => wlMembers); + _applicableUserIds = _members.Values.SelectMany(p => p).Distinct(); + } + catch (Exception e) + { + _logger.LogError("Event handler OnListUpdated threw exception: {e}", e); + } + } +} \ No newline at end of file diff --git a/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs index fdd1a40e..d346a911 100644 --- a/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs +++ b/Iceshrimp.Backend/Controllers/Mastodon/Streaming/WebSocketConnection.cs @@ -67,6 +67,7 @@ public sealed class WebSocketConnection( Channels.Add(new PublicChannel(this, "public:remote:media", false, true, true)); Channels.Add(new HashtagChannel(this, true)); Channels.Add(new HashtagChannel(this, false)); + Channels.Add(new ListChannel(this)); EventService.UserBlocked += OnUserUnblock; EventService.UserUnblocked += OnUserBlock; diff --git a/Iceshrimp.Backend/Core/Helpers/WriteLockingList.cs b/Iceshrimp.Backend/Core/Helpers/WriteLockingList.cs index 6bc9d564..a75f42b0 100644 --- a/Iceshrimp.Backend/Core/Helpers/WriteLockingList.cs +++ b/Iceshrimp.Backend/Core/Helpers/WriteLockingList.cs @@ -5,9 +5,9 @@ namespace Iceshrimp.Backend.Core.Helpers; [SuppressMessage("ReSharper", "InconsistentlySynchronizedField", Justification = "This is intentional (it's a *write* locking list, after all)")] -public class WriteLockingList : ICollection +public class WriteLockingList(IEnumerable? sourceCollection = null) : ICollection { - private readonly List _list = []; + private readonly List _list = sourceCollection?.ToList() ?? []; public IEnumerator GetEnumerator() => _list.GetEnumerator(); IEnumerator IEnumerable.GetEnumerator() => _list.GetEnumerator(); @@ -17,11 +17,14 @@ public class WriteLockingList : ICollection lock (_list) _list.Add(item); } - public void AddIfMissing(T item) + public bool AddIfMissing(T item) { lock (_list) - if (!_list.Contains(item)) - _list.Add(item); + { + if (_list.Contains(item)) return false; + _list.Add(item); + return true; + } } public void AddRange(IEnumerable item) diff --git a/Iceshrimp.Backend/Core/Services/EventService.cs b/Iceshrimp.Backend/Core/Services/EventService.cs index 12c726ae..0d9341a9 100644 --- a/Iceshrimp.Backend/Core/Services/EventService.cs +++ b/Iceshrimp.Backend/Core/Services/EventService.cs @@ -22,6 +22,7 @@ public class EventService public event EventHandler? FilterAdded; public event EventHandler? FilterRemoved; public event EventHandler? FilterUpdated; + public event EventHandler? ListMembersUpdated; public void RaiseNotePublished(object? sender, Note note) => NotePublished?.Invoke(sender, note); public void RaiseNoteUpdated(object? sender, Note note) => NoteUpdated?.Invoke(sender, note); @@ -65,7 +66,8 @@ public class EventService public void RaiseUserUnmuted(object? sender, User actor, User obj) => UserUnmuted?.Invoke(sender, new UserInteraction { Actor = actor, Object = obj }); - public void RaiseFilterAdded(object? sender, Filter filter) => FilterAdded?.Invoke(sender, filter); - public void RaiseFilterRemoved(object? sender, Filter filter) => FilterRemoved?.Invoke(sender, filter); - public void RaiseFilterUpdated(object? sender, Filter filter) => FilterUpdated?.Invoke(sender, filter); + public void RaiseFilterAdded(object? sender, Filter filter) => FilterAdded?.Invoke(sender, filter); + public void RaiseFilterRemoved(object? sender, Filter filter) => FilterRemoved?.Invoke(sender, filter); + public void RaiseFilterUpdated(object? sender, Filter filter) => FilterUpdated?.Invoke(sender, filter); + public void RaiseListMembersUpdated(object? sender, UserList list) => ListMembersUpdated?.Invoke(sender, list); } \ No newline at end of file