[backend/drive] Switch to stream processing for remote media
This makes sure that files larger than the configured maximum remote media cache size are not loaded into memory (if the size is known), or are only loaded into memory until the configured maximum size before getting discarded (if the size is not known)
This commit is contained in:
parent
0e44d9f752
commit
bd734275c7
5 changed files with 89 additions and 30 deletions
|
@ -100,8 +100,8 @@ public sealed class Config
|
|||
|
||||
public sealed class StorageSection
|
||||
{
|
||||
public readonly int? MaxCacheSizeBytes;
|
||||
public readonly int? MaxUploadSizeBytes;
|
||||
public readonly long? MaxCacheSizeBytes;
|
||||
public readonly long? MaxUploadSizeBytes;
|
||||
public readonly TimeSpan? MediaRetentionTimeSpan;
|
||||
|
||||
public bool CleanAvatars = false;
|
||||
|
@ -170,10 +170,11 @@ public sealed class Config
|
|||
|
||||
MaxUploadSizeBytes = suffix switch
|
||||
{
|
||||
null => num,
|
||||
'k' or 'K' => num * 1024,
|
||||
'm' or 'M' => num * 1024 * 1024,
|
||||
'g' or 'G' => num * 1024 * 1024 * 1024,
|
||||
null => num,
|
||||
'k' or 'K' => num * 1024L,
|
||||
'm' or 'M' => num * 1024L * 1024,
|
||||
'g' or 'G' => num * 1024L * 1024 * 1024,
|
||||
|
||||
_ => throw new Exception("Unsupported suffix, use one of: [K]ilobytes, [M]egabytes, [G]igabytes")
|
||||
};
|
||||
}
|
||||
|
@ -200,10 +201,11 @@ public sealed class Config
|
|||
|
||||
MaxCacheSizeBytes = suffix switch
|
||||
{
|
||||
null => num,
|
||||
'k' or 'K' => num * 1024,
|
||||
'm' or 'M' => num * 1024 * 1024,
|
||||
'g' or 'G' => num * 1024 * 1024 * 1024,
|
||||
null => num,
|
||||
'k' or 'K' => num * 1024L,
|
||||
'm' or 'M' => num * 1024L * 1024,
|
||||
'g' or 'G' => num * 1024L * 1024 * 1024,
|
||||
|
||||
_ => throw new Exception("Unsupported suffix, use one of: [K]ilobytes, [M]egabytes, [G]igabytes")
|
||||
};
|
||||
}
|
||||
|
@ -295,4 +297,4 @@ public sealed class Config
|
|||
[Range(0, int.MaxValue)] public int Completed { get; init; } = 100;
|
||||
[Range(0, int.MaxValue)] public int Failed { get; init; } = 10;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,6 @@ public static class ServiceExtensions
|
|||
// Singleton = instantiated once across application lifetime
|
||||
services
|
||||
.AddSingleton<HttpClient, CustomHttpClient>()
|
||||
.AddSingleton<UnrestrictedHttpClient>()
|
||||
.AddSingleton<HttpRequestService>()
|
||||
.AddSingleton<CronService>()
|
||||
.AddSingleton<QueueService>()
|
||||
|
|
31
Iceshrimp.Backend/Core/Extensions/StreamExtensions.cs
Normal file
31
Iceshrimp.Backend/Core/Extensions/StreamExtensions.cs
Normal file
|
@ -0,0 +1,31 @@
|
|||
using System.Buffers;
|
||||
|
||||
namespace Iceshrimp.Backend.Core.Extensions;
|
||||
|
||||
public static class StreamExtensions
|
||||
{
|
||||
public static async Task CopyToAsync(
|
||||
this Stream source, Stream destination, long? maxLength, CancellationToken cancellationToken
|
||||
)
|
||||
{
|
||||
var buffer = ArrayPool<byte>.Shared.Rent(81920);
|
||||
try
|
||||
{
|
||||
int bytesRead;
|
||||
var totalBytesRead = 0L;
|
||||
while ((maxLength == null || totalBytesRead <= maxLength) && (bytesRead = await DoRead()) != 0)
|
||||
{
|
||||
totalBytesRead += bytesRead;
|
||||
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
ValueTask<int> DoRead() => source.ReadAsync(new Memory<byte>(buffer), cancellationToken);
|
||||
}
|
||||
}
|
|
@ -365,16 +365,4 @@ public class CustomHttpClient : HttpClient
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class UnrestrictedHttpClient : CustomHttpClient
|
||||
{
|
||||
public UnrestrictedHttpClient(
|
||||
IOptions<Config.InstanceSection> options,
|
||||
IOptionsMonitor<Config.SecuritySection> security,
|
||||
ILoggerFactory loggerFactory
|
||||
) : base(options, security, loggerFactory)
|
||||
{
|
||||
MaxResponseContentBufferSize = int.MaxValue;
|
||||
}
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
using System.Buffers;
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using Iceshrimp.Backend.Core.Configuration;
|
||||
using Iceshrimp.Backend.Core.Database;
|
||||
|
@ -17,7 +18,7 @@ public class DriveService(
|
|||
[SuppressMessage("ReSharper", "SuggestBaseTypeForParameterInConstructor")]
|
||||
IOptionsSnapshot<Config.StorageSection> storageConfig,
|
||||
IOptions<Config.InstanceSection> instanceConfig,
|
||||
UnrestrictedHttpClient httpClient,
|
||||
HttpClient httpClient,
|
||||
QueueService queueSvc,
|
||||
ILogger<DriveService> logger,
|
||||
ImageProcessor imageProcessor
|
||||
|
@ -89,7 +90,7 @@ public class DriveService(
|
|||
|
||||
try
|
||||
{
|
||||
var res = await httpClient.GetAsync(uri);
|
||||
var res = await httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead);
|
||||
res.EnsureSuccessStatusCode();
|
||||
|
||||
var request = new DriveFileCreationRequest
|
||||
|
@ -101,7 +102,15 @@ public class DriveService(
|
|||
MimeType = CleanMimeType(mimeType ?? res.Content.Headers.ContentType?.MediaType)
|
||||
};
|
||||
|
||||
return await StoreFile(await res.Content.ReadAsStreamAsync(), user, request);
|
||||
var input = await res.Content.ReadAsStreamAsync();
|
||||
var maxLength = user.IsLocalUser
|
||||
? storageConfig.Value.MaxUploadSizeBytes
|
||||
: storageConfig.Value.MediaRetentionTimeSpan != null
|
||||
? storageConfig.Value.MaxCacheSizeBytes
|
||||
: 0;
|
||||
|
||||
var stream = await GetSafeStreamOrNullAsync(input, maxLength, res.Content.Headers.ContentLength);
|
||||
return await StoreFile(stream, user, request);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
@ -141,8 +150,7 @@ public class DriveService(
|
|||
throw GracefulException.UnprocessableEntity("Attachment is too large.");
|
||||
|
||||
DriveFile? file;
|
||||
|
||||
if (user.IsRemoteUser && input.Length > storageConfig.Value.MaxCacheSizeBytes)
|
||||
if (input == Stream.Null || user.IsRemoteUser && input.Length > storageConfig.Value.MaxCacheSizeBytes)
|
||||
{
|
||||
file = new DriveFile
|
||||
{
|
||||
|
@ -407,6 +415,37 @@ public class DriveService(
|
|||
? "application/octet-stream"
|
||||
: mimeType;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// We can't trust the Content-Length header, and it might be null.
|
||||
/// This makes sure that we only ever read up to maxLength into memory.
|
||||
/// </summary>
|
||||
/// <param name="stream">The response content stream</param>
|
||||
/// <param name="maxLength">The maximum length to buffer (null = unlimited)</param>
|
||||
/// <param name="contentLength">The content length, if known</param>
|
||||
/// <param name="token">A CancellationToken, if applicable</param>
|
||||
/// <returns>Either a buffered MemoryStream, or Stream.Null</returns>
|
||||
private static async Task<Stream> GetSafeStreamOrNullAsync(
|
||||
Stream stream, long? maxLength, long? contentLength, CancellationToken token = default
|
||||
)
|
||||
{
|
||||
if (maxLength is 0) return Stream.Null;
|
||||
if (contentLength > maxLength) return Stream.Null;
|
||||
|
||||
MemoryStream buf = new();
|
||||
if (contentLength < maxLength)
|
||||
maxLength = contentLength.Value;
|
||||
|
||||
await stream.CopyToAsync(buf, maxLength, token);
|
||||
if (maxLength == null || buf.Length <= maxLength)
|
||||
{
|
||||
buf.Seek(0, SeekOrigin.Begin);
|
||||
return buf;
|
||||
}
|
||||
|
||||
await buf.DisposeAsync();
|
||||
return Stream.Null;
|
||||
}
|
||||
}
|
||||
|
||||
public class DriveFileCreationRequest
|
||||
|
@ -458,4 +497,4 @@ file static class DriveFileExtensions
|
|||
ThumbnailAccessKey = file.ThumbnailAccessKey
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue