⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 abstractwindow.cs

📁 rudp可靠保障得udp传输
💻 CS
字号:
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace Helper.Net.RUDP
{

	#region SWSlot

	internal sealed class SWSlot
	{
		internal int StartPacketId;
		internal int EndPacketId;

		internal long StartByte;
		internal long EndByte;

		internal SWSlot(int startPacketId,
						int endPacketId,
						long startByte,
						long endByte)
		{
			StartPacketId = startPacketId;
			EndPacketId = endPacketId;
			StartByte = startByte;
			EndByte = endByte;
		}
	}

	#endregion

	/// <summary>
	/// This window act as Congestions and Sliding window.
	/// 
	/// Because we have SACK and we handle ACK in a more efficient way, we do not really
	/// have a sliding window but another mechanism (slots) to handle ACKs.
	/// </summary>
	abstract internal class AbstractWindow
	{

		#region Variables

		internal RUDPSocket _rudp;

		//---- The slots : The AKCs we are waiting for
		private List<SWSlot> _sendSlots = new List<SWSlot>();
		private ReaderWriterLock _sendSlotsLock = new ReaderWriterLock();

		//---- Window sizes

		// Congestion Window Size
		internal double _cwnd;

		// Receive Window Size
		internal int _rwnd;

		// Advertised window
		internal int _awnd = Int32.MaxValue;

		//---- Synchronization
		internal ManualResetEvent _event = new ManualResetEvent(true);

		#endregion

		#region Constructor

		internal AbstractWindow(RUDPSocket rudp)
		{
			_rudp = rudp;
		}

		#endregion

		#region Reset

		internal virtual void Reset()
		{
			_sendSlotsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);
			_sendSlots.Clear();
			_sendSlotsLock.ReleaseWriterLock();

			// Initial = MTU
			_cwnd = 2 * PMTUDiscovery.MinMTU;
			_awnd = Int32.MaxValue;
			_rwnd = 0;

			_rudp._deltaRtt = 0;
			_rudp._rtt = 0;
			_rudp._rto = 3 * 1000000; // http://www.faqs.org/rfcs/rfc2988.html
			_rudp._sto = (int)(15000000 + 4 * _rudp._rto);

			if (_event != null)
				_event.Set();
		}

		#endregion

		#region CanSend

		internal bool CanSend(int payloadLength)
		{
			_sendSlotsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);

			if (_sendSlots.Count < 1)
			{
				_sendSlotsLock.ReleaseReaderLock();
				return true;
			}

			// Used Window size
			long startByte = _sendSlots[0].StartByte;
			long endByte = _sendSlots[_sendSlots.Count - 1].EndByte;

			_sendSlotsLock.ReleaseReaderLock();

			RUDPStack.Trace("Congestion => " + Math.Min(_rudp._sendSize, _cwnd) + ">" + (payloadLength + endByte - startByte));

			return Math.Min(_awnd, Math.Min(_rudp._sendSize, _cwnd)) > (payloadLength + endByte - startByte);
		}

		#endregion

		#region CanReceive

		internal bool CanReceive(int length)
		{
			return _rudp._receiveSize > (_rwnd + length);
		}

		#endregion

		#region OnSend

		internal void OnSend(int packetId, long sequence, int payloadLength)
		{
			//---- Update window (if needed)
			OnSend_UpdateWindow(payloadLength);

			//---- Sliding
			try
			{
				SWSlot slot;

				_sendSlotsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);

				//---- First slot
				if (_sendSlots.Count < 1)
				{
					slot = new SWSlot(packetId, -1, sequence, sequence + payloadLength);
					_sendSlots.Add(slot);
					return;
				}

				//---- Check the last slot only, because ordered
				slot = _sendSlots[_sendSlots.Count - 1];

				// Grow the slot
				if ((slot.EndPacketId < 0 && slot.StartPacketId == packetId - 1) ||
					(slot.EndPacketId > -1 && slot.EndPacketId == packetId - 1))
				{
					slot.EndPacketId = packetId;
					slot.EndByte += payloadLength;
					return;
				}

				// New slot
				slot = new SWSlot(packetId, -1, sequence, sequence + payloadLength);
				_sendSlots.Add(slot);
			}
			finally
			{
				// Lock
				if (!CanSend(payloadLength))
					_event.Reset();

				_sendSlotsLock.ReleaseWriterLock();
			}
		}

		#endregion

		#region OnResend

		/// <summary>
		/// Called when a packet is resended
		/// </summary>
		internal void OnResend(RUDPOutgoingPacket packet)
		{
			//---- Congestion
			OnResend_UpdateParameters(packet);
			OnResend_UpdateWindow();

			// Congestion is at least one MTU
			_cwnd = Math.Max(_rudp._mtu, _cwnd);
		}

		#endregion

		#region OnACK

		internal void OnACK(RUDPOutgoingPacket packet, double currentRTT)
		{
			// Duplicated ACK
			if (packet == null)
				return;

			//---- Congestion
			OnACK_UpdateParameters(packet, currentRTT);
			OnACK_UpdateWindow(packet);

			if (_cwnd < _rudp.MTU)
				_cwnd = _rudp.MTU;

			//---- Sliding
			int packetId = packet.PacketId;

			_sendSlotsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);

			SWSlot slot = null;
			try
			{
				for (int index = 0; index < _sendSlots.Count; index++)
				{
					slot = _sendSlots[index];

					// Delete this slot
					if (slot.EndPacketId < 0 && slot.StartPacketId == packetId)
					{
						_sendSlots.RemoveAt(index);
						return;
					}

					// Decrease the slot
					if (slot.EndPacketId > -1 && slot.StartPacketId == packetId)
					{
						slot.StartPacketId++;
						slot.StartByte += packet.Payload.Length;
						if (slot.StartPacketId == slot.EndPacketId)
							slot.EndPacketId = -1;
						return;
					}

					// Decrease the slot
					if (slot.EndPacketId > -1 && slot.EndPacketId == packetId)
					{
						slot.EndPacketId--;
						slot.EndByte -= packet.Payload.Length;
						if (slot.StartPacketId == slot.EndPacketId)
							slot.EndPacketId = -1;
						return;
					}

					// Split the slot
					if (slot.EndPacketId > -1 && slot.StartPacketId <= packetId && slot.EndPacketId >= packetId)
					{
						// Right slot
						SWSlot newSlot = new SWSlot(packetId + 1,
													slot.EndPacketId,
													packet.Sequence + packet.Payload.Length + 1,
													slot.EndByte);
						if (newSlot.StartPacketId == newSlot.EndPacketId)
							newSlot.EndPacketId = -1;
						_sendSlots.Insert(index + 1, newSlot);

						// Left slot
						slot.EndPacketId = packetId - 1;
						slot.EndByte = packet.Sequence - 1;
						if (slot.StartPacketId == slot.EndPacketId)
							slot.EndPacketId = -1;

						return;
					}
				}

			}
			finally
			{
				if (CanSend(0))
					_event.Set();

				_sendSlotsLock.ReleaseWriterLock();
			}
		}

		#endregion

		#region OnReceive

		internal void OnReceive(RUDPIngoingPacket packet)
		{
			if (packet != null)
				Interlocked.Exchange(ref _rwnd, _rwnd + packet.Payload.Length);
		}

		#endregion

		#region OnReceiveProcessed

		internal void OnReceiveProcessed(RUDPIngoingPacket packet)
		{
			Interlocked.Exchange(ref _rwnd, _rwnd - packet.Payload.Length);
		}

		#endregion

		#region OnReceiveAdvertisedWindow

		internal void OnReceiveAdvertisedWindow(int windowSize)
		{
			_awnd = windowSize;
		}

		#endregion

		#region WaitOne

		internal void WaitOne()
		{
			_event.WaitOne(1, true);
		}

		#endregion

		#region Properties

		internal ManualResetEvent WaitObject
		{
			get
			{
				return _event;
			}
		}

		internal int AdvertisedWindow
		{
			get
			{
				// AdvertisedWindow = remaining size in the receive buffer
				return _rudp._receiveSize - _rwnd;
			}
		}

		#endregion

		#region OnResend_UpdateParameters

		internal void OnResend_UpdateParameters(RUDPOutgoingPacket packet)
		{
			// Karn's Algorithm : On successive retransmissions, set each timeout to twice the previous one.
			if (packet.Retransmission > 0)
			{
				//-- Calculate _rto
				//traces.Add("RTO x 2 : " + _rudp._rto + " new : " + _rudp._rto * 2);
				_rudp._rto *= 2;
				if (_rudp._rto > 60000000)
					_rudp._rto = 60000000;

				//-- Calculate _sto
				_rudp._sto = (int)(15000000 + 4 * _rudp._rtt);
			}
		}

		//private List<string> traces = new List<string>();

		#endregion

		#region OnACK_UpdateParameters

		internal void OnACK_UpdateParameters(RUDPOutgoingPacket packet, double currentRTT)
		{
			//---- TCP - (RFC 2988) : http://www.faqs.org/rfcs/rfc2988.html
			if (packet.Retransmission > 0)
				return;

			//-- Calculate _rtt
			if (_rudp._rtt == 0)
			{
				_rudp._rtt = currentRTT;
				_rudp._deltaRtt = currentRTT / 2;
			}
			else
			{
				_rudp._deltaRtt = (0.75 * _rudp._deltaRtt + 0.25 * Math.Abs(_rudp._rtt - currentRTT));
				_rudp._rtt = (0.875 * _rudp._rtt + 0.125 * currentRTT);
			}

			//-- Calculate _rto
			//traces.Add("RTO ACK : " + _rudp._rto + " new : " + _rudp._rtt + Math.Max(1, 4 * _rudp._deltaRtt));
			_rudp._rto = _rudp._rtt + Math.Max(1, 4 * _rudp._deltaRtt);

			//if (_rudp._rto < 1000000)
			//    _rudp._rto = 1000000;
			//if (_rudp._rto > 60000000)
			//    _rudp._rto = 60000000;

			//Console.WriteLine("RTT : " + _rudp._rtt);
			//Console.WriteLine("_rto : " + _rudp._rto);

			//-- Calculate _sto
			_rudp._sto = (int)(15000000 + 4 * _rudp._rtt);

			//RUDPStack.Trace("ResendIntervalTimeout : " + _rudp._rto);
		}

		#endregion

		abstract internal void OnACK_UpdateWindow(RUDPOutgoingPacket packet);

		virtual internal void OnSend_UpdateWindow(int payloadLength)
		{
		}

		abstract internal void OnResend_UpdateWindow();

	}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -