From e52f813ed2194502e506576959f0793cd3cbff7b Mon Sep 17 00:00:00 2001 From: Paul Schneider Date: Fri, 28 Jun 2019 00:49:20 +0100 Subject: [PATCH] fixes live info & filename url --- src/Yavsc/Helpers/FileSystemHelpers.cs | 9 ++-- src/Yavsc/Services/LiveProcessor.cs | 62 +++++++++++++++++++------- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/Yavsc/Helpers/FileSystemHelpers.cs b/src/Yavsc/Helpers/FileSystemHelpers.cs index b1241785..7715b862 100644 --- a/src/Yavsc/Helpers/FileSystemHelpers.cs +++ b/src/Yavsc/Helpers/FileSystemHelpers.cs @@ -247,14 +247,17 @@ public static FileRecievedInfo ReceiveProSignature(this ClaimsPrincipal user, st public static string GetFileUrl (this LiveFlow flow) { - if (flow.DifferedFileName==null) + if (flow.DifferedFileName==null) return null; // no server-side backup for this stream - return null; + return $"{Startup.UserFilesOptions.RequestPath}/{flow.Owner.UserName}/live/"+GetFileName(flow); + } + public static string GetFileName (this LiveFlow flow) + { var fileInfo = new FileInfo(flow.DifferedFileName); var ext = fileInfo.Extension; var namelen = flow.DifferedFileName.Length - ext.Length; var basename = flow.DifferedFileName.Substring(0,namelen); - return $"{Startup.UserFilesOptions.RequestPath}/{flow.Owner.UserName}/live/{basename}-{flow.SequenceNumber}{ext}"; + return $"{basename}-{flow.SequenceNumber}{ext}"; } } } diff --git a/src/Yavsc/Services/LiveProcessor.cs b/src/Yavsc/Services/LiveProcessor.cs index c8dd1dc2..b574f961 100644 --- a/src/Yavsc/Services/LiveProcessor.cs +++ b/src/Yavsc/Services/LiveProcessor.cs @@ -15,6 +15,8 @@ using Yavsc.Helpers; using Yavsc.Models; using Yavsc.ViewModels.Streaming; using Yavsc.Models.Messaging; +using Yavsc.Models.FileSystem; +using Newtonsoft.Json; namespace Yavsc.Services { @@ -53,9 +55,11 @@ namespace Yavsc.Services LiveCastHandler meta = null; if (Casters.ContainsKey(uname)) { + _logger.LogWarning($"Casters.ContainsKey({uname})"); meta = Casters[uname]; if (meta.Socket.State == WebSocketState.Open || meta.Socket.State == WebSocketState.Connecting ) { + _logger.LogWarning($"Closing cx"); // FIXME loosed connexion should be detected & disposed else where await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None); @@ -69,6 +73,7 @@ namespace Yavsc.Services } else { + _logger.LogInformation($"new caster"); // Accept the socket meta = new LiveCastHandler { Socket = await context.WebSockets.AcceptWebSocketAsync() }; } @@ -90,13 +95,13 @@ namespace Yavsc.Services WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, meta.TokenSource.Token); _logger.LogInformation($"Received bytes : {received.Count}"); _logger.LogInformation($"Is the end : {received.EndOfMessage}"); + const string livePath = "live"; - string destDir = context.User.InitPostToFileSystem("live"); + string destDir = context.User.InitPostToFileSystem(livePath); _logger.LogInformation($"Saving flow to {destDir}"); - string fileName = Path.Combine(destDir, flow.DifferedFileName); - FileInfo iFile = new FileInfo(fileName); - var fsio = new Queue>(); - var toFs = Task.Run( ()=> user.ReceiveUserFile(destDir, fsio, flow.DifferedFileName, flow.MediaType, meta.TokenSource.Token)); + string fileName = flow.GetFileName(); + var fsInputQueue = new Queue>(); + var taskWritingToFs = Task.Run( ()=> user.ReceiveUserFile(destDir, fsInputQueue, fileName, flow.MediaType, meta.TokenSource.Token)); var hubContext = GlobalHost.ConnectionManager.GetHubContext(); hubContext.Clients.All.addPublicStream(new PublicStreamInfo @@ -130,7 +135,7 @@ namespace Yavsc.Services ToClose.Push(cliItem.Key); } } - fsio.Enqueue(sBuffer); + fsInputQueue.Enqueue(sBuffer); // logger.LogInformation("replying..."); while (!received.CloseStatus.HasValue) { @@ -155,7 +160,7 @@ namespace Yavsc.Services ToClose.Push(cliItem.Key); } } - fsio.Enqueue(sBuffer); + fsInputQueue.Enqueue(sBuffer); _logger.LogInformation($"Received new bytes : {received.Count}"); _logger.LogInformation($"Is the end : {received.EndOfMessage}"); while (ToClose.Count >0) @@ -163,26 +168,36 @@ namespace Yavsc.Services string no = ToClose.Pop(); _logger.LogInformation("Closing follower connection"); WebSocket listenningSocket; - if (meta.Listeners.TryRemove(no, out listenningSocket)) - await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None); - + if (meta.Listeners.TryRemove(no, out listenningSocket)) { + await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, + "State != WebSocketState.Open", CancellationToken.None); + listenningSocket.Dispose(); + } } } _logger.LogInformation("Closing connection"); await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); + meta.Socket.Dispose(); + meta.TokenSource.Cancel(); - Casters[uname] = null; + taskWritingToFs.Wait(); + _logger.LogInformation("Resulting file : " +JsonConvert.SerializeObject(taskWritingToFs.Result)); + } catch (Exception ex) { _logger.LogError($"Exception occured : {ex.Message}"); _logger.LogError(ex.StackTrace); + await meta.Socket.CloseAsync(received.CloseStatus.Value, "exception occured", CancellationToken.None); meta.Socket.Dispose(); - Casters[uname] = null; + meta.TokenSource.Cancel(); } + taskWritingToFs.Dispose(); } else - { // not meta.Socket != null && meta.Socket.State == WebSocketState.Open + { + // Socket was not accepted open ... + // not (meta.Socket != null && meta.Socket.State == WebSocketState.Open) if (meta.Socket != null) { _logger.LogError($"meta.Socket.State not Open: {meta.Socket.State.ToString()} "); @@ -191,6 +206,9 @@ namespace Yavsc.Services else _logger.LogError("socket object is null"); } + + + RemoveLiveInfo(uname); } catch (IOException ex) { @@ -201,12 +219,22 @@ namespace Yavsc.Services else { _logger.LogError($"Really unexpected end of stream"); - } - await meta.Socket?.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, ex.Message, CancellationToken.None); - meta.Socket?.Dispose(); - Casters[uname] = null; + await meta.Socket?.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, ex.Message, CancellationToken.None); + } + meta.Socket?.Dispose(); + + RemoveLiveInfo(uname); } return true; } + void RemoveLiveInfo(string userName) + { + LiveCastHandler caster; + if (Casters.TryRemove(userName, out caster)) + _logger.LogInformation("removed live info"); + else + _logger.LogError("could not remove live info"); + + } } } \ No newline at end of file