[backend/streaming] Add connection state handlers, enable stateful auto-reconnect, remove stub message handlers

This commit is contained in:
Laura Hausmann 2024-06-26 16:07:59 +02:00
parent 5fd34e690d
commit 8d158cc7b9
No known key found for this signature in database
GPG key ID: D044E84C5BE01605
5 changed files with 21 additions and 59 deletions

View file

@ -8,11 +8,6 @@ namespace Iceshrimp.Backend.Hubs;
[Microsoft.AspNetCore.Authorization.Authorize(Policy = "HubAuthorization")] [Microsoft.AspNetCore.Authorization.Authorize(Policy = "HubAuthorization")]
public class StreamingHub(StreamingService streamingService) : Hub<IStreamingHubClient>, IStreamingHubServer public class StreamingHub(StreamingService streamingService) : Hub<IStreamingHubClient>, IStreamingHubServer
{ {
public async Task SendMessage(string user, string message)
{
await Clients.All.ReceiveMessage("SignalR", "ping!");
}
public Task Subscribe(StreamingTimeline timeline) public Task Subscribe(StreamingTimeline timeline)
{ {
var userId = Context.UserIdentifier ?? throw new Exception("UserIdentifier must not be null at this stage"); var userId = Context.UserIdentifier ?? throw new Exception("UserIdentifier must not be null at this stage");
@ -32,7 +27,6 @@ public class StreamingHub(StreamingService streamingService) : Hub<IStreamingHub
var user = ctx.GetUserOrFail(); var user = ctx.GetUserOrFail();
var userId = Context.UserIdentifier ?? throw new Exception("UserIdentifier must not be null at this stage"); var userId = Context.UserIdentifier ?? throw new Exception("UserIdentifier must not be null at this stage");
streamingService.Connect(userId, user, Context.ConnectionId); streamingService.Connect(userId, user, Context.ConnectionId);
await Clients.User(userId).ReceiveMessage("SignalR", "Device connected");
} }
public override async Task OnDisconnectedAsync(Exception? exception) public override async Task OnDisconnectedAsync(Exception? exception)

View file

@ -19,11 +19,10 @@ internal class StreamingService(
private HubConnection? _hubConnection; private HubConnection? _hubConnection;
private IStreamingHubServer? _hub; private IStreamingHubServer? _hub;
public event EventHandler<string>? Message;
public event EventHandler<NotificationResponse>? Notification; public event EventHandler<NotificationResponse>? Notification;
public event EventHandler<NoteEvent>? NotePublished; public event EventHandler<NoteEvent>? NotePublished;
public event EventHandler<NoteEvent>? NoteUpdated; public event EventHandler<NoteEvent>? NoteUpdated;
public event EventHandler? OnConnectionChange; public event EventHandler<HubConnectionState>? OnConnectionChange;
public async Task Connect(StoredUser? user = null) public async Task Connect(StoredUser? user = null)
{ {
@ -40,6 +39,8 @@ internal class StreamingService(
_hubConnection = new HubConnectionBuilder() _hubConnection = new HubConnectionBuilder()
.WithUrl(navigation.ToAbsoluteUri("/hubs/streaming"), Auth) .WithUrl(navigation.ToAbsoluteUri("/hubs/streaming"), Auth)
.WithAutomaticReconnect()
.WithStatefulReconnect()
.AddMessagePackProtocol() .AddMessagePackProtocol()
.Build(); .Build();
@ -53,22 +54,26 @@ internal class StreamingService(
} }
catch (Exception e) catch (Exception e)
{ {
Message?.Invoke(this, $"System: Connection failed - {e.Message}"); OnConnectionChange?.Invoke(this, HubConnectionState.Disconnected);
logger.LogError("Connection failed: {error}", e.Message); logger.LogError("Connection failed: {error}", e.Message);
} }
return; return;
void Auth(HttpConnectionOptions options) void Auth(HttpConnectionOptions options) =>
{
options.AccessTokenProvider = () => Task.FromResult<string?>(user.Token); options.AccessTokenProvider = () => Task.FromResult<string?>(user.Token);
} }
public async Task Reconnect(StoredUser? user = null)
{
if (_hubConnection is null)
{
await Connect(user);
return;
} }
public async Task Send(string userInput, string messageInput) if (_hubConnection.State is not HubConnectionState.Disconnected) return;
{ await _hubConnection.StartAsync();
if (_hub is not null)
await _hub.SendMessage(userInput, messageInput);
} }
public bool IsConnected => _hubConnection?.State == HubConnectionState.Connected; public bool IsConnected => _hubConnection?.State == HubConnectionState.Connected;
@ -83,13 +88,6 @@ internal class StreamingService(
private class StreamingHubClient(StreamingService streaming) : IStreamingHubClient, IHubConnectionObserver private class StreamingHubClient(StreamingService streaming) : IStreamingHubClient, IHubConnectionObserver
{ {
public Task ReceiveMessage(string user, string message)
{
var encodedMsg = $"{user}: {message}";
streaming.Message?.Invoke(this, encodedMsg);
return Task.CompletedTask;
}
public Task Notification(NotificationResponse notification) public Task Notification(NotificationResponse notification)
{ {
streaming.Notification?.Invoke(this, notification); streaming.Notification?.Invoke(this, notification);
@ -112,19 +110,20 @@ internal class StreamingService(
public Task OnClosed(Exception? exception) public Task OnClosed(Exception? exception)
{ {
streaming.OnConnectionChange?.Invoke(this, EventArgs.Empty); streaming.OnConnectionChange?.Invoke(this, HubConnectionState.Disconnected);
return ReceiveMessage("System", "Connection closed."); return Task.CompletedTask;
} }
public Task OnReconnected(string? connectionId) public Task OnReconnected(string? connectionId)
{ {
streaming.OnConnectionChange?.Invoke(this, EventArgs.Empty); streaming.OnConnectionChange?.Invoke(this, HubConnectionState.Connected);
return ReceiveMessage("System", "Reconnected."); return Task.CompletedTask;
} }
public Task OnReconnecting(Exception? exception) public Task OnReconnecting(Exception? exception)
{ {
return ReceiveMessage("System", "Reconnecting..."); streaming.OnConnectionChange?.Invoke(this, HubConnectionState.Reconnecting);
return Task.CompletedTask;
} }
} }
} }

View file

@ -6,22 +6,6 @@
<PageTitle>Home</PageTitle> <PageTitle>Home</PageTitle>
<div class="form-group mb-3">
<label>
User:
<input class="form-control" @bind="_userInput"/>
</label>
</div>
<div class="form-group mb-3">
<label>
Message:
<input class="form-control" @bind="_messageInput" size="50"/>
</label>
</div>
<button class="btn btn-primary" @onclick="Send" disabled="@(!StreamingService.IsConnected)">Send</button>
<hr>
<ul id="messagesList"> <ul id="messagesList">
@foreach (var message in _messages) @foreach (var message in _messages)
{ {

View file

@ -10,12 +10,8 @@ public partial class Streaming : IAsyncDisposable
[Inject] private StreamingService StreamingService { get; init; } = null!; [Inject] private StreamingService StreamingService { get; init; } = null!;
private readonly List<string> _messages = []; private readonly List<string> _messages = [];
private string _userInput = "";
private string _messageInput = "";
protected override async Task OnInitializedAsync() protected override async Task OnInitializedAsync()
{ {
StreamingService.Message += OnMessage;
StreamingService.Notification += OnNotification; StreamingService.Notification += OnNotification;
StreamingService.NotePublished += OnNotePublished; StreamingService.NotePublished += OnNotePublished;
StreamingService.NoteUpdated += OnNoteUpdated; StreamingService.NoteUpdated += OnNoteUpdated;
@ -23,14 +19,6 @@ public partial class Streaming : IAsyncDisposable
await StreamingService.Connect(); await StreamingService.Connect();
} }
private async Task Send() => await StreamingService.Send(_userInput, _messageInput);
private async void OnMessage(object? _, string message)
{
_messages.Add(message);
await InvokeAsync(StateHasChanged);
}
private async void OnNotification(object? _, NotificationResponse notification) private async void OnNotification(object? _, NotificationResponse notification)
{ {
_messages.Add($"Notification: {notification.Id} ({notification.Type})"); _messages.Add($"Notification: {notification.Id} ({notification.Type})");
@ -51,7 +39,6 @@ public partial class Streaming : IAsyncDisposable
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
StreamingService.Message -= OnMessage;
StreamingService.Notification -= OnNotification; StreamingService.Notification -= OnNotification;
StreamingService.NotePublished -= OnNotePublished; StreamingService.NotePublished -= OnNotePublished;
StreamingService.NoteUpdated -= OnNoteUpdated; StreamingService.NoteUpdated -= OnNoteUpdated;

View file

@ -4,14 +4,12 @@ namespace Iceshrimp.Shared.HubSchemas;
public interface IStreamingHubServer public interface IStreamingHubServer
{ {
public Task SendMessage(string user, string message);
public Task Subscribe(StreamingTimeline timeline); public Task Subscribe(StreamingTimeline timeline);
public Task Unsubscribe(StreamingTimeline timeline); public Task Unsubscribe(StreamingTimeline timeline);
} }
public interface IStreamingHubClient public interface IStreamingHubClient
{ {
public Task ReceiveMessage(string user, string message);
public Task Notification(NotificationResponse notification); public Task Notification(NotificationResponse notification);
public Task NotePublished(List<StreamingTimeline> timelines, NoteResponse note); public Task NotePublished(List<StreamingTimeline> timelines, NoteResponse note);
public Task NoteUpdated(List<StreamingTimeline> timelines, NoteResponse note); public Task NoteUpdated(List<StreamingTimeline> timelines, NoteResponse note);