Import SocketIO code

This commit is contained in:
theolivenbaum
2022-07-14 16:53:54 +02:00
parent 73330185de
commit e2615a8dc1
46 changed files with 3182 additions and 2 deletions

View File

@@ -0,0 +1,64 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
namespace SocketIOClient.Newtonsoft.Json
{
class ByteArrayConverter : JsonConverter
{
public ByteArrayConverter()
{
Bytes = new List<byte[]>();
}
internal List<byte[]> Bytes { get; }
public override bool CanConvert(Type objectType)
{
return objectType == typeof(byte[]);
}
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, global::Newtonsoft.Json.JsonSerializer serializer)
{
byte[] bytes = null;
if (reader.TokenType == JsonToken.StartObject)
{
reader.Read();
if (reader.TokenType == JsonToken.PropertyName && reader.Value?.ToString() == "_placeholder")
{
reader.Read();
if (reader.TokenType == JsonToken.Boolean && (bool)reader.Value)
{
reader.Read();
if (reader.TokenType == JsonToken.PropertyName && reader.Value?.ToString() == "num")
{
reader.Read();
if (reader.Value != null)
{
if (int.TryParse(reader.Value.ToString(), out int num))
{
bytes = Bytes[num];
reader.Read();
}
}
}
}
}
}
return bytes;
}
public override void WriteJson(JsonWriter writer, object value, global::Newtonsoft.Json.JsonSerializer serializer)
{
var source = value as byte[];
Bytes.Add(source.ToArray());
writer.WriteStartObject();
writer.WritePropertyName("_placeholder");
writer.WriteValue(true);
writer.WritePropertyName("num");
writer.WriteValue(Bytes.Count - 1);
writer.WriteEndObject();
}
}
}

View File

@@ -0,0 +1,11 @@
namespace SocketIOClient
{
public class DisconnectReason
{
public static string IOServerDisconnect = "io server disconnect";
public static string IOClientDisconnect = "io client disconnect";
public static string PingTimeout = "ping timeout";
public static string TransportClose = "transport close";
public static string TransportError = "transport error";
}
}

View File

@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
namespace SocketIOClient
{
public delegate void OnAnyHandler(string eventName, SocketIOResponse response);
public delegate void OnOpenedHandler(string sid, int pingInterval, int pingTimeout);
//public delegate void OnDisconnectedHandler(string sid, int pingInterval, int pingTimeout);
public delegate void OnAck(int packetId, List<JsonElement> array);
public delegate void OnBinaryAck(int packetId, int totalCount, List<JsonElement> array);
public delegate void OnBinaryReceived(int packetId, int totalCount, string eventName, List<JsonElement> array);
public delegate void OnDisconnected();
public delegate void OnError(string error);
public delegate void OnEventReceived(int packetId, string eventName, List<JsonElement> array);
public delegate void OnOpened(string sid, int pingInterval, int pingTimeout);
public delegate void OnPing();
public delegate void OnPong();
}

View File

@@ -0,0 +1,18 @@
using System;
using System.Threading;
namespace SocketIOClient.Extensions
{
internal static class CancellationTokenSourceExtensions
{
public static void TryDispose(this CancellationTokenSource cts)
{
cts?.Dispose();
}
public static void TryCancel(this CancellationTokenSource cts)
{
cts?.Cancel();
}
}
}

View File

@@ -0,0 +1,12 @@
using System;
namespace SocketIOClient.Extensions
{
internal static class DisposableExtensions
{
public static void TryDispose(this IDisposable disposable)
{
disposable?.Dispose();
}
}
}

View File

@@ -0,0 +1,17 @@
using System;
namespace SocketIOClient.Extensions
{
internal static class EventHandlerExtensions
{
public static void TryInvoke<T>(this EventHandler<T> handler, object sender, T args)
{
handler?.Invoke(sender, args);
}
public static void TryInvoke(this EventHandler handler, object sender, EventArgs args)
{
handler?.Invoke(sender, args);
}
}
}

View File

@@ -0,0 +1,30 @@
using System;
namespace SocketIOClient.Extensions
{
internal static class SocketIOEventExtensions
{
public static void TryInvoke(this OnAnyHandler handler, string eventName, SocketIOResponse response)
{
try
{
handler(eventName, response);
}
catch
{
// The exception is thrown by the user code, so it can be swallowed
}
}
public static void TryInvoke(this Action<SocketIOResponse> handler, SocketIOResponse response)
{
try
{
handler(response);
}
catch
{
// The exception is thrown by the user code, so it can be swallowed
}
}
}
}

View File

@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
namespace SocketIOClient.JsonSerializer
{
class ByteArrayConverter : JsonConverter<byte[]>
{
public ByteArrayConverter()
{
Bytes = new List<byte[]>();
}
public List<byte[]> Bytes { get; }
public override byte[] Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
byte[] bytes = null;
if (reader.TokenType == JsonTokenType.StartObject)
{
reader.Read();
if (reader.TokenType == JsonTokenType.PropertyName && reader.GetString() == "_placeholder")
{
reader.Read();
if (reader.TokenType == JsonTokenType.True && reader.GetBoolean())
{
reader.Read();
if (reader.TokenType == JsonTokenType.PropertyName && reader.GetString() == "num")
{
reader.Read();
int num = reader.GetInt32();
bytes = Bytes[num];
reader.Read();
}
}
}
}
return bytes;
}
public override void Write(Utf8JsonWriter writer, byte[] value, JsonSerializerOptions options)
{
Bytes.Add(value);
writer.WriteStartObject();
writer.WritePropertyName("_placeholder");
writer.WriteBooleanValue(true);
writer.WritePropertyName("num");
writer.WriteNumberValue(Bytes.Count - 1);
writer.WriteEndObject();
}
}
}

View File

@@ -0,0 +1,11 @@
using System.Collections.Generic;
namespace SocketIOClient.JsonSerializer
{
public interface IJsonSerializer
{
JsonSerializeResult Serialize(object[] data);
T Deserialize<T>(string json);
T Deserialize<T>(string json, IList<byte[]> incomingBytes);
}
}

View File

@@ -0,0 +1,10 @@
using System.Collections.Generic;
namespace SocketIOClient.JsonSerializer
{
public class JsonSerializeResult
{
public string Json { get; set; }
public IList<byte[]> Bytes { get; set; }
}
}

View File

@@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Text.Json;
namespace SocketIOClient.JsonSerializer
{
public class SystemTextJsonSerializer : IJsonSerializer
{
public JsonSerializeResult Serialize(object[] data)
{
var converter = new ByteArrayConverter();
var options = GetOptions();
options.Converters.Add(converter);
string json = System.Text.Json.JsonSerializer.Serialize(data, options);
return new JsonSerializeResult
{
Json = json,
Bytes = converter.Bytes
};
}
public T Deserialize<T>(string json)
{
var options = GetOptions();
return System.Text.Json.JsonSerializer.Deserialize<T>(json, options);
}
public T Deserialize<T>(string json, IList<byte[]> bytes)
{
var options = GetOptions();
var converter = new ByteArrayConverter();
options.Converters.Add(converter);
converter.Bytes.AddRange(bytes);
return System.Text.Json.JsonSerializer.Deserialize<T>(json, options);
}
private JsonSerializerOptions GetOptions()
{
JsonSerializerOptions options = null;
if (OptionsProvider != null)
{
options = OptionsProvider();
}
if (options == null)
{
options = new JsonSerializerOptions();
}
return options;
}
public Func<JsonSerializerOptions> OptionsProvider { get; set; }
}
}

View File

@@ -0,0 +1,23 @@
Code from https://github.com/doghappy/socket.io-client-csharp
MIT License
Copyright (c) 2019 HeroWong
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,100 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
namespace SocketIOClient.Messages
{
public class BinaryMessage : IMessage
{
public MessageType Type => MessageType.BinaryMessage;
public string Namespace { get; set; }
public string Event { get; set; }
public int Id { get; set; }
public List<JsonElement> JsonElements { get; set; }
public string Json { get; set; }
public int BinaryCount { get; set; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public void Read(string msg)
{
int index1 = msg.IndexOf('-');
BinaryCount = int.Parse(msg.Substring(0, index1));
int index2 = msg.IndexOf('[');
int index3 = msg.LastIndexOf(',', index2);
if (index3 > -1)
{
Namespace = msg.Substring(index1 + 1, index3 - index1 - 1);
int idLength = index2 - index3 - 1;
if (idLength > 0)
{
Id = int.Parse(msg.Substring(index3 + 1, idLength));
}
}
else
{
int idLength = index2 - index1 - 1;
if (idLength > 0)
{
Id = int.Parse(msg.Substring(index1 + 1, idLength));
}
}
string json = msg.Substring(index2);
var array = JsonDocument.Parse(json).RootElement.EnumerateArray();
int i = -1;
foreach (var item in array)
{
i++;
if (i == 0)
{
Event = item.GetString();
JsonElements = new List<JsonElement>();
}
else
{
JsonElements.Add(item);
}
}
}
public string Write()
{
var builder = new StringBuilder();
builder
.Append("45")
.Append(OutgoingBytes.Count)
.Append('-');
if (!string.IsNullOrEmpty(Namespace))
{
builder.Append(Namespace).Append(',');
}
if (string.IsNullOrEmpty(Json))
{
builder.Append("[\"").Append(Event).Append("\"]");
}
else
{
string data = Json.Insert(1, $"\"{Event}\",");
builder.Append(data);
}
return builder.ToString();
}
}
}

View File

@@ -0,0 +1,75 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
namespace SocketIOClient.Messages
{
/// <summary>
/// The server calls the client's callback
/// </summary>
public class ClientAckMessage : IMessage
{
public MessageType Type => MessageType.AckMessage;
public string Namespace { get; set; }
public string Event { get; set; }
public List<JsonElement> JsonElements { get; set; }
public string Json { get; set; }
public int Id { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public void Read(string msg)
{
int index = msg.IndexOf('[');
int lastIndex = msg.LastIndexOf(',', index);
if (lastIndex > -1)
{
string text = msg.Substring(0, index);
Namespace = text.Substring(0, lastIndex);
Id = int.Parse(text.Substring(lastIndex + 1));
}
else
{
Id = int.Parse(msg.Substring(0, index));
}
msg = msg.Substring(index);
JsonElements = JsonDocument.Parse(msg).RootElement.EnumerateArray().ToList();
}
public string Write()
{
var builder = new StringBuilder();
builder.Append("42");
if (!string.IsNullOrEmpty(Namespace))
{
builder.Append(Namespace).Append(',');
}
builder.Append(Id);
if (string.IsNullOrEmpty(Json))
{
builder.Append("[\"").Append(Event).Append("\"]");
}
else
{
string data = Json.Insert(1, $"\"{Event}\",");
builder.Append(data);
}
return builder.ToString();
}
}
}

View File

@@ -0,0 +1,82 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
namespace SocketIOClient.Messages
{
/// <summary>
/// The server calls the client's callback with binary
/// </summary>
public class ClientBinaryAckMessage : IMessage
{
public MessageType Type => MessageType.BinaryAckMessage;
public string Namespace { get; set; }
public string Event { get; set; }
public List<JsonElement> JsonElements { get; set; }
public string Json { get; set; }
public int Id { get; set; }
public int BinaryCount { get; set; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public void Read(string msg)
{
int index1 = msg.IndexOf('-');
BinaryCount = int.Parse(msg.Substring(0, index1));
int index2 = msg.IndexOf('[');
int index3 = msg.LastIndexOf(',', index2);
if (index3 > -1)
{
Namespace = msg.Substring(index1 + 1, index3 - index1 - 1);
Id = int.Parse(msg.Substring(index3 + 1, index2 - index3 - 1));
}
else
{
Id = int.Parse(msg.Substring(index1 + 1, index2 - index1 - 1));
}
string json = msg.Substring(index2);
JsonElements = JsonDocument.Parse(json).RootElement.EnumerateArray().ToList();
}
public string Write()
{
var builder = new StringBuilder();
builder
.Append("45")
.Append(OutgoingBytes.Count)
.Append('-');
if (!string.IsNullOrEmpty(Namespace))
{
builder.Append(Namespace).Append(',');
}
builder.Append(Id);
if (string.IsNullOrEmpty(Json))
{
builder.Append("[\"").Append(Event).Append("\"]");
}
else
{
string data = Json.Insert(1, $"\"{Event}\",");
builder.Append(data);
}
return builder.ToString();
}
}
}

View File

@@ -0,0 +1,128 @@
using System;
using SocketIOClient.Transport;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
namespace SocketIOClient.Messages
{
public class ConnectedMessage : IMessage
{
public MessageType Type => MessageType.Connected;
public string Namespace { get; set; }
public string Sid { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public IEnumerable<KeyValuePair<string, string>> Query { get; set; }
public string AuthJsonStr { get; set; }
public void Read(string msg)
{
if (Eio == 3)
{
Eio3Read(msg);
}
else
{
Eio4Read(msg);
}
}
public string Write()
{
if (Eio == 3)
{
return Eio3Write();
}
return Eio4Write();
}
public void Eio4Read(string msg)
{
int index = msg.IndexOf('{');
if (index > 0)
{
Namespace = msg.Substring(0, index - 1);
msg = msg.Substring(index);
}
else
{
Namespace = string.Empty;
}
Sid = JsonDocument.Parse(msg).RootElement.GetProperty("sid").GetString();
}
public string Eio4Write()
{
var builder = new StringBuilder("40");
if (!string.IsNullOrEmpty(Namespace))
{
builder.Append(Namespace).Append(',');
}
builder.Append(AuthJsonStr);
return builder.ToString();
}
public void Eio3Read(string msg)
{
if (msg.Length >= 2)
{
int startIndex = msg.IndexOf('/');
if (startIndex == -1)
{
return;
}
int endIndex = msg.IndexOf('?', startIndex);
if (endIndex == -1)
{
endIndex = msg.IndexOf(',', startIndex);
}
if (endIndex == -1)
{
endIndex = msg.Length;
}
Namespace = msg.Substring(startIndex, endIndex);
}
}
public string Eio3Write()
{
if (string.IsNullOrEmpty(Namespace))
{
return string.Empty;
}
var builder = new StringBuilder("40");
builder.Append(Namespace);
if (Query != null)
{
int i = -1;
foreach (var item in Query)
{
i++;
if (i == 0)
{
builder.Append('?');
}
else
{
builder.Append('&');
}
builder.Append(item.Key).Append('=').Append(item.Value);
}
}
builder.Append(',');
return builder.ToString();
}
}
}

View File

@@ -0,0 +1,36 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
namespace SocketIOClient.Messages
{
public class DisconnectedMessage : IMessage
{
public MessageType Type => MessageType.Disconnected;
public string Namespace { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public void Read(string msg)
{
Namespace = msg.TrimEnd(',');
}
public string Write()
{
if (string.IsNullOrEmpty(Namespace))
{
return "41";
}
return "41" + Namespace + ",";
}
}
}

View File

@@ -0,0 +1,50 @@
using SocketIOClient.Transport;
using System;
using System.Collections.Generic;
using System.Text.Json;
namespace SocketIOClient.Messages
{
public class ErrorMessage : IMessage
{
public MessageType Type => MessageType.ErrorMessage;
public string Message { get; set; }
public string Namespace { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public void Read(string msg)
{
if (Eio == 3)
{
Message = msg.Trim('"');
}
else
{
int index = msg.IndexOf('{');
if (index > 0)
{
Namespace = msg.Substring(0, index - 1);
msg = msg.Substring(index);
}
var doc = JsonDocument.Parse(msg);
Message = doc.RootElement.GetProperty("message").GetString();
}
}
public string Write()
{
throw new NotImplementedException();
}
}
}

View File

@@ -0,0 +1,97 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
namespace SocketIOClient.Messages
{
public class EventMessage : IMessage
{
public MessageType Type => MessageType.EventMessage;
public string Namespace { get; set; }
public string Event { get; set; }
public int Id { get; set; }
public List<JsonElement> JsonElements { get; set; }
public string Json { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public void Read(string msg)
{
int index = msg.IndexOf('[');
int lastIndex = msg.LastIndexOf(',', index);
if (lastIndex > -1)
{
string text = msg.Substring(0, index);
Namespace = text.Substring(0, lastIndex);
if (index - lastIndex > 1)
{
Id = int.Parse(text.Substring(lastIndex + 1));
}
}
else
{
if (index > 0)
{
Id = int.Parse(msg.Substring(0, index));
}
}
msg = msg.Substring(index);
//int index = msg.IndexOf('[');
//if (index > 0)
//{
// Namespace = msg.Substring(0, index - 1);
// msg = msg.Substring(index);
//}
var array = JsonDocument.Parse(msg).RootElement.EnumerateArray();
int i = -1;
foreach (var item in array)
{
i++;
if (i == 0)
{
Event = item.GetString();
JsonElements = new List<JsonElement>();
}
else
{
JsonElements.Add(item);
}
}
}
public string Write()
{
var builder = new StringBuilder();
builder.Append("42");
if (!string.IsNullOrEmpty(Namespace))
{
builder.Append(Namespace).Append(',');
}
if (string.IsNullOrEmpty(Json))
{
builder.Append("[\"").Append(Event).Append("\"]");
}
else
{
string data = Json.Insert(1, $"\"{Event}\",");
builder.Append(data);
}
return builder.ToString();
}
}
}

View File

@@ -0,0 +1,30 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
namespace SocketIOClient.Messages
{
public interface IMessage
{
MessageType Type { get; }
List<byte[]> OutgoingBytes { get; set; }
List<byte[]> IncomingBytes { get; set; }
int BinaryCount { get; }
int Eio { get; set; }
TransportProtocol Protocol { get; set; }
void Read(string msg);
//void Eio3WsRead(string msg);
//void Eio3HttpRead(string msg);
string Write();
//string Eio3WsWrite();
}
}

View File

@@ -0,0 +1,75 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace SocketIOClient.Messages
{
public static class MessageFactory
{
private static IMessage CreateMessage(MessageType type)
{
switch (type)
{
case MessageType.Opened:
return new OpenedMessage();
case MessageType.Ping:
return new PingMessage();
case MessageType.Pong:
return new PongMessage();
case MessageType.Connected:
return new ConnectedMessage();
case MessageType.Disconnected:
return new DisconnectedMessage();
case MessageType.EventMessage:
return new EventMessage();
case MessageType.AckMessage:
return new ClientAckMessage();
case MessageType.ErrorMessage:
return new ErrorMessage();
case MessageType.BinaryMessage:
return new BinaryMessage();
case MessageType.BinaryAckMessage:
return new ClientBinaryAckMessage();
}
return null;
}
private static readonly Dictionary<string, MessageType> _messageTypes = Enum.GetValues<MessageType>().ToDictionary(v => ((int)v).ToString(), v => v);
public static IMessage CreateMessage(int eio, string msg)
{
foreach (var (prefix,item) in _messageTypes)
{
if (msg.StartsWith(prefix))
{
IMessage result = CreateMessage(item);
if (result != null)
{
result.Eio = eio;
result.Read(msg.Substring(prefix.Length));
return result;
}
}
}
return null;
}
public static OpenedMessage CreateOpenedMessage(string msg)
{
var openedMessage = new OpenedMessage();
if (msg[0] == '0')
{
openedMessage.Eio = 4;
openedMessage.Read(msg.Substring(1));
}
else
{
openedMessage.Eio = 3;
int index = msg.IndexOf(':');
openedMessage.Read(msg.Substring(index + 2));
}
return openedMessage;
}
}
}

View File

@@ -0,0 +1,16 @@
namespace SocketIOClient.Messages
{
public enum MessageType
{
Opened,
Ping = 2,
Pong,
Connected = 40,
Disconnected,
EventMessage,
AckMessage,
ErrorMessage,
BinaryMessage,
BinaryAckMessage
}
}

View File

@@ -0,0 +1,79 @@
using System;
using System.Text.Json;
using System.Collections.Generic;
using SocketIOClient.Transport;
namespace SocketIOClient.Messages
{
public class OpenedMessage : IMessage
{
public MessageType Type => MessageType.Opened;
public string Sid { get; set; }
public string Namespace { get; set; }
public List<string> Upgrades { get; private set; }
public int PingInterval { get; private set; }
public int PingTimeout { get; private set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
private int GetInt32FromJsonElement(JsonElement element, string msg, string name)
{
var p = element.GetProperty(name);
int val;
switch (p.ValueKind)
{
case JsonValueKind.String:
val = int.Parse(p.GetString());
break;
case JsonValueKind.Number:
val = p.GetInt32();
break;
default:
throw new ArgumentException($"Invalid message: '{msg}'");
}
return val;
}
public void Read(string msg)
{
var doc = JsonDocument.Parse(msg);
var root = doc.RootElement;
Sid = root.GetProperty("sid").GetString();
PingInterval = GetInt32FromJsonElement(root, msg, "pingInterval");
PingTimeout = GetInt32FromJsonElement(root, msg, "pingTimeout");
Upgrades = new List<string>();
var upgrades = root.GetProperty("upgrades").EnumerateArray();
foreach (var item in upgrades)
{
Upgrades.Add(item.GetString());
}
}
public string Write()
{
//var builder = new StringBuilder();
//builder.Append("40");
//if (!string.IsNullOrEmpty(Namespace))
//{
// builder.Append(Namespace).Append(',');
//}
//return builder.ToString();
throw new NotImplementedException();
}
}
}

View File

@@ -0,0 +1,26 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
namespace SocketIOClient.Messages
{
public class PingMessage : IMessage
{
public MessageType Type => MessageType.Ping;
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public void Read(string msg)
{
}
public string Write() => "2";
}
}

View File

@@ -0,0 +1,29 @@
using SocketIOClient.Transport;
using System;
using System.Collections.Generic;
namespace SocketIOClient.Messages
{
public class PongMessage : IMessage
{
public MessageType Type => MessageType.Pong;
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public TimeSpan Duration { get; set; }
public void Read(string msg)
{
}
public string Write() => "3";
}
}

View File

@@ -0,0 +1,54 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
using System.Text;
namespace SocketIOClient.Messages
{
/// <summary>
/// The client calls the server's callback
/// </summary>
public class ServerAckMessage : IMessage
{
public MessageType Type => MessageType.AckMessage;
public string Namespace { get; set; }
public string Json { get; set; }
public int Id { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public void Read(string msg)
{
}
public string Write()
{
var builder = new StringBuilder();
builder.Append("43");
if (!string.IsNullOrEmpty(Namespace))
{
builder.Append(Namespace).Append(',');
}
builder.Append(Id);
if (string.IsNullOrEmpty(Json))
{
builder.Append("[]");
}
else
{
builder.Append(Json);
}
return builder.ToString();
}
}
}

View File

@@ -0,0 +1,60 @@
using SocketIOClient.Transport;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
namespace SocketIOClient.Messages
{
/// <summary>
/// The client calls the server's callback with binary
/// </summary>
public class ServerBinaryAckMessage : IMessage
{
public MessageType Type => MessageType.BinaryAckMessage;
public string Namespace { get; set; }
public List<JsonElement> JsonElements { get; set; }
public string Json { get; set; }
public int Id { get; set; }
public int BinaryCount { get; }
public int Eio { get; set; }
public TransportProtocol Protocol { get; set; }
public List<byte[]> OutgoingBytes { get; set; }
public List<byte[]> IncomingBytes { get; set; }
public void Read(string msg)
{
}
public string Write()
{
var builder = new StringBuilder();
builder
.Append("46")
.Append(OutgoingBytes.Count)
.Append('-');
if (!string.IsNullOrEmpty(Namespace))
{
builder.Append(Namespace).Append(',');
}
builder.Append(Id);
if (string.IsNullOrEmpty(Json))
{
builder.Append("[]");
}
else
{
builder.Append(Json);
}
return builder.ToString();
}
}
}

View File

@@ -0,0 +1,56 @@
using System;
using Newtonsoft.Json;
using SocketIOClient.JsonSerializer;
using System.Collections.Generic;
namespace SocketIOClient.Newtonsoft.Json
{
public class NewtonsoftJsonSerializer : IJsonSerializer
{
public Func<JsonSerializerSettings> JsonSerializerOptions { get; }
public JsonSerializeResult Serialize(object[] data)
{
var converter = new ByteArrayConverter();
var settings = GetOptions();
settings.Converters.Add(converter);
string json = JsonConvert.SerializeObject(data, settings);
return new JsonSerializeResult
{
Json = json,
Bytes = converter.Bytes
};
}
public T Deserialize<T>(string json)
{
var settings = GetOptions();
return JsonConvert.DeserializeObject<T>(json, settings);
}
public T Deserialize<T>(string json, IList<byte[]> bytes)
{
var converter = new ByteArrayConverter();
converter.Bytes.AddRange(bytes);
var settings = GetOptions();
settings.Converters.Add(converter);
return JsonConvert.DeserializeObject<T>(json, settings);
}
private JsonSerializerSettings GetOptions()
{
JsonSerializerSettings options = null;
if (OptionsProvider != null)
{
options = OptionsProvider();
}
if (options == null)
{
options = new JsonSerializerSettings();
}
return options;
}
public Func<JsonSerializerSettings> OptionsProvider { get; set; }
}
}

View File

@@ -0,0 +1,769 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Http;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using SocketIOClient.Extensions;
using SocketIOClient.JsonSerializer;
using SocketIOClient.Messages;
using SocketIOClient.Transport;
using SocketIOClient.UriConverters;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace SocketIOClient
{
/// <summary>
/// socket.io client class
/// </summary>
public class SocketIO : IDisposable
{
/// <summary>
/// Create SocketIO object with default options
/// </summary>
/// <param name="uri"></param>
public SocketIO(string uri) : this(new Uri(uri)) { }
/// <summary>
/// Create SocketIO object with options
/// </summary>
/// <param name="uri"></param>
public SocketIO(Uri uri) : this(uri, new SocketIOOptions()) { }
/// <summary>
/// Create SocketIO object with options
/// </summary>
/// <param name="uri"></param>
/// <param name="options"></param>
public SocketIO(string uri, SocketIOOptions options) : this(new Uri(uri), options) { }
/// <summary>
/// Create SocketIO object with options
/// </summary>
/// <param name="uri"></param>
/// <param name="options"></param>
public SocketIO(Uri uri, SocketIOOptions options)
{
ServerUri = uri ?? throw new ArgumentNullException("uri");
Options = options ?? throw new ArgumentNullException("options");
Initialize();
}
Uri _serverUri;
private Uri ServerUri
{
get => _serverUri;
set
{
if (_serverUri != value)
{
_serverUri = value;
if (value != null && value.AbsolutePath != "/")
{
_namespace = value.AbsolutePath;
}
}
}
}
/// <summary>
/// An unique identifier for the socket session. Set after the connect event is triggered, and updated after the reconnect event.
/// </summary>
public string Id { get; private set; }
string _namespace;
/// <summary>
/// Whether or not the socket is connected to the server.
/// </summary>
public bool Connected { get; private set; }
int _attempts;
[Obsolete]
/// <summary>
/// Whether or not the socket is disconnected from the server.
/// </summary>
public bool Disconnected => !Connected;
public SocketIOOptions Options { get; }
public IJsonSerializer JsonSerializer { get; set; }
public IUriConverter UriConverter { get; set; }
internal ILogger Logger { get; set; }
ILoggerFactory _loggerFactory;
public ILoggerFactory LoggerFactory
{
get => _loggerFactory;
set
{
_loggerFactory = value ?? throw new ArgumentNullException(nameof(LoggerFactory));
Logger = _loggerFactory.CreateLogger<SocketIO>();
}
}
public HttpClient HttpClient { get; set; }
public Func<IClientWebSocket> ClientWebSocketProvider { get; set; }
private IClientWebSocket _clientWebsocket;
BaseTransport _transport;
List<Type> _expectedExceptions;
int _packetId;
bool _isConnectCoreRunning;
Uri _realServerUri;
Exception _connectCoreException;
Dictionary<int, Action<SocketIOResponse>> _ackHandlers;
List<OnAnyHandler> _onAnyHandlers;
Dictionary<string, Action<SocketIOResponse>> _eventHandlers;
CancellationTokenSource _connectionTokenSource;
double _reconnectionDelay;
bool _hasError;
bool _isFaild;
readonly static object _connectionLock = new object();
#region Socket.IO event
public event EventHandler OnConnected;
//public event EventHandler<string> OnConnectError;
//public event EventHandler<string> OnConnectTimeout;
public event EventHandler<string> OnError;
public event EventHandler<string> OnDisconnected;
/// <summary>
/// Fired upon a successful reconnection.
/// </summary>
public event EventHandler<int> OnReconnected;
/// <summary>
/// Fired upon an attempt to reconnect.
/// </summary>
public event EventHandler<int> OnReconnectAttempt;
/// <summary>
/// Fired upon a reconnection attempt error.
/// </summary>
public event EventHandler<Exception> OnReconnectError;
/// <summary>
/// Fired when couldnt reconnect within reconnectionAttempts
/// </summary>
public event EventHandler OnReconnectFailed;
public event EventHandler OnPing;
public event EventHandler<TimeSpan> OnPong;
#endregion
#region Observable Event
//Subject<Unit> _onConnected;
//public IObservable<Unit> ConnectedObservable { get; private set; }
#endregion
private void Initialize()
{
_packetId = -1;
_ackHandlers = new Dictionary<int, Action<SocketIOResponse>>();
_eventHandlers = new Dictionary<string, Action<SocketIOResponse>>();
_onAnyHandlers = new List<OnAnyHandler>();
JsonSerializer = new SystemTextJsonSerializer();
UriConverter = new UriConverter();
HttpClient = new HttpClient();
ClientWebSocketProvider = () => new SystemNetWebSocketsClientWebSocket(Options.EIO);
_expectedExceptions = new List<Type>
{
typeof(TimeoutException),
typeof(WebSocketException),
typeof(HttpRequestException),
typeof(OperationCanceledException),
typeof(TaskCanceledException)
};
LoggerFactory = NullLoggerFactory.Instance;
}
private async Task CreateTransportAsync()
{
Options.Transport = await GetProtocolAsync();
if (Options.Transport == TransportProtocol.Polling)
{
HttpPollingHandler handler;
if (Options.EIO == 3)
handler = new Eio3HttpPollingHandler(HttpClient);
else
handler = new Eio4HttpPollingHandler(HttpClient);
_transport = new HttpTransport(HttpClient, handler, Options, JsonSerializer, Logger);
}
else
{
_clientWebsocket = ClientWebSocketProvider();
_transport = new WebSocketTransport(_clientWebsocket, Options, JsonSerializer, Logger);
}
_transport.Namespace = _namespace;
SetHeaders();
}
private void SetHeaders()
{
if (Options.ExtraHeaders != null)
{
foreach (var item in Options.ExtraHeaders)
{
_transport.AddHeader(item.Key, item.Value);
}
}
}
private void SyncExceptionToMain(Exception e)
{
_connectCoreException = e;
_isConnectCoreRunning = false;
}
private void ConnectCore()
{
DisposeForReconnect();
_reconnectionDelay = Options.ReconnectionDelay;
_connectionTokenSource = new CancellationTokenSource();
var cct = _connectionTokenSource.Token;
Task.Factory.StartNew(async () =>
{
while (true)
{
_clientWebsocket.TryDispose();
_transport.TryDispose();
CreateTransportAsync().Wait();
_realServerUri = UriConverter.GetServerUri(Options.Transport == TransportProtocol.WebSocket, ServerUri, Options.EIO, Options.Path, Options.Query);
try
{
if (cct.IsCancellationRequested)
break;
if (_attempts > 0)
OnReconnectAttempt.TryInvoke(this, _attempts);
var timeoutCts = new CancellationTokenSource(Options.ConnectionTimeout);
_transport.Subscribe(OnMessageReceived, OnErrorReceived);
await _transport.ConnectAsync(_realServerUri, timeoutCts.Token).ConfigureAwait(false);
break;
}
catch (Exception e)
{
if (_expectedExceptions.Contains(e.GetType()))
{
if (!Options.Reconnection)
{
SyncExceptionToMain(e);
throw;
}
if (_attempts > 0)
{
OnReconnectError.TryInvoke(this, e);
}
_attempts++;
if (_attempts <= Options.ReconnectionAttempts)
{
if (_reconnectionDelay < Options.ReconnectionDelayMax)
{
_reconnectionDelay += 2 * Options.RandomizationFactor;
}
if (_reconnectionDelay > Options.ReconnectionDelayMax)
{
_reconnectionDelay = Options.ReconnectionDelayMax;
}
Thread.Sleep((int)_reconnectionDelay);
}
else
{
_isFaild = true;
OnReconnectFailed.TryInvoke(this, EventArgs.Empty);
break;
}
}
else
{
SyncExceptionToMain(e);
throw;
}
}
}
_isConnectCoreRunning = false;
});
}
private async Task<TransportProtocol> GetProtocolAsync()
{
if (Options.Transport == TransportProtocol.Polling && Options.AutoUpgrade)
{
Uri uri = UriConverter.GetServerUri(false, ServerUri, Options.EIO, Options.Path, Options.Query);
try
{
string text = await HttpClient.GetStringAsync(uri);
if (text.Contains("websocket"))
{
return TransportProtocol.WebSocket;
}
}
catch (Exception e)
{
Logger.LogWarning(e, e.Message);
}
}
return Options.Transport;
}
public async Task ConnectAsync()
{
if (Connected || _isConnectCoreRunning)
return;
lock (_connectionLock)
{
if (_isConnectCoreRunning)
return;
_isConnectCoreRunning = true;
}
ConnectCore();
while (_isConnectCoreRunning)
{
await Task.Delay(100);
}
if (_connectCoreException != null)
{
Logger.LogError(_connectCoreException, _connectCoreException.Message);
throw _connectCoreException;
}
int ms = 0;
while (!Connected)
{
if (_hasError)
{
Logger.LogWarning($"Got a connection error, try to use '{nameof(OnError)}' to detect it.");
break;
}
if (_isFaild)
{
Logger.LogWarning($"Reconnect failed, try to use '{nameof(OnReconnectFailed)}' to detect it.");
break;
}
ms += 100;
if (ms > Options.ConnectionTimeout.TotalMilliseconds)
{
throw new TimeoutException();
}
await Task.Delay(100);
}
}
private void PingHandler()
{
OnPing.TryInvoke(this, EventArgs.Empty);
}
private void PongHandler(PongMessage msg)
{
OnPong.TryInvoke(this, msg.Duration);
}
private void ConnectedHandler(ConnectedMessage msg)
{
Id = msg.Sid;
Connected = true;
OnConnected.TryInvoke(this, EventArgs.Empty);
if (_attempts > 0)
{
OnReconnected.TryInvoke(this, _attempts);
}
_attempts = 0;
}
private void DisconnectedHandler()
{
_ = InvokeDisconnect(DisconnectReason.IOServerDisconnect);
}
private void EventMessageHandler(EventMessage m)
{
var res = new SocketIOResponse(m.JsonElements, this)
{
PacketId = m.Id
};
foreach (var item in _onAnyHandlers)
{
item.TryInvoke(m.Event, res);
}
if (_eventHandlers.ContainsKey(m.Event))
{
_eventHandlers[m.Event].TryInvoke(res);
}
}
private void AckMessageHandler(ClientAckMessage m)
{
if (_ackHandlers.ContainsKey(m.Id))
{
var res = new SocketIOResponse(m.JsonElements, this);
_ackHandlers[m.Id].TryInvoke(res);
_ackHandlers.Remove(m.Id);
}
}
private void ErrorMessageHandler(ErrorMessage msg)
{
_hasError = true;
OnError.TryInvoke(this, msg.Message);
}
private void BinaryMessageHandler(BinaryMessage msg)
{
var response = new SocketIOResponse(msg.JsonElements, this)
{
PacketId = msg.Id,
};
response.InComingBytes.AddRange(msg.IncomingBytes);
foreach (var item in _onAnyHandlers)
{
item.TryInvoke(msg.Event, response);
}
if (_eventHandlers.ContainsKey(msg.Event))
{
_eventHandlers[msg.Event].TryInvoke(response);
}
}
private void BinaryAckMessageHandler(ClientBinaryAckMessage msg)
{
if (_ackHandlers.ContainsKey(msg.Id))
{
var response = new SocketIOResponse(msg.JsonElements, this)
{
PacketId = msg.Id,
};
response.InComingBytes.AddRange(msg.IncomingBytes);
_ackHandlers[msg.Id].TryInvoke(response);
}
}
private void OnErrorReceived(Exception ex)
{
Logger.LogError(ex, ex.Message);
_ = InvokeDisconnect(DisconnectReason.TransportClose);
}
private void OnMessageReceived(IMessage msg)
{
try
{
switch (msg.Type)
{
case MessageType.Ping:
PingHandler();
break;
case MessageType.Pong:
PongHandler(msg as PongMessage);
break;
case MessageType.Connected:
ConnectedHandler(msg as ConnectedMessage);
break;
case MessageType.Disconnected:
DisconnectedHandler();
break;
case MessageType.EventMessage:
EventMessageHandler(msg as EventMessage);
break;
case MessageType.AckMessage:
AckMessageHandler(msg as ClientAckMessage);
break;
case MessageType.ErrorMessage:
ErrorMessageHandler(msg as ErrorMessage);
break;
case MessageType.BinaryMessage:
BinaryMessageHandler(msg as BinaryMessage);
break;
case MessageType.BinaryAckMessage:
BinaryAckMessageHandler(msg as ClientBinaryAckMessage);
break;
}
}
catch (Exception e)
{
Logger.LogError(e, e.Message);
}
}
public async Task DisconnectAsync()
{
if (Connected)
{
var msg = new DisconnectedMessage
{
Namespace = _namespace
};
try
{
await _transport.SendAsync(msg, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
Logger.LogError(e, e.Message);
}
await InvokeDisconnect(DisconnectReason.IOClientDisconnect);
}
}
/// <summary>
/// Register a new handler for the given event.
/// </summary>
/// <param name="eventName"></param>
/// <param name="callback"></param>
public void On(string eventName, Action<SocketIOResponse> callback)
{
if (_eventHandlers.ContainsKey(eventName))
{
_eventHandlers.Remove(eventName);
}
_eventHandlers.Add(eventName, callback);
}
/// <summary>
/// Unregister a new handler for the given event.
/// </summary>
/// <param name="eventName"></param>
public void Off(string eventName)
{
if (_eventHandlers.ContainsKey(eventName))
{
_eventHandlers.Remove(eventName);
}
}
public void OnAny(OnAnyHandler handler)
{
if (handler != null)
{
_onAnyHandlers.Add(handler);
}
}
public void PrependAny(OnAnyHandler handler)
{
if (handler != null)
{
_onAnyHandlers.Insert(0, handler);
}
}
public void OffAny(OnAnyHandler handler)
{
if (handler != null)
{
_onAnyHandlers.Remove(handler);
}
}
public OnAnyHandler[] ListenersAny() => _onAnyHandlers.ToArray();
internal async Task ClientAckAsync(int packetId, CancellationToken cancellationToken, params object[] data)
{
IMessage msg;
if (data != null && data.Length > 0)
{
var result = JsonSerializer.Serialize(data);
if (result.Bytes.Count > 0)
{
msg = new ServerBinaryAckMessage
{
Id = packetId,
Namespace = _namespace,
Json = result.Json
};
msg.OutgoingBytes = new List<byte[]>(result.Bytes);
}
else
{
msg = new ServerAckMessage
{
Namespace = _namespace,
Id = packetId,
Json = result.Json
};
}
}
else
{
msg = new ServerAckMessage
{
Namespace = _namespace,
Id = packetId
};
}
await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Emits an event to the socket
/// </summary>
/// <param name="eventName"></param>
/// <param name="data">Any other parameters can be included. All serializable datastructures are supported, including byte[]</param>
/// <returns></returns>
public async Task EmitAsync(string eventName, params object[] data)
{
await EmitAsync(eventName, CancellationToken.None, data).ConfigureAwait(false);
}
public async Task EmitAsync(string eventName, CancellationToken cancellationToken, params object[] data)
{
if (data != null && data.Length > 0)
{
var result = JsonSerializer.Serialize(data);
if (result.Bytes.Count > 0)
{
var msg = new BinaryMessage
{
Namespace = _namespace,
OutgoingBytes = new List<byte[]>(result.Bytes),
Event = eventName,
Json = result.Json
};
await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
}
else
{
var msg = new EventMessage
{
Namespace = _namespace,
Event = eventName,
Json = result.Json
};
await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
}
}
else
{
var msg = new EventMessage
{
Namespace = _namespace,
Event = eventName
};
await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Emits an event to the socket
/// </summary>
/// <param name="eventName"></param>
/// <param name="ack">will be called with the server answer.</param>
/// <param name="data">Any other parameters can be included. All serializable datastructures are supported, including byte[]</param>
/// <returns></returns>
public async Task EmitAsync(string eventName, Action<SocketIOResponse> ack, params object[] data)
{
await EmitAsync(eventName, CancellationToken.None, ack, data).ConfigureAwait(false);
}
public async Task EmitAsync(string eventName, CancellationToken cancellationToken, Action<SocketIOResponse> ack, params object[] data)
{
_ackHandlers.Add(++_packetId, ack);
if (data != null && data.Length > 0)
{
var result = JsonSerializer.Serialize(data);
if (result.Bytes.Count > 0)
{
var msg = new ClientBinaryAckMessage
{
Event = eventName,
Namespace = _namespace,
Json = result.Json,
Id = _packetId,
OutgoingBytes = new List<byte[]>(result.Bytes)
};
await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
}
else
{
var msg = new ClientAckMessage
{
Event = eventName,
Namespace = _namespace,
Id = _packetId,
Json = result.Json
};
await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
}
}
else
{
var msg = new ClientAckMessage
{
Event = eventName,
Namespace = _namespace,
Id = _packetId
};
await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
}
}
private async Task InvokeDisconnect(string reason)
{
if (Connected)
{
Connected = false;
Id = null;
OnDisconnected.TryInvoke(this, reason);
try
{
await _transport.DisconnectAsync(CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
Logger.LogError(e, e.Message);
}
if (reason != DisconnectReason.IOServerDisconnect && reason != DisconnectReason.IOClientDisconnect)
{
//In the this cases (explicit disconnection), the client will not try to reconnect and you need to manually call socket.connect().
if (Options.Reconnection)
{
ConnectCore();
}
}
}
}
public void AddExpectedException(Type type)
{
if (!_expectedExceptions.Contains(type))
{
_expectedExceptions.Add(type);
}
}
private void DisposeForReconnect()
{
_hasError = false;
_isFaild = false;
_packetId = -1;
_ackHandlers.Clear();
_connectCoreException = null;
_hasError = false;
_connectionTokenSource.TryCancel();
_connectionTokenSource.TryDispose();
}
public void Dispose()
{
HttpClient.Dispose();
_transport.TryDispose();
_ackHandlers.Clear();
_onAnyHandlers.Clear();
_eventHandlers.Clear();
_connectionTokenSource.TryCancel();
_connectionTokenSource.TryDispose();
}
}
}

View File

@@ -0,0 +1,65 @@
using SocketIOClient.Transport;
using System;
using System.Collections.Generic;
namespace SocketIOClient
{
public sealed class SocketIOOptions
{
public SocketIOOptions()
{
RandomizationFactor = 0.5;
ReconnectionDelay = 1000;
ReconnectionDelayMax = 5000;
ReconnectionAttempts = int.MaxValue;
Path = "/socket.io";
ConnectionTimeout = TimeSpan.FromSeconds(20);
Reconnection = true;
Transport = TransportProtocol.Polling;
EIO = 4;
AutoUpgrade = true;
}
public string Path { get; set; }
public TimeSpan ConnectionTimeout { get; set; }
public IEnumerable<KeyValuePair<string, string>> Query { get; set; }
/// <summary>
/// Whether to allow reconnection if accidentally disconnected
/// </summary>
public bool Reconnection { get; set; }
public double ReconnectionDelay { get; set; }
public int ReconnectionDelayMax { get; set; }
public int ReconnectionAttempts { get; set; }
double _randomizationFactor;
public double RandomizationFactor
{
get => _randomizationFactor;
set
{
if (value >= 0 && value <= 1)
{
_randomizationFactor = value;
}
else
{
throw new ArgumentException($"{nameof(RandomizationFactor)} should be greater than or equal to 0.0, and less than 1.0.");
}
}
}
public Dictionary<string, string> ExtraHeaders { get; set; }
public TransportProtocol Transport { get; set; }
public int EIO { get; set; }
public bool AutoUpgrade { get; set; }
public object Auth { get; set; }
}
}

View File

@@ -0,0 +1,62 @@
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace SocketIOClient
{
public class SocketIOResponse
{
public SocketIOResponse(IList<JsonElement> array, SocketIO socket)
{
_array = array;
InComingBytes = new List<byte[]>();
SocketIO = socket;
PacketId = -1;
}
readonly IList<JsonElement> _array;
public List<byte[]> InComingBytes { get; }
public SocketIO SocketIO { get; }
public int PacketId { get; set; }
public T GetValue<T>(int index = 0)
{
var element = GetValue(index);
string json = element.GetRawText();
return SocketIO.JsonSerializer.Deserialize<T>(json, InComingBytes);
}
public JsonElement GetValue(int index = 0) => _array[index];
public int Count => _array.Count;
public override string ToString()
{
var builder = new StringBuilder();
builder.Append('[');
foreach (var item in _array)
{
builder.Append(item.GetRawText());
if (_array.IndexOf(item) < _array.Count - 1)
{
builder.Append(',');
}
}
builder.Append(']');
return builder.ToString();
}
public async Task CallbackAsync(params object[] data)
{
await SocketIO.ClientAckAsync(PacketId, CancellationToken.None, data).ConfigureAwait(false);
}
public async Task CallbackAsync(CancellationToken cancellationToken, params object[] data)
{
await SocketIO.ClientAckAsync(PacketId, cancellationToken, data).ConfigureAwait(false);
}
}
}

View File

@@ -0,0 +1,245 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Reactive.Subjects;
using Microsoft.Extensions.Logging;
using SocketIOClient.JsonSerializer;
using SocketIOClient.Messages;
using SocketIOClient.UriConverters;
namespace SocketIOClient.Transport
{
public abstract class BaseTransport : IObserver<string>, IObserver<byte[]>, IObservable<IMessage>, IDisposable
{
public BaseTransport(SocketIOOptions options, IJsonSerializer jsonSerializer, ILogger logger)
{
Options = options;
MessageSubject = new Subject<IMessage>();
JsonSerializer = jsonSerializer;
UriConverter = new UriConverter();
_messageQueue = new Queue<IMessage>();
_logger = logger;
}
DateTime _pingTime;
readonly Queue<IMessage> _messageQueue;
readonly ILogger _logger;
protected SocketIOOptions Options { get; }
protected Subject<IMessage> MessageSubject { get; }
protected IJsonSerializer JsonSerializer { get; }
protected CancellationTokenSource PingTokenSource { get; private set; }
protected OpenedMessage OpenedMessage { get; private set; }
public string Namespace { get; set; }
public IUriConverter UriConverter { get; set; }
public async Task SendAsync(IMessage msg, CancellationToken cancellationToken)
{
msg.Eio = Options.EIO;
msg.Protocol = Options.Transport;
var payload = new Payload
{
Text = msg.Write()
};
if (msg.OutgoingBytes != null)
{
payload.Bytes = msg.OutgoingBytes;
}
await SendAsync(payload, cancellationToken).ConfigureAwait(false);
}
protected virtual async Task OpenAsync(OpenedMessage msg)
{
OpenedMessage = msg;
if (Options.EIO == 3 && string.IsNullOrEmpty(Namespace))
{
return;
}
var connectMsg = new ConnectedMessage
{
Namespace = Namespace,
Eio = Options.EIO,
Query = Options.Query,
};
if (Options.EIO == 4)
{
if (Options.Auth != null)
{
connectMsg.AuthJsonStr = JsonSerializer.Serialize(new[] { Options.Auth }).Json.TrimStart('[').TrimEnd(']');
}
}
for (int i = 1; i <= 3; i++)
{
try
{
await SendAsync(connectMsg, CancellationToken.None).ConfigureAwait(false);
break;
}
catch (Exception e)
{
if (i == 3)
OnError(e);
else
await Task.Delay(TimeSpan.FromMilliseconds(Math.Pow(2, i) * 100));
}
}
}
/// <summary>
/// <para>Eio3 ping is sent by the client</para>
/// <para>Eio4 ping is sent by the server</para>
/// </summary>
/// <param name="cancellationToken"></param>
private void StartPing(CancellationToken cancellationToken)
{
_logger.LogDebug($"[Ping] Interval: {OpenedMessage.PingInterval}");
Task.Factory.StartNew(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(OpenedMessage.PingInterval);
if (cancellationToken.IsCancellationRequested)
{
break;
}
try
{
var ping = new PingMessage();
_logger.LogDebug($"[Ping] Sending");
await SendAsync(ping, CancellationToken.None).ConfigureAwait(false);
_logger.LogDebug($"[Ping] Has been sent");
_pingTime = DateTime.Now;
MessageSubject.OnNext(ping);
}
catch (Exception e)
{
_logger.LogDebug($"[Ping] Failed to send, {e.Message}");
MessageSubject.OnError(e);
break;
}
}
}, TaskCreationOptions.LongRunning);
}
public abstract Task ConnectAsync(Uri uri, CancellationToken cancellationToken);
public abstract Task DisconnectAsync(CancellationToken cancellationToken);
public abstract void AddHeader(string key, string val);
public virtual void Dispose()
{
MessageSubject.Dispose();
_messageQueue.Clear();
if (PingTokenSource != null)
{
PingTokenSource.Cancel();
PingTokenSource.Dispose();
}
}
public abstract Task SendAsync(Payload payload, CancellationToken cancellationToken);
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
MessageSubject.OnError(error);
}
public void OnNext(string text)
{
_logger.LogDebug($"[Receive] {text}");
var msg = MessageFactory.CreateMessage(Options.EIO, text);
if (msg == null)
{
return;
}
if (msg.BinaryCount > 0)
{
msg.IncomingBytes = new List<byte[]>(msg.BinaryCount);
_messageQueue.Enqueue(msg);
return;
}
if (msg.Type == MessageType.Opened)
{
OpenAsync(msg as OpenedMessage).ConfigureAwait(false);
}
if (Options.EIO == 3)
{
if (msg.Type == MessageType.Connected)
{
var connectMsg = msg as ConnectedMessage;
connectMsg.Sid = OpenedMessage.Sid;
if ((string.IsNullOrEmpty(Namespace) && string.IsNullOrEmpty(connectMsg.Namespace)) || connectMsg.Namespace == Namespace)
{
if (PingTokenSource != null)
{
PingTokenSource.Cancel();
}
PingTokenSource = new CancellationTokenSource();
StartPing(PingTokenSource.Token);
}
else
{
return;
}
}
else if (msg.Type == MessageType.Pong)
{
var pong = msg as PongMessage;
pong.Duration = DateTime.Now - _pingTime;
}
}
MessageSubject.OnNext(msg);
if (msg.Type == MessageType.Ping)
{
_pingTime = DateTime.Now;
try
{
SendAsync(new PongMessage(), CancellationToken.None).ConfigureAwait(false);
MessageSubject.OnNext(new PongMessage
{
Eio = Options.EIO,
Protocol = Options.Transport,
Duration = DateTime.Now - _pingTime
});
}
catch (Exception e)
{
OnError(e);
}
}
}
public void OnNext(byte[] bytes)
{
_logger.LogDebug($"[Receive] binary message");
if (_messageQueue.Count > 0)
{
var msg = _messageQueue.Peek();
msg.IncomingBytes.Add(bytes);
if (msg.IncomingBytes.Count == msg.BinaryCount)
{
MessageSubject.OnNext(msg);
_messageQueue.Dequeue();
}
}
}
public IDisposable Subscribe(IObserver<IMessage> observer)
{
return MessageSubject.Subscribe(observer);
}
}
}

View File

@@ -0,0 +1,76 @@
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using System.Net.Http.Headers;
namespace SocketIOClient.Transport
{
public class Eio3HttpPollingHandler : HttpPollingHandler
{
public Eio3HttpPollingHandler(HttpClient httpClient) : base(httpClient) { }
public override async Task PostAsync(string uri, IEnumerable<byte[]> bytes, CancellationToken cancellationToken)
{
var list = new List<byte>();
foreach (var item in bytes)
{
list.Add(1);
var length = SplitInt(item.Length + 1).Select(x => (byte)x);
list.AddRange(length);
list.Add(byte.MaxValue);
list.Add(4);
list.AddRange(item);
}
var content = new ByteArrayContent(list.ToArray());
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
await HttpClient.PostAsync(AppendRandom(uri), content, cancellationToken).ConfigureAwait(false);
}
private List<int> SplitInt(int number)
{
List<int> list = new List<int>();
while (number > 0)
{
list.Add(number % 10);
number /= 10;
}
list.Reverse();
return list;
}
protected override void ProduceText(string text)
{
int p = 0;
while (true)
{
int index = text.IndexOf(':', p);
if (index == -1)
{
break;
}
if (int.TryParse(text.Substring(p, index - p), out int length))
{
string msg = text.Substring(index + 1, length);
TextSubject.OnNext(msg);
}
else
{
break;
}
p = index + length + 1;
if (p >= text.Length)
{
break;
}
}
}
public override Task PostAsync(string uri, string content, CancellationToken cancellationToken)
{
content = content.Length + ":" + content;
return base.PostAsync(uri, content, cancellationToken);
}
}
}

View File

@@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace SocketIOClient.Transport
{
public class Eio4HttpPollingHandler : HttpPollingHandler
{
public Eio4HttpPollingHandler(HttpClient httpClient) : base(httpClient) { }
const char Separator = '\u001E'; //1E 
public override async Task PostAsync(string uri, IEnumerable<byte[]> bytes, CancellationToken cancellationToken)
{
var builder = new StringBuilder();
foreach (var item in bytes)
{
builder.Append('b').Append(Convert.ToBase64String(item)).Append(Separator);
}
if (builder.Length == 0)
{
return;
}
string text = builder.ToString().TrimEnd(Separator);
await PostAsync(uri, text, cancellationToken);
}
protected override void ProduceText(string text)
{
string[] items = text.Split(new[] { Separator }, StringSplitOptions.RemoveEmptyEntries);
foreach (var item in items)
{
if (item[0] == 'b')
{
byte[] bytes = Convert.FromBase64String(item.Substring(1));
BytesSubject.OnNext(bytes);
}
else
{
TextSubject.OnNext(item);
}
}
}
}
}

View File

@@ -0,0 +1,118 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace SocketIOClient.Transport
{
public abstract class HttpPollingHandler : IHttpPollingHandler
{
public HttpPollingHandler(HttpClient httpClient)
{
HttpClient = httpClient;
TextSubject = new Subject<string>();
BytesSubject = new Subject<byte[]>();
TextObservable = TextSubject.AsObservable();
BytesObservable = BytesSubject.AsObservable();
}
protected HttpClient HttpClient { get; }
protected Subject<string> TextSubject{get;}
protected Subject<byte[]> BytesSubject{get;}
public IObservable<string> TextObservable { get; }
public IObservable<byte[]> BytesObservable { get; }
protected string AppendRandom(string uri)
{
return uri + "&t=" + DateTimeOffset.Now.ToUnixTimeSeconds();
}
public async Task GetAsync(string uri, CancellationToken cancellationToken)
{
var req = new HttpRequestMessage(HttpMethod.Get, AppendRandom(uri));
var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
if (!resMsg.IsSuccessStatusCode)
{
throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}");
}
await ProduceMessageAsync(resMsg).ConfigureAwait(false);
}
public async Task SendAsync(HttpRequestMessage req, CancellationToken cancellationToken)
{
var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
if (!resMsg.IsSuccessStatusCode)
{
throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}");
}
await ProduceMessageAsync(resMsg).ConfigureAwait(false);
}
public async virtual Task PostAsync(string uri, string content, CancellationToken cancellationToken)
{
var httpContent = new StringContent(content);
var resMsg = await HttpClient.PostAsync(AppendRandom(uri), httpContent, cancellationToken).ConfigureAwait(false);
await ProduceMessageAsync(resMsg).ConfigureAwait(false);
}
public abstract Task PostAsync(string uri, IEnumerable<byte[]> bytes, CancellationToken cancellationToken);
private async Task ProduceMessageAsync(HttpResponseMessage resMsg)
{
if (resMsg.Content.Headers.ContentType.MediaType == "application/octet-stream")
{
byte[] bytes = await resMsg.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
ProduceBytes(bytes);
}
else
{
string text = await resMsg.Content.ReadAsStringAsync().ConfigureAwait(false);
ProduceText(text);
}
}
protected abstract void ProduceText(string text);
private void ProduceBytes(byte[] bytes)
{
int i = 0;
while (bytes.Length > i + 4)
{
byte type = bytes[i];
var builder = new StringBuilder();
i++;
while (bytes[i] != byte.MaxValue)
{
builder.Append(bytes[i]);
i++;
}
i++;
int length = int.Parse(builder.ToString());
if (type == 0)
{
var buffer = new byte[length];
Buffer.BlockCopy(bytes, i, buffer, 0, buffer.Length);
TextSubject.OnNext(Encoding.UTF8.GetString(buffer));
}
else if (type == 1)
{
var buffer = new byte[length - 1];
Buffer.BlockCopy(bytes, i + 1, buffer, 0, buffer.Length);
BytesSubject.OnNext(buffer);
}
i += length;
}
}
public void Dispose()
{
TextSubject.Dispose();
BytesSubject.Dispose();
}
}
}

View File

@@ -0,0 +1,121 @@
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using SocketIOClient.JsonSerializer;
using SocketIOClient.Messages;
namespace SocketIOClient.Transport
{
public class HttpTransport : BaseTransport
{
public HttpTransport(HttpClient http,
IHttpPollingHandler pollingHandler,
SocketIOOptions options,
IJsonSerializer jsonSerializer,
ILogger logger) : base(options, jsonSerializer, logger)
{
_http = http;
_httpPollingHandler = pollingHandler;
_httpPollingHandler.TextObservable.Subscribe(this);
_httpPollingHandler.BytesObservable.Subscribe(this);
}
string _httpUri;
CancellationTokenSource _pollingTokenSource;
readonly HttpClient _http;
readonly IHttpPollingHandler _httpPollingHandler;
private void StartPolling(CancellationToken cancellationToken)
{
Task.Factory.StartNew(async () =>
{
int retry = 0;
while (!cancellationToken.IsCancellationRequested)
{
if (!_httpUri.Contains("&sid="))
{
await Task.Delay(20);
continue;
}
try
{
await _httpPollingHandler.GetAsync(_httpUri, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
retry++;
if (retry >= 3)
{
MessageSubject.OnError(e);
break;
}
await Task.Delay(100 * (int)Math.Pow(2, retry));
}
}
}, TaskCreationOptions.LongRunning);
}
public override async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
{
var req = new HttpRequestMessage(HttpMethod.Get, uri);
// if (_options.ExtraHeaders != null)
// {
// foreach (var item in _options.ExtraHeaders)
// {
// req.Headers.Add(item.Key, item.Value);
// }
// }
_httpUri = uri.ToString();
await _httpPollingHandler.SendAsync(req, new CancellationTokenSource(Options.ConnectionTimeout).Token).ConfigureAwait(false);
if (_pollingTokenSource != null)
{
_pollingTokenSource.Cancel();
}
_pollingTokenSource = new CancellationTokenSource();
StartPolling(_pollingTokenSource.Token);
}
public override Task DisconnectAsync(CancellationToken cancellationToken)
{
_pollingTokenSource.Cancel();
if (PingTokenSource != null)
{
PingTokenSource.Cancel();
}
return Task.CompletedTask;
}
public override void AddHeader(string key, string val)
{
_http.DefaultRequestHeaders.Add(key, val);
}
public override void Dispose()
{
base.Dispose();
_httpPollingHandler.Dispose();
}
public override async Task SendAsync(Payload payload, CancellationToken cancellationToken)
{
await _httpPollingHandler.PostAsync(_httpUri, payload.Text, cancellationToken);
if (payload.Bytes != null && payload.Bytes.Count > 0)
{
await _httpPollingHandler.PostAsync(_httpUri, payload.Bytes, cancellationToken);
}
}
protected override async Task OpenAsync(OpenedMessage msg)
{
//if (!_httpUri.Contains("&sid="))
//{
//}
_httpUri += "&sid=" + msg.Sid;
await base.OpenAsync(msg);
}
}
}

View File

@@ -0,0 +1,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace SocketIOClient.Transport
{
public interface IClientWebSocket : IDisposable
{
IObservable<string> TextObservable { get; }
IObservable<byte[]> BytesObservable { get; }
Task ConnectAsync(Uri uri, CancellationToken cancellationToken);
Task DisconnectAsync(CancellationToken cancellationToken);
Task SendAsync(byte[] bytes, TransportMessageType type, bool endOfMessage, CancellationToken cancellationToken);
void AddHeader(string key, string val);
}
}

View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SocketIOClient.Transport
{
public interface IHttpPollingHandler : IDisposable
{
IObservable<string> TextObservable { get; }
IObservable<byte[]> BytesObservable { get; }
Task GetAsync(string uri, CancellationToken cancellationToken);
Task SendAsync(HttpRequestMessage req, CancellationToken cancellationToken);
Task PostAsync(string uri, string content, CancellationToken cancellationToken);
Task PostAsync(string uri, IEnumerable<byte[]> bytes, CancellationToken cancellationToken);
}
}

View File

@@ -0,0 +1,10 @@
using System.Collections.Generic;
namespace SocketIOClient.Transport
{
public class Payload
{
public string Text { get; set; }
public List<byte[]> Bytes { get; set; }
}
}

View File

@@ -0,0 +1,143 @@
using System;
using System.Net.WebSockets;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace SocketIOClient.Transport
{
public class SystemNetWebSocketsClientWebSocket : IClientWebSocket
{
public SystemNetWebSocketsClientWebSocket(int eio)
{
_eio = eio;
_textSubject = new Subject<string>();
_bytesSubject = new Subject<byte[]>();
TextObservable = _textSubject.AsObservable();
BytesObservable = _bytesSubject.AsObservable();
_ws = new ClientWebSocket();
_listenCancellation = new CancellationTokenSource();
_sendLock = new SemaphoreSlim(1, 1);
}
const int ReceiveChunkSize = 1024 * 8;
readonly int _eio;
readonly ClientWebSocket _ws;
readonly Subject<string> _textSubject;
readonly Subject<byte[]> _bytesSubject;
readonly CancellationTokenSource _listenCancellation;
readonly SemaphoreSlim _sendLock;
public IObservable<string> TextObservable { get; }
public IObservable<byte[]> BytesObservable { get; }
private void Listen()
{
Task.Factory.StartNew(async() =>
{
while (true)
{
if (_listenCancellation.IsCancellationRequested)
{
break;
}
var buffer = new byte[ReceiveChunkSize];
int count = 0;
WebSocketReceiveResult result = null;
while (_ws.State == WebSocketState.Open)
{
var subBuffer = new byte[ReceiveChunkSize];
try
{
result = await _ws.ReceiveAsync(new ArraySegment<byte>(subBuffer), CancellationToken.None).ConfigureAwait(false);
// resize
if (buffer.Length - count < result.Count)
{
Array.Resize(ref buffer, buffer.Length + result.Count);
}
Buffer.BlockCopy(subBuffer, 0, buffer, count, result.Count);
count += result.Count;
if (result.EndOfMessage)
{
break;
}
}
catch (Exception e)
{
_textSubject.OnError(e);
break;
}
}
if (result == null)
{
break;
}
switch (result.MessageType)
{
case WebSocketMessageType.Text:
string text = Encoding.UTF8.GetString(buffer, 0, count);
_textSubject.OnNext(text);
break;
case WebSocketMessageType.Binary:
byte[] bytes;
if (_eio == 3)
{
bytes = new byte[count - 1];
Buffer.BlockCopy(buffer, 1, bytes, 0, bytes.Length);
}
else
{
bytes = new byte[count];
Buffer.BlockCopy(buffer, 0, bytes, 0, bytes.Length);
}
_bytesSubject.OnNext(bytes);
break;
case WebSocketMessageType.Close:
_textSubject.OnError(new WebSocketException("Received a Close message"));
break;
}
}
});
}
public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
{
await _ws.ConnectAsync(uri, cancellationToken);
Listen();
}
public async Task DisconnectAsync(CancellationToken cancellationToken)
{
await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken);
}
public async Task SendAsync(byte[] bytes, TransportMessageType type, bool endOfMessage, CancellationToken cancellationToken)
{
var msgType = WebSocketMessageType.Text;
if (type == TransportMessageType.Binary)
{
msgType = WebSocketMessageType.Binary;
}
await _ws.SendAsync(new ArraySegment<byte>(bytes), msgType, endOfMessage, cancellationToken).ConfigureAwait(false);
}
public void AddHeader(string key, string val)
{
_ws.Options.SetRequestHeader(key, val);
}
public void Dispose()
{
_textSubject.Dispose();
_bytesSubject.Dispose();
_ws.Dispose();
}
}
}

View File

@@ -0,0 +1,8 @@
namespace SocketIOClient.Transport
{
public enum TransportMessageType
{
Text = 0,
Binary = 1
}
}

View File

@@ -0,0 +1,8 @@
namespace SocketIOClient.Transport
{
public enum TransportProtocol
{
Polling,
WebSocket
}
}

View File

@@ -0,0 +1,92 @@
using System;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using SocketIOClient.JsonSerializer;
namespace SocketIOClient.Transport
{
public class WebSocketTransport : BaseTransport
{
public WebSocketTransport(IClientWebSocket ws, SocketIOOptions options, IJsonSerializer jsonSerializer, ILogger logger)
: base(options, jsonSerializer, logger)
{
_ws = ws;
_sendLock = new SemaphoreSlim(1, 1);
_ws.TextObservable.Subscribe(this);
_ws.BytesObservable.Subscribe(this);
}
const int ReceiveChunkSize = 1024 * 8;
const int SendChunkSize = 1024 * 8;
readonly IClientWebSocket _ws;
readonly SemaphoreSlim _sendLock;
private async Task SendAsync(TransportMessageType type, byte[] bytes, CancellationToken cancellationToken)
{
try
{
await _sendLock.WaitAsync().ConfigureAwait(false);
if (type == TransportMessageType.Binary && Options.EIO == 3)
{
byte[] buffer = new byte[bytes.Length + 1];
buffer[0] = 4;
Buffer.BlockCopy(bytes, 0, buffer, 1, bytes.Length);
bytes = buffer;
}
int pages = (int)Math.Ceiling(bytes.Length * 1.0 / SendChunkSize);
for (int i = 0; i < pages; i++)
{
int offset = i * SendChunkSize;
int length = SendChunkSize;
if (offset + length > bytes.Length)
{
length = bytes.Length - offset;
}
byte[] subBuffer = new byte[length];
Buffer.BlockCopy(bytes, offset, subBuffer, 0, subBuffer.Length);
bool endOfMessage = pages - 1 == i;
await _ws.SendAsync(subBuffer, type, endOfMessage, cancellationToken).ConfigureAwait(false);
}
}
finally
{
_sendLock.Release();
}
}
public override async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
{
await _ws.ConnectAsync(uri, cancellationToken);
}
public override async Task DisconnectAsync(CancellationToken cancellationToken)
{
await _ws.DisconnectAsync(cancellationToken);
}
public override async Task SendAsync(Payload payload, CancellationToken cancellationToken)
{
byte[] bytes = Encoding.UTF8.GetBytes(payload.Text);
await SendAsync(TransportMessageType.Text, bytes, cancellationToken);
if (payload.Bytes != null)
{
foreach (var item in payload.Bytes)
{
await SendAsync(TransportMessageType.Binary, item, cancellationToken);
}
}
}
public override void AddHeader(string key, string val) => _ws.AddHeader(key, val);
public override void Dispose()
{
base.Dispose();
_sendLock.Dispose();
}
}
}

View File

@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
namespace SocketIOClient.UriConverters
{
public interface IUriConverter
{
Uri GetServerUri(bool ws, Uri serverUri, int eio, string path, IEnumerable<KeyValuePair<string, string>> queryParams);
}
}

View File

@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace SocketIOClient.UriConverters
{
public class UriConverter : IUriConverter
{
public Uri GetServerUri(bool ws, Uri serverUri, int eio, string path, IEnumerable<KeyValuePair<string, string>> queryParams)
{
var builder = new StringBuilder();
if (serverUri.Scheme == "https" || serverUri.Scheme == "wss")
{
builder.Append(ws ? "wss://" : "https://");
}
else if (serverUri.Scheme == "http" || serverUri.Scheme == "ws")
{
builder.Append(ws ? "ws://" : "http://");
}
else
{
throw new ArgumentException("Only supports 'http, https, ws, wss' protocol");
}
builder.Append(serverUri.Host);
if (!serverUri.IsDefaultPort)
{
builder.Append(":").Append(serverUri.Port);
}
if (string.IsNullOrEmpty(path))
{
builder.Append("/socket.io");
}
else
{
builder.Append(path);
}
builder
.Append("/?EIO=")
.Append(eio)
.Append("&transport=")
.Append(ws ? "websocket" : "polling");
if (queryParams != null)
{
foreach (var item in queryParams)
{
builder.Append('&').Append(item.Key).Append('=').Append(item.Value);
}
}
return new Uri(builder.ToString());
}
}
}