a first file was sent by websocket

This commit is contained in:
2019-06-25 17:11:16 +01:00
parent e826a4edc9
commit cc54fb2083
6 changed files with 87 additions and 36 deletions

View File

@ -1,5 +1,6 @@
namespace Yavsc namespace Yavsc
{ {
using Microsoft.AspNet.Http;
using Yavsc.Models.Auth; using Yavsc.Models.Auth;
public static class Constants public static class Constants
@ -36,7 +37,7 @@ namespace Yavsc
YavscConnectionStringEnvName = "YAVSC_DB_CONNECTION"; YavscConnectionStringEnvName = "YAVSC_DB_CONNECTION";
public const int WebSocketsMaxBufLen = 6*1024; public const int WebSocketsMaxBufLen = 4*1024;
public static readonly long DefaultFSQ = 1024*1024*500; public static readonly long DefaultFSQ = 1024*1024*500;
@ -62,5 +63,6 @@ namespace Yavsc
public const int MaxUserNameLength = 26; public const int MaxUserNameLength = 26;
public const string LivePath = "/live/cast";
} }
} }

View File

@ -16,10 +16,12 @@ namespace Yavsc
{ {
var webSocketOptions = new WebSocketOptions() var webSocketOptions = new WebSocketOptions()
{ {
KeepAliveInterval = TimeSpan.FromSeconds(120), KeepAliveInterval = TimeSpan.FromSeconds(320),
ReceiveBufferSize = Constants.WebSocketsMaxBufLen, ReceiveBufferSize = Constants.WebSocketsMaxBufLen,
ReplaceFeature = true ReplaceFeature = false
}; };
app.UseWebSockets(webSocketOptions); app.UseWebSockets(webSocketOptions);
app.UseSignalR(Constants.SignalRPath); app.UseSignalR(Constants.SignalRPath);
} }

View File

@ -62,7 +62,7 @@ namespace Yavsc
// leave the final slash // leave the final slash
PathString liveCastingPath = "/live/cast"; PathString liveCastingPath = Constants.LivePath;
public Startup(IHostingEnvironment env, IApplicationEnvironment appEnv) public Startup(IHostingEnvironment env, IApplicationEnvironment appEnv)
{ {
@ -451,26 +451,28 @@ namespace Yavsc
if (context.WebSockets.IsWebSocketRequest) if (context.WebSockets.IsWebSocketRequest)
{ {
if (!context.User.Identity.IsAuthenticated) if (!context.User.Identity.IsAuthenticated)
context.Response.StatusCode = 403; context.Abort();
else else
{ {
// get the flow id from request path // get the flow id from request path
var castid = long.Parse(context.Request.Path.Value.Substring(liveCastingPath.Value.Length + 1)); var castid = long.Parse(context.Request.Path.Value.Substring(liveCastingPath.Value.Length + 1));
logger.LogInformation("Cast id : "+castid);
var uname = context.User.GetUserName(); var uname = context.User.GetUserName();
// ensure uniqueness of casting stream from this user // ensure uniqueness of casting stream from this user
var uid = context.User.GetUserId(); var uid = context.User.GetUserId();
// get some setup from user // get some setup from user
var flow = _dbContext.LiveFlow.Include(f => f.Owner).SingleOrDefault(f => (f.OwnerId == uid && f.Id == castid)); var flow = _dbContext.LiveFlow.Include(f => f.Owner).SingleOrDefault(f => (f.OwnerId == uid && f.Id == castid));
if (flow == null) if (flow == null)
{ {
context.Response.StatusCode = 400; logger.LogWarning("Aborting. Flow info was not found.");
var socket = await context.WebSockets.AcceptWebSocketAsync();
await socket.CloseAsync(WebSocketCloseStatus.PolicyViolation, "flow id invalid", CancellationToken.None);
} }
else else
{ {
logger.LogInformation("flow : "+flow.Title+" for "+uname);
LiveCastMeta meta = null; LiveCastMeta meta = null;
if (LiveApiController.Casters.ContainsKey(uname)) if (LiveApiController.Casters.ContainsKey(uname))
{ {
@ -478,6 +480,7 @@ namespace Yavsc
if (meta.Socket.State != WebSocketState.Closed) if (meta.Socket.State != WebSocketState.Closed)
{ {
// FIXME loosed connexion should be detected & disposed else where // FIXME loosed connexion should be detected & disposed else where
await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None);
meta.Socket.Dispose(); meta.Socket.Dispose();
meta.Socket = await context.WebSockets.AcceptWebSocketAsync(); meta.Socket = await context.WebSockets.AcceptWebSocketAsync();
} }
@ -497,22 +500,20 @@ namespace Yavsc
// Dispatch the flow // Dispatch the flow
try try
{ {
if (meta.Socket != null && meta.Socket.State == WebSocketState.Open) if (meta.Socket != null && meta.Socket.State == WebSocketState.Open)
{ {
LiveApiController.Casters[uname] = meta; LiveApiController.Casters[uname] = meta;
// TODO: Handle the socket here. // TODO: Handle the socket here.
// Find receivers: others in the chat room // Find receivers: others in the chat room
// send them the flow // send them the flow
var buffer = new byte[Constants.WebSocketsMaxBufLen];
var sBuffer = new ArraySegment<byte>(new byte[1024]); var sBuffer = new ArraySegment<byte>(buffer);
logger.LogInformation("Receiving bytes..."); logger.LogInformation("Receiving bytes...");
WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, CancellationToken.None); WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, CancellationToken.None);
logger.LogInformation("Received bytes!!!!"); logger.LogInformation($"Received bytes : {received.Count}");
logger.LogInformation($"Is the end : {received.EndOfMessage}");
/* TODO
var hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>(); var hubContext = GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
hubContext.Clients.All.addPublicStream(new hubContext.Clients.All.addPublicStream(new
@ -523,42 +524,59 @@ namespace Yavsc
url = flow.GetFileUrl(), url = flow.GetFileUrl(),
mediaType = flow.MediaType mediaType = flow.MediaType
}, $"{flow.Owner.UserName} is starting a stream!"); }, $"{flow.Owner.UserName} is starting a stream!");
*/
// FIXME do we really need to close those one in invalid state ? // FIXME do we really need to close those one in invalid state ?
Stack<string> ToClose = new Stack<string>(); // Stack<string> ToClose = new Stack<string>();
try try
{ {
while (received.MessageType != WebSocketMessageType.Close) /*
{
logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}"); logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}");
// Echo anything we receive // Echo anything we receive
// and send to all listner found // and send to all listner found
foreach (var cliItem in meta.Listeners) foreach (var cliItem in meta.Listeners)
{ {
var listenningSocket = cliItem.Value; var listenningSocket = cliItem.Value;
if (listenningSocket.State == WebSocketState.Open) if (listenningSocket.State == WebSocketState.Open) {
await listenningSocket.SendAsync( await listenningSocket.SendAsync(
sBuffer, received.MessageType, received.EndOfMessage, CancellationToken.None); sBuffer, received.MessageType, received.EndOfMessage, CancellationToken.None);
}
else else
if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent) if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent)
{ {
ToClose.Push(cliItem.Key); ToClose.Push(cliItem.Key);
} }
} }
received = await meta.Socket.ReceiveAsync(sBuffer, CancellationToken.None); */
// logger.LogInformation("replying...");
string no; while (!received.CloseStatus.HasValue)
do
{ {
no = ToClose.Pop();
// await meta.Socket.SendAsync(new ArraySegment<byte>(buffer), received.MessageType, received.EndOfMessage, CancellationToken.None);
logger.LogInformation("Receiving new bytes...");
received = await meta.Socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
logger.LogInformation($"Received new bytes : {received.Count}");
logger.LogInformation($"Is the end : {received.EndOfMessage}");
}
logger.LogInformation("Closing connection");
await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None);
/* while (ToClose.Count >0)
{
string no = ToClose.Pop();
WebSocket listenningSocket; WebSocket listenningSocket;
if (meta.Listeners.TryRemove(no, out listenningSocket)) if (meta.Listeners.TryRemove(no, out listenningSocket))
await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None); await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None);
} while (no != null); } */
}
await meta.Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "eof", CancellationToken.None);
LiveApiController.Casters[uname] = null; LiveApiController.Casters[uname] = null;
} }
catch (Exception ex) catch (Exception ex)
@ -597,6 +615,9 @@ namespace Yavsc
} }
} }
} }
else {
context.Response.StatusCode = 400;
}
} }
else else
{ {

View File

@ -49,6 +49,7 @@ namespace cli {
private async Task <int> DoExecute() private async Task <int> DoExecute()
{ {
if (_fileOption.HasValue()){ if (_fileOption.HasValue()){
var fi = new FileInfo(_fileOption.Value()); var fi = new FileInfo(_fileOption.Value());
if (!fi.Exists) { if (!fi.Exists) {
@ -57,6 +58,7 @@ namespace cli {
} }
using (var stream = fi.OpenRead()) using (var stream = fi.OpenRead())
{ {
_logger.LogInformation("DoExecute from given file");
await DoStream(stream); await DoStream(stream);
} }
return 0; return 0;
@ -65,6 +67,7 @@ namespace cli {
{ {
using(var stream = Console.OpenStandardInput()) using(var stream = Console.OpenStandardInput())
{ {
_logger.LogInformation("DoExecute from standard input");
await DoStream(stream); await DoStream(stream);
} }
return 0; return 0;
@ -72,20 +75,39 @@ namespace cli {
} }
async Task DoStream (Stream stream) async Task DoStream (Stream stream)
{ {
_tokenSource = new CancellationTokenSource(); _tokenSource = new CancellationTokenSource();
await _client.ConnectAsync( var url = _cxSettings.StreamingUrl+"/"+_flowIdArg.Value;
new Uri(_cxSettings.StreamingUrl+"/"+_flowIdArg.Value),
_tokenSource.Token); _logger.LogInformation("Connecting to "+url);
await _client.ConnectAsync(new Uri(url), _tokenSource.Token);
_logger.LogInformation("Connected");
const int bufLen = Constants.WebSocketsMaxBufLen; const int bufLen = Constants.WebSocketsMaxBufLen;
byte [] buffer = new byte[bufLen]; byte [] buffer = new byte[bufLen];
const int offset=0; const int offset=0;
int read = 0; int read = 0;
/*
var reciving = Task.Run(async ()=> {
byte [] readbuffer = new byte[bufLen];
var rb = new ArraySegment<byte>(readbuffer, 0, bufLen);
bool continueReading = false;
do {
var result = await _client.ReceiveAsync(rb, _tokenSource.Token);
_logger.LogInformation($"received {result.Count} bytes");
continueReading = !result.CloseStatus.HasValue;
} while (continueReading);
} ); */
do { do {
read = await stream.ReadAsync(buffer, offset, bufLen); read = await stream.ReadAsync(buffer, offset, bufLen);
var segment = new ArraySegment<byte>(buffer, offset, read); var segment = new ArraySegment<byte>(buffer, offset, read);
await _client.SendAsync(new ArraySegment<byte>(buffer), bool end = read < bufLen;
WebSocketMessageType.Binary, false, _tokenSource.Token); await _client.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, end, _tokenSource.Token);
} while (read>0); _logger.LogInformation($"sent {read} bytes end:{end} ");
} while (read>0 && stream.CanRead );
// reciving.Wait();
await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "EOF", _tokenSource.Token);
} }
} }
} }

View File

@ -105,6 +105,8 @@ namespace cli
// calling a Startup sequence // calling a Startup sequence
var appBuilder = ConfigureApplication(); var appBuilder = ConfigureApplication();
var loggerFactory = appBuilder.ApplicationServices.GetRequiredService<ILoggerFactory>(); var loggerFactory = appBuilder.ApplicationServices.GetRequiredService<ILoggerFactory>();
var cxSettings = appBuilder.ApplicationServices.GetRequiredService<IOptions<ConnectionSettings>>();
var usercxSettings = appBuilder.ApplicationServices.GetRequiredService<IOptions<UserConnectionSettings>>();
CommandOption rootCommandHelpOption = cliapp.HelpOption("-? | -h | --help"); CommandOption rootCommandHelpOption = cliapp.HelpOption("-? | -h | --help");
@ -113,6 +115,7 @@ namespace cli
(new AuthCommander(loggerFactory)).Integrate(cliapp); (new AuthCommander(loggerFactory)).Integrate(cliapp);
(new CiBuildCommand()).Integrate(cliapp); (new CiBuildCommand()).Integrate(cliapp);
(new GenerationCommander()).Integrate(cliapp); (new GenerationCommander()).Integrate(cliapp);
(new Streamer(loggerFactory, cxSettings, usercxSettings )).Integrate(cliapp);
if (args.Length == 0) if (args.Length == 0)
{ {

View File

@ -3,6 +3,7 @@ namespace cli
using System.ComponentModel.DataAnnotations.Schema; using System.ComponentModel.DataAnnotations.Schema;
using System.Runtime.Serialization; using System.Runtime.Serialization;
using Newtonsoft.Json; using Newtonsoft.Json;
using Yavsc;
public class ConnectionSettings public class ConnectionSettings
{ {
@ -38,8 +39,8 @@ namespace cli
[NotMapped] [NotMapped]
[JsonIgnore] [JsonIgnore]
public string StreamingUrl { get { public string StreamingUrl { get {
return Port==0 ? $"{SiteAccessSheme}://{Authority}/ws": return Port==0 ? $"ws://{Authority}"+Constants.LivePath:
$"{SiteAccessSheme}://{Authority}:{Port}/ws"; $"ws://{Authority}:{Port}"+Constants.LivePath;
} } } }
} }
} }