codes a streamer
This commit is contained in:
@ -1,14 +1,23 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Net.WebSockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using cli.Model;
|
||||
using Microsoft.Extensions.CommandLineUtils;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.OptionsModel;
|
||||
|
||||
namespace cli {
|
||||
|
||||
public class Streamer {
|
||||
public class Streamer: ICommander {
|
||||
private ClientWebSocket _client;
|
||||
private ILogger _logger;
|
||||
private ConnectionSettings _cxSettings;
|
||||
private UserConnectionSettings _userCxSettings;
|
||||
private CommandOption _fileOption;
|
||||
private CommandArgument _flowIdArg;
|
||||
private CancellationTokenSource _tokenSource;
|
||||
|
||||
public Streamer(ILoggerFactory loggerFactory,
|
||||
IOptions<ConnectionSettings> cxSettings,
|
||||
@ -21,5 +30,61 @@ namespace cli {
|
||||
_client = new ClientWebSocket();
|
||||
_client.Options.SetRequestHeader("Authorization", $"Bearer {_userCxSettings.AccessToken}");
|
||||
}
|
||||
|
||||
public CommandLineApplication Integrate(CommandLineApplication rootApp)
|
||||
{
|
||||
CommandLineApplication streamCmd = rootApp.Command("stream",
|
||||
(target) =>
|
||||
{
|
||||
target.FullName = "Stream to server";
|
||||
target.Description = "Stream arbitrary binary data to your server channel";
|
||||
_fileOption = target.Option("-f | --file", "use given file as input stream", CommandOptionType.SingleValue);
|
||||
_flowIdArg = target.Argument("flowId", "Remote Id for this channel", false);
|
||||
target.HelpOption("-? | -h | --help");
|
||||
});
|
||||
streamCmd.OnExecute(async() => await DoExecute());
|
||||
return streamCmd;
|
||||
}
|
||||
|
||||
private async Task <int> DoExecute()
|
||||
{
|
||||
if (_fileOption.HasValue()){
|
||||
var fi = new FileInfo(_fileOption.Value());
|
||||
if (!fi.Exists) {
|
||||
_logger.LogError("Input file doesn´t exist.");
|
||||
return -2;
|
||||
}
|
||||
using (var stream = fi.OpenRead())
|
||||
{
|
||||
await DoStream(stream);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
using(var stream = Console.OpenStandardInput())
|
||||
{
|
||||
await DoStream(stream);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
async Task DoStream (Stream stream)
|
||||
{
|
||||
_tokenSource = new CancellationTokenSource();
|
||||
await _client.ConnectAsync(
|
||||
new Uri(_cxSettings.StreamingUrl+"/"+_flowIdArg.Value),
|
||||
_tokenSource.Token);
|
||||
const int bufLen = 56*1024;
|
||||
byte [] buffer = new byte[bufLen];
|
||||
const int offset=0;
|
||||
int read = 0;
|
||||
do {
|
||||
read = await stream.ReadAsync(buffer, offset, bufLen);
|
||||
var segment = new ArraySegment<byte>(buffer, offset, read);
|
||||
await _client.SendAsync(new ArraySegment<byte>(buffer),
|
||||
WebSocketMessageType.Binary, false, _tokenSource.Token);
|
||||
} while (read>0);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
namespace cli
|
||||
{
|
||||
using System.ComponentModel.DataAnnotations.Schema;
|
||||
using System.Runtime.Serialization;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
public class ConnectionSettings
|
||||
@ -10,25 +11,35 @@ namespace cli
|
||||
public string Authority { get; set; }
|
||||
public string Audience { get; set; }
|
||||
public string SiteAccessSheme { get; set; } = "http";
|
||||
public int Port { get; set; }
|
||||
public string Scope { get; set; } = "profile";
|
||||
|
||||
[NotMapped]
|
||||
[JsonIgnore]
|
||||
public string AuthorizeUrl {get {
|
||||
return $"{SiteAccessSheme}://{Authority}/authorize";
|
||||
return Port==0 ? $"{SiteAccessSheme}://{Authority}/authorize" :
|
||||
$"{SiteAccessSheme}://{Authority}:{Port}/authorize" ;
|
||||
} }
|
||||
|
||||
[NotMapped]
|
||||
[JsonIgnore]
|
||||
public string RedirectUrl {get {
|
||||
return $"{SiteAccessSheme}://{Authority}/oauth/success";
|
||||
return Port==0 ? $"{SiteAccessSheme}://{Authority}/oauth/success" :
|
||||
$"{SiteAccessSheme}://{Authority}:{Port}/oauth/success" ;
|
||||
} }
|
||||
|
||||
[NotMapped]
|
||||
[JsonIgnore]
|
||||
public string AccessTokenUrl {get {
|
||||
return $"{SiteAccessSheme}://{Authority}/token";
|
||||
public string AccessTokenUrl { get {
|
||||
return Port==0 ? $"{SiteAccessSheme}://{Authority}/token":
|
||||
$"{SiteAccessSheme}://{Authority}:{Port}/token";
|
||||
} }
|
||||
|
||||
[NotMapped]
|
||||
[JsonIgnore]
|
||||
public string StreamingUrl { get {
|
||||
return Port==0 ? $"{SiteAccessSheme}://{Authority}/ws":
|
||||
$"{SiteAccessSheme}://{Authority}:{Port}/ws";
|
||||
} }
|
||||
|
||||
}
|
||||
}
|
8
src/cli/Settings/UserConnectionSettings.cs
Normal file
8
src/cli/Settings/UserConnectionSettings.cs
Normal file
@ -0,0 +1,8 @@
|
||||
public class UserConnectionSettings {
|
||||
public string UserName {get; set;}
|
||||
public string AccessToken {get; set;}
|
||||
public string TokenType {get; set;}
|
||||
public string ExpiresIn {get; set;}
|
||||
public string RefreshToken {get; set;}
|
||||
|
||||
}
|
Reference in New Issue
Block a user