In the previous post I spoke about a custom MultipartFormData stream provider and how it can help us to manage some custom informations included in a request message.
In that example I generated chunks form a file and I sent those to a rest service (AKA Web API) with some additional informations that were then retrieved from the custom provider.
Now I want to use these informations to manage the upload session and merge all the chunks when received.
What I need to do is define the models involved in the process and the service that manage the chunks.
Models
We have to define two stuff, the first one is the model for the chunk:
public class ChunkMetadata { public string Filename { get; set; } public int ChunkNumber { get; set; } public ChunkMetadata(string filename, int chunkNumber) { Filename = filename; ChunkNumber = chunkNumber; } }
The ChunkNumber property deserves an explanation; is the number associated to the chunk and will be useful to understand the correct order when we’ll have to merge all of them.
The second one is the model of the session, that is the bunch of the chunks that compose the file.
First of all we define the interface:
public interface IUploadSession { ConcurrentBag<ChunkMetadata> Chunks { get; set; } string Filename { get; } long Filesize { get; } bool AddChunk(string filename, string chunkFileName, int chunkNumber, int totalChunks); Task MergeChunks(string path); }
The FileName and Filesize are closely tied to the session; we need AddChunk and MergeChunks methods as well.
We also need a thread safe collection for the chunks that compose the session, so we define a CuncurrentBag collection, that is the thread safe representation of the List.
Now we can implement the model:
public class UploadSession : IUploadSession { public string Filename { get; private set; } public long Filesize { get; private set; } private int _totalChunks; private int _chunksUploaded; public ConcurrentBag<ChunkMetadata> Chunks { get; set; } public UploadSession() { Filesize = 0; _chunksUploaded = 0; Chunks = new ConcurrentBag<ChunkMetadata>(); } public bool AddChunk(string filename, string chunkFileName, int chunkNumber, int totalChunks) { if (Filename == null) { Filename = filename; _totalChunks = totalChunks; } var metadata = new ChunkMetadata(chunkFileName, chunkNumber); Chunks.Add(metadata); _chunksUploaded = Interlocked.Increment(ref _chunksUploaded); return _chunksUploaded == _totalChunks; } public async Task MergeChunks(string path) { var filePath = path + Filename; using (var mainFile = new FileStream(filePath, FileMode.Create)) { foreach (var chunk in Chunks.OrderBy(c => c.ChunkNumber)) { using (var chunkFile = new FileStream(chunk.Filename, FileMode.Open)) { await chunkFile.CopyToAsync(mainFile); Filesize += chunkFile.Length; } } } foreach (var chunk in Chunks) { File.Delete(chunk.Filename); } } }
The implementation is quite simple.
The AddChunk method add the new chunk to the collection, then increment the _chunksUploaded property with the thread safe operation Interlocked.Increment; at the end, the method returns a bool that is true if all the chunks are received, otherwise false.
The MergeChunks method deal with the retrieve of all the chunks from the file system.
It gets the collection, order by the chunk number, read the bytes from the chunks and copy those to the main file stream.
After all, the chunks are deleted.
Service
The service will have an interface like this:
public interface IUploadService { Guid StartNewSession(); Task<bool> UploadChunk(HttpRequestMessage request); }
In my mind, the StartNewSession method will instantiate a new Session object and assign a new correlation id that is the unique identifier of the session.
This is the implementation:
public class UploadService : IUploadService { private readonly Context _db = new Context(); private readonly string _path; private readonly ConcurrentDictionary<string, UploadSession> _uploadSessions; public UploadService(string path) { _path = path; _uploadSessions = new ConcurrentDictionary<string, UploadSession>(); } public async Task<bool> UploadChunk(HttpRequestMessage request) { var provider = new CustomMultipartFormDataStreamProvider(_path); await request.Content.ReadAsMultipartAsync(provider); provider.ExtractValues(); UploadSession uploadSession; _uploadSessions.TryGetValue(provider.CorrelationId, out uploadSession); if (uploadSession == null) throw new ObjectNotFoundException(); var completed = uploadSession.AddChunk(provider.Filename, provider.ChunkFilename, provider.ChunkNumber, provider.TotalChunks); if (completed) { await uploadSession.MergeChunks(_path); var fileBlob = new FileBlob() { Id = Guid.NewGuid(), Path = _path + uploadSession.Filename, Name = uploadSession.Filename, Size = uploadSession.Filesize }; _db.FileBlobs.Add(fileBlob); await _db.SaveChangesAsync(); return true; } return false; } public Guid StartNewSession() { var correlationId = Guid.NewGuid(); var session = new UploadSession(); _uploadSessions.TryAdd(correlationId.ToString(), session); return correlationId; } }
In the StartNewSession method we use the thread safe method TryAdd to add a new session to the CuncurrentBag.
About the UploadChunk method, we seen the first part of the implementation in the previous post.
Once the metadata is retrieved from the request, we try to find the session object with a thread safe operation.
If we don’t find the object, of course we need to throw an exception because we expect that the related session exists.
If the session exists, we add the chunk to the session and we check the result of the operation.
If is the last chunk, we merge all of them and we can do a database operation if needed.
Controller
The implementation of the controller is very simple:
public class FileBlobsController : ApiController { private readonly IUploadService _fileBlobsService; private readonly Context _db = new Context(); public FileBlobsController(IUploadService uploadService) { _fileBlobsService = uploadService; } [Route("api/fileblobs/getcorrelationid")] [HttpGet] public IHttpActionResult GetCorrelationId() { return Ok(_fileBlobsService.StartNewSession()); } [HttpPost] public async Task<IHttpActionResult> PostFileBlob() { if (!Request.Content.IsMimeMultipartContent()) throw new Exception(); var result = await _fileBlobsService.UploadChunk(Request); return Ok(result); } }
You can find the source code here.
Leave a Reply