using System.Text.Json; using Iceshrimp.Backend.Controllers.Mastodon.Renderers; using Iceshrimp.Backend.Controllers.Mastodon.Schemas.Entities; using Iceshrimp.Backend.Core.Database; using Iceshrimp.Backend.Core.Database.Tables; using Iceshrimp.Backend.Core.Middleware; namespace Iceshrimp.Backend.Controllers.Mastodon.Streaming.Channels; public class UserChannel(WebSocketConnection connection, bool notificationsOnly) : IChannel { private readonly ILogger _logger = connection.Scope.ServiceProvider.GetRequiredService>(); public string Name => notificationsOnly ? "user:notification" : "user"; public List Scopes => ["read:statuses", "read:notifications"]; public bool IsSubscribed { get; private set; } public bool IsAggregate => false; public async Task SubscribeAsync(StreamingRequestMessage _) { if (IsSubscribed) return; IsSubscribed = true; await using var scope = connection.GetAsyncServiceScope(); await using var db = scope.ServiceProvider.GetRequiredService(); if (!notificationsOnly) { connection.EventService.NotePublished += OnNotePublished; connection.EventService.NoteUpdated += OnNoteUpdated; connection.EventService.NoteDeleted += OnNoteDeleted; } connection.EventService.Notification += OnNotification; } public Task UnsubscribeAsync(StreamingRequestMessage _) { if (!IsSubscribed) return Task.CompletedTask; IsSubscribed = false; Dispose(); return Task.CompletedTask; } public void Dispose() { if (!notificationsOnly) { connection.EventService.NotePublished -= OnNotePublished; connection.EventService.NoteUpdated -= OnNoteUpdated; connection.EventService.NoteDeleted -= OnNoteDeleted; } connection.EventService.Notification -= OnNotification; } private NoteWithVisibilities? IsApplicable(Note note) { if (!IsApplicableBool(note)) return null; var res = EnforceRenoteReplyVisibility(note); return res is not { Note.IsPureRenote: true, Renote: null } ? res : null; } private bool IsApplicableBool(Note note) => connection.Following.Prepend(connection.Token.User.Id).Contains(note.UserId) && !connection.HiddenFromHome.Contains(note.UserId) && (note.Visibility <= Note.NoteVisibility.Followers || note.IsVisibleFor(connection.Token.User, connection.Following)); private bool IsApplicable(Notification notification) => notification.NotifieeId == connection.Token.User.Id; private bool IsFiltered(Notification notification) => (notification.Notifier != null && connection.IsFiltered(notification.Notifier)) || (notification.Note != null && connection.IsFiltered(notification.Note)); private NoteWithVisibilities EnforceRenoteReplyVisibility(Note note) { var wrapped = new NoteWithVisibilities(note); if (!wrapped.Renote?.IsVisibleFor(connection.Token.User, connection.Following) ?? false) wrapped.Renote = null; return wrapped; } private static StatusEntity EnforceRenoteReplyVisibility(StatusEntity rendered, NoteWithVisibilities note) { var renote = note.Renote == null && rendered.Renote != null; if (!renote) return rendered; rendered = (StatusEntity)rendered.Clone(); if (renote) rendered.Renote = null; return rendered; } private async void OnNotePublished(object? _, Note note) { try { var wrapped = IsApplicable(note); if (wrapped == null) return; if (connection.IsFiltered(note)) return; if (note.CreatedAt < DateTime.UtcNow - TimeSpan.FromMinutes(5)) return; await using var scope = connection.GetAsyncServiceScope(); if (await connection.IsMutedThreadAsync(note, scope)) return; var renderer = scope.ServiceProvider.GetRequiredService(); var intermediate = await renderer.RenderAsync(note, connection.Token.User); var rendered = EnforceRenoteReplyVisibility(intermediate, wrapped); var message = new StreamingUpdateMessage { Stream = [Name], Event = "update", Payload = JsonSerializer.Serialize(rendered) }; await connection.SendMessageAsync(JsonSerializer.Serialize(message)); } catch (Exception e) { _logger.LogError("Event handler OnNoteUpdated threw exception: {e}", e); } } private async void OnNoteUpdated(object? _, Note note) { try { var wrapped = IsApplicable(note); if (wrapped == null) return; if (connection.IsFiltered(note)) return; await using var scope = connection.GetAsyncServiceScope(); var renderer = scope.ServiceProvider.GetRequiredService(); var intermediate = await renderer.RenderAsync(note, connection.Token.User); var rendered = EnforceRenoteReplyVisibility(intermediate, wrapped); var message = new StreamingUpdateMessage { Stream = [Name], Event = "status.update", Payload = JsonSerializer.Serialize(rendered) }; await connection.SendMessageAsync(JsonSerializer.Serialize(message)); } catch (Exception e) { _logger.LogError("Event handler OnNoteUpdated threw exception: {e}", e); } } private async void OnNoteDeleted(object? _, Note note) { try { if (!IsApplicableBool(note)) return; if (connection.IsFiltered(note)) return; var message = new StreamingUpdateMessage { Stream = [Name], Event = "delete", Payload = note.Id }; await connection.SendMessageAsync(JsonSerializer.Serialize(message)); } catch (Exception e) { _logger.LogError("Event handler OnNoteDeleted threw exception: {e}", e); } } private async void OnNotification(object? _, Notification notification) { try { if (!IsApplicable(notification)) return; if (IsFiltered(notification)) return; await using var scope = connection.GetAsyncServiceScope(); if (notification.Note != null && await connection.IsMutedThreadAsync(notification.Note, scope, true)) return; var renderer = scope.ServiceProvider.GetRequiredService(); NotificationEntity rendered; try { rendered = await renderer.RenderAsync(notification, connection.Token.User, connection.Token.IsPleroma); } catch (GracefulException) { // Unsupported notification type return; } var message = new StreamingUpdateMessage { Stream = [Name], Event = "notification", Payload = JsonSerializer.Serialize(rendered) }; await connection.SendMessageAsync(JsonSerializer.Serialize(message)); } catch (Exception e) { _logger.LogError("Event handler OnNotification threw exception: {e}", e); } } private class NoteWithVisibilities(Note note) { public readonly Note Note = note; public Note? Renote = note.Renote; } }