diff --git a/ElectronNET.API/ElectronNET.API.csproj b/ElectronNET.API/ElectronNET.API.csproj index 520786a..2cb3799 100644 --- a/ElectronNET.API/ElectronNET.API.csproj +++ b/ElectronNET.API/ElectronNET.API.csproj @@ -43,8 +43,10 @@ This package contains the API to access the "native" electron API. - - + + + diff --git a/ElectronNET.API/SocketIO/ByteArrayConverter.cs b/ElectronNET.API/SocketIO/ByteArrayConverter.cs new file mode 100644 index 0000000..206b266 --- /dev/null +++ b/ElectronNET.API/SocketIO/ByteArrayConverter.cs @@ -0,0 +1,64 @@ +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace SocketIOClient.Newtonsoft.Json +{ + class ByteArrayConverter : JsonConverter + { + public ByteArrayConverter() + { + Bytes = new List(); + } + + internal List Bytes { get; } + + public override bool CanConvert(Type objectType) + { + return objectType == typeof(byte[]); + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, global::Newtonsoft.Json.JsonSerializer serializer) + { + byte[] bytes = null; + if (reader.TokenType == JsonToken.StartObject) + { + reader.Read(); + if (reader.TokenType == JsonToken.PropertyName && reader.Value?.ToString() == "_placeholder") + { + reader.Read(); + if (reader.TokenType == JsonToken.Boolean && (bool)reader.Value) + { + reader.Read(); + if (reader.TokenType == JsonToken.PropertyName && reader.Value?.ToString() == "num") + { + reader.Read(); + if (reader.Value != null) + { + if (int.TryParse(reader.Value.ToString(), out int num)) + { + bytes = Bytes[num]; + reader.Read(); + } + } + } + } + } + } + return bytes; + } + + public override void WriteJson(JsonWriter writer, object value, global::Newtonsoft.Json.JsonSerializer serializer) + { + var source = value as byte[]; + Bytes.Add(source.ToArray()); + writer.WriteStartObject(); + writer.WritePropertyName("_placeholder"); + writer.WriteValue(true); + writer.WritePropertyName("num"); + writer.WriteValue(Bytes.Count - 1); + writer.WriteEndObject(); + } + } +} diff --git a/ElectronNET.API/SocketIO/DisconnectReason.cs b/ElectronNET.API/SocketIO/DisconnectReason.cs new file mode 100644 index 0000000..ed66d95 --- /dev/null +++ b/ElectronNET.API/SocketIO/DisconnectReason.cs @@ -0,0 +1,11 @@ +namespace SocketIOClient +{ + public class DisconnectReason + { + public static string IOServerDisconnect = "io server disconnect"; + public static string IOClientDisconnect = "io client disconnect"; + public static string PingTimeout = "ping timeout"; + public static string TransportClose = "transport close"; + public static string TransportError = "transport error"; + } +} diff --git a/ElectronNET.API/SocketIO/EventHandlers.cs b/ElectronNET.API/SocketIO/EventHandlers.cs new file mode 100644 index 0000000..48d5f02 --- /dev/null +++ b/ElectronNET.API/SocketIO/EventHandlers.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; + +namespace SocketIOClient +{ + public delegate void OnAnyHandler(string eventName, SocketIOResponse response); + public delegate void OnOpenedHandler(string sid, int pingInterval, int pingTimeout); + //public delegate void OnDisconnectedHandler(string sid, int pingInterval, int pingTimeout); + public delegate void OnAck(int packetId, List array); + public delegate void OnBinaryAck(int packetId, int totalCount, List array); + public delegate void OnBinaryReceived(int packetId, int totalCount, string eventName, List array); + public delegate void OnDisconnected(); + public delegate void OnError(string error); + public delegate void OnEventReceived(int packetId, string eventName, List array); + public delegate void OnOpened(string sid, int pingInterval, int pingTimeout); + public delegate void OnPing(); + public delegate void OnPong(); +} diff --git a/ElectronNET.API/SocketIO/Extensions/CancellationTokenSourceExtensions.cs b/ElectronNET.API/SocketIO/Extensions/CancellationTokenSourceExtensions.cs new file mode 100644 index 0000000..ff5ae99 --- /dev/null +++ b/ElectronNET.API/SocketIO/Extensions/CancellationTokenSourceExtensions.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading; + +namespace SocketIOClient.Extensions +{ + internal static class CancellationTokenSourceExtensions + { + public static void TryDispose(this CancellationTokenSource cts) + { + cts?.Dispose(); + } + + public static void TryCancel(this CancellationTokenSource cts) + { + cts?.Cancel(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Extensions/DisposableExtensions.cs b/ElectronNET.API/SocketIO/Extensions/DisposableExtensions.cs new file mode 100644 index 0000000..a0f8722 --- /dev/null +++ b/ElectronNET.API/SocketIO/Extensions/DisposableExtensions.cs @@ -0,0 +1,12 @@ +using System; + +namespace SocketIOClient.Extensions +{ + internal static class DisposableExtensions + { + public static void TryDispose(this IDisposable disposable) + { + disposable?.Dispose(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Extensions/EventHandlerExtensions.cs b/ElectronNET.API/SocketIO/Extensions/EventHandlerExtensions.cs new file mode 100644 index 0000000..b05b8f3 --- /dev/null +++ b/ElectronNET.API/SocketIO/Extensions/EventHandlerExtensions.cs @@ -0,0 +1,17 @@ +using System; + +namespace SocketIOClient.Extensions +{ + internal static class EventHandlerExtensions + { + public static void TryInvoke(this EventHandler handler, object sender, T args) + { + handler?.Invoke(sender, args); + } + + public static void TryInvoke(this EventHandler handler, object sender, EventArgs args) + { + handler?.Invoke(sender, args); + } + } +} diff --git a/ElectronNET.API/SocketIO/Extensions/SocketIOEventExtensions.cs b/ElectronNET.API/SocketIO/Extensions/SocketIOEventExtensions.cs new file mode 100644 index 0000000..a03fadf --- /dev/null +++ b/ElectronNET.API/SocketIO/Extensions/SocketIOEventExtensions.cs @@ -0,0 +1,30 @@ +using System; + +namespace SocketIOClient.Extensions +{ + internal static class SocketIOEventExtensions + { + public static void TryInvoke(this OnAnyHandler handler, string eventName, SocketIOResponse response) + { + try + { + handler(eventName, response); + } + catch + { + // The exception is thrown by the user code, so it can be swallowed + } + } + public static void TryInvoke(this Action handler, SocketIOResponse response) + { + try + { + handler(response); + } + catch + { + // The exception is thrown by the user code, so it can be swallowed + } + } + } +} diff --git a/ElectronNET.API/SocketIO/JsonSerializer/ByteArrayConverter.cs b/ElectronNET.API/SocketIO/JsonSerializer/ByteArrayConverter.cs new file mode 100644 index 0000000..30bc704 --- /dev/null +++ b/ElectronNET.API/SocketIO/JsonSerializer/ByteArrayConverter.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace SocketIOClient.JsonSerializer +{ + class ByteArrayConverter : JsonConverter + { + public ByteArrayConverter() + { + Bytes = new List(); + } + + + public List Bytes { get; } + + public override byte[] Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + byte[] bytes = null; + if (reader.TokenType == JsonTokenType.StartObject) + { + reader.Read(); + if (reader.TokenType == JsonTokenType.PropertyName && reader.GetString() == "_placeholder") + { + reader.Read(); + if (reader.TokenType == JsonTokenType.True && reader.GetBoolean()) + { + reader.Read(); + if (reader.TokenType == JsonTokenType.PropertyName && reader.GetString() == "num") + { + reader.Read(); + int num = reader.GetInt32(); + bytes = Bytes[num]; + reader.Read(); + } + } + } + } + return bytes; + } + + public override void Write(Utf8JsonWriter writer, byte[] value, JsonSerializerOptions options) + { + Bytes.Add(value); + writer.WriteStartObject(); + writer.WritePropertyName("_placeholder"); + writer.WriteBooleanValue(true); + writer.WritePropertyName("num"); + writer.WriteNumberValue(Bytes.Count - 1); + writer.WriteEndObject(); + } + } +} diff --git a/ElectronNET.API/SocketIO/JsonSerializer/IJsonSerializer.cs b/ElectronNET.API/SocketIO/JsonSerializer/IJsonSerializer.cs new file mode 100644 index 0000000..8cfe4b8 --- /dev/null +++ b/ElectronNET.API/SocketIO/JsonSerializer/IJsonSerializer.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; + +namespace SocketIOClient.JsonSerializer +{ + public interface IJsonSerializer + { + JsonSerializeResult Serialize(object[] data); + T Deserialize(string json); + T Deserialize(string json, IList incomingBytes); + } +} diff --git a/ElectronNET.API/SocketIO/JsonSerializer/JsonSerializeResult.cs b/ElectronNET.API/SocketIO/JsonSerializer/JsonSerializeResult.cs new file mode 100644 index 0000000..c4bc1e8 --- /dev/null +++ b/ElectronNET.API/SocketIO/JsonSerializer/JsonSerializeResult.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; + +namespace SocketIOClient.JsonSerializer +{ + public class JsonSerializeResult + { + public string Json { get; set; } + public IList Bytes { get; set; } + } +} diff --git a/ElectronNET.API/SocketIO/JsonSerializer/SystemTextJsonSerializer.cs b/ElectronNET.API/SocketIO/JsonSerializer/SystemTextJsonSerializer.cs new file mode 100644 index 0000000..9a90981 --- /dev/null +++ b/ElectronNET.API/SocketIO/JsonSerializer/SystemTextJsonSerializer.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; + +namespace SocketIOClient.JsonSerializer +{ + public class SystemTextJsonSerializer : IJsonSerializer + { + public JsonSerializeResult Serialize(object[] data) + { + var converter = new ByteArrayConverter(); + var options = GetOptions(); + options.Converters.Add(converter); + string json = System.Text.Json.JsonSerializer.Serialize(data, options); + return new JsonSerializeResult + { + Json = json, + Bytes = converter.Bytes + }; + } + + public T Deserialize(string json) + { + var options = GetOptions(); + return System.Text.Json.JsonSerializer.Deserialize(json, options); + } + + public T Deserialize(string json, IList bytes) + { + var options = GetOptions(); + var converter = new ByteArrayConverter(); + options.Converters.Add(converter); + converter.Bytes.AddRange(bytes); + return System.Text.Json.JsonSerializer.Deserialize(json, options); + } + + private JsonSerializerOptions GetOptions() + { + JsonSerializerOptions options = null; + if (OptionsProvider != null) + { + options = OptionsProvider(); + } + if (options == null) + { + options = new JsonSerializerOptions(); + } + return options; + } + + public Func OptionsProvider { get; set; } + } +} diff --git a/ElectronNET.API/SocketIO/LICENSE b/ElectronNET.API/SocketIO/LICENSE new file mode 100644 index 0000000..c0584dd --- /dev/null +++ b/ElectronNET.API/SocketIO/LICENSE @@ -0,0 +1,23 @@ +Code from https://github.com/doghappy/socket.io-client-csharp + +MIT License + +Copyright (c) 2019 HeroWong + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/ElectronNET.API/SocketIO/Messages/BinaryMessage.cs b/ElectronNET.API/SocketIO/Messages/BinaryMessage.cs new file mode 100644 index 0000000..df2a7c0 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/BinaryMessage.cs @@ -0,0 +1,100 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; + +namespace SocketIOClient.Messages +{ + public class BinaryMessage : IMessage + { + public MessageType Type => MessageType.BinaryMessage; + + public string Namespace { get; set; } + + public string Event { get; set; } + + public int Id { get; set; } + + public List JsonElements { get; set; } + + public string Json { get; set; } + + public int BinaryCount { get; set; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public void Read(string msg) + { + int index1 = msg.IndexOf('-'); + BinaryCount = int.Parse(msg.Substring(0, index1)); + + int index2 = msg.IndexOf('['); + + int index3 = msg.LastIndexOf(',', index2); + if (index3 > -1) + { + Namespace = msg.Substring(index1 + 1, index3 - index1 - 1); + int idLength = index2 - index3 - 1; + if (idLength > 0) + { + Id = int.Parse(msg.Substring(index3 + 1, idLength)); + } + } + else + { + int idLength = index2 - index1 - 1; + if (idLength > 0) + { + Id = int.Parse(msg.Substring(index1 + 1, idLength)); + } + } + + string json = msg.Substring(index2); + + var array = JsonDocument.Parse(json).RootElement.EnumerateArray(); + int i = -1; + foreach (var item in array) + { + i++; + if (i == 0) + { + Event = item.GetString(); + JsonElements = new List(); + } + else + { + JsonElements.Add(item); + } + } + } + + public string Write() + { + var builder = new StringBuilder(); + builder + .Append("45") + .Append(OutgoingBytes.Count) + .Append('-'); + if (!string.IsNullOrEmpty(Namespace)) + { + builder.Append(Namespace).Append(','); + } + if (string.IsNullOrEmpty(Json)) + { + builder.Append("[\"").Append(Event).Append("\"]"); + } + else + { + string data = Json.Insert(1, $"\"{Event}\","); + builder.Append(data); + } + return builder.ToString(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/ClientAckMessage.cs b/ElectronNET.API/SocketIO/Messages/ClientAckMessage.cs new file mode 100644 index 0000000..2b4dcda --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/ClientAckMessage.cs @@ -0,0 +1,75 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; + +namespace SocketIOClient.Messages +{ + /// + /// The server calls the client's callback + /// + public class ClientAckMessage : IMessage + { + public MessageType Type => MessageType.AckMessage; + + public string Namespace { get; set; } + + public string Event { get; set; } + + public List JsonElements { get; set; } + + public string Json { get; set; } + + public int Id { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public void Read(string msg) + { + int index = msg.IndexOf('['); + int lastIndex = msg.LastIndexOf(',', index); + if (lastIndex > -1) + { + string text = msg.Substring(0, index); + Namespace = text.Substring(0, lastIndex); + Id = int.Parse(text.Substring(lastIndex + 1)); + } + else + { + Id = int.Parse(msg.Substring(0, index)); + } + msg = msg.Substring(index); + JsonElements = JsonDocument.Parse(msg).RootElement.EnumerateArray().ToList(); + } + + public string Write() + { + var builder = new StringBuilder(); + builder.Append("42"); + if (!string.IsNullOrEmpty(Namespace)) + { + builder.Append(Namespace).Append(','); + } + builder.Append(Id); + if (string.IsNullOrEmpty(Json)) + { + builder.Append("[\"").Append(Event).Append("\"]"); + } + else + { + string data = Json.Insert(1, $"\"{Event}\","); + builder.Append(data); + } + return builder.ToString(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/ClientBinaryAckMessage.cs b/ElectronNET.API/SocketIO/Messages/ClientBinaryAckMessage.cs new file mode 100644 index 0000000..f151268 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/ClientBinaryAckMessage.cs @@ -0,0 +1,82 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; + +namespace SocketIOClient.Messages +{ + /// + /// The server calls the client's callback with binary + /// + public class ClientBinaryAckMessage : IMessage + { + public MessageType Type => MessageType.BinaryAckMessage; + + public string Namespace { get; set; } + + public string Event { get; set; } + + public List JsonElements { get; set; } + + public string Json { get; set; } + + public int Id { get; set; } + + public int BinaryCount { get; set; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public void Read(string msg) + { + int index1 = msg.IndexOf('-'); + BinaryCount = int.Parse(msg.Substring(0, index1)); + + int index2 = msg.IndexOf('['); + + int index3 = msg.LastIndexOf(',', index2); + if (index3 > -1) + { + Namespace = msg.Substring(index1 + 1, index3 - index1 - 1); + Id = int.Parse(msg.Substring(index3 + 1, index2 - index3 - 1)); + } + else + { + Id = int.Parse(msg.Substring(index1 + 1, index2 - index1 - 1)); + } + + string json = msg.Substring(index2); + JsonElements = JsonDocument.Parse(json).RootElement.EnumerateArray().ToList(); + } + + public string Write() + { + var builder = new StringBuilder(); + builder + .Append("45") + .Append(OutgoingBytes.Count) + .Append('-'); + if (!string.IsNullOrEmpty(Namespace)) + { + builder.Append(Namespace).Append(','); + } + builder.Append(Id); + if (string.IsNullOrEmpty(Json)) + { + builder.Append("[\"").Append(Event).Append("\"]"); + } + else + { + string data = Json.Insert(1, $"\"{Event}\","); + builder.Append(data); + } + return builder.ToString(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/ConnectedMessage.cs b/ElectronNET.API/SocketIO/Messages/ConnectedMessage.cs new file mode 100644 index 0000000..0663ad5 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/ConnectedMessage.cs @@ -0,0 +1,128 @@ +using System; +using SocketIOClient.Transport; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; + +namespace SocketIOClient.Messages +{ + public class ConnectedMessage : IMessage + { + public MessageType Type => MessageType.Connected; + + public string Namespace { get; set; } + + public string Sid { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public IEnumerable> Query { get; set; } + public string AuthJsonStr { get; set; } + + public void Read(string msg) + { + if (Eio == 3) + { + Eio3Read(msg); + } + else + { + Eio4Read(msg); + } + } + + public string Write() + { + if (Eio == 3) + { + return Eio3Write(); + } + return Eio4Write(); + } + + public void Eio4Read(string msg) + { + int index = msg.IndexOf('{'); + if (index > 0) + { + Namespace = msg.Substring(0, index - 1); + msg = msg.Substring(index); + } + else + { + Namespace = string.Empty; + } + Sid = JsonDocument.Parse(msg).RootElement.GetProperty("sid").GetString(); + } + + public string Eio4Write() + { + var builder = new StringBuilder("40"); + if (!string.IsNullOrEmpty(Namespace)) + { + builder.Append(Namespace).Append(','); + } + builder.Append(AuthJsonStr); + return builder.ToString(); + } + + public void Eio3Read(string msg) + { + if (msg.Length >= 2) + { + int startIndex = msg.IndexOf('/'); + if (startIndex == -1) + { + return; + } + int endIndex = msg.IndexOf('?', startIndex); + if (endIndex == -1) + { + endIndex = msg.IndexOf(',', startIndex); + } + if (endIndex == -1) + { + endIndex = msg.Length; + } + Namespace = msg.Substring(startIndex, endIndex); + } + } + + public string Eio3Write() + { + if (string.IsNullOrEmpty(Namespace)) + { + return string.Empty; + } + var builder = new StringBuilder("40"); + builder.Append(Namespace); + if (Query != null) + { + int i = -1; + foreach (var item in Query) + { + i++; + if (i == 0) + { + builder.Append('?'); + } + else + { + builder.Append('&'); + } + builder.Append(item.Key).Append('=').Append(item.Value); + } + } + builder.Append(','); + return builder.ToString(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/DisconnectedMessage.cs b/ElectronNET.API/SocketIO/Messages/DisconnectedMessage.cs new file mode 100644 index 0000000..4bceba9 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/DisconnectedMessage.cs @@ -0,0 +1,36 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; + +namespace SocketIOClient.Messages +{ + public class DisconnectedMessage : IMessage + { + public MessageType Type => MessageType.Disconnected; + + public string Namespace { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public void Read(string msg) + { + Namespace = msg.TrimEnd(','); + } + + public string Write() + { + if (string.IsNullOrEmpty(Namespace)) + { + return "41"; + } + return "41" + Namespace + ","; + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/ErrorMessage.cs b/ElectronNET.API/SocketIO/Messages/ErrorMessage.cs new file mode 100644 index 0000000..f36d78b --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/ErrorMessage.cs @@ -0,0 +1,50 @@ +using SocketIOClient.Transport; +using System; +using System.Collections.Generic; +using System.Text.Json; + +namespace SocketIOClient.Messages +{ + public class ErrorMessage : IMessage + { + public MessageType Type => MessageType.ErrorMessage; + + public string Message { get; set; } + + public string Namespace { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public void Read(string msg) + { + if (Eio == 3) + { + Message = msg.Trim('"'); + } + else + { + int index = msg.IndexOf('{'); + if (index > 0) + { + Namespace = msg.Substring(0, index - 1); + msg = msg.Substring(index); + } + var doc = JsonDocument.Parse(msg); + Message = doc.RootElement.GetProperty("message").GetString(); + } + } + + public string Write() + { + throw new NotImplementedException(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/EventMessage.cs b/ElectronNET.API/SocketIO/Messages/EventMessage.cs new file mode 100644 index 0000000..522bd20 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/EventMessage.cs @@ -0,0 +1,97 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; + +namespace SocketIOClient.Messages +{ + public class EventMessage : IMessage + { + public MessageType Type => MessageType.EventMessage; + + public string Namespace { get; set; } + + public string Event { get; set; } + + public int Id { get; set; } + + public List JsonElements { get; set; } + + public string Json { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public void Read(string msg) + { + int index = msg.IndexOf('['); + int lastIndex = msg.LastIndexOf(',', index); + if (lastIndex > -1) + { + string text = msg.Substring(0, index); + Namespace = text.Substring(0, lastIndex); + if (index - lastIndex > 1) + { + Id = int.Parse(text.Substring(lastIndex + 1)); + } + } + else + { + if (index > 0) + { + Id = int.Parse(msg.Substring(0, index)); + } + } + msg = msg.Substring(index); + + //int index = msg.IndexOf('['); + //if (index > 0) + //{ + // Namespace = msg.Substring(0, index - 1); + // msg = msg.Substring(index); + //} + var array = JsonDocument.Parse(msg).RootElement.EnumerateArray(); + int i = -1; + foreach (var item in array) + { + i++; + if (i == 0) + { + Event = item.GetString(); + JsonElements = new List(); + } + else + { + JsonElements.Add(item); + } + } + } + + public string Write() + { + var builder = new StringBuilder(); + builder.Append("42"); + if (!string.IsNullOrEmpty(Namespace)) + { + builder.Append(Namespace).Append(','); + } + if (string.IsNullOrEmpty(Json)) + { + builder.Append("[\"").Append(Event).Append("\"]"); + } + else + { + string data = Json.Insert(1, $"\"{Event}\","); + builder.Append(data); + } + return builder.ToString(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/IMessage.cs b/ElectronNET.API/SocketIO/Messages/IMessage.cs new file mode 100644 index 0000000..c7f0e25 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/IMessage.cs @@ -0,0 +1,30 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; + +namespace SocketIOClient.Messages +{ + public interface IMessage + { + MessageType Type { get; } + + List OutgoingBytes { get; set; } + + List IncomingBytes { get; set; } + + int BinaryCount { get; } + + int Eio { get; set; } + + TransportProtocol Protocol { get; set; } + + void Read(string msg); + + //void Eio3WsRead(string msg); + + //void Eio3HttpRead(string msg); + + string Write(); + + //string Eio3WsWrite(); + } +} diff --git a/ElectronNET.API/SocketIO/Messages/MessageFactory.cs b/ElectronNET.API/SocketIO/Messages/MessageFactory.cs new file mode 100644 index 0000000..227a6f7 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/MessageFactory.cs @@ -0,0 +1,75 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace SocketIOClient.Messages +{ + public static class MessageFactory + { + private static IMessage CreateMessage(MessageType type) + { + switch (type) + { + case MessageType.Opened: + return new OpenedMessage(); + case MessageType.Ping: + return new PingMessage(); + case MessageType.Pong: + return new PongMessage(); + case MessageType.Connected: + return new ConnectedMessage(); + case MessageType.Disconnected: + return new DisconnectedMessage(); + case MessageType.EventMessage: + return new EventMessage(); + case MessageType.AckMessage: + return new ClientAckMessage(); + case MessageType.ErrorMessage: + return new ErrorMessage(); + case MessageType.BinaryMessage: + return new BinaryMessage(); + case MessageType.BinaryAckMessage: + return new ClientBinaryAckMessage(); + } + return null; + } + + private static readonly Dictionary _messageTypes = Enum.GetValues().ToDictionary(v => ((int)v).ToString(), v => v); + + public static IMessage CreateMessage(int eio, string msg) + { + foreach (var (prefix,item) in _messageTypes) + { + if (msg.StartsWith(prefix)) + { + IMessage result = CreateMessage(item); + if (result != null) + { + result.Eio = eio; + result.Read(msg.Substring(prefix.Length)); + return result; + } + } + } + return null; + } + + public static OpenedMessage CreateOpenedMessage(string msg) + { + var openedMessage = new OpenedMessage(); + if (msg[0] == '0') + { + openedMessage.Eio = 4; + openedMessage.Read(msg.Substring(1)); + } + else + { + openedMessage.Eio = 3; + int index = msg.IndexOf(':'); + openedMessage.Read(msg.Substring(index + 2)); + } + return openedMessage; + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/MessageType.cs b/ElectronNET.API/SocketIO/Messages/MessageType.cs new file mode 100644 index 0000000..345c98f --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/MessageType.cs @@ -0,0 +1,16 @@ +namespace SocketIOClient.Messages +{ + public enum MessageType + { + Opened, + Ping = 2, + Pong, + Connected = 40, + Disconnected, + EventMessage, + AckMessage, + ErrorMessage, + BinaryMessage, + BinaryAckMessage + } +} diff --git a/ElectronNET.API/SocketIO/Messages/OpenedMessage.cs b/ElectronNET.API/SocketIO/Messages/OpenedMessage.cs new file mode 100644 index 0000000..df297ea --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/OpenedMessage.cs @@ -0,0 +1,79 @@ +using System; +using System.Text.Json; +using System.Collections.Generic; +using SocketIOClient.Transport; + +namespace SocketIOClient.Messages +{ + public class OpenedMessage : IMessage + { + public MessageType Type => MessageType.Opened; + + public string Sid { get; set; } + + public string Namespace { get; set; } + + public List Upgrades { get; private set; } + + public int PingInterval { get; private set; } + + public int PingTimeout { get; private set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + private int GetInt32FromJsonElement(JsonElement element, string msg, string name) + { + var p = element.GetProperty(name); + int val; + switch (p.ValueKind) + { + case JsonValueKind.String: + val = int.Parse(p.GetString()); + break; + case JsonValueKind.Number: + val = p.GetInt32(); + break; + default: + throw new ArgumentException($"Invalid message: '{msg}'"); + } + return val; + } + + public void Read(string msg) + { + var doc = JsonDocument.Parse(msg); + var root = doc.RootElement; + Sid = root.GetProperty("sid").GetString(); + + PingInterval = GetInt32FromJsonElement(root, msg, "pingInterval"); + PingTimeout = GetInt32FromJsonElement(root, msg, "pingTimeout"); + + Upgrades = new List(); + var upgrades = root.GetProperty("upgrades").EnumerateArray(); + foreach (var item in upgrades) + { + Upgrades.Add(item.GetString()); + } + } + + public string Write() + { + //var builder = new StringBuilder(); + //builder.Append("40"); + //if (!string.IsNullOrEmpty(Namespace)) + //{ + // builder.Append(Namespace).Append(','); + //} + //return builder.ToString(); + throw new NotImplementedException(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/PingMessage.cs b/ElectronNET.API/SocketIO/Messages/PingMessage.cs new file mode 100644 index 0000000..fa2b134 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/PingMessage.cs @@ -0,0 +1,26 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; + +namespace SocketIOClient.Messages +{ + public class PingMessage : IMessage + { + public MessageType Type => MessageType.Ping; + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public void Read(string msg) + { + } + + public string Write() => "2"; + } +} diff --git a/ElectronNET.API/SocketIO/Messages/PongMessage.cs b/ElectronNET.API/SocketIO/Messages/PongMessage.cs new file mode 100644 index 0000000..fb4ccfa --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/PongMessage.cs @@ -0,0 +1,29 @@ +using SocketIOClient.Transport; +using System; +using System.Collections.Generic; + +namespace SocketIOClient.Messages +{ + public class PongMessage : IMessage + { + public MessageType Type => MessageType.Pong; + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public TimeSpan Duration { get; set; } + + public void Read(string msg) + { + } + + public string Write() => "3"; + } +} diff --git a/ElectronNET.API/SocketIO/Messages/ServerAckMessage.cs b/ElectronNET.API/SocketIO/Messages/ServerAckMessage.cs new file mode 100644 index 0000000..ce8ce1a --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/ServerAckMessage.cs @@ -0,0 +1,54 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; +using System.Text; + +namespace SocketIOClient.Messages +{ + /// + /// The client calls the server's callback + /// + public class ServerAckMessage : IMessage + { + public MessageType Type => MessageType.AckMessage; + + public string Namespace { get; set; } + + public string Json { get; set; } + + public int Id { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public void Read(string msg) + { + } + + public string Write() + { + var builder = new StringBuilder(); + builder.Append("43"); + if (!string.IsNullOrEmpty(Namespace)) + { + builder.Append(Namespace).Append(','); + } + builder.Append(Id); + if (string.IsNullOrEmpty(Json)) + { + builder.Append("[]"); + } + else + { + builder.Append(Json); + } + return builder.ToString(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Messages/ServerBinaryAckMessage.cs b/ElectronNET.API/SocketIO/Messages/ServerBinaryAckMessage.cs new file mode 100644 index 0000000..199f309 --- /dev/null +++ b/ElectronNET.API/SocketIO/Messages/ServerBinaryAckMessage.cs @@ -0,0 +1,60 @@ +using SocketIOClient.Transport; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; + +namespace SocketIOClient.Messages +{ + /// + /// The client calls the server's callback with binary + /// + public class ServerBinaryAckMessage : IMessage + { + public MessageType Type => MessageType.BinaryAckMessage; + + public string Namespace { get; set; } + + public List JsonElements { get; set; } + + public string Json { get; set; } + + public int Id { get; set; } + + public int BinaryCount { get; } + + public int Eio { get; set; } + + public TransportProtocol Protocol { get; set; } + + public List OutgoingBytes { get; set; } + + public List IncomingBytes { get; set; } + + public void Read(string msg) + { + } + + public string Write() + { + var builder = new StringBuilder(); + builder + .Append("46") + .Append(OutgoingBytes.Count) + .Append('-'); + if (!string.IsNullOrEmpty(Namespace)) + { + builder.Append(Namespace).Append(','); + } + builder.Append(Id); + if (string.IsNullOrEmpty(Json)) + { + builder.Append("[]"); + } + else + { + builder.Append(Json); + } + return builder.ToString(); + } + } +} diff --git a/ElectronNET.API/SocketIO/NewtonsoftJsonSerializer.cs b/ElectronNET.API/SocketIO/NewtonsoftJsonSerializer.cs new file mode 100644 index 0000000..9691ffc --- /dev/null +++ b/ElectronNET.API/SocketIO/NewtonsoftJsonSerializer.cs @@ -0,0 +1,56 @@ +using System; +using Newtonsoft.Json; +using SocketIOClient.JsonSerializer; +using System.Collections.Generic; + +namespace SocketIOClient.Newtonsoft.Json +{ + public class NewtonsoftJsonSerializer : IJsonSerializer + { + public Func JsonSerializerOptions { get; } + + public JsonSerializeResult Serialize(object[] data) + { + var converter = new ByteArrayConverter(); + var settings = GetOptions(); + settings.Converters.Add(converter); + string json = JsonConvert.SerializeObject(data, settings); + return new JsonSerializeResult + { + Json = json, + Bytes = converter.Bytes + }; + } + + public T Deserialize(string json) + { + var settings = GetOptions(); + return JsonConvert.DeserializeObject(json, settings); + } + + public T Deserialize(string json, IList bytes) + { + var converter = new ByteArrayConverter(); + converter.Bytes.AddRange(bytes); + var settings = GetOptions(); + settings.Converters.Add(converter); + return JsonConvert.DeserializeObject(json, settings); + } + + private JsonSerializerSettings GetOptions() + { + JsonSerializerSettings options = null; + if (OptionsProvider != null) + { + options = OptionsProvider(); + } + if (options == null) + { + options = new JsonSerializerSettings(); + } + return options; + } + + public Func OptionsProvider { get; set; } + } +} diff --git a/ElectronNET.API/SocketIO/SocketIO.cs b/ElectronNET.API/SocketIO/SocketIO.cs new file mode 100644 index 0000000..edf2f1c --- /dev/null +++ b/ElectronNET.API/SocketIO/SocketIO.cs @@ -0,0 +1,769 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net.Http; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using SocketIOClient.Extensions; +using SocketIOClient.JsonSerializer; +using SocketIOClient.Messages; +using SocketIOClient.Transport; +using SocketIOClient.UriConverters; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace SocketIOClient +{ + /// + /// socket.io client class + /// + public class SocketIO : IDisposable + { + /// + /// Create SocketIO object with default options + /// + /// + public SocketIO(string uri) : this(new Uri(uri)) { } + + /// + /// Create SocketIO object with options + /// + /// + public SocketIO(Uri uri) : this(uri, new SocketIOOptions()) { } + + /// + /// Create SocketIO object with options + /// + /// + /// + public SocketIO(string uri, SocketIOOptions options) : this(new Uri(uri), options) { } + + /// + /// Create SocketIO object with options + /// + /// + /// + public SocketIO(Uri uri, SocketIOOptions options) + { + ServerUri = uri ?? throw new ArgumentNullException("uri"); + Options = options ?? throw new ArgumentNullException("options"); + Initialize(); + } + + Uri _serverUri; + private Uri ServerUri + { + get => _serverUri; + set + { + if (_serverUri != value) + { + _serverUri = value; + if (value != null && value.AbsolutePath != "/") + { + _namespace = value.AbsolutePath; + } + } + } + } + + /// + /// An unique identifier for the socket session. Set after the connect event is triggered, and updated after the reconnect event. + /// + public string Id { get; private set; } + + string _namespace; + + /// + /// Whether or not the socket is connected to the server. + /// + public bool Connected { get; private set; } + + int _attempts; + + [Obsolete] + /// + /// Whether or not the socket is disconnected from the server. + /// + public bool Disconnected => !Connected; + + public SocketIOOptions Options { get; } + + public IJsonSerializer JsonSerializer { get; set; } + + public IUriConverter UriConverter { get; set; } + + internal ILogger Logger { get; set; } + + ILoggerFactory _loggerFactory; + public ILoggerFactory LoggerFactory + { + get => _loggerFactory; + set + { + _loggerFactory = value ?? throw new ArgumentNullException(nameof(LoggerFactory)); + Logger = _loggerFactory.CreateLogger(); + } + } + + public HttpClient HttpClient { get; set; } + + public Func ClientWebSocketProvider { get; set; } + private IClientWebSocket _clientWebsocket; + + BaseTransport _transport; + + List _expectedExceptions; + + int _packetId; + bool _isConnectCoreRunning; + Uri _realServerUri; + Exception _connectCoreException; + Dictionary> _ackHandlers; + List _onAnyHandlers; + Dictionary> _eventHandlers; + CancellationTokenSource _connectionTokenSource; + double _reconnectionDelay; + bool _hasError; + bool _isFaild; + readonly static object _connectionLock = new object(); + + #region Socket.IO event + public event EventHandler OnConnected; + //public event EventHandler OnConnectError; + //public event EventHandler OnConnectTimeout; + public event EventHandler OnError; + public event EventHandler OnDisconnected; + + /// + /// Fired upon a successful reconnection. + /// + public event EventHandler OnReconnected; + + /// + /// Fired upon an attempt to reconnect. + /// + public event EventHandler OnReconnectAttempt; + + /// + /// Fired upon a reconnection attempt error. + /// + public event EventHandler OnReconnectError; + + /// + /// Fired when couldn’t reconnect within reconnectionAttempts + /// + public event EventHandler OnReconnectFailed; + public event EventHandler OnPing; + public event EventHandler OnPong; + + #endregion + + #region Observable Event + //Subject _onConnected; + //public IObservable ConnectedObservable { get; private set; } + #endregion + + private void Initialize() + { + _packetId = -1; + _ackHandlers = new Dictionary>(); + _eventHandlers = new Dictionary>(); + _onAnyHandlers = new List(); + + JsonSerializer = new SystemTextJsonSerializer(); + UriConverter = new UriConverter(); + + HttpClient = new HttpClient(); + ClientWebSocketProvider = () => new SystemNetWebSocketsClientWebSocket(Options.EIO); + _expectedExceptions = new List + { + typeof(TimeoutException), + typeof(WebSocketException), + typeof(HttpRequestException), + typeof(OperationCanceledException), + typeof(TaskCanceledException) + }; + LoggerFactory = NullLoggerFactory.Instance; + } + + private async Task CreateTransportAsync() + { + Options.Transport = await GetProtocolAsync(); + if (Options.Transport == TransportProtocol.Polling) + { + HttpPollingHandler handler; + if (Options.EIO == 3) + handler = new Eio3HttpPollingHandler(HttpClient); + else + handler = new Eio4HttpPollingHandler(HttpClient); + _transport = new HttpTransport(HttpClient, handler, Options, JsonSerializer, Logger); + } + else + { + _clientWebsocket = ClientWebSocketProvider(); + _transport = new WebSocketTransport(_clientWebsocket, Options, JsonSerializer, Logger); + } + _transport.Namespace = _namespace; + SetHeaders(); + } + + private void SetHeaders() + { + if (Options.ExtraHeaders != null) + { + foreach (var item in Options.ExtraHeaders) + { + _transport.AddHeader(item.Key, item.Value); + } + } + } + + private void SyncExceptionToMain(Exception e) + { + _connectCoreException = e; + _isConnectCoreRunning = false; + } + + private void ConnectCore() + { + DisposeForReconnect(); + _reconnectionDelay = Options.ReconnectionDelay; + _connectionTokenSource = new CancellationTokenSource(); + var cct = _connectionTokenSource.Token; + Task.Factory.StartNew(async () => + { + while (true) + { + _clientWebsocket.TryDispose(); + _transport.TryDispose(); + CreateTransportAsync().Wait(); + _realServerUri = UriConverter.GetServerUri(Options.Transport == TransportProtocol.WebSocket, ServerUri, Options.EIO, Options.Path, Options.Query); + try + { + if (cct.IsCancellationRequested) + break; + if (_attempts > 0) + OnReconnectAttempt.TryInvoke(this, _attempts); + var timeoutCts = new CancellationTokenSource(Options.ConnectionTimeout); + _transport.Subscribe(OnMessageReceived, OnErrorReceived); + await _transport.ConnectAsync(_realServerUri, timeoutCts.Token).ConfigureAwait(false); + break; + } + catch (Exception e) + { + if (_expectedExceptions.Contains(e.GetType())) + { + if (!Options.Reconnection) + { + SyncExceptionToMain(e); + throw; + } + if (_attempts > 0) + { + OnReconnectError.TryInvoke(this, e); + } + _attempts++; + if (_attempts <= Options.ReconnectionAttempts) + { + if (_reconnectionDelay < Options.ReconnectionDelayMax) + { + _reconnectionDelay += 2 * Options.RandomizationFactor; + } + if (_reconnectionDelay > Options.ReconnectionDelayMax) + { + _reconnectionDelay = Options.ReconnectionDelayMax; + } + Thread.Sleep((int)_reconnectionDelay); + } + else + { + _isFaild = true; + OnReconnectFailed.TryInvoke(this, EventArgs.Empty); + break; + } + } + else + { + SyncExceptionToMain(e); + throw; + } + } + } + _isConnectCoreRunning = false; + }); + } + + private async Task GetProtocolAsync() + { + if (Options.Transport == TransportProtocol.Polling && Options.AutoUpgrade) + { + Uri uri = UriConverter.GetServerUri(false, ServerUri, Options.EIO, Options.Path, Options.Query); + try + { + string text = await HttpClient.GetStringAsync(uri); + if (text.Contains("websocket")) + { + return TransportProtocol.WebSocket; + } + } + catch (Exception e) + { + Logger.LogWarning(e, e.Message); + } + } + return Options.Transport; + } + + public async Task ConnectAsync() + { + if (Connected || _isConnectCoreRunning) + return; + + lock (_connectionLock) + { + if (_isConnectCoreRunning) + return; + _isConnectCoreRunning = true; + } + ConnectCore(); + while (_isConnectCoreRunning) + { + await Task.Delay(100); + } + if (_connectCoreException != null) + { + Logger.LogError(_connectCoreException, _connectCoreException.Message); + throw _connectCoreException; + } + int ms = 0; + while (!Connected) + { + if (_hasError) + { + Logger.LogWarning($"Got a connection error, try to use '{nameof(OnError)}' to detect it."); + break; + } + if (_isFaild) + { + Logger.LogWarning($"Reconnect failed, try to use '{nameof(OnReconnectFailed)}' to detect it."); + break; + } + ms += 100; + if (ms > Options.ConnectionTimeout.TotalMilliseconds) + { + throw new TimeoutException(); + } + await Task.Delay(100); + } + } + + private void PingHandler() + { + OnPing.TryInvoke(this, EventArgs.Empty); + } + + private void PongHandler(PongMessage msg) + { + OnPong.TryInvoke(this, msg.Duration); + } + + private void ConnectedHandler(ConnectedMessage msg) + { + Id = msg.Sid; + Connected = true; + OnConnected.TryInvoke(this, EventArgs.Empty); + if (_attempts > 0) + { + OnReconnected.TryInvoke(this, _attempts); + } + _attempts = 0; + } + + private void DisconnectedHandler() + { + _ = InvokeDisconnect(DisconnectReason.IOServerDisconnect); + } + + private void EventMessageHandler(EventMessage m) + { + var res = new SocketIOResponse(m.JsonElements, this) + { + PacketId = m.Id + }; + foreach (var item in _onAnyHandlers) + { + item.TryInvoke(m.Event, res); + } + if (_eventHandlers.ContainsKey(m.Event)) + { + _eventHandlers[m.Event].TryInvoke(res); + } + } + + private void AckMessageHandler(ClientAckMessage m) + { + if (_ackHandlers.ContainsKey(m.Id)) + { + var res = new SocketIOResponse(m.JsonElements, this); + _ackHandlers[m.Id].TryInvoke(res); + _ackHandlers.Remove(m.Id); + } + } + + private void ErrorMessageHandler(ErrorMessage msg) + { + _hasError = true; + OnError.TryInvoke(this, msg.Message); + } + + private void BinaryMessageHandler(BinaryMessage msg) + { + var response = new SocketIOResponse(msg.JsonElements, this) + { + PacketId = msg.Id, + }; + response.InComingBytes.AddRange(msg.IncomingBytes); + foreach (var item in _onAnyHandlers) + { + item.TryInvoke(msg.Event, response); + } + if (_eventHandlers.ContainsKey(msg.Event)) + { + _eventHandlers[msg.Event].TryInvoke(response); + } + } + + private void BinaryAckMessageHandler(ClientBinaryAckMessage msg) + { + if (_ackHandlers.ContainsKey(msg.Id)) + { + var response = new SocketIOResponse(msg.JsonElements, this) + { + PacketId = msg.Id, + }; + response.InComingBytes.AddRange(msg.IncomingBytes); + _ackHandlers[msg.Id].TryInvoke(response); + } + } + + private void OnErrorReceived(Exception ex) + { + Logger.LogError(ex, ex.Message); + _ = InvokeDisconnect(DisconnectReason.TransportClose); + } + + private void OnMessageReceived(IMessage msg) + { + try + { + switch (msg.Type) + { + case MessageType.Ping: + PingHandler(); + break; + case MessageType.Pong: + PongHandler(msg as PongMessage); + break; + case MessageType.Connected: + ConnectedHandler(msg as ConnectedMessage); + break; + case MessageType.Disconnected: + DisconnectedHandler(); + break; + case MessageType.EventMessage: + EventMessageHandler(msg as EventMessage); + break; + case MessageType.AckMessage: + AckMessageHandler(msg as ClientAckMessage); + break; + case MessageType.ErrorMessage: + ErrorMessageHandler(msg as ErrorMessage); + break; + case MessageType.BinaryMessage: + BinaryMessageHandler(msg as BinaryMessage); + break; + case MessageType.BinaryAckMessage: + BinaryAckMessageHandler(msg as ClientBinaryAckMessage); + break; + } + } + catch (Exception e) + { + Logger.LogError(e, e.Message); + } + } + + public async Task DisconnectAsync() + { + if (Connected) + { + var msg = new DisconnectedMessage + { + Namespace = _namespace + }; + try + { + await _transport.SendAsync(msg, CancellationToken.None).ConfigureAwait(false); + } + catch (Exception e) + { + Logger.LogError(e, e.Message); + } + await InvokeDisconnect(DisconnectReason.IOClientDisconnect); + } + } + + /// + /// Register a new handler for the given event. + /// + /// + /// + public void On(string eventName, Action callback) + { + if (_eventHandlers.ContainsKey(eventName)) + { + _eventHandlers.Remove(eventName); + } + _eventHandlers.Add(eventName, callback); + } + + + + /// + /// Unregister a new handler for the given event. + /// + /// + public void Off(string eventName) + { + if (_eventHandlers.ContainsKey(eventName)) + { + _eventHandlers.Remove(eventName); + } + } + + public void OnAny(OnAnyHandler handler) + { + if (handler != null) + { + _onAnyHandlers.Add(handler); + } + } + + public void PrependAny(OnAnyHandler handler) + { + if (handler != null) + { + _onAnyHandlers.Insert(0, handler); + } + } + + public void OffAny(OnAnyHandler handler) + { + if (handler != null) + { + _onAnyHandlers.Remove(handler); + } + } + + public OnAnyHandler[] ListenersAny() => _onAnyHandlers.ToArray(); + + internal async Task ClientAckAsync(int packetId, CancellationToken cancellationToken, params object[] data) + { + IMessage msg; + if (data != null && data.Length > 0) + { + var result = JsonSerializer.Serialize(data); + if (result.Bytes.Count > 0) + { + msg = new ServerBinaryAckMessage + { + Id = packetId, + Namespace = _namespace, + Json = result.Json + }; + msg.OutgoingBytes = new List(result.Bytes); + } + else + { + msg = new ServerAckMessage + { + Namespace = _namespace, + Id = packetId, + Json = result.Json + }; + } + } + else + { + msg = new ServerAckMessage + { + Namespace = _namespace, + Id = packetId + }; + } + await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); + } + + /// + /// Emits an event to the socket + /// + /// + /// Any other parameters can be included. All serializable datastructures are supported, including byte[] + /// + public async Task EmitAsync(string eventName, params object[] data) + { + await EmitAsync(eventName, CancellationToken.None, data).ConfigureAwait(false); + } + + public async Task EmitAsync(string eventName, CancellationToken cancellationToken, params object[] data) + { + if (data != null && data.Length > 0) + { + var result = JsonSerializer.Serialize(data); + if (result.Bytes.Count > 0) + { + var msg = new BinaryMessage + { + Namespace = _namespace, + OutgoingBytes = new List(result.Bytes), + Event = eventName, + Json = result.Json + }; + await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); + } + else + { + var msg = new EventMessage + { + Namespace = _namespace, + Event = eventName, + Json = result.Json + }; + await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); + } + } + else + { + var msg = new EventMessage + { + Namespace = _namespace, + Event = eventName + }; + await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); + } + } + + /// + /// Emits an event to the socket + /// + /// + /// will be called with the server answer. + /// Any other parameters can be included. All serializable datastructures are supported, including byte[] + /// + public async Task EmitAsync(string eventName, Action ack, params object[] data) + { + await EmitAsync(eventName, CancellationToken.None, ack, data).ConfigureAwait(false); + } + + public async Task EmitAsync(string eventName, CancellationToken cancellationToken, Action ack, params object[] data) + { + _ackHandlers.Add(++_packetId, ack); + if (data != null && data.Length > 0) + { + var result = JsonSerializer.Serialize(data); + if (result.Bytes.Count > 0) + { + var msg = new ClientBinaryAckMessage + { + Event = eventName, + Namespace = _namespace, + Json = result.Json, + Id = _packetId, + OutgoingBytes = new List(result.Bytes) + }; + await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); + } + else + { + var msg = new ClientAckMessage + { + Event = eventName, + Namespace = _namespace, + Id = _packetId, + Json = result.Json + }; + await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); + } + } + else + { + var msg = new ClientAckMessage + { + Event = eventName, + Namespace = _namespace, + Id = _packetId + }; + await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); + } + } + + private async Task InvokeDisconnect(string reason) + { + if (Connected) + { + Connected = false; + Id = null; + OnDisconnected.TryInvoke(this, reason); + try + { + await _transport.DisconnectAsync(CancellationToken.None).ConfigureAwait(false); + } + catch (Exception e) + { + Logger.LogError(e, e.Message); + } + if (reason != DisconnectReason.IOServerDisconnect && reason != DisconnectReason.IOClientDisconnect) + { + //In the this cases (explicit disconnection), the client will not try to reconnect and you need to manually call socket.connect(). + if (Options.Reconnection) + { + ConnectCore(); + } + } + } + } + + public void AddExpectedException(Type type) + { + if (!_expectedExceptions.Contains(type)) + { + _expectedExceptions.Add(type); + } + } + + private void DisposeForReconnect() + { + _hasError = false; + _isFaild = false; + _packetId = -1; + _ackHandlers.Clear(); + _connectCoreException = null; + _hasError = false; + _connectionTokenSource.TryCancel(); + _connectionTokenSource.TryDispose(); + } + + public void Dispose() + { + HttpClient.Dispose(); + _transport.TryDispose(); + _ackHandlers.Clear(); + _onAnyHandlers.Clear(); + _eventHandlers.Clear(); + _connectionTokenSource.TryCancel(); + _connectionTokenSource.TryDispose(); + } + } +} \ No newline at end of file diff --git a/ElectronNET.API/SocketIO/SocketIOOptions.cs b/ElectronNET.API/SocketIO/SocketIOOptions.cs new file mode 100644 index 0000000..dc154bb --- /dev/null +++ b/ElectronNET.API/SocketIO/SocketIOOptions.cs @@ -0,0 +1,65 @@ +using SocketIOClient.Transport; +using System; +using System.Collections.Generic; + +namespace SocketIOClient +{ + public sealed class SocketIOOptions + { + public SocketIOOptions() + { + RandomizationFactor = 0.5; + ReconnectionDelay = 1000; + ReconnectionDelayMax = 5000; + ReconnectionAttempts = int.MaxValue; + Path = "/socket.io"; + ConnectionTimeout = TimeSpan.FromSeconds(20); + Reconnection = true; + Transport = TransportProtocol.Polling; + EIO = 4; + AutoUpgrade = true; + } + + public string Path { get; set; } + + public TimeSpan ConnectionTimeout { get; set; } + + public IEnumerable> Query { get; set; } + + /// + /// Whether to allow reconnection if accidentally disconnected + /// + public bool Reconnection { get; set; } + + public double ReconnectionDelay { get; set; } + public int ReconnectionDelayMax { get; set; } + public int ReconnectionAttempts { get; set; } + + double _randomizationFactor; + public double RandomizationFactor + { + get => _randomizationFactor; + set + { + if (value >= 0 && value <= 1) + { + _randomizationFactor = value; + } + else + { + throw new ArgumentException($"{nameof(RandomizationFactor)} should be greater than or equal to 0.0, and less than 1.0."); + } + } + } + + public Dictionary ExtraHeaders { get; set; } + + public TransportProtocol Transport { get; set; } + + public int EIO { get; set; } + + public bool AutoUpgrade { get; set; } + + public object Auth { get; set; } + } +} diff --git a/ElectronNET.API/SocketIO/SocketIOResponse.cs b/ElectronNET.API/SocketIO/SocketIOResponse.cs new file mode 100644 index 0000000..42d979c --- /dev/null +++ b/ElectronNET.API/SocketIO/SocketIOResponse.cs @@ -0,0 +1,62 @@ +using System.Collections.Generic; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace SocketIOClient +{ + public class SocketIOResponse + { + public SocketIOResponse(IList array, SocketIO socket) + { + _array = array; + InComingBytes = new List(); + SocketIO = socket; + PacketId = -1; + } + + readonly IList _array; + + public List InComingBytes { get; } + public SocketIO SocketIO { get; } + public int PacketId { get; set; } + + public T GetValue(int index = 0) + { + var element = GetValue(index); + string json = element.GetRawText(); + return SocketIO.JsonSerializer.Deserialize(json, InComingBytes); + } + + public JsonElement GetValue(int index = 0) => _array[index]; + + public int Count => _array.Count; + + public override string ToString() + { + var builder = new StringBuilder(); + builder.Append('['); + foreach (var item in _array) + { + builder.Append(item.GetRawText()); + if (_array.IndexOf(item) < _array.Count - 1) + { + builder.Append(','); + } + } + builder.Append(']'); + return builder.ToString(); + } + + public async Task CallbackAsync(params object[] data) + { + await SocketIO.ClientAckAsync(PacketId, CancellationToken.None, data).ConfigureAwait(false); + } + + public async Task CallbackAsync(CancellationToken cancellationToken, params object[] data) + { + await SocketIO.ClientAckAsync(PacketId, cancellationToken, data).ConfigureAwait(false); + } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/BaseTransport.cs b/ElectronNET.API/SocketIO/Transport/BaseTransport.cs new file mode 100644 index 0000000..efbdc64 --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/BaseTransport.cs @@ -0,0 +1,245 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Reactive.Subjects; +using Microsoft.Extensions.Logging; +using SocketIOClient.JsonSerializer; +using SocketIOClient.Messages; +using SocketIOClient.UriConverters; + +namespace SocketIOClient.Transport +{ + public abstract class BaseTransport : IObserver, IObserver, IObservable, IDisposable + { + public BaseTransport(SocketIOOptions options, IJsonSerializer jsonSerializer, ILogger logger) + { + Options = options; + MessageSubject = new Subject(); + JsonSerializer = jsonSerializer; + UriConverter = new UriConverter(); + _messageQueue = new Queue(); + _logger = logger; + } + + DateTime _pingTime; + readonly Queue _messageQueue; + readonly ILogger _logger; + + protected SocketIOOptions Options { get; } + protected Subject MessageSubject { get; } + + protected IJsonSerializer JsonSerializer { get; } + protected CancellationTokenSource PingTokenSource { get; private set; } + protected OpenedMessage OpenedMessage { get; private set; } + + public string Namespace { get; set; } + public IUriConverter UriConverter { get; set; } + + public async Task SendAsync(IMessage msg, CancellationToken cancellationToken) + { + msg.Eio = Options.EIO; + msg.Protocol = Options.Transport; + var payload = new Payload + { + Text = msg.Write() + }; + if (msg.OutgoingBytes != null) + { + payload.Bytes = msg.OutgoingBytes; + } + await SendAsync(payload, cancellationToken).ConfigureAwait(false); + } + + protected virtual async Task OpenAsync(OpenedMessage msg) + { + OpenedMessage = msg; + if (Options.EIO == 3 && string.IsNullOrEmpty(Namespace)) + { + return; + } + var connectMsg = new ConnectedMessage + { + Namespace = Namespace, + Eio = Options.EIO, + Query = Options.Query, + }; + if (Options.EIO == 4) + { + if (Options.Auth != null) + { + connectMsg.AuthJsonStr = JsonSerializer.Serialize(new[] { Options.Auth }).Json.TrimStart('[').TrimEnd(']'); + } + } + + for (int i = 1; i <= 3; i++) + { + try + { + await SendAsync(connectMsg, CancellationToken.None).ConfigureAwait(false); + break; + } + catch (Exception e) + { + if (i == 3) + OnError(e); + else + await Task.Delay(TimeSpan.FromMilliseconds(Math.Pow(2, i) * 100)); + } + } + } + + /// + /// Eio3 ping is sent by the client + /// Eio4 ping is sent by the server + /// + /// + private void StartPing(CancellationToken cancellationToken) + { + _logger.LogDebug($"[Ping] Interval: {OpenedMessage.PingInterval}"); + Task.Factory.StartNew(async () => + { + while (!cancellationToken.IsCancellationRequested) + { + await Task.Delay(OpenedMessage.PingInterval); + if (cancellationToken.IsCancellationRequested) + { + break; + } + try + { + var ping = new PingMessage(); + _logger.LogDebug($"[Ping] Sending"); + await SendAsync(ping, CancellationToken.None).ConfigureAwait(false); + _logger.LogDebug($"[Ping] Has been sent"); + _pingTime = DateTime.Now; + MessageSubject.OnNext(ping); + } + catch (Exception e) + { + _logger.LogDebug($"[Ping] Failed to send, {e.Message}"); + MessageSubject.OnError(e); + break; + } + } + }, TaskCreationOptions.LongRunning); + } + + public abstract Task ConnectAsync(Uri uri, CancellationToken cancellationToken); + + public abstract Task DisconnectAsync(CancellationToken cancellationToken); + + public abstract void AddHeader(string key, string val); + + public virtual void Dispose() + { + MessageSubject.Dispose(); + _messageQueue.Clear(); + if (PingTokenSource != null) + { + PingTokenSource.Cancel(); + PingTokenSource.Dispose(); + } + } + + public abstract Task SendAsync(Payload payload, CancellationToken cancellationToken); + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + MessageSubject.OnError(error); + } + + public void OnNext(string text) + { + _logger.LogDebug($"[Receive] {text}"); + var msg = MessageFactory.CreateMessage(Options.EIO, text); + if (msg == null) + { + return; + } + if (msg.BinaryCount > 0) + { + msg.IncomingBytes = new List(msg.BinaryCount); + _messageQueue.Enqueue(msg); + return; + } + if (msg.Type == MessageType.Opened) + { + OpenAsync(msg as OpenedMessage).ConfigureAwait(false); + } + + if (Options.EIO == 3) + { + if (msg.Type == MessageType.Connected) + { + var connectMsg = msg as ConnectedMessage; + connectMsg.Sid = OpenedMessage.Sid; + if ((string.IsNullOrEmpty(Namespace) && string.IsNullOrEmpty(connectMsg.Namespace)) || connectMsg.Namespace == Namespace) + { + if (PingTokenSource != null) + { + PingTokenSource.Cancel(); + } + PingTokenSource = new CancellationTokenSource(); + StartPing(PingTokenSource.Token); + } + else + { + return; + } + } + else if (msg.Type == MessageType.Pong) + { + var pong = msg as PongMessage; + pong.Duration = DateTime.Now - _pingTime; + } + } + + MessageSubject.OnNext(msg); + + if (msg.Type == MessageType.Ping) + { + _pingTime = DateTime.Now; + try + { + SendAsync(new PongMessage(), CancellationToken.None).ConfigureAwait(false); + MessageSubject.OnNext(new PongMessage + { + Eio = Options.EIO, + Protocol = Options.Transport, + Duration = DateTime.Now - _pingTime + }); + } + catch (Exception e) + { + OnError(e); + } + } + } + + public void OnNext(byte[] bytes) + { + _logger.LogDebug($"[Receive] binary message"); + if (_messageQueue.Count > 0) + { + var msg = _messageQueue.Peek(); + msg.IncomingBytes.Add(bytes); + if (msg.IncomingBytes.Count == msg.BinaryCount) + { + MessageSubject.OnNext(msg); + _messageQueue.Dequeue(); + } + } + } + + public IDisposable Subscribe(IObserver observer) + { + return MessageSubject.Subscribe(observer); + } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/Eio3HttpPollingHandler.cs b/ElectronNET.API/SocketIO/Transport/Eio3HttpPollingHandler.cs new file mode 100644 index 0000000..8134c3f --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/Eio3HttpPollingHandler.cs @@ -0,0 +1,76 @@ +using System.Collections.Generic; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using System.Linq; +using System.Net.Http.Headers; + +namespace SocketIOClient.Transport +{ + public class Eio3HttpPollingHandler : HttpPollingHandler + { + public Eio3HttpPollingHandler(HttpClient httpClient) : base(httpClient) { } + + public override async Task PostAsync(string uri, IEnumerable bytes, CancellationToken cancellationToken) + { + var list = new List(); + foreach (var item in bytes) + { + list.Add(1); + var length = SplitInt(item.Length + 1).Select(x => (byte)x); + list.AddRange(length); + list.Add(byte.MaxValue); + list.Add(4); + list.AddRange(item); + } + var content = new ByteArrayContent(list.ToArray()); + content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); + await HttpClient.PostAsync(AppendRandom(uri), content, cancellationToken).ConfigureAwait(false); + } + + private List SplitInt(int number) + { + List list = new List(); + while (number > 0) + { + list.Add(number % 10); + number /= 10; + } + list.Reverse(); + return list; + } + + protected override void ProduceText(string text) + { + int p = 0; + while (true) + { + int index = text.IndexOf(':', p); + if (index == -1) + { + break; + } + if (int.TryParse(text.Substring(p, index - p), out int length)) + { + string msg = text.Substring(index + 1, length); + TextSubject.OnNext(msg); + } + else + { + break; + } + p = index + length + 1; + if (p >= text.Length) + { + break; + } + } + } + + public override Task PostAsync(string uri, string content, CancellationToken cancellationToken) + { + content = content.Length + ":" + content; + return base.PostAsync(uri, content, cancellationToken); + } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/Eio4HttpPollingHandler.cs b/ElectronNET.API/SocketIO/Transport/Eio4HttpPollingHandler.cs new file mode 100644 index 0000000..a8b1208 --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/Eio4HttpPollingHandler.cs @@ -0,0 +1,48 @@ +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace SocketIOClient.Transport +{ + public class Eio4HttpPollingHandler : HttpPollingHandler + { + public Eio4HttpPollingHandler(HttpClient httpClient) : base(httpClient) { } + + const char Separator = '\u001E'; //1E  + + public override async Task PostAsync(string uri, IEnumerable bytes, CancellationToken cancellationToken) + { + var builder = new StringBuilder(); + foreach (var item in bytes) + { + builder.Append('b').Append(Convert.ToBase64String(item)).Append(Separator); + } + if (builder.Length == 0) + { + return; + } + string text = builder.ToString().TrimEnd(Separator); + await PostAsync(uri, text, cancellationToken); + } + + protected override void ProduceText(string text) + { + string[] items = text.Split(new[] { Separator }, StringSplitOptions.RemoveEmptyEntries); + foreach (var item in items) + { + if (item[0] == 'b') + { + byte[] bytes = Convert.FromBase64String(item.Substring(1)); + BytesSubject.OnNext(bytes); + } + else + { + TextSubject.OnNext(item); + } + } + } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/HttpPollingHandler.cs b/ElectronNET.API/SocketIO/Transport/HttpPollingHandler.cs new file mode 100644 index 0000000..48147dc --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/HttpPollingHandler.cs @@ -0,0 +1,118 @@ +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace SocketIOClient.Transport +{ + public abstract class HttpPollingHandler : IHttpPollingHandler + { + public HttpPollingHandler(HttpClient httpClient) + { + HttpClient = httpClient; + TextSubject = new Subject(); + BytesSubject = new Subject(); + TextObservable = TextSubject.AsObservable(); + BytesObservable = BytesSubject.AsObservable(); + } + + protected HttpClient HttpClient { get; } + protected Subject TextSubject{get;} + protected Subject BytesSubject{get;} + + public IObservable TextObservable { get; } + public IObservable BytesObservable { get; } + + protected string AppendRandom(string uri) + { + return uri + "&t=" + DateTimeOffset.Now.ToUnixTimeSeconds(); + } + + public async Task GetAsync(string uri, CancellationToken cancellationToken) + { + var req = new HttpRequestMessage(HttpMethod.Get, AppendRandom(uri)); + var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false); + if (!resMsg.IsSuccessStatusCode) + { + throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}"); + } + await ProduceMessageAsync(resMsg).ConfigureAwait(false); + } + + public async Task SendAsync(HttpRequestMessage req, CancellationToken cancellationToken) + { + var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false); + if (!resMsg.IsSuccessStatusCode) + { + throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}"); + } + await ProduceMessageAsync(resMsg).ConfigureAwait(false); + } + + public async virtual Task PostAsync(string uri, string content, CancellationToken cancellationToken) + { + var httpContent = new StringContent(content); + var resMsg = await HttpClient.PostAsync(AppendRandom(uri), httpContent, cancellationToken).ConfigureAwait(false); + await ProduceMessageAsync(resMsg).ConfigureAwait(false); + } + + public abstract Task PostAsync(string uri, IEnumerable bytes, CancellationToken cancellationToken); + + private async Task ProduceMessageAsync(HttpResponseMessage resMsg) + { + if (resMsg.Content.Headers.ContentType.MediaType == "application/octet-stream") + { + byte[] bytes = await resMsg.Content.ReadAsByteArrayAsync().ConfigureAwait(false); + ProduceBytes(bytes); + } + else + { + string text = await resMsg.Content.ReadAsStringAsync().ConfigureAwait(false); + ProduceText(text); + } + } + + protected abstract void ProduceText(string text); + + private void ProduceBytes(byte[] bytes) + { + int i = 0; + while (bytes.Length > i + 4) + { + byte type = bytes[i]; + var builder = new StringBuilder(); + i++; + while (bytes[i] != byte.MaxValue) + { + builder.Append(bytes[i]); + i++; + } + i++; + int length = int.Parse(builder.ToString()); + if (type == 0) + { + var buffer = new byte[length]; + Buffer.BlockCopy(bytes, i, buffer, 0, buffer.Length); + TextSubject.OnNext(Encoding.UTF8.GetString(buffer)); + } + else if (type == 1) + { + var buffer = new byte[length - 1]; + Buffer.BlockCopy(bytes, i + 1, buffer, 0, buffer.Length); + BytesSubject.OnNext(buffer); + } + i += length; + } + } + + public void Dispose() + { + TextSubject.Dispose(); + BytesSubject.Dispose(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/HttpTransport.cs b/ElectronNET.API/SocketIO/Transport/HttpTransport.cs new file mode 100644 index 0000000..4e7c6bf --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/HttpTransport.cs @@ -0,0 +1,121 @@ +using System; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using SocketIOClient.JsonSerializer; +using SocketIOClient.Messages; + +namespace SocketIOClient.Transport +{ + public class HttpTransport : BaseTransport + { + public HttpTransport(HttpClient http, + IHttpPollingHandler pollingHandler, + SocketIOOptions options, + IJsonSerializer jsonSerializer, + ILogger logger) : base(options, jsonSerializer, logger) + { + _http = http; + _httpPollingHandler = pollingHandler; + _httpPollingHandler.TextObservable.Subscribe(this); + _httpPollingHandler.BytesObservable.Subscribe(this); + } + + string _httpUri; + CancellationTokenSource _pollingTokenSource; + + readonly HttpClient _http; + readonly IHttpPollingHandler _httpPollingHandler; + + private void StartPolling(CancellationToken cancellationToken) + { + Task.Factory.StartNew(async () => + { + int retry = 0; + while (!cancellationToken.IsCancellationRequested) + { + if (!_httpUri.Contains("&sid=")) + { + await Task.Delay(20); + continue; + } + try + { + await _httpPollingHandler.GetAsync(_httpUri, CancellationToken.None).ConfigureAwait(false); + } + catch (Exception e) + { + retry++; + if (retry >= 3) + { + MessageSubject.OnError(e); + break; + } + await Task.Delay(100 * (int)Math.Pow(2, retry)); + } + } + }, TaskCreationOptions.LongRunning); + } + + public override async Task ConnectAsync(Uri uri, CancellationToken cancellationToken) + { + var req = new HttpRequestMessage(HttpMethod.Get, uri); + // if (_options.ExtraHeaders != null) + // { + // foreach (var item in _options.ExtraHeaders) + // { + // req.Headers.Add(item.Key, item.Value); + // } + // } + + _httpUri = uri.ToString(); + await _httpPollingHandler.SendAsync(req, new CancellationTokenSource(Options.ConnectionTimeout).Token).ConfigureAwait(false); + if (_pollingTokenSource != null) + { + _pollingTokenSource.Cancel(); + } + _pollingTokenSource = new CancellationTokenSource(); + StartPolling(_pollingTokenSource.Token); + } + + public override Task DisconnectAsync(CancellationToken cancellationToken) + { + _pollingTokenSource.Cancel(); + if (PingTokenSource != null) + { + PingTokenSource.Cancel(); + } + return Task.CompletedTask; + } + + public override void AddHeader(string key, string val) + { + _http.DefaultRequestHeaders.Add(key, val); + } + + public override void Dispose() + { + base.Dispose(); + _httpPollingHandler.Dispose(); + } + + public override async Task SendAsync(Payload payload, CancellationToken cancellationToken) + { + await _httpPollingHandler.PostAsync(_httpUri, payload.Text, cancellationToken); + if (payload.Bytes != null && payload.Bytes.Count > 0) + { + await _httpPollingHandler.PostAsync(_httpUri, payload.Bytes, cancellationToken); + } + } + + protected override async Task OpenAsync(OpenedMessage msg) + { + //if (!_httpUri.Contains("&sid=")) + //{ + //} + _httpUri += "&sid=" + msg.Sid; + await base.OpenAsync(msg); + } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/IClientWebSocket.cs b/ElectronNET.API/SocketIO/Transport/IClientWebSocket.cs new file mode 100644 index 0000000..a3f75fd --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/IClientWebSocket.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace SocketIOClient.Transport +{ + public interface IClientWebSocket : IDisposable + { + IObservable TextObservable { get; } + IObservable BytesObservable { get; } + Task ConnectAsync(Uri uri, CancellationToken cancellationToken); + Task DisconnectAsync(CancellationToken cancellationToken); + Task SendAsync(byte[] bytes, TransportMessageType type, bool endOfMessage, CancellationToken cancellationToken); + void AddHeader(string key, string val); + } +} diff --git a/ElectronNET.API/SocketIO/Transport/IHttpPollingHandler.cs b/ElectronNET.API/SocketIO/Transport/IHttpPollingHandler.cs new file mode 100644 index 0000000..f117010 --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/IHttpPollingHandler.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; + +namespace SocketIOClient.Transport +{ + public interface IHttpPollingHandler : IDisposable + { + IObservable TextObservable { get; } + IObservable BytesObservable { get; } + Task GetAsync(string uri, CancellationToken cancellationToken); + Task SendAsync(HttpRequestMessage req, CancellationToken cancellationToken); + Task PostAsync(string uri, string content, CancellationToken cancellationToken); + Task PostAsync(string uri, IEnumerable bytes, CancellationToken cancellationToken); + } +} diff --git a/ElectronNET.API/SocketIO/Transport/Payload.cs b/ElectronNET.API/SocketIO/Transport/Payload.cs new file mode 100644 index 0000000..e906f67 --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/Payload.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; + +namespace SocketIOClient.Transport +{ + public class Payload + { + public string Text { get; set; } + public List Bytes { get; set; } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/SystemNetWebSocketsClientWebSocket.cs b/ElectronNET.API/SocketIO/Transport/SystemNetWebSocketsClientWebSocket.cs new file mode 100644 index 0000000..5a49dbd --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/SystemNetWebSocketsClientWebSocket.cs @@ -0,0 +1,143 @@ +using System; +using System.Net.WebSockets; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace SocketIOClient.Transport +{ + public class SystemNetWebSocketsClientWebSocket : IClientWebSocket + { + public SystemNetWebSocketsClientWebSocket(int eio) + { + _eio = eio; + _textSubject = new Subject(); + _bytesSubject = new Subject(); + TextObservable = _textSubject.AsObservable(); + BytesObservable = _bytesSubject.AsObservable(); + _ws = new ClientWebSocket(); + _listenCancellation = new CancellationTokenSource(); + _sendLock = new SemaphoreSlim(1, 1); + } + + const int ReceiveChunkSize = 1024 * 8; + + readonly int _eio; + readonly ClientWebSocket _ws; + readonly Subject _textSubject; + readonly Subject _bytesSubject; + readonly CancellationTokenSource _listenCancellation; + readonly SemaphoreSlim _sendLock; + + public IObservable TextObservable { get; } + public IObservable BytesObservable { get; } + + private void Listen() + { + Task.Factory.StartNew(async() => + { + while (true) + { + if (_listenCancellation.IsCancellationRequested) + { + break; + } + var buffer = new byte[ReceiveChunkSize]; + int count = 0; + WebSocketReceiveResult result = null; + + while (_ws.State == WebSocketState.Open) + { + var subBuffer = new byte[ReceiveChunkSize]; + try + { + result = await _ws.ReceiveAsync(new ArraySegment(subBuffer), CancellationToken.None).ConfigureAwait(false); + + // resize + if (buffer.Length - count < result.Count) + { + Array.Resize(ref buffer, buffer.Length + result.Count); + } + Buffer.BlockCopy(subBuffer, 0, buffer, count, result.Count); + count += result.Count; + if (result.EndOfMessage) + { + break; + } + } + catch (Exception e) + { + _textSubject.OnError(e); + break; + } + } + + if (result == null) + { + break; + } + + switch (result.MessageType) + { + case WebSocketMessageType.Text: + string text = Encoding.UTF8.GetString(buffer, 0, count); + _textSubject.OnNext(text); + break; + case WebSocketMessageType.Binary: + byte[] bytes; + if (_eio == 3) + { + bytes = new byte[count - 1]; + Buffer.BlockCopy(buffer, 1, bytes, 0, bytes.Length); + } + else + { + bytes = new byte[count]; + Buffer.BlockCopy(buffer, 0, bytes, 0, bytes.Length); + } + _bytesSubject.OnNext(bytes); + break; + case WebSocketMessageType.Close: + _textSubject.OnError(new WebSocketException("Received a Close message")); + break; + } + } + }); + } + + public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken) + { + await _ws.ConnectAsync(uri, cancellationToken); + Listen(); + } + + public async Task DisconnectAsync(CancellationToken cancellationToken) + { + await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken); + } + + public async Task SendAsync(byte[] bytes, TransportMessageType type, bool endOfMessage, CancellationToken cancellationToken) + { + var msgType = WebSocketMessageType.Text; + if (type == TransportMessageType.Binary) + { + msgType = WebSocketMessageType.Binary; + } + await _ws.SendAsync(new ArraySegment(bytes), msgType, endOfMessage, cancellationToken).ConfigureAwait(false); + } + + public void AddHeader(string key, string val) + { + _ws.Options.SetRequestHeader(key, val); + } + + public void Dispose() + { + _textSubject.Dispose(); + _bytesSubject.Dispose(); + _ws.Dispose(); + } + } +} diff --git a/ElectronNET.API/SocketIO/Transport/TransportMessageType.cs b/ElectronNET.API/SocketIO/Transport/TransportMessageType.cs new file mode 100644 index 0000000..24f9aeb --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/TransportMessageType.cs @@ -0,0 +1,8 @@ +namespace SocketIOClient.Transport +{ + public enum TransportMessageType + { + Text = 0, + Binary = 1 + } +} diff --git a/ElectronNET.API/SocketIO/Transport/TransportProtocol.cs b/ElectronNET.API/SocketIO/Transport/TransportProtocol.cs new file mode 100644 index 0000000..50ad89b --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/TransportProtocol.cs @@ -0,0 +1,8 @@ +namespace SocketIOClient.Transport +{ + public enum TransportProtocol + { + Polling, + WebSocket + } +} diff --git a/ElectronNET.API/SocketIO/Transport/WebSocketTransport.cs b/ElectronNET.API/SocketIO/Transport/WebSocketTransport.cs new file mode 100644 index 0000000..532ac66 --- /dev/null +++ b/ElectronNET.API/SocketIO/Transport/WebSocketTransport.cs @@ -0,0 +1,92 @@ +using System; +using System.Reactive.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using SocketIOClient.JsonSerializer; + +namespace SocketIOClient.Transport +{ + public class WebSocketTransport : BaseTransport + { + public WebSocketTransport(IClientWebSocket ws, SocketIOOptions options, IJsonSerializer jsonSerializer, ILogger logger) + : base(options, jsonSerializer, logger) + { + _ws = ws; + _sendLock = new SemaphoreSlim(1, 1); + _ws.TextObservable.Subscribe(this); + _ws.BytesObservable.Subscribe(this); + } + + const int ReceiveChunkSize = 1024 * 8; + const int SendChunkSize = 1024 * 8; + + readonly IClientWebSocket _ws; + readonly SemaphoreSlim _sendLock; + + private async Task SendAsync(TransportMessageType type, byte[] bytes, CancellationToken cancellationToken) + { + try + { + await _sendLock.WaitAsync().ConfigureAwait(false); + if (type == TransportMessageType.Binary && Options.EIO == 3) + { + byte[] buffer = new byte[bytes.Length + 1]; + buffer[0] = 4; + Buffer.BlockCopy(bytes, 0, buffer, 1, bytes.Length); + bytes = buffer; + } + int pages = (int)Math.Ceiling(bytes.Length * 1.0 / SendChunkSize); + for (int i = 0; i < pages; i++) + { + int offset = i * SendChunkSize; + int length = SendChunkSize; + if (offset + length > bytes.Length) + { + length = bytes.Length - offset; + } + byte[] subBuffer = new byte[length]; + Buffer.BlockCopy(bytes, offset, subBuffer, 0, subBuffer.Length); + bool endOfMessage = pages - 1 == i; + await _ws.SendAsync(subBuffer, type, endOfMessage, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _sendLock.Release(); + } + } + + public override async Task ConnectAsync(Uri uri, CancellationToken cancellationToken) + { + await _ws.ConnectAsync(uri, cancellationToken); + } + + public override async Task DisconnectAsync(CancellationToken cancellationToken) + { + await _ws.DisconnectAsync(cancellationToken); + } + + public override async Task SendAsync(Payload payload, CancellationToken cancellationToken) + { + byte[] bytes = Encoding.UTF8.GetBytes(payload.Text); + await SendAsync(TransportMessageType.Text, bytes, cancellationToken); + if (payload.Bytes != null) + { + foreach (var item in payload.Bytes) + { + await SendAsync(TransportMessageType.Binary, item, cancellationToken); + } + } + } + + public override void AddHeader(string key, string val) => _ws.AddHeader(key, val); + + public override void Dispose() + { + base.Dispose(); + _sendLock.Dispose(); + } + } +} diff --git a/ElectronNET.API/SocketIO/UriConverters/IUriConverter.cs b/ElectronNET.API/SocketIO/UriConverters/IUriConverter.cs new file mode 100644 index 0000000..923f813 --- /dev/null +++ b/ElectronNET.API/SocketIO/UriConverters/IUriConverter.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; + +namespace SocketIOClient.UriConverters +{ + public interface IUriConverter + { + Uri GetServerUri(bool ws, Uri serverUri, int eio, string path, IEnumerable> queryParams); + } +} diff --git a/ElectronNET.API/SocketIO/UriConverters/UriConverter.cs b/ElectronNET.API/SocketIO/UriConverters/UriConverter.cs new file mode 100644 index 0000000..922ee9d --- /dev/null +++ b/ElectronNET.API/SocketIO/UriConverters/UriConverter.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace SocketIOClient.UriConverters +{ + public class UriConverter : IUriConverter + { + public Uri GetServerUri(bool ws, Uri serverUri, int eio, string path, IEnumerable> queryParams) + { + var builder = new StringBuilder(); + if (serverUri.Scheme == "https" || serverUri.Scheme == "wss") + { + builder.Append(ws ? "wss://" : "https://"); + } + else if (serverUri.Scheme == "http" || serverUri.Scheme == "ws") + { + builder.Append(ws ? "ws://" : "http://"); + } + else + { + throw new ArgumentException("Only supports 'http, https, ws, wss' protocol"); + } + builder.Append(serverUri.Host); + if (!serverUri.IsDefaultPort) + { + builder.Append(":").Append(serverUri.Port); + } + if (string.IsNullOrEmpty(path)) + { + builder.Append("/socket.io"); + } + else + { + builder.Append(path); + } + builder + .Append("/?EIO=") + .Append(eio) + .Append("&transport=") + .Append(ws ? "websocket" : "polling"); + + if (queryParams != null) + { + foreach (var item in queryParams) + { + builder.Append('&').Append(item.Key).Append('=').Append(item.Value); + } + } + + return new Uri(builder.ToString()); + } + } +}