📄 abstractwindow.cs
字号:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;
namespace Helper.Net.RUDP
{
#region SWSlot
internal sealed class SWSlot
{
internal int StartPacketId;
internal int EndPacketId;
internal long StartByte;
internal long EndByte;
internal SWSlot(int startPacketId,
int endPacketId,
long startByte,
long endByte)
{
StartPacketId = startPacketId;
EndPacketId = endPacketId;
StartByte = startByte;
EndByte = endByte;
}
}
#endregion
/// <summary>
/// This window act as Congestions and Sliding window.
///
/// Because we have SACK and we handle ACK in a more efficient way, we do not really
/// have a sliding window but another mechanism (slots) to handle ACKs.
/// </summary>
abstract internal class AbstractWindow
{
#region Variables
internal RUDPSocket _rudp;
//---- The slots : The AKCs we are waiting for
private List<SWSlot> _sendSlots = new List<SWSlot>();
private ReaderWriterLock _sendSlotsLock = new ReaderWriterLock();
//---- Window sizes
// Congestion Window Size
internal double _cwnd;
// Receive Window Size
internal int _rwnd;
// Advertised window
internal int _awnd = Int32.MaxValue;
//---- Synchronization
internal ManualResetEvent _event = new ManualResetEvent(true);
#endregion
#region Constructor
internal AbstractWindow(RUDPSocket rudp)
{
_rudp = rudp;
}
#endregion
#region Reset
internal virtual void Reset()
{
_sendSlotsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);
_sendSlots.Clear();
_sendSlotsLock.ReleaseWriterLock();
// Initial = MTU
_cwnd = 2 * PMTUDiscovery.MinMTU;
_awnd = Int32.MaxValue;
_rwnd = 0;
_rudp._deltaRtt = 0;
_rudp._rtt = 0;
_rudp._rto = 3 * 1000000; // http://www.faqs.org/rfcs/rfc2988.html
_rudp._sto = (int)(15000000 + 4 * _rudp._rto);
if (_event != null)
_event.Set();
}
#endregion
#region CanSend
internal bool CanSend(int payloadLength)
{
_sendSlotsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);
if (_sendSlots.Count < 1)
{
_sendSlotsLock.ReleaseReaderLock();
return true;
}
// Used Window size
long startByte = _sendSlots[0].StartByte;
long endByte = _sendSlots[_sendSlots.Count - 1].EndByte;
_sendSlotsLock.ReleaseReaderLock();
RUDPStack.Trace("Congestion => " + Math.Min(_rudp._sendSize, _cwnd) + ">" + (payloadLength + endByte - startByte));
return Math.Min(_awnd, Math.Min(_rudp._sendSize, _cwnd)) > (payloadLength + endByte - startByte);
}
#endregion
#region CanReceive
internal bool CanReceive(int length)
{
return _rudp._receiveSize > (_rwnd + length);
}
#endregion
#region OnSend
internal void OnSend(int packetId, long sequence, int payloadLength)
{
//---- Update window (if needed)
OnSend_UpdateWindow(payloadLength);
//---- Sliding
try
{
SWSlot slot;
_sendSlotsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);
//---- First slot
if (_sendSlots.Count < 1)
{
slot = new SWSlot(packetId, -1, sequence, sequence + payloadLength);
_sendSlots.Add(slot);
return;
}
//---- Check the last slot only, because ordered
slot = _sendSlots[_sendSlots.Count - 1];
// Grow the slot
if ((slot.EndPacketId < 0 && slot.StartPacketId == packetId - 1) ||
(slot.EndPacketId > -1 && slot.EndPacketId == packetId - 1))
{
slot.EndPacketId = packetId;
slot.EndByte += payloadLength;
return;
}
// New slot
slot = new SWSlot(packetId, -1, sequence, sequence + payloadLength);
_sendSlots.Add(slot);
}
finally
{
// Lock
if (!CanSend(payloadLength))
_event.Reset();
_sendSlotsLock.ReleaseWriterLock();
}
}
#endregion
#region OnResend
/// <summary>
/// Called when a packet is resended
/// </summary>
internal void OnResend(RUDPOutgoingPacket packet)
{
//---- Congestion
OnResend_UpdateParameters(packet);
OnResend_UpdateWindow();
// Congestion is at least one MTU
_cwnd = Math.Max(_rudp._mtu, _cwnd);
}
#endregion
#region OnACK
internal void OnACK(RUDPOutgoingPacket packet, double currentRTT)
{
// Duplicated ACK
if (packet == null)
return;
//---- Congestion
OnACK_UpdateParameters(packet, currentRTT);
OnACK_UpdateWindow(packet);
if (_cwnd < _rudp.MTU)
_cwnd = _rudp.MTU;
//---- Sliding
int packetId = packet.PacketId;
_sendSlotsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);
SWSlot slot = null;
try
{
for (int index = 0; index < _sendSlots.Count; index++)
{
slot = _sendSlots[index];
// Delete this slot
if (slot.EndPacketId < 0 && slot.StartPacketId == packetId)
{
_sendSlots.RemoveAt(index);
return;
}
// Decrease the slot
if (slot.EndPacketId > -1 && slot.StartPacketId == packetId)
{
slot.StartPacketId++;
slot.StartByte += packet.Payload.Length;
if (slot.StartPacketId == slot.EndPacketId)
slot.EndPacketId = -1;
return;
}
// Decrease the slot
if (slot.EndPacketId > -1 && slot.EndPacketId == packetId)
{
slot.EndPacketId--;
slot.EndByte -= packet.Payload.Length;
if (slot.StartPacketId == slot.EndPacketId)
slot.EndPacketId = -1;
return;
}
// Split the slot
if (slot.EndPacketId > -1 && slot.StartPacketId <= packetId && slot.EndPacketId >= packetId)
{
// Right slot
SWSlot newSlot = new SWSlot(packetId + 1,
slot.EndPacketId,
packet.Sequence + packet.Payload.Length + 1,
slot.EndByte);
if (newSlot.StartPacketId == newSlot.EndPacketId)
newSlot.EndPacketId = -1;
_sendSlots.Insert(index + 1, newSlot);
// Left slot
slot.EndPacketId = packetId - 1;
slot.EndByte = packet.Sequence - 1;
if (slot.StartPacketId == slot.EndPacketId)
slot.EndPacketId = -1;
return;
}
}
}
finally
{
if (CanSend(0))
_event.Set();
_sendSlotsLock.ReleaseWriterLock();
}
}
#endregion
#region OnReceive
internal void OnReceive(RUDPIngoingPacket packet)
{
if (packet != null)
Interlocked.Exchange(ref _rwnd, _rwnd + packet.Payload.Length);
}
#endregion
#region OnReceiveProcessed
internal void OnReceiveProcessed(RUDPIngoingPacket packet)
{
Interlocked.Exchange(ref _rwnd, _rwnd - packet.Payload.Length);
}
#endregion
#region OnReceiveAdvertisedWindow
internal void OnReceiveAdvertisedWindow(int windowSize)
{
_awnd = windowSize;
}
#endregion
#region WaitOne
internal void WaitOne()
{
_event.WaitOne(1, true);
}
#endregion
#region Properties
internal ManualResetEvent WaitObject
{
get
{
return _event;
}
}
internal int AdvertisedWindow
{
get
{
// AdvertisedWindow = remaining size in the receive buffer
return _rudp._receiveSize - _rwnd;
}
}
#endregion
#region OnResend_UpdateParameters
internal void OnResend_UpdateParameters(RUDPOutgoingPacket packet)
{
// Karn's Algorithm : On successive retransmissions, set each timeout to twice the previous one.
if (packet.Retransmission > 0)
{
//-- Calculate _rto
//traces.Add("RTO x 2 : " + _rudp._rto + " new : " + _rudp._rto * 2);
_rudp._rto *= 2;
if (_rudp._rto > 60000000)
_rudp._rto = 60000000;
//-- Calculate _sto
_rudp._sto = (int)(15000000 + 4 * _rudp._rtt);
}
}
//private List<string> traces = new List<string>();
#endregion
#region OnACK_UpdateParameters
internal void OnACK_UpdateParameters(RUDPOutgoingPacket packet, double currentRTT)
{
//---- TCP - (RFC 2988) : http://www.faqs.org/rfcs/rfc2988.html
if (packet.Retransmission > 0)
return;
//-- Calculate _rtt
if (_rudp._rtt == 0)
{
_rudp._rtt = currentRTT;
_rudp._deltaRtt = currentRTT / 2;
}
else
{
_rudp._deltaRtt = (0.75 * _rudp._deltaRtt + 0.25 * Math.Abs(_rudp._rtt - currentRTT));
_rudp._rtt = (0.875 * _rudp._rtt + 0.125 * currentRTT);
}
//-- Calculate _rto
//traces.Add("RTO ACK : " + _rudp._rto + " new : " + _rudp._rtt + Math.Max(1, 4 * _rudp._deltaRtt));
_rudp._rto = _rudp._rtt + Math.Max(1, 4 * _rudp._deltaRtt);
//if (_rudp._rto < 1000000)
// _rudp._rto = 1000000;
//if (_rudp._rto > 60000000)
// _rudp._rto = 60000000;
//Console.WriteLine("RTT : " + _rudp._rtt);
//Console.WriteLine("_rto : " + _rudp._rto);
//-- Calculate _sto
_rudp._sto = (int)(15000000 + 4 * _rudp._rtt);
//RUDPStack.Trace("ResendIntervalTimeout : " + _rudp._rto);
}
#endregion
abstract internal void OnACK_UpdateWindow(RUDPOutgoingPacket packet);
virtual internal void OnSend_UpdateWindow(int payloadLength)
{
}
abstract internal void OnResend_UpdateWindow();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -