refactor Socket to use a task completion source instead of a getter with locks

This commit is contained in:
theolivenbaum
2021-09-02 20:37:22 +02:00
parent 7f5db58edd
commit ea40249a9c

View File

@@ -90,18 +90,15 @@ namespace ElectronNET.API
private static SemaphoreSlim _socketSemaphoreEmit = new SemaphoreSlim(1, 1);
private static SemaphoreSlim _socketSemaphoreHandlers = new SemaphoreSlim(1, 1);
private static TaskCompletionSource _waitForBeingConnected = new TaskCompletionSource();
private static Task _waitForConnection
private static TaskCompletionSource<SocketIO> _connectedSocketTask = new TaskCompletionSource<SocketIO>();
private static Task<SocketIO> _waitForConnection
{
get
{
if(_socket is null)
{
var _ = Socket; //Ensure we trigger the first connection before anything else
}
return _waitForBeingConnected.Task;
EnsureSocketTaskIsCreated();
return _connectedSocketTask.Task;
}
}
@@ -111,25 +108,23 @@ namespace ElectronNET.API
{
//We don't care about waiting for the event to be emitted, so this doesn't need to be async
Task.Run(async () =>
{
await EmitAsync(eventString, args);
});
Task.Run(() => EmitAsync(eventString, args));
}
private static async Task EmitAsync(string eventString, object[] args)
{
await _waitForConnection;
if (App.SocketDebug)
{
Log("Sending event {0}", eventString);
}
var socket = await _waitForConnection;
await _socketSemaphoreEmit.WaitAsync();
try
{
await Socket.EmitAsync(eventString, args);
await socket.EmitAsync(eventString, args);
}
finally
{
@@ -142,31 +137,6 @@ namespace ElectronNET.API
}
}
internal static void Log(string formatString, params object[] args)
{
if (Logger is object)
{
Logger.LogInformation(formatString, args);
}
else
{
Console.WriteLine(formatString, args);
}
}
internal static void LogError(Exception E, string formatString, params object[] args)
{
if (Logger is object)
{
Logger.LogError(E, formatString, args);
}
else
{
Console.WriteLine(formatString, args);
Console.WriteLine(E.ToString());
}
}
/// <summary>
/// This method is only used on places where we need to be sure the event was sent on the socket, such as Quit, Exit, Relaunch and QuitAndInstall methods
/// </summary>
@@ -179,18 +149,19 @@ namespace ElectronNET.API
Log("Sending event {0}", eventString);
}
_waitForConnection.Wait();
_socketSemaphoreEmit.Wait();
try
Task.Run(async () =>
{
Socket.EmitAsync(eventString, args).Wait();
}
finally
{
_socketSemaphoreEmit.Release();
}
var socket = await _waitForConnection;
try
{
await _socketSemaphoreEmit.WaitAsync();
await socket.EmitAsync(eventString, args);
}
finally
{
_socketSemaphoreEmit.Release();
}
}).Wait();
if (App.SocketDebug)
@@ -201,10 +172,12 @@ namespace ElectronNET.API
public static void Off(string eventString)
{
EnsureSocketTaskIsCreated();
_socketSemaphoreHandlers.Wait();
try
{
Socket.Off(eventString);
_socket.Off(eventString);
}
finally
{
@@ -214,10 +187,12 @@ namespace ElectronNET.API
public static void On(string eventString, Action fn)
{
EnsureSocketTaskIsCreated();
_socketSemaphoreHandlers.Wait();
try
{
Socket.On(eventString, _ =>
_socket.On(eventString, _ =>
{
try
{
@@ -237,10 +212,12 @@ namespace ElectronNET.API
public static void On<T>(string eventString, Action<T> fn)
{
EnsureSocketTaskIsCreated();
_socketSemaphoreHandlers.Wait();
try
{
Socket.On(eventString, (o) =>
_socket.On(eventString, (o) =>
{
try
{
@@ -372,101 +349,131 @@ namespace ElectronNET.API
return await taskCompletionSource.Task;
}
private static SocketIO Socket
internal static void Log(string formatString, params object[] args)
{
get
if (Logger is object)
{
if (_socket is null)
Logger.LogInformation(formatString, args);
}
else
{
Console.WriteLine(formatString, args);
}
}
internal static void LogError(Exception E, string formatString, params object[] args)
{
if (Logger is object)
{
Logger.LogError(E, formatString, args);
}
else
{
Console.WriteLine(formatString, args);
Console.WriteLine(E.ToString());
}
}
private static void EnsureSocketTaskIsCreated()
{
if (_socket is null)
{
if (HybridSupport.IsElectronActive)
{
if (HybridSupport.IsElectronActive)
lock (_syncRoot)
{
lock (_syncRoot)
if (_socket is null && HybridSupport.IsElectronActive)
{
if (_socket is null && HybridSupport.IsElectronActive)
var socket = new SocketIO($"http://localhost:{BridgeSettings.SocketPort}", new SocketIOOptions()
{
var socket = new SocketIO($"http://localhost:{BridgeSettings.SocketPort}", new SocketIOOptions()
{
EIO = 3,
Reconnection = true,
ReconnectionAttempts = int.MaxValue,
ReconnectionDelay = 1000,
ReconnectionDelayMax = 5000,
RandomizationFactor = 0.1,
ConnectionTimeout = TimeSpan.FromSeconds(10)
});
EIO = 3,
Reconnection = true,
ReconnectionAttempts = int.MaxValue,
ReconnectionDelay = 1000,
ReconnectionDelayMax = 5000,
RandomizationFactor = 0.5,
ConnectionTimeout = TimeSpan.FromSeconds(10)
});
socket.JsonSerializer = new CamelCaseNewtonsoftJsonSerializer(socket.Options.EIO);
socket.JsonSerializer = new CamelCaseNewtonsoftJsonSerializer(socket.Options.EIO);
socket.OnConnected += (_, __) =>
{
_waitForBeingConnected.TrySetResult();
Log("ElectronNET socket connected on port {0}!", BridgeSettings.SocketPort);
};
socket.OnConnected += (_, __) =>
{
_connectedSocketTask.TrySetResult(socket);
Log("ElectronNET socket connected on port {0}!", BridgeSettings.SocketPort);
};
socket.OnReconnectAttempt += (_, __) =>
{
_waitForBeingConnected = new();
Log("ElectronNET socket is trying to reconnect on port {0}...", BridgeSettings.SocketPort);
};
socket.OnReconnectAttempt += (_, __) =>
{
_connectedSocketTask = new();
Log("ElectronNET socket is trying to reconnect on port {0}...", BridgeSettings.SocketPort);
};
socket.OnReconnectError += (_, ex) =>
{
Log("ElectronNET socket failed to connect {0}", ex);
};
socket.OnReconnectError += (_, ex) =>
{
Log("ElectronNET socket failed to connect {0}", ex);
};
socket.OnReconnected += (_, __) =>
{
_waitForBeingConnected.TrySetResult();
Log("ElectronNET socket reconnected on port {0}...", BridgeSettings.SocketPort);
};
socket.OnReconnected += (_, __) =>
{
_connectedSocketTask.TrySetResult(socket);
Log("ElectronNET socket reconnected on port {0}...", BridgeSettings.SocketPort);
};
socket.OnDisconnected += async (_, reason) =>
{
Log("ElectronNET socket disconnected with reason {0}, trying to reconnect on port {1}!", reason, BridgeSettings.SocketPort);
socket.OnDisconnected += async (_, reason) =>
{
_connectedSocketTask = new();
_waitForBeingConnected = new();
Log("ElectronNET socket disconnected with reason {0}, trying to reconnect on port {1}!", reason, BridgeSettings.SocketPort);
int i = 0;
int i = 0;
double miliseconds = 500;
double miliseconds = 500;
while (true)
while (true)
{
try
{
try
if (!socket.Connected)
{
if (!socket.Connected)
{
await socket.ConnectAsync();
_waitForBeingConnected.TrySetResult(); //Probably was already on the OnConnected call
}
return;
await socket.ConnectAsync();
_connectedSocketTask.TrySetResult(socket); //Probably was already on the OnConnected call
}
catch (Exception e)
{
LogError(e, "Failed to reconnect, will try again in {0} ms.", miliseconds * 2);
}
await Task.Delay(TimeSpan.FromMilliseconds(miliseconds));
miliseconds = Math.Min(60_000, Math.Pow(2, i) + 500);
i++;
return;
}
catch (Exception e)
{
LogError(e, "Failed to reconnect, will try again in {0} ms.", miliseconds * 2);
}
};
socket.ConnectAsync().Wait();
await Task.Delay(TimeSpan.FromMilliseconds(miliseconds));
_socket = socket;
}
miliseconds = Math.Min(60_000, Math.Pow(2, i) + 500);
i++;
}
};
Task.Run(async () =>
{
await socket.ConnectAsync();
_connectedSocketTask.TrySetResult(socket); //Probably was already on the OnConnected call
});
_socket = socket;
}
else
{
throw new Exception("Missing Socket Port");
}
}
else
{
throw new Exception("Missing Socket Port");
}
}
return _socket;
else
{
throw new Exception("Missing Socket Port");
}
}
}