📄 rudpstack.cs
字号:
//-- 1 - Send
RetransmissionTimer(rudp, now);
//-- 2 - ACKs
SendACKTimer(rudp, now);
//-- 3 - Check for keep alive
KeepAliveTimer(rudp, now);
//-- 4 - MTU Discovery
rudp._pmtuDiscovery.OnHeartBeat(now);
//-- 5 - Bandwidth
BandwidthTimer(rudp, now);
}
//---- Do not use 100%
Thread.Sleep(1);
}
}
#endregion
#region RetransmissionTimer
private static void RetransmissionTimer(RUDPSocket rudp, long now)
{
int count = rudp._sendingPackets.Count;
for (int index = 0; index < count; index++)
{
rudp._sendingPacketsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);
RUDPOutgoingPacket packet = rudp._sendingPackets[index];
rudp._sendingPacketsLock.ReleaseReaderLock();
//---- Not yet sended
if (packet.TSLastSend < 0)
continue;
//---- It is ACKed
if (packet.IsACKed)
{
rudp._sendingPacketsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);
rudp._sendingPackets.RemoveAt(index);
rudp._sendingPacketsLock.ReleaseWriterLock();
ReleaseOutgoingPacket(packet);
index--;
count--;
continue;
}
//---- Check for time out
if (packet.TSFirstSend > -1 && (now - packet.TSFirstSend) > rudp._sto)
{
//-- Normal time out
// Send connection Reset with ACK
OnDisconnected(rudp, DisconnectionReason.TimeOut);
return;
}
//---- Retransmission or not ?
if ((now - packet.TSLastSend) < rudp._rto)
continue;
//---- Get the SACK slots to send with
SACKSlot slot1 = null, slot2 = null, slot3 = null, slot4 = null;
rudp._sackWindow.GetSLACKSlots(out slot1, out slot2, out slot3, out slot4);
// Update the payload for the SACK slots
UpdatePacketPayload(packet.Payload, slot1, slot2, slot3, slot4);
#if CONSOLE_TRACE
string acksList = "";
if (slot1 != null)
acksList += " [" + slot1.StartPacketId + " <-> " + slot1.EndPacketId + "]";
if (slot2 != null)
acksList += " [" + slot2.StartPacketId + " <-> " + slot2.EndPacketId + "]";
if (slot3 != null)
acksList += " [" + slot3.StartPacketId + " <-> " + slot3.EndPacketId + "]";
if (slot4 != null)
acksList += " [" + slot4.StartPacketId + " <-> " + slot4.EndPacketId + "]";
#endif
// Send
#if CONSOLE_TRACE
Trace("Resend packet(" + rudp.Handle + "): " + packet.PacketId + " RTO=" + rudp._rto + "RTT=" + rudp._rtt + " ACKs:" + acksList);
#endif
if (SocketSendPacket(rudp, packet, packet.Payload, now))
{
rudp._controlWindow.OnResend(packet);
// Update
packet.Retransmission++;
}
}
}
#endregion
#region SendACKTimer
private static void SendACKTimer(RUDPSocket rudp, long now)
{
int acksCount = rudp._sackWindow.ACKCount;
if (acksCount < 1)
return;
//---- Delayed ACKs
if (acksCount < 2)
if (rudp._lastACKSendTS > -1 && (HiResTimer.MicroSeconds - rudp._lastACKSendTS) < DelayACKTime)
return;
rudp._lastACKSendTS = HiResTimer.MicroSeconds;
//---- Prepare the SACKs list
List<SACKSlot> sackSlots = rudp._sackWindow.PrepareACKList();
for (int index = 0; index < sackSlots.Count; index++)
{
//---- Get the SACK slots to send with
SACKSlot slot1 = null, slot2 = null, slot3 = null, slot4 = null;
if (sackSlots.Count > 0)
{
slot1 = sackSlots[0];
sackSlots.RemoveAt(0);
}
if (sackSlots.Count > 0)
{
slot2 = sackSlots[0];
sackSlots.RemoveAt(0);
}
if (sackSlots.Count > 0)
{
slot3 = sackSlots[0];
sackSlots.RemoveAt(0);
}
if (sackSlots.Count > 0)
{
slot4 = sackSlots[0];
sackSlots.RemoveAt(0);
}
#if CONSOLE_TRACE
if (slot1 != null)
Trace("Send ACK(" + rudp.Handle + "): " + slot1.StartPacketId + " <-> " + slot1.EndPacketId);
if (slot2 != null)
Trace("Send ACK(" + rudp.Handle + "): " + slot2.StartPacketId + " <-> " + slot2.EndPacketId);
if (slot3 != null)
Trace("Send ACK(" + rudp.Handle + "): " + slot3.StartPacketId + " <-> " + slot3.EndPacketId);
if (slot4 != null)
Trace("Send ACK(" + rudp.Handle + "): " + slot4.StartPacketId + " <-> " + slot4.EndPacketId);
#endif
byte[] packetPayload = MakePacketPayload(rudp, -1, RUDPPacketChannel.ACK, slot1, slot2, slot3, slot4, null, 0, 0);
SocketSendACK(rudp, rudp._physical, rudp._remoteEndPoint, packetPayload);
PayloadManager.Deallocate(RUDPPacketChannel.ACK, packetPayload);
}
}
#endregion
#region KeepAliveTimer
private static void KeepAliveTimer(RUDPSocket rudp, long now)
{
long lastSendTS = Math.Max(rudp._lastSendTS, rudp._lastACKSendTS);
//---- Send a keep alive (if possible)
if (rudp._status == RUDPSocketStatus.Connected &&
(now - lastSendTS) > RUDPStack.KeepAliveInterval &&
rudp._controlWindow.CanSend(0))
{
PushPacketToSend(rudp, true, RUDPPacketChannel.KeepAlive, new byte[0], 0, 0);
}
}
#endregion
#region BandwidthTimer
private static void BandwidthTimer(RUDPSocket rudp, long now)
{
//---- Send 2 packets
if (rudp._status == RUDPSocketStatus.Connected &&
(now - rudp._lastBandwidthTS) > RUDPStack.BandwidthInterval &&
rudp._controlWindow.CanSend(0))
{
PushPacketToSend(rudp, false, RUDPPacketChannel.Bandwidth01, null, 0, 0);
PushPacketToSend(rudp, false, RUDPPacketChannel.Bandwidth02, BitConverter.GetBytes(now), 0, 8);
rudp._lastBandwidthTS = now;
}
}
#endregion
#region Trace
#if CONSOLE_TRACE_MEMORY
static private List<string> _traces = new List<string>();
#endif
[Conditional("CONSOLE_TRACE")]
internal static void Trace(string text)
{
#if CONSOLE_TRACE_MEMORY
lock (_traces)
_traces.Add(text);
#else
Console.WriteLine(text);
#endif
}
#endregion
#region HandleException
internal static void HandleException(Exception exception, params object[] args)
{
string paramsText = "";
foreach (object val in args)
paramsText += " - " + val.ToString();
if (paramsText.Length > 0)
Console.WriteLine("CriticalError :" + exception.Message + '(' + paramsText + ")\n" + exception.StackTrace);
else
Console.WriteLine("CriticalError :" + exception.Message + '\n' + exception.StackTrace);
}
#endregion
#region StackFatalException
private static void StackFatalException(Exception exception)
{
HandleException(exception);
}
#endregion
#region SocketErrorToRUDPSocketError
static private RUDPSocketError SocketErrorToRUDPSocketError(SocketError socketError)
{
int error = (int)socketError;
return (RUDPSocketError)Enum.ToObject(typeof(RUDPSocketError), error);
}
#endregion
#region OnSocketUnhandledError
/// <summary>
/// Called when we have an error on a socket.
/// </summary>
static internal void OnSocketUnhandledError(PhysicalSocket physical, IPEndPoint remoteEndPoint, SocketError error)
{
//---- Get the socket
RUDPSocket rudp;
physical._connectedRDUPsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);
try
{
if (!physical._connectedRDUPs.TryGetValue(remoteEndPoint, out rudp))
return; // Released socket
}
finally
{
physical._connectedRDUPsLock.ReleaseReaderLock();
}
//---- Handle the error
OnSocketUnhandledError(rudp, SocketErrorToRUDPSocketError(error), null);
}
/// <summary>
/// Called when we have an error on a socket.
/// </summary>
static internal void OnSocketUnhandledError(RUDPSocket rudp, RUDPSocketError error, RUDPSendIAsyncResult sendAsyncResult)
{
//---- Disconnect the socket
OnDisconnected(rudp, DisconnectionReason.SocketError);
//---- Handle the error and forward it to the socket
if (rudp._status == RUDPSocketStatus.Connecting)
rudp.OnEndConnect(error);
else
{
// On Send Error
if (sendAsyncResult != null)
rudp.OnEndSend(error, sendAsyncResult);
// ELSE ... HOW TO GET sendAsyncResult when NULL ?????
// On Receive Error
RUDPReceiveIAsyncResult receiveAsyncResult = null;
Interlocked.Exchange<RUDPReceiveIAsyncResult>(ref receiveAsyncResult, rudp._asyncResultReceive);
if (receiveAsyncResult != null)
{
Interlocked.Exchange<RUDPReceiveIAsyncResult>(ref rudp._asyncResultReceive, null);
rudp.OnEndReceive(error, null, true, receiveAsyncResult);
}
}
}
#endregion
#region Helpers
private static void UpdateAffinity()
{
if (Environment.ProcessorCount < 2)
return;
int processorAffinity = 0;
for (int index = 0; index < Environment.ProcessorCount; index++)
processorAffinity += 1 << index;
int threadId = Thread.CurrentThread.ManagedThreadId;
Process process = Process.GetCurrentProcess();
process.ProcessorAffinity = new IntPtr(processorAffinity);
/*
foreach (ProcessThread processThread in process.Threads)
if (processThread.Id == threadId)
processThread.ProcessorAffinity = new IntPtr(processorAffinity);*/
}
/// <summary>
/// The purpose of StallThread is to increase efficiency when contention on the SpinWaitLocks
/// object is detected.
/// On a multi-processor machine, the thread will call Thread.SpinWait, which causes the thread
/// to remain in user mode; it will not transition to kernel mode and it will never enter a
/// wait state. Thread's SpinWait method was added to support hyper-threaded CPUs.
/// If your code is running on a machine with hyper-threaded CPUs, this method kicks
/// the other thread so that it starts running (for more information about hyper-threaded CPUs,
/// see the sidebar "Hyper-Threaded CPUs").
/// When there is contention on a single-processor machine, I do force the thread to transition
/// to kernel mode (by calling SwitchToThread) for if I didn't, CPU time would be wasted as the
/// thread spun without any hope of finding the lock released.
/// </summary>
public static void StallThread()
{
if (IsSingleCpuMachine)
{
// On single-CPU system, spinning does no good
SwitchToThread();
}
else
{
// The multi-CPU system might be hyper-threaded, let the other thread run
Thread.SpinWait(1);
}
}
/// <summary>
/// Returns the local machine IP address.
/// Can be an IPv6 or IPv4
/// </summary>
static internal IPAddress LocalIPAddress(ProtocolType protocol)
{
IPHostEntry localMachineInfo = Dns.Resolve(Dns.GetHostName());
// Search IPv6 Address (if supported)
if (System.Net.Sockets.Socket.OSSupportsIPv6 && protocol == ProtocolType.IPv6)
foreach (IPAddress ipAddress in localMachineInfo.AddressList)
if (ipAddress.AddressFamily == AddressFamily.InterNetworkV6)
if (ipAddress.ToString() != "::1")
return ipAddress;
// Search for IPv4
foreach (IPAddress ipAddress in localMachineInfo.AddressList)
if (ipAddress.AddressFamily == AddressFamily.InterNetwork)
if (ipAddress.ToString() != "127.0.0.1")
return ipAddress;
// IP = "127.0.0.1" ... No IP !!
return IPAddress.Parse("127.0.0.1");
}
#endregion
#region Packets Pool
private static LockFreeQueue<RUDPOutgoingPacket> _outgoingPacketsPools = new LockFreeQueue<RUDPOutgoingPacket>();
internal static RUDPOutgoingPacket NewOutgoingPacket(int packetId, long sequence, byte[] payload, RUDPPacketChannel channel)
{
RUDPOutgoingPacket packet;
if (!_outgoingPacketsPools.TryDequeue(out packet))
{
for (int index = 0; index < 100; index++)
_outgoingPacketsPools.Enqueue(new RUDPOutgoingPacket(-1, -1, null, RUDPPacketChannel.Undefined));
return new RUDPOutgoingPacket(packetId, sequence, payload, channel);
}
packet.Reset();
packet.PacketId = packetId;
packet.Payload = payload;
packet.Channel = channel;
packet.Sequence = sequence;
return packet;
}
internal static void ReleaseOutgoingPacket(RUDPOutgoingPacket packet)
{
_outgoingPacketsPools.Enqueue(packet);
PayloadManager.Deallocate(packet.Channel, packet.Payload);
packet.Payload = null;
}
#endregion
#region Fragments Pool
private static LockFreeQueue<FragmentInformation> _fragmentsPools = new LockFreeQueue<FragmentInformation>();
internal static FragmentInformation NewFragmentInformation(RUDPSocket rudpSocket, bool isReliable, int mtu, byte[] payload, int offset, int size, RUDPSendIAsyncResult asyncResult)
{
FragmentInformation fragment;
if (!_fragmentsPools.TryDequeue(out fragment))
{
for (int index = 0; index < 100; index++)
_fragmentsPools.Enqueue(new FragmentInformation(null, false, -1, null, -1, -1, null));
return new FragmentInformation(rudpSocket, isReliable, mtu, payload, offset, size, asyncResult);
}
fragment.rudp = rudpSocket;
fragment.IsReliable = isReliable;
fragment.Offset = offset;
fragment.Size = size;
fragment.Payload = payload;
fragment.MTU = mtu;
fragment.AsyncResult = asyncResult;
fragment.IsInAsyncThread = false;
return fragment;
}
internal static void ReleaseFragmentInformation(FragmentInformation fragment)
{
_fragmentsPools.Enqueue(fragment);
}
#endregion
#region API
[DllImport("Kernel32", ExactSpelling = true)]
[return: MarshalAs(UnmanagedType.Bool)]
private static extern Boolean SwitchToThread();
#endregion
}
#region DisconnectionReason
public enum DisconnectionReason
{
TimeOut,
ConnectionClosed,
SocketError
}
#endregion
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -