[backend/signalr] Switch to Lazy<T> for NoteResponses in StreamingService
This commit is contained in:
parent
c9b9a8e45b
commit
8340952653
2 changed files with 12 additions and 28 deletions
|
@ -1,5 +1,4 @@
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using AsyncKeyedLock;
|
|
||||||
using Iceshrimp.Backend.Controllers.Web.Renderers;
|
using Iceshrimp.Backend.Controllers.Web.Renderers;
|
||||||
using Iceshrimp.Backend.Core.Database.Tables;
|
using Iceshrimp.Backend.Core.Database.Tables;
|
||||||
using Iceshrimp.Backend.SignalR;
|
using Iceshrimp.Backend.SignalR;
|
||||||
|
@ -12,12 +11,6 @@ namespace Iceshrimp.Backend.Core.Services;
|
||||||
|
|
||||||
public sealed class StreamingService
|
public sealed class StreamingService
|
||||||
{
|
{
|
||||||
private static readonly AsyncKeyedLocker<string> Locker = new(o =>
|
|
||||||
{
|
|
||||||
o.PoolSize = 100;
|
|
||||||
o.PoolInitialFill = 5;
|
|
||||||
});
|
|
||||||
|
|
||||||
private readonly ConcurrentDictionary<string, StreamingConnectionAggregate> _connections = [];
|
private readonly ConcurrentDictionary<string, StreamingConnectionAggregate> _connections = [];
|
||||||
private readonly EventService _eventSvc;
|
private readonly EventService _eventSvc;
|
||||||
private readonly IHubContext<StreamingHub, IStreamingHubClient> _hub;
|
private readonly IHubContext<StreamingHub, IStreamingHubClient> _hub;
|
||||||
|
@ -40,8 +33,8 @@ public sealed class StreamingService
|
||||||
eventSvc.NoteUpdated += OnNoteUpdated;
|
eventSvc.NoteUpdated += OnNoteUpdated;
|
||||||
}
|
}
|
||||||
|
|
||||||
public event EventHandler<(Note note, Func<Task<NoteResponse>> rendered)>? NotePublished;
|
public event EventHandler<(Note note, Lazy<Task<NoteResponse>> rendered)>? NotePublished;
|
||||||
public event EventHandler<(Note note, Func<Task<NoteResponse>> rendered)>? NoteUpdated;
|
public event EventHandler<(Note note, Lazy<Task<NoteResponse>> rendered)>? NoteUpdated;
|
||||||
|
|
||||||
public void Connect(string userId, User user, string connectionId)
|
public void Connect(string userId, User user, string connectionId)
|
||||||
{
|
{
|
||||||
|
@ -80,23 +73,14 @@ public sealed class StreamingService
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Func<Task<NoteResponse>> Render(Note note)
|
private Lazy<Task<NoteResponse>> Render(Note note)
|
||||||
{
|
{
|
||||||
NoteResponse? res = null;
|
return new Lazy<Task<NoteResponse>>(async () =>
|
||||||
return async () =>
|
|
||||||
{
|
{
|
||||||
// Skip the mutex if res is already set
|
await using var scope = _scopeFactory.CreateAsyncScope();
|
||||||
if (res != null) return res;
|
var renderer = scope.ServiceProvider.GetRequiredService<NoteRenderer>();
|
||||||
|
return await renderer.RenderOne(note, null);
|
||||||
using (await Locker.LockAsync(note.Id))
|
});
|
||||||
{
|
|
||||||
if (res != null) return res;
|
|
||||||
await using var scope = _scopeFactory.CreateAsyncScope();
|
|
||||||
var renderer = scope.ServiceProvider.GetRequiredService<NoteRenderer>();
|
|
||||||
res = await renderer.RenderOne(note, null);
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void OnNotePublished(object? _, Note note)
|
private void OnNotePublished(object? _, Note note)
|
||||||
|
|
|
@ -84,7 +84,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async void OnNotePublished(object? _, (Note note, Func<Task<NoteResponse>> rendered) data)
|
private async void OnNotePublished(object? _, (Note note, Lazy<Task<NoteResponse>> rendered) data)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -100,7 +100,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var rendered = EnforceRenoteReplyVisibility(await data.rendered(), wrapped);
|
var rendered = EnforceRenoteReplyVisibility(await data.rendered.Value, wrapped);
|
||||||
await _hub.Clients.Clients(recipients.connectionIds).NotePublished(recipients.timelines, rendered);
|
await _hub.Clients.Clients(recipients.connectionIds).NotePublished(recipients.timelines, rendered);
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
@ -109,7 +109,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async void OnNoteUpdated(object? _, (Note note, Func<Task<NoteResponse>> rendered) data)
|
private async void OnNoteUpdated(object? _, (Note note, Lazy<Task<NoteResponse>> rendered) data)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -118,7 +118,7 @@ public sealed class StreamingConnectionAggregate : IDisposable
|
||||||
var (connectionIds, _) = FindRecipients(data.note);
|
var (connectionIds, _) = FindRecipients(data.note);
|
||||||
if (connectionIds.Count == 0) return;
|
if (connectionIds.Count == 0) return;
|
||||||
|
|
||||||
var rendered = EnforceRenoteReplyVisibility(await data.rendered(), wrapped);
|
var rendered = EnforceRenoteReplyVisibility(await data.rendered.Value, wrapped);
|
||||||
await _hub.Clients.Clients(connectionIds).NoteUpdated(rendered);
|
await _hub.Clients.Clients(connectionIds).NoteUpdated(rendered);
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
|
Loading…
Add table
Reference in a new issue