[frontend] Move streaming connection into service
This commit is contained in:
parent
bc14abbacc
commit
a2774bef53
4 changed files with 176 additions and 105 deletions
130
Iceshrimp.Frontend/Core/Services/StreamingService.cs
Normal file
130
Iceshrimp.Frontend/Core/Services/StreamingService.cs
Normal file
|
@ -0,0 +1,130 @@
|
|||
using Iceshrimp.Frontend.Core.Schemas;
|
||||
using Iceshrimp.Shared.HubSchemas;
|
||||
using Iceshrimp.Shared.Schemas;
|
||||
using Microsoft.AspNetCore.Components;
|
||||
using Microsoft.AspNetCore.Http.Connections.Client;
|
||||
using Microsoft.AspNetCore.SignalR.Client;
|
||||
using TypedSignalR.Client;
|
||||
|
||||
namespace Iceshrimp.Frontend.Core.Services;
|
||||
|
||||
using NoteEvent = (StreamingTimeline timeline, NoteResponse note);
|
||||
|
||||
internal class StreamingService(
|
||||
SessionService session,
|
||||
NavigationManager navigation,
|
||||
ILogger<StreamingService> logger
|
||||
) : IAsyncDisposable
|
||||
{
|
||||
private HubConnection? _hubConnection;
|
||||
private IStreamingHubServer? _hub;
|
||||
|
||||
public event EventHandler<string>? Message;
|
||||
public event EventHandler<NotificationResponse>? Notification;
|
||||
public event EventHandler<NoteEvent>? NotePublished;
|
||||
public event EventHandler<NoteEvent>? NoteUpdated;
|
||||
public event EventHandler? OnConnectionChange;
|
||||
|
||||
public async Task Connect(StoredUser? user = null)
|
||||
{
|
||||
if (_hubConnection != null)
|
||||
{
|
||||
await _hubConnection.DisposeAsync();
|
||||
_hubConnection = null;
|
||||
}
|
||||
|
||||
user ??= session.Current;
|
||||
|
||||
if (user == null)
|
||||
return;
|
||||
|
||||
_hubConnection = new HubConnectionBuilder()
|
||||
.WithUrl(navigation.ToAbsoluteUri("/hubs/streaming"), Auth)
|
||||
.AddMessagePackProtocol()
|
||||
.Build();
|
||||
|
||||
_hub = _hubConnection.CreateHubProxy<IStreamingHubServer>();
|
||||
_hubConnection.Register<IStreamingHubClient>(new StreamingHubClient(this));
|
||||
|
||||
try
|
||||
{
|
||||
await _hubConnection.StartAsync();
|
||||
await _hub.Subscribe(StreamingTimeline.Home);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Message?.Invoke(this, $"System: Connection failed - {e.Message}");
|
||||
logger.LogError("Connection failed: {error}", e.Message);
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
void Auth(HttpConnectionOptions options)
|
||||
{
|
||||
options.AccessTokenProvider = () => Task.FromResult<string?>(user.Token);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task Send(string userInput, string messageInput)
|
||||
{
|
||||
if (_hub is not null)
|
||||
await _hub.SendMessage(userInput, messageInput);
|
||||
}
|
||||
|
||||
public bool IsConnected => _hubConnection?.State == HubConnectionState.Connected;
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_hubConnection is not null)
|
||||
{
|
||||
await _hubConnection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private class StreamingHubClient(StreamingService streaming) : IStreamingHubClient, IHubConnectionObserver
|
||||
{
|
||||
public Task ReceiveMessage(string user, string message)
|
||||
{
|
||||
var encodedMsg = $"{user}: {message}";
|
||||
streaming.Message?.Invoke(this, encodedMsg);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task Notification(NotificationResponse notification)
|
||||
{
|
||||
streaming.Notification?.Invoke(this, notification);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task NotePublished(List<StreamingTimeline> timelines, NoteResponse note)
|
||||
{
|
||||
foreach (var timeline in timelines)
|
||||
streaming.NotePublished?.Invoke(this, (timeline, note));
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task NoteUpdated(List<StreamingTimeline> timelines, NoteResponse note)
|
||||
{
|
||||
foreach (var timeline in timelines)
|
||||
streaming.NoteUpdated?.Invoke(this, (timeline, note));
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task OnClosed(Exception? exception)
|
||||
{
|
||||
streaming.OnConnectionChange?.Invoke(this, EventArgs.Empty);
|
||||
return ReceiveMessage("System", "Connection closed.");
|
||||
}
|
||||
|
||||
public Task OnReconnected(string? connectionId)
|
||||
{
|
||||
streaming.OnConnectionChange?.Invoke(this, EventArgs.Empty);
|
||||
return ReceiveMessage("System", "Reconnected.");
|
||||
}
|
||||
|
||||
public Task OnReconnecting(Exception? exception)
|
||||
{
|
||||
return ReceiveMessage("System", "Reconnecting...");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,8 @@
|
|||
@page "/streaming"
|
||||
@inject NavigationManager Navigation
|
||||
@implements IAsyncDisposable
|
||||
|
||||
@code {
|
||||
// See Streaming.razor.cs
|
||||
}
|
||||
|
||||
<PageTitle>Home</PageTitle>
|
||||
|
||||
|
@ -16,7 +18,7 @@
|
|||
<input class="form-control" @bind="_messageInput" size="50"/>
|
||||
</label>
|
||||
</div>
|
||||
<button class="btn btn-primary" @onclick="Send" disabled="@(!IsConnected)">Send</button>
|
||||
<button class="btn btn-primary" @onclick="Send" disabled="@(!StreamingService.IsConnected)">Send</button>
|
||||
|
||||
<hr>
|
||||
|
||||
|
@ -26,7 +28,3 @@
|
|||
<li>@message</li>
|
||||
}
|
||||
</ul>
|
||||
|
||||
@code {
|
||||
// See Streaming.razor.cs
|
||||
}
|
|
@ -2,119 +2,61 @@ using Iceshrimp.Frontend.Core.Services;
|
|||
using Iceshrimp.Shared.HubSchemas;
|
||||
using Iceshrimp.Shared.Schemas;
|
||||
using Microsoft.AspNetCore.Components;
|
||||
using Microsoft.AspNetCore.Http.Connections.Client;
|
||||
using Microsoft.AspNetCore.SignalR.Client;
|
||||
using TypedSignalR.Client;
|
||||
|
||||
namespace Iceshrimp.Frontend.Pages;
|
||||
|
||||
public partial class Streaming
|
||||
public partial class Streaming : IAsyncDisposable
|
||||
{
|
||||
[Inject] private SessionService? Session { get; set; }
|
||||
[Inject] private StreamingService StreamingService { get; init; } = null!;
|
||||
private readonly List<string> _messages = [];
|
||||
|
||||
private readonly List<string> _messages = [];
|
||||
|
||||
private HubConnection? _hubConnection;
|
||||
private IStreamingHubServer? _hub;
|
||||
private string _userInput = "";
|
||||
private string _messageInput = "";
|
||||
|
||||
private class StreamingHubClient(Streaming page) : IStreamingHubClient, IHubConnectionObserver
|
||||
{
|
||||
public Task ReceiveMessage(string user, string message)
|
||||
{
|
||||
var encodedMsg = $"{user}: {message}";
|
||||
page._messages.Add(encodedMsg);
|
||||
page.InvokeAsync(page.StateHasChanged);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task Notification(NotificationResponse notification)
|
||||
{
|
||||
var encodedMsg = $"Notification: {notification.Id} ({notification.Type})";
|
||||
page._messages.Add(encodedMsg);
|
||||
page.InvokeAsync(page.StateHasChanged);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task NotePublished(List<StreamingTimeline> timelines, NoteResponse note)
|
||||
{
|
||||
var encodedMsg = $"Note: {note.Id}";
|
||||
page._messages.Add(encodedMsg);
|
||||
page.InvokeAsync(page.StateHasChanged);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task NoteUpdated(List<StreamingTimeline> timelines, NoteResponse note)
|
||||
{
|
||||
var encodedMsg = $"Note update: {note.Id}";
|
||||
page._messages.Add(encodedMsg);
|
||||
page.InvokeAsync(page.StateHasChanged);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task OnClosed(Exception? exception)
|
||||
{
|
||||
return ReceiveMessage("System", "Connection closed.");
|
||||
}
|
||||
|
||||
public Task OnReconnected(string? connectionId)
|
||||
{
|
||||
return ReceiveMessage("System", "Reconnected.");
|
||||
}
|
||||
|
||||
public Task OnReconnecting(Exception? exception)
|
||||
{
|
||||
return ReceiveMessage("System", "Reconnecting...");
|
||||
}
|
||||
}
|
||||
private string _userInput = "";
|
||||
private string _messageInput = "";
|
||||
|
||||
protected override async Task OnInitializedAsync()
|
||||
{
|
||||
_hubConnection = new HubConnectionBuilder()
|
||||
.WithUrl(Navigation.ToAbsoluteUri("/hubs/streaming"), Auth)
|
||||
.AddMessagePackProtocol()
|
||||
.Build();
|
||||
StreamingService.Message += OnMessage;
|
||||
StreamingService.Notification += OnNotification;
|
||||
StreamingService.NotePublished += OnNotePublished;
|
||||
StreamingService.NoteUpdated += OnNoteUpdated;
|
||||
|
||||
// This must be in a .razor.cs file for the code generator to work correctly
|
||||
_hub = _hubConnection.CreateHubProxy<IStreamingHubServer>();
|
||||
|
||||
_hubConnection.Register<IStreamingHubClient>(new StreamingHubClient(this));
|
||||
|
||||
try
|
||||
{
|
||||
await _hubConnection.StartAsync();
|
||||
await _hub.Subscribe(StreamingTimeline.Home);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_messages.Add($"System: Connection failed - {e.Message}");
|
||||
await InvokeAsync(StateHasChanged);
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
void Auth(HttpConnectionOptions options)
|
||||
{
|
||||
options.AccessTokenProvider = () => Task.FromResult(Session?.Current?.Token);
|
||||
}
|
||||
await StreamingService.Connect();
|
||||
}
|
||||
|
||||
private async Task Send()
|
||||
private async Task Send() => await StreamingService.Send(_userInput, _messageInput);
|
||||
|
||||
private async void OnMessage(object? _, string message)
|
||||
{
|
||||
if (_hub is not null)
|
||||
{
|
||||
await _hub.SendMessage(_userInput, _messageInput);
|
||||
}
|
||||
_messages.Add(message);
|
||||
await InvokeAsync(StateHasChanged);
|
||||
}
|
||||
|
||||
private bool IsConnected => _hubConnection?.State == HubConnectionState.Connected;
|
||||
private async void OnNotification(object? _, NotificationResponse notification)
|
||||
{
|
||||
_messages.Add($"Notification: {notification.Id} ({notification.Type})");
|
||||
await InvokeAsync(StateHasChanged);
|
||||
}
|
||||
|
||||
private async void OnNotePublished(object? _, (StreamingTimeline timeline, NoteResponse note) data)
|
||||
{
|
||||
_messages.Add($"Note: {data.note.Id} ({data.timeline.ToString()})");
|
||||
await InvokeAsync(StateHasChanged);
|
||||
}
|
||||
|
||||
private async void OnNoteUpdated(object? _, (StreamingTimeline timeline, NoteResponse note) data)
|
||||
{
|
||||
_messages.Add($"Note updated: {data.note.Id} ({data.timeline.ToString()})");
|
||||
await InvokeAsync(StateHasChanged);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_hubConnection is not null)
|
||||
{
|
||||
await _hubConnection.DisposeAsync();
|
||||
}
|
||||
StreamingService.Message -= OnMessage;
|
||||
StreamingService.Notification -= OnNotification;
|
||||
StreamingService.NotePublished -= OnNotePublished;
|
||||
StreamingService.NoteUpdated -= OnNoteUpdated;
|
||||
|
||||
await StreamingService.DisposeAsync();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
|
@ -13,6 +13,7 @@ builder.Services.AddSingleton(_ => new HttpClient { BaseAddress = new Uri(builde
|
|||
builder.Services.AddSingleton<ApiClient>();
|
||||
builder.Services.AddSingleton<ApiService>();
|
||||
builder.Services.AddSingleton<SessionService>();
|
||||
builder.Services.AddSingleton<StreamingService>();
|
||||
builder.Services.AddScoped<AuthenticationStateProvider, CustomAuthStateProvider>();
|
||||
builder.Services.AddAuthorizationCore();
|
||||
builder.Services.AddCascadingAuthenticationState();
|
||||
|
|
Loading…
Add table
Reference in a new issue