Skip to content

Commit

Permalink
Merge pull request #35 from dimhotepus/production-feedback-1
Browse files Browse the repository at this point in the history
A bunch of fixes from production usage
  • Loading branch information
juliusfriedman authored Dec 17, 2023
2 parents 09d82d4 + 02be50a commit dd5e552
Show file tree
Hide file tree
Showing 16 changed files with 128 additions and 67 deletions.
14 changes: 5 additions & 9 deletions Common/Classes/Disposables/BaseDisposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ public abstract class BaseDisposable : IDisposed, IAsyncDisposable
/// <param name="bd"></param>
/// <returns></returns>
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal static long RetrieveState(BaseDisposable bd) { return bd.State; }
internal static int RetrieveState(BaseDisposable bd) { return bd.State; }

/// <summary>
/// Indicates the <see cref=""/>
/// </summary>
/// <param name="bd"></param>
/// <returns></returns>
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal static bool IsDestructing(BaseDisposable bd, ref long state) { return (state = RetrieveState(bd)) > Disposed; }
internal static bool IsDestructing(BaseDisposable bd, ref int state) { return (state = RetrieveState(bd)) > Disposed; }

/// <summary>
/// If the sender is of the type <see cref="BaseDisposable"/> then <see cref="SetShouldDispose"/> will be called to dispose the instance immediately.
Expand Down Expand Up @@ -133,7 +133,7 @@ public static void SetShouldDispose(BaseDisposable toDispose, bool value, bool c
/// The sign bit of the integer value is the only 'confusing' part about this and it must be understood because the Interlocked methods are CLS Compliant and do not expose unsigned counterparts.
/// See the remarks section above for more clarity.
/// </remarks>
private long State; // = Undisposed; (Todo, internal protected and can remove Statics.. or new private protected and...)
private int State; // = Undisposed; (Todo, internal protected and can remove Statics.. or new private protected and...)

#endregion

Expand Down Expand Up @@ -188,9 +188,7 @@ protected internal bool IsUndisposed
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
get
{
//return System.Threading.Thread.VolatileRead(ref State) == Undisposed;
//return (System.Threading.Interlocked.Read(ref State) & int.MaxValue).Equals(Undisposed);
return System.Threading.Interlocked.Read(ref State).Equals(Undisposed);
return System.Threading.Volatile.Read(ref State) == Undisposed;
}
}

Expand All @@ -202,9 +200,7 @@ internal bool IsFinalized
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
get
{
//return System.Threading.Thread.VolatileRead(ref State) == Finalized;
//return (System.Threading.Interlocked.Read(ref State) & int.MaxValue).Equals(Finalized);
return System.Threading.Interlocked.Read(ref State).Equals(Finalized);
return System.Threading.Volatile.Read(ref State) == Finalized;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public SuppressedFinalizerDisposable(bool shouldDispose)
internal void Resurrect(ref int managedThreadId)
{
//Need to retrieve the state from this instance.
long state = 0;
int state = 0;

//Not already disposed or destructing?
if (IsUndisposed is false | BaseDisposable.IsDestructing(this, ref state) is false) return;
Expand Down
6 changes: 3 additions & 3 deletions Common/Collections/Generic/ConcurrentLinkedQueueSlim.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The above copyright notice and this permission notice shall be included in all c
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

#endregion

Expand Down Expand Up @@ -217,7 +218,7 @@ public long Count
public bool IsEmpty
{
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
get { return Count is Common.Binary.LongZero; }
get { return Volatile.Read(ref First) is null; }
}

#endregion
Expand Down Expand Up @@ -324,8 +325,7 @@ public bool TryPeek(ref T t)
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
public void Enqueue(T t)
{
bool added = false;

bool added;
do added = TryEnqueue(ref t);
while (added is false);
}
Expand Down
8 changes: 1 addition & 7 deletions Common/Extensions/SocketExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -963,15 +963,9 @@ public static void DisableAddressReuse(System.Net.Sockets.Socket socket, bool ex

#region UnicastPortReuse

//Notes 4.6 has ReuseUnicastPort

private const int ReuseUnicastPort = 0x3007; // 12295

private static readonly System.Net.Sockets.SocketOptionName ReuseUnicastPortOption = (System.Net.Sockets.SocketOptionName)ReuseUnicastPort;

public static void SetUnicastPortReuse(System.Net.Sockets.Socket socket, int value)
{
socket.SetSocketOption(System.Net.Sockets.SocketOptionLevel.Socket, ReuseUnicastPortOption, value);
socket.SetSocketOption(System.Net.Sockets.SocketOptionLevel.Socket, System.Net.Sockets.SocketOptionName.ReuseUnicastPort, value);
}

public static void DisableUnicastPortReuse(System.Net.Sockets.Socket socket)
Expand Down
1 change: 1 addition & 0 deletions Http/HttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,7 @@ m_LastTransmitted is not null && message is not null &&
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ToString() + "@SendHttpMessage: " + ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);
}
finally
{
Expand Down
5 changes: 4 additions & 1 deletion Rtp/RtpClient.Fields.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ public partial class RtpClient
internal bool m_StopRequested, m_ThreadEvents, //on or off right now, int could allow levels of threading..
m_IListSockets; //Indicates if to use the IList send overloads.

// How much time to wait between event queue checks.
private System.TimeSpan m_WaitIntervalBetweenEvents = Media.Common.Extensions.TimeSpan.TimeSpanExtensions.OneMillisecond;

//Collection to handle the dispatch of events.
//Notes that Collections.Concurrent.Queue may be better suited for this in production until the ConcurrentLinkedQueue has been thoroughly engineered and tested.
//The context, the item, final, received
private readonly Media.Common.Collections.Generic.ConcurrentLinkedQueueSlim<(RtpClient.TransportContext Context, Common.BaseDisposable Packet, bool Final, bool Received)> m_EventData = new();

//Todo, LinkedQueue and Clock.
private readonly System.Threading.ManualResetEventSlim m_EventReady = new(false, 100); //should be caluclated based on memory and speed. SpinWait uses 10 as a default.
private readonly System.Threading.ManualResetEventSlim m_EventReady = new(false, spinCount: 20); //should be caluclated based on memory and speed. SpinWait uses 10 as a default.

//Outgoing Packets, Not a Queue because you cant re-order a Queue (in place) and you can't take a range from the Queue (in a single operation)
//Those things aside, ordering is not performed here and only single packets are iterated and would eliminate the need for removing after the operation.
Expand Down
11 changes: 9 additions & 2 deletions Rtp/RtpClient.Methods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2262,7 +2262,7 @@ private void HandleEvents()
m_EventReady.Reset();

while (IsActive && m_EventData.IsEmpty)
m_EventReady.Wait(Common.Extensions.TimeSpan.TimeSpanExtensions.OneMicrosecond);
m_EventReady.Wait(WaitIntervalBetweenEvents);
}
else if (IsActive is false) break;

Expand All @@ -2273,7 +2273,13 @@ private void HandleEvents()
HandleEvent();
}
}
catch (System.Exception ex) { Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@HandleEvents: " + ex.Message); goto Begin; }
catch (System.Exception ex)
{
Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@HandleEvents: " + ex.Message);
Media.Common.ILoggingExtensions.LogException(Logger, ex);

goto Begin;
}
}
}

Expand Down Expand Up @@ -2784,6 +2790,7 @@ private void SendReceieve()
catch (System.Exception ex)
{
Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@SendRecieve: " + ex.Message);
Media.Common.ILoggingExtensions.LogException(Logger, ex);

if (critical) System.Threading.Thread.EndCriticalRegion();

Expand Down
13 changes: 13 additions & 0 deletions Rtp/RtpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3925,6 +3925,19 @@ public bool IListSockets
}
}

/// <summary>
/// How much time to wait between event queue checks.
/// High values reduce event handling speed / FPS, but also reduce CPU consumption.
/// </summary>
public TimeSpan WaitIntervalBetweenEvents
{
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.Synchronized | System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
get => m_WaitIntervalBetweenEvents;

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.Synchronized | System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
set => m_WaitIntervalBetweenEvents = value;
}

/// <summary>
/// Gets or sets a value which indicates if events will be threaded or not.
/// If threading is enabled the call will block until the event thread has started.
Expand Down
34 changes: 28 additions & 6 deletions Rtsp/RtspClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ public bool IsPlaying
catch (Exception ex)
{
Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@IsPlaying - " + ex.Message);
Media.Common.ILoggingExtensions.LogException(Logger, ex);
}
}

Expand Down Expand Up @@ -3083,7 +3084,7 @@ public void StopPlaying(IEnumerable<MediaDescription> mediaDescriptions, bool fo
public void StopPlaying(bool disconnectSocket = true)
{
try { Disconnect(disconnectSocket); }
catch (Exception ex) { Media.Common.ILoggingExtensions.Log(Logger, ex.Message); }
catch (Exception ex) { Media.Common.ILoggingExtensions.LogException(Logger, ex); }
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
Expand Down Expand Up @@ -3318,7 +3319,7 @@ message header.
}
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);

throw;
}
Expand Down Expand Up @@ -4623,6 +4624,7 @@ SharesSocket is false &&
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ToString() + "@SendRtspMessage: " + ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);
}
finally
{
Expand Down Expand Up @@ -5968,7 +5970,11 @@ InUse is false &&
DisableKeepAliveRequest = keepAlives;
}
}
catch (Exception ex) { Common.ILoggingExtensions.Log(Logger, ToString() + "@MonitorProtocol: " + ex.Message); }
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ToString() + "@MonitorProtocol: " + ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);
}

//If not disposed AND IsConnected and if protocol switch is still allowed AND IsPlaying and not already TCP
if (Common.IDisposedExtensions.IsNullOrDisposed(this) is false &&
Expand Down Expand Up @@ -6048,6 +6054,7 @@ tc.TotalPacketsSent is Common.Binary.LongZero &&
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ToString() + "@MonitorProtocol: " + ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);
}
}
}
Expand All @@ -6071,8 +6078,19 @@ tc.TotalPacketsSent is Common.Binary.LongZero &&

//If there is still a timer change it based on the last messages round trip time, should be relative to all messages...
if (Common.IDisposedExtensions.IsNullOrDisposed(this) is false && m_ProtocolMonitor is not null)
try { m_ProtocolMonitor.Change(m_ConnectionTime.Add(LastMessageRoundTripTime), Media.Common.Extensions.TimeSpan.TimeSpanExtensions.InfiniteTimeSpan); }
catch (Exception ex) { Common.ILoggingExtensions.Log(Logger, ToString() + "@MonitorProtocol: " + ex.Message); }
try
{
m_ProtocolMonitor.Change
(
m_ConnectionTime.Add(LastMessageRoundTripTime),
Media.Common.Extensions.TimeSpan.TimeSpanExtensions.InfiniteTimeSpan
);
}
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ToString() + "@MonitorProtocol: " + ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);
}
}

public RtspMessage SendPlay(MediaDescription mediaDescription, TimeSpan? startTime = null, TimeSpan? endTime = null, string rangeType = "npt")
Expand Down Expand Up @@ -6515,7 +6533,11 @@ context.Goodbye is null ||
}

}
catch (Exception ex) { Common.ILoggingExtensions.Log(Logger, ToString() + "@SendKeepAlive: " + ex.Message); }
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ToString() + "@SendKeepAlive: " + ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);
}

//Raise the stopping event if not playing anymore
//if (true == wasPlaying && false == IsPlaying) OnStopping();
Expand Down
4 changes: 3 additions & 1 deletion Rtsp/RtspSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ public bool IsPlaying
catch (Exception ex)
{
Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@IsPlaying - " + ex.Message);
Media.Common.ILoggingExtensions.LogException(Logger, ex);
}
}

Expand Down Expand Up @@ -1702,6 +1703,7 @@ SharesSocket is false &&
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ToString() + "@SendRtspMessage: " + ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);
}
finally
{
Expand Down Expand Up @@ -2396,7 +2398,7 @@ message header.
}
catch (Exception ex)
{
Common.ILoggingExtensions.Log(Logger, ex.Message);
Common.ILoggingExtensions.LogException(Logger, ex);

throw;
}
Expand Down
2 changes: 1 addition & 1 deletion RtspServer/MediaTypes/RFC2435Media.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2527,7 +2527,7 @@ internal override void SendPackets()
{
try
{
if (Frames.Count is 0 && State == StreamState.Started)
if (Frames.IsEmpty && State == StreamState.Started)
{
if (RtpClient.IsActive) RtpClient.m_WorkerThread.Priority = System.Threading.ThreadPriority.Lowest;

Expand Down
2 changes: 1 addition & 1 deletion RtspServer/MediaTypes/RtpAudioSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal override void SendPackets()
{
try
{
if (Frames.Count is 0 && State == StreamState.Started)
if (Frames.IsEmpty && State == StreamState.Started)
{
if (RtpClient.IsActive) RtpClient.m_WorkerThread.Priority = System.Threading.ThreadPriority.Lowest;

Expand Down
2 changes: 1 addition & 1 deletion RtspServer/MediaTypes/RtpSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ internal virtual void SendPackets()
{
try
{
if (Packets.Count is 0)
if (Packets.IsEmpty)
{
System.Threading.Thread.Sleep(0);

Expand Down
2 changes: 1 addition & 1 deletion RtspServer/MediaTypes/RtpVideoSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ internal override void SendPackets()
{
try
{
if (Frames.Count is 0 && State is StreamState.Started)
if (Frames.IsEmpty && State is StreamState.Started)
{
if (RtpClient.IsActive)
RtpClient.m_WorkerThread.Priority = ThreadPriority.Lowest;
Expand Down
Loading

0 comments on commit dd5e552

Please sign in to comment.