[backend/masto-client] Add list streaming channel (ISH-332)
This commit is contained in:
parent
a4087a4c81
commit
dc09a4ada2
5 changed files with 243 additions and 9 deletions
|
@ -10,6 +10,7 @@ using Iceshrimp.Backend.Core.Database.Tables;
|
|||
using Iceshrimp.Backend.Core.Extensions;
|
||||
using Iceshrimp.Backend.Core.Helpers;
|
||||
using Iceshrimp.Backend.Core.Middleware;
|
||||
using Iceshrimp.Backend.Core.Services;
|
||||
using Microsoft.AspNetCore.Cors;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.RateLimiting;
|
||||
|
@ -23,7 +24,7 @@ namespace Iceshrimp.Backend.Controllers.Mastodon;
|
|||
[EnableRateLimiting("sliding")]
|
||||
[EnableCors("mastodon")]
|
||||
[Produces(MediaTypeNames.Application.Json)]
|
||||
public class ListController(DatabaseContext db, UserRenderer userRenderer) : ControllerBase
|
||||
public class ListController(DatabaseContext db, UserRenderer userRenderer, EventService eventSvc) : ControllerBase
|
||||
{
|
||||
[HttpGet]
|
||||
[Authorize("read:lists")]
|
||||
|
@ -143,6 +144,7 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer) : Con
|
|||
|
||||
db.Remove(list);
|
||||
await db.SaveChangesAsync();
|
||||
eventSvc.RaiseListMembersUpdated(this, list);
|
||||
return Ok(new object());
|
||||
}
|
||||
|
||||
|
@ -210,6 +212,8 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer) : Con
|
|||
await db.AddRangeAsync(memberships);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
eventSvc.RaiseListMembersUpdated(this, list);
|
||||
|
||||
return Ok(new object());
|
||||
}
|
||||
|
||||
|
@ -231,6 +235,8 @@ public class ListController(DatabaseContext db, UserRenderer userRenderer) : Con
|
|||
.Where(p => p.UserList == list && request.AccountIds.Contains(p.UserId))
|
||||
.ExecuteDeleteAsync();
|
||||
|
||||
eventSvc.RaiseListMembersUpdated(this, list);
|
||||
|
||||
return Ok(new object());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
using System.Collections.Concurrent;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text.Json;
|
||||
using Iceshrimp.Backend.Controllers.Mastodon.Renderers;
|
||||
using Iceshrimp.Backend.Controllers.Mastodon.Schemas.Entities;
|
||||
using Iceshrimp.Backend.Core.Database;
|
||||
using Iceshrimp.Backend.Core.Database.Tables;
|
||||
using Iceshrimp.Backend.Core.Extensions;
|
||||
using Iceshrimp.Backend.Core.Helpers;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming.Channels;
|
||||
|
||||
public class ListChannel(WebSocketConnection connection) : IChannel
|
||||
{
|
||||
private readonly ILogger<ListChannel> _logger =
|
||||
connection.Scope.ServiceProvider.GetRequiredService<ILogger<ListChannel>>();
|
||||
|
||||
public string Name => "list";
|
||||
public List<string> Scopes => ["read:statuses"];
|
||||
public bool IsSubscribed => _lists.Count != 0;
|
||||
public bool IsAggregate => true;
|
||||
|
||||
private readonly WriteLockingList<string> _lists = [];
|
||||
private readonly ConcurrentDictionary<string, WriteLockingList<string>> _members = [];
|
||||
private IEnumerable<string> _applicableUserIds = [];
|
||||
|
||||
public async Task Subscribe(StreamingRequestMessage msg)
|
||||
{
|
||||
if (msg.List == null)
|
||||
{
|
||||
await connection.CloseAsync(WebSocketCloseStatus.InvalidPayloadData);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!IsSubscribed)
|
||||
{
|
||||
connection.EventService.NotePublished += OnNotePublished;
|
||||
connection.EventService.NoteUpdated += OnNoteUpdated;
|
||||
connection.EventService.NoteDeleted += OnNoteDeleted;
|
||||
connection.EventService.ListMembersUpdated += OnListMembersUpdated;
|
||||
}
|
||||
|
||||
if (_lists.AddIfMissing(msg.List))
|
||||
{
|
||||
await using var scope = connection.ScopeFactory.CreateAsyncScope();
|
||||
|
||||
var db = scope.ServiceProvider.GetRequiredService<DatabaseContext>();
|
||||
var list = await db.UserLists.FirstOrDefaultAsync(p => p.UserId == connection.Token.User.Id &&
|
||||
p.Id == msg.List);
|
||||
if (list == null)
|
||||
{
|
||||
await connection.CloseAsync(WebSocketCloseStatus.InvalidPayloadData);
|
||||
return;
|
||||
}
|
||||
|
||||
var members = await db.UserListMembers.Where(p => p.UserList == list).Select(p => p.UserId).ToListAsync();
|
||||
_members.AddOrUpdate(list.Id, _ => new WriteLockingList<string>(members), (_, existing) => existing);
|
||||
_applicableUserIds = _members.Values.SelectMany(p => p).Distinct();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task Unsubscribe(StreamingRequestMessage msg)
|
||||
{
|
||||
if (msg.List == null)
|
||||
{
|
||||
await connection.CloseAsync(WebSocketCloseStatus.InvalidPayloadData);
|
||||
return;
|
||||
}
|
||||
|
||||
_lists.RemoveAll(p => p == msg.List);
|
||||
_members.TryRemove(msg.List, out _);
|
||||
_applicableUserIds = _members.Values.SelectMany(p => p).Distinct();
|
||||
|
||||
if (!IsSubscribed) Dispose();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
connection.EventService.NotePublished -= OnNotePublished;
|
||||
connection.EventService.NoteUpdated -= OnNoteUpdated;
|
||||
connection.EventService.NoteDeleted -= OnNoteDeleted;
|
||||
connection.EventService.ListMembersUpdated -= OnListMembersUpdated;
|
||||
}
|
||||
|
||||
private NoteWithVisibilities? IsApplicable(Note note)
|
||||
{
|
||||
if (!IsApplicableBool(note)) return null;
|
||||
var res = EnforceRenoteReplyVisibility(note);
|
||||
return res is not { Note.IsPureRenote: true, Renote: null } ? res : null;
|
||||
}
|
||||
|
||||
private bool IsApplicableBool(Note note) =>
|
||||
_applicableUserIds.Contains(note.UserId) && note.IsVisibleFor(connection.Token.User, connection.Following);
|
||||
|
||||
private NoteWithVisibilities EnforceRenoteReplyVisibility(Note note)
|
||||
{
|
||||
var wrapped = new NoteWithVisibilities(note);
|
||||
if (!wrapped.Renote?.IsVisibleFor(connection.Token.User, connection.Following) ?? false)
|
||||
wrapped.Renote = null;
|
||||
|
||||
return wrapped;
|
||||
}
|
||||
|
||||
private class NoteWithVisibilities(Note note)
|
||||
{
|
||||
public readonly Note Note = note;
|
||||
public Note? Renote = note.Renote;
|
||||
}
|
||||
|
||||
private static StatusEntity EnforceRenoteReplyVisibility(StatusEntity rendered, NoteWithVisibilities note)
|
||||
{
|
||||
var renote = note.Renote == null && rendered.Renote != null;
|
||||
if (!renote) return rendered;
|
||||
|
||||
rendered = (StatusEntity)rendered.Clone();
|
||||
if (renote) rendered.Renote = null;
|
||||
return rendered;
|
||||
}
|
||||
|
||||
private IEnumerable<StreamingUpdateMessage> RenderMessage(
|
||||
IEnumerable<string> lists, string eventType, string payload
|
||||
) => lists.Select(list => new StreamingUpdateMessage
|
||||
{
|
||||
Stream = [Name, list],
|
||||
Event = eventType,
|
||||
Payload = payload
|
||||
});
|
||||
|
||||
private async void OnNotePublished(object? _, Note note)
|
||||
{
|
||||
try
|
||||
{
|
||||
var wrapped = IsApplicable(note);
|
||||
if (wrapped == null) return;
|
||||
if (connection.IsFiltered(note)) return;
|
||||
await using var scope = connection.ScopeFactory.CreateAsyncScope();
|
||||
|
||||
var renderer = scope.ServiceProvider.GetRequiredService<NoteRenderer>();
|
||||
var data = new NoteRenderer.NoteRendererDto { Filters = connection.Filters.ToList() };
|
||||
var intermediate = await renderer.RenderAsync(note, connection.Token.User, data: data);
|
||||
var rendered = EnforceRenoteReplyVisibility(intermediate, wrapped);
|
||||
|
||||
var lists = _members.Where(p => p.Value.Contains(note.UserId)).Select(p => p.Key);
|
||||
var messages = RenderMessage(lists, "update", JsonSerializer.Serialize(rendered));
|
||||
foreach (var message in messages)
|
||||
await connection.SendMessageAsync(JsonSerializer.Serialize(message));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError("Event handler OnNotePublished threw exception: {e}", e);
|
||||
}
|
||||
}
|
||||
|
||||
private async void OnNoteUpdated(object? _, Note note)
|
||||
{
|
||||
try
|
||||
{
|
||||
var wrapped = IsApplicable(note);
|
||||
if (wrapped == null) return;
|
||||
if (connection.IsFiltered(note)) return;
|
||||
await using var scope = connection.ScopeFactory.CreateAsyncScope();
|
||||
|
||||
var renderer = scope.ServiceProvider.GetRequiredService<NoteRenderer>();
|
||||
var data = new NoteRenderer.NoteRendererDto { Filters = connection.Filters.ToList() };
|
||||
var intermediate = await renderer.RenderAsync(note, connection.Token.User, data: data);
|
||||
var rendered = EnforceRenoteReplyVisibility(intermediate, wrapped);
|
||||
|
||||
var lists = _members.Where(p => p.Value.Contains(note.UserId)).Select(p => p.Key);
|
||||
var messages = RenderMessage(lists, "status.update", JsonSerializer.Serialize(rendered));
|
||||
foreach (var message in messages)
|
||||
await connection.SendMessageAsync(JsonSerializer.Serialize(message));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError("Event handler OnNoteUpdated threw exception: {e}", e);
|
||||
}
|
||||
}
|
||||
|
||||
private async void OnNoteDeleted(object? _, Note note)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!IsApplicableBool(note)) return;
|
||||
if (connection.IsFiltered(note)) return;
|
||||
|
||||
var lists = _members.Where(p => p.Value.Contains(note.UserId)).Select(p => p.Key);
|
||||
var messages = RenderMessage(lists, "status.update", note.Id);
|
||||
foreach (var message in messages)
|
||||
await connection.SendMessageAsync(JsonSerializer.Serialize(message));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError("Event handler OnNoteDeleted threw exception: {e}", e);
|
||||
}
|
||||
}
|
||||
|
||||
private async void OnListMembersUpdated(object? _, UserList list)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (list.UserId != connection.Token.User.Id) return;
|
||||
if (!_lists.Contains(list.Id)) return;
|
||||
|
||||
await using var scope = connection.ScopeFactory.CreateAsyncScope();
|
||||
|
||||
var db = scope.ServiceProvider.GetRequiredService<DatabaseContext>();
|
||||
var members = await db.UserListMembers.Where(p => p.UserListId == list.Id)
|
||||
.Select(p => p.UserId)
|
||||
.ToListAsync();
|
||||
|
||||
var wlMembers = new WriteLockingList<string>(members);
|
||||
|
||||
_members.AddOrUpdate(list.Id, _ => wlMembers, (_, _) => wlMembers);
|
||||
_applicableUserIds = _members.Values.SelectMany(p => p).Distinct();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError("Event handler OnListUpdated threw exception: {e}", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -67,6 +67,7 @@ public sealed class WebSocketConnection(
|
|||
Channels.Add(new PublicChannel(this, "public:remote:media", false, true, true));
|
||||
Channels.Add(new HashtagChannel(this, true));
|
||||
Channels.Add(new HashtagChannel(this, false));
|
||||
Channels.Add(new ListChannel(this));
|
||||
|
||||
EventService.UserBlocked += OnUserUnblock;
|
||||
EventService.UserUnblocked += OnUserBlock;
|
||||
|
|
|
@ -5,9 +5,9 @@ namespace Iceshrimp.Backend.Core.Helpers;
|
|||
|
||||
[SuppressMessage("ReSharper", "InconsistentlySynchronizedField",
|
||||
Justification = "This is intentional (it's a *write* locking list, after all)")]
|
||||
public class WriteLockingList<T> : ICollection<T>
|
||||
public class WriteLockingList<T>(IEnumerable<T>? sourceCollection = null) : ICollection<T>
|
||||
{
|
||||
private readonly List<T> _list = [];
|
||||
private readonly List<T> _list = sourceCollection?.ToList() ?? [];
|
||||
|
||||
public IEnumerator<T> GetEnumerator() => _list.GetEnumerator();
|
||||
IEnumerator IEnumerable.GetEnumerator() => _list.GetEnumerator();
|
||||
|
@ -17,11 +17,14 @@ public class WriteLockingList<T> : ICollection<T>
|
|||
lock (_list) _list.Add(item);
|
||||
}
|
||||
|
||||
public void AddIfMissing(T item)
|
||||
public bool AddIfMissing(T item)
|
||||
{
|
||||
lock (_list)
|
||||
if (!_list.Contains(item))
|
||||
{
|
||||
if (_list.Contains(item)) return false;
|
||||
_list.Add(item);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void AddRange(IEnumerable<T> item)
|
||||
|
|
|
@ -22,6 +22,7 @@ public class EventService
|
|||
public event EventHandler<Filter>? FilterAdded;
|
||||
public event EventHandler<Filter>? FilterRemoved;
|
||||
public event EventHandler<Filter>? FilterUpdated;
|
||||
public event EventHandler<UserList>? ListMembersUpdated;
|
||||
|
||||
public void RaiseNotePublished(object? sender, Note note) => NotePublished?.Invoke(sender, note);
|
||||
public void RaiseNoteUpdated(object? sender, Note note) => NoteUpdated?.Invoke(sender, note);
|
||||
|
@ -68,4 +69,5 @@ public class EventService
|
|||
public void RaiseFilterAdded(object? sender, Filter filter) => FilterAdded?.Invoke(sender, filter);
|
||||
public void RaiseFilterRemoved(object? sender, Filter filter) => FilterRemoved?.Invoke(sender, filter);
|
||||
public void RaiseFilterUpdated(object? sender, Filter filter) => FilterUpdated?.Invoke(sender, filter);
|
||||
public void RaiseListMembersUpdated(object? sender, UserList list) => ListMembersUpdated?.Invoke(sender, list);
|
||||
}
|
Loading…
Add table
Reference in a new issue