A first flow was saved to disk

This commit is contained in:
2019-06-27 10:30:28 +01:00
parent cc54fb2083
commit 64832d6fef
13 changed files with 365 additions and 243 deletions

View File

@ -1,34 +0,0 @@
using System;
using System.Collections.Generic;
using Yavsc.ViewModels.Chat;
namespace Yavsc.Services
{
public interface IConnexionManager {
void SetUserName(string cxId, string userName);
string GetUserName (string cxId);
void OnConnected(string userName, bool isCop);
bool IsConnected(string candidate);
bool IsPresent(string roomName, string userName);
ChatRoomInfo Join(string roomName, string userName);
bool Part(string cxId, string roomName, string reason);
bool Kick(string cxId, string userName, string roomName, string reason);
bool Op(string roomName, string userName);
bool Deop(string roomName, string userName);
bool Hop(string roomName, string userName);
bool Dehop(string roomName, string userName);
bool TryGetChanInfo(string room, out ChatRoomInfo chanInfo);
IEnumerable<string> GetConnexionIds(string userName);
void Abort(string connectionId);
void SetErrorHandler(Action<string,string> errorHandler);
IEnumerable<ChannelShortInfo> ListChannels(string pattern);
}
}

View File

@ -0,0 +1,212 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Http;
using Microsoft.AspNet.SignalR;
using Microsoft.Data.Entity;
using Microsoft.Extensions.Logging;
using Yavsc.Helpers;
using Yavsc.Models;
using Yavsc.ViewModels.Streaming;
using Yavsc.Models.Messaging;
namespace Yavsc.Services
{
public class LiveProcessor : ILiveProcessor {
IHubContext _hubContext;
private ILogger _logger;
ApplicationDbContext _dbContext;
public PathString LiveCastingPath {get; set;} = Constants.LivePath;
public ConcurrentDictionary<string, LiveCastHandler> Casters {get;} = new ConcurrentDictionary<string, LiveCastHandler>();
public LiveProcessor(ApplicationDbContext dbContext, ILoggerFactory loggerFactory)
{
_dbContext = dbContext;
_hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
_logger = loggerFactory.CreateLogger<LiveProcessor>();
}
public async Task<bool> AcceptStream (HttpContext context)
{
// TODO defer request handling
var liveId = long.Parse(context.Request.Path.Value.Substring(LiveCastingPath.Value.Length + 1));
var userId = context.User.GetUserId();
var user = await _dbContext.Users.FirstAsync(u => u.Id == userId);
var uname = user.UserName;
var flow = _dbContext.LiveFlow.Include(f => f.Owner).SingleOrDefault(f => (f.OwnerId == userId && f.Id == liveId));
if (flow == null)
{
_logger.LogWarning("Aborting. Flow info was not found.");
context.Response.StatusCode = 400;
return false;
}
_logger.LogInformation("flow : "+flow.Title+" for "+uname);
LiveCastHandler meta = null;
if (Casters.ContainsKey(uname))
{
meta = Casters[uname];
if (meta.Socket.State == WebSocketState.Open || meta.Socket.State == WebSocketState.Connecting )
{
// FIXME loosed connexion should be detected & disposed else where
await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None);
}
if (!meta.TokenSource.IsCancellationRequested) {
meta.TokenSource.Cancel();
}
meta.Socket.Dispose();
meta.Socket = await context.WebSockets.AcceptWebSocketAsync();
meta.TokenSource = new CancellationTokenSource();
}
else
{
// Accept the socket
meta = new LiveCastHandler { Socket = await context.WebSockets.AcceptWebSocketAsync() };
}
_logger.LogInformation("Accepted web socket");
// Dispatch the flow
try
{
if (meta.Socket != null && meta.Socket.State == WebSocketState.Open)
{
Casters[uname] = meta;
// TODO: Handle the socket here.
// Find receivers: others in the chat room
// send them the flow
var buffer = new byte[Constants.WebSocketsMaxBufLen];
var sBuffer = new ArraySegment<byte>(buffer);
_logger.LogInformation("Receiving bytes...");
WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, meta.TokenSource.Token);
_logger.LogInformation($"Received bytes : {received.Count}");
_logger.LogInformation($"Is the end : {received.EndOfMessage}");
string destDir = context.User.InitPostToFileSystem("live");
_logger.LogInformation($"Saving flow to {destDir}");
string fileName = Path.Combine(destDir, flow.DifferedFileName);
FileInfo iFile = new FileInfo(fileName);
var fsio = new Queue<ArraySegment<byte>>();
var toFs = Task.Run( ()=> user.ReceiveUserFile(destDir, fsio, flow.DifferedFileName, flow.MediaType, meta.TokenSource.Token));
var hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
hubContext.Clients.All.addPublicStream(new PublicStreamInfo
{
id = flow.Id,
sender = flow.Owner.UserName,
title = flow.Title,
url = flow.GetFileUrl(),
mediaType = flow.MediaType
}, $"{flow.Owner.UserName} is starting a stream!");
Stack<string> ToClose = new Stack<string>();
try
{
_logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}");
// Echo anything we receive
// and send to all listner found
foreach (var cliItem in meta.Listeners)
{
var listenningSocket = cliItem.Value;
if (listenningSocket.State == WebSocketState.Open) {
await listenningSocket.SendAsync(
sBuffer, received.MessageType, received.EndOfMessage, meta.TokenSource.Token);
}
else
if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent)
{
ToClose.Push(cliItem.Key);
}
}
fsio.Enqueue(sBuffer);
// logger.LogInformation("replying...");
while (!received.CloseStatus.HasValue)
{
// reply echo
// await meta.Socket.SendAsync(new ArraySegment<byte>(buffer), received.MessageType, received.EndOfMessage, meta.TokenSource.Token);
_logger.LogInformation("Receiving new bytes...");
buffer = new byte[Constants.WebSocketsMaxBufLen];
sBuffer = new ArraySegment<byte>(buffer);
received = await meta.Socket.ReceiveAsync(sBuffer, meta.TokenSource.Token);
foreach (var cliItem in meta.Listeners)
{
var listenningSocket = cliItem.Value;
if (listenningSocket.State == WebSocketState.Open) {
await listenningSocket.SendAsync(
sBuffer, received.MessageType, received.EndOfMessage, meta.TokenSource.Token);
}
else
if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent)
{
ToClose.Push(cliItem.Key);
}
}
fsio.Enqueue(sBuffer);
_logger.LogInformation($"Received new bytes : {received.Count}");
_logger.LogInformation($"Is the end : {received.EndOfMessage}");
while (ToClose.Count >0)
{
string no = ToClose.Pop();
_logger.LogInformation("Closing follower connection");
WebSocket listenningSocket;
if (meta.Listeners.TryRemove(no, out listenningSocket))
await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None);
}
}
_logger.LogInformation("Closing connection");
await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None);
meta.TokenSource.Cancel();
Casters[uname] = null;
}
catch (Exception ex)
{
_logger.LogError($"Exception occured : {ex.Message}");
_logger.LogError(ex.StackTrace);
meta.Socket.Dispose();
Casters[uname] = null;
}
}
else
{ // not meta.Socket != null && meta.Socket.State == WebSocketState.Open
if (meta.Socket != null)
{
_logger.LogError($"meta.Socket.State not Open: {meta.Socket.State.ToString()} ");
meta.Socket.Dispose();
}
else
_logger.LogError("socket object is null");
}
}
catch (IOException ex)
{
if (ex.Message == "Unexpected end of stream")
{
_logger.LogError($"Unexpected end of stream");
}
else
{
_logger.LogError($"Really unexpected end of stream");
}
await meta.Socket?.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, ex.Message, CancellationToken.None);
meta.Socket?.Dispose();
Casters[uname] = null;
}
return true;
}
}
}