⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rudpstack.cs

📁 rudp可靠保障得udp传输
💻 CS
📖 第 1 页 / 共 4 页
字号:

					//-- 1 - Send
					RetransmissionTimer(rudp, now);

					//-- 2 - ACKs
					SendACKTimer(rudp, now);

					//-- 3 - Check for keep alive
					KeepAliveTimer(rudp, now);

					//-- 4 - MTU Discovery
					rudp._pmtuDiscovery.OnHeartBeat(now);

					//-- 5 - Bandwidth
					BandwidthTimer(rudp, now);
				}

				//---- Do not use 100%
				Thread.Sleep(1);
			}

		}

		#endregion

		#region RetransmissionTimer

		private static void RetransmissionTimer(RUDPSocket rudp, long now)
		{
			int count = rudp._sendingPackets.Count;
			for (int index = 0; index < count; index++)
			{
				rudp._sendingPacketsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);
				RUDPOutgoingPacket packet = rudp._sendingPackets[index];
				rudp._sendingPacketsLock.ReleaseReaderLock();

				//---- Not yet sended
				if (packet.TSLastSend < 0)
					continue;

				//---- It is ACKed
				if (packet.IsACKed)
				{
					rudp._sendingPacketsLock.AcquireWriterLock(RUDPSocket.LockTimeOut);
					rudp._sendingPackets.RemoveAt(index);
					rudp._sendingPacketsLock.ReleaseWriterLock();

					ReleaseOutgoingPacket(packet);

					index--;
					count--;
					continue;
				}

				//---- Check for time out
				if (packet.TSFirstSend > -1 && (now - packet.TSFirstSend) > rudp._sto)
				{
					//-- Normal time out
					// Send connection Reset with ACK
					OnDisconnected(rudp, DisconnectionReason.TimeOut);
					return;
				}

				//---- Retransmission or not ?
				if ((now - packet.TSLastSend) < rudp._rto)
					continue;

				//---- Get the SACK slots to send with
				SACKSlot slot1 = null, slot2 = null, slot3 = null, slot4 = null;
				rudp._sackWindow.GetSLACKSlots(out slot1, out slot2, out slot3, out slot4);

				// Update the payload for the SACK slots
				UpdatePacketPayload(packet.Payload, slot1, slot2, slot3, slot4);

#if CONSOLE_TRACE
				string acksList = "";
				if (slot1 != null)
					acksList += " [" + slot1.StartPacketId + " <-> " + slot1.EndPacketId + "]";
				if (slot2 != null)
					acksList += " [" + slot2.StartPacketId + " <-> " + slot2.EndPacketId + "]";
				if (slot3 != null)
					acksList += " [" + slot3.StartPacketId + " <-> " + slot3.EndPacketId + "]";
				if (slot4 != null)
					acksList += " [" + slot4.StartPacketId + " <-> " + slot4.EndPacketId + "]";
#endif

				// Send
#if CONSOLE_TRACE
				Trace("Resend packet(" + rudp.Handle + "): " + packet.PacketId + " RTO=" + rudp._rto + "RTT=" + rudp._rtt + " ACKs:" + acksList);
#endif
				if (SocketSendPacket(rudp, packet, packet.Payload, now))
				{
					rudp._controlWindow.OnResend(packet);

					// Update
					packet.Retransmission++;
				}
			}
		}

		#endregion

		#region SendACKTimer

		private static void SendACKTimer(RUDPSocket rudp, long now)
		{
			int acksCount = rudp._sackWindow.ACKCount;
			if (acksCount < 1)
				return;

			//---- Delayed ACKs
			if (acksCount < 2)
				if (rudp._lastACKSendTS > -1 && (HiResTimer.MicroSeconds - rudp._lastACKSendTS) < DelayACKTime)
					return;

			rudp._lastACKSendTS = HiResTimer.MicroSeconds;

			//---- Prepare the SACKs list
			List<SACKSlot> sackSlots = rudp._sackWindow.PrepareACKList();

			for (int index = 0; index < sackSlots.Count; index++)
			{
				//---- Get the SACK slots to send with
				SACKSlot slot1 = null, slot2 = null, slot3 = null, slot4 = null;
				if (sackSlots.Count > 0)
				{
					slot1 = sackSlots[0];
					sackSlots.RemoveAt(0);
				}
				if (sackSlots.Count > 0)
				{
					slot2 = sackSlots[0];
					sackSlots.RemoveAt(0);
				}
				if (sackSlots.Count > 0)
				{
					slot3 = sackSlots[0];
					sackSlots.RemoveAt(0);
				}
				if (sackSlots.Count > 0)
				{
					slot4 = sackSlots[0];
					sackSlots.RemoveAt(0);
				}

#if CONSOLE_TRACE
				if (slot1 != null)
					Trace("Send ACK(" + rudp.Handle + "): " + slot1.StartPacketId + " <-> " + slot1.EndPacketId);
				if (slot2 != null)
					Trace("Send ACK(" + rudp.Handle + "): " + slot2.StartPacketId + " <-> " + slot2.EndPacketId);
				if (slot3 != null)
					Trace("Send ACK(" + rudp.Handle + "): " + slot3.StartPacketId + " <-> " + slot3.EndPacketId);
				if (slot4 != null)
					Trace("Send ACK(" + rudp.Handle + "): " + slot4.StartPacketId + " <-> " + slot4.EndPacketId);
#endif

				byte[] packetPayload = MakePacketPayload(rudp, -1, RUDPPacketChannel.ACK, slot1, slot2, slot3, slot4, null, 0, 0);
				SocketSendACK(rudp, rudp._physical, rudp._remoteEndPoint, packetPayload);
				PayloadManager.Deallocate(RUDPPacketChannel.ACK, packetPayload);
			}
		}

		#endregion

		#region KeepAliveTimer

		private static void KeepAliveTimer(RUDPSocket rudp, long now)
		{
			long lastSendTS = Math.Max(rudp._lastSendTS, rudp._lastACKSendTS);

			//---- Send a keep alive (if possible)
			if (rudp._status == RUDPSocketStatus.Connected &&
				(now - lastSendTS) > RUDPStack.KeepAliveInterval &&
				rudp._controlWindow.CanSend(0))
			{
				PushPacketToSend(rudp, true, RUDPPacketChannel.KeepAlive, new byte[0], 0, 0);
			}
		}

		#endregion

		#region BandwidthTimer

		private static void BandwidthTimer(RUDPSocket rudp, long now)
		{
			//---- Send 2 packets
			if (rudp._status == RUDPSocketStatus.Connected &&
				(now - rudp._lastBandwidthTS) > RUDPStack.BandwidthInterval &&
				rudp._controlWindow.CanSend(0))
			{
				PushPacketToSend(rudp, false, RUDPPacketChannel.Bandwidth01, null, 0, 0);
				PushPacketToSend(rudp, false, RUDPPacketChannel.Bandwidth02, BitConverter.GetBytes(now), 0, 8);
				rudp._lastBandwidthTS = now;
			}
		}

		#endregion

		#region Trace

#if CONSOLE_TRACE_MEMORY
		static private List<string> _traces = new List<string>();
#endif

		[Conditional("CONSOLE_TRACE")]
		internal static void Trace(string text)
		{
#if CONSOLE_TRACE_MEMORY
			lock (_traces)
				_traces.Add(text);
#else
			Console.WriteLine(text);
#endif
		}

		#endregion

		#region HandleException

		internal static void HandleException(Exception exception, params object[] args)
		{
			string paramsText = "";

			foreach (object val in args)
				paramsText += " - " + val.ToString();

			if (paramsText.Length > 0)
				Console.WriteLine("CriticalError :" + exception.Message + '(' + paramsText + ")\n" + exception.StackTrace);

			else
				Console.WriteLine("CriticalError :" + exception.Message + '\n' + exception.StackTrace);
		}

		#endregion

		#region StackFatalException

		private static void StackFatalException(Exception exception)
		{
			HandleException(exception);
		}

		#endregion

		#region SocketErrorToRUDPSocketError

		static private RUDPSocketError SocketErrorToRUDPSocketError(SocketError socketError)
		{
			int error = (int)socketError;
			return (RUDPSocketError)Enum.ToObject(typeof(RUDPSocketError), error);
		}

		#endregion

		#region OnSocketUnhandledError

		/// <summary>
		/// Called when we have an error on a socket.
		/// </summary>
		static internal void OnSocketUnhandledError(PhysicalSocket physical, IPEndPoint remoteEndPoint, SocketError error)
		{
			//---- Get the socket
			RUDPSocket rudp;
			physical._connectedRDUPsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);
			try
			{
				if (!physical._connectedRDUPs.TryGetValue(remoteEndPoint, out rudp))
					return; // Released socket
			}
			finally
			{
				physical._connectedRDUPsLock.ReleaseReaderLock();
			}

			//---- Handle the error
			OnSocketUnhandledError(rudp, SocketErrorToRUDPSocketError(error), null);
		}

		/// <summary>
		/// Called when we have an error on a socket.
		/// </summary>
		static internal void OnSocketUnhandledError(RUDPSocket rudp, RUDPSocketError error, RUDPSendIAsyncResult sendAsyncResult)
		{
			//---- Disconnect the socket
			OnDisconnected(rudp, DisconnectionReason.SocketError);

			//---- Handle the error and forward it to the socket
			if (rudp._status == RUDPSocketStatus.Connecting)
				rudp.OnEndConnect(error);
			else
			{
				// On Send Error
				if (sendAsyncResult != null)
					rudp.OnEndSend(error, sendAsyncResult);

				// ELSE ... HOW TO GET sendAsyncResult when NULL ?????

				// On Receive Error
				RUDPReceiveIAsyncResult receiveAsyncResult = null;
				Interlocked.Exchange<RUDPReceiveIAsyncResult>(ref receiveAsyncResult, rudp._asyncResultReceive);
				if (receiveAsyncResult != null)
				{
					Interlocked.Exchange<RUDPReceiveIAsyncResult>(ref rudp._asyncResultReceive, null);

					rudp.OnEndReceive(error, null, true, receiveAsyncResult);
				}
			}
		}

		#endregion

		#region Helpers

		private static void UpdateAffinity()
		{
			if (Environment.ProcessorCount < 2)
				return;

			int processorAffinity = 0;
			for (int index = 0; index < Environment.ProcessorCount; index++)
				processorAffinity += 1 << index;

			int threadId = Thread.CurrentThread.ManagedThreadId;
			Process process = Process.GetCurrentProcess();
			process.ProcessorAffinity = new IntPtr(processorAffinity);
			/*
			foreach (ProcessThread processThread in process.Threads)
				if (processThread.Id == threadId)
					processThread.ProcessorAffinity = new IntPtr(processorAffinity);*/
		}

		/// <summary>
		/// The purpose of StallThread is to increase efficiency when contention on the SpinWaitLocks
		/// object is detected.
		/// On a multi-processor machine, the thread will call Thread.SpinWait, which causes the thread
		/// to remain in user mode; it will not transition to kernel mode and it will never enter a
		/// wait state. Thread's SpinWait method was added to support hyper-threaded CPUs.
		/// If your code is running on a machine with hyper-threaded CPUs, this method kicks
		/// the other thread so that it starts running (for more information about hyper-threaded CPUs,
		/// see the sidebar "Hyper-Threaded CPUs").
		/// When there is contention on a single-processor machine, I do force the thread to transition
		/// to kernel mode (by calling SwitchToThread) for if I didn't, CPU time would be wasted as the
		/// thread spun without any hope of finding the lock released.
		/// </summary>
		public static void StallThread()
		{
			if (IsSingleCpuMachine)
			{
				// On single-CPU system, spinning does no good
				SwitchToThread();
			}
			else
			{
				// The multi-CPU system might be hyper-threaded, let the other thread run
				Thread.SpinWait(1);
			}
		}

		/// <summary>
		/// Returns the local machine IP address.
		/// Can be an IPv6 or IPv4
		/// </summary>
		static internal IPAddress LocalIPAddress(ProtocolType protocol)
		{
			IPHostEntry localMachineInfo = Dns.Resolve(Dns.GetHostName());

			// Search IPv6 Address (if supported)
			if (System.Net.Sockets.Socket.OSSupportsIPv6 && protocol == ProtocolType.IPv6)

				foreach (IPAddress ipAddress in localMachineInfo.AddressList)

					if (ipAddress.AddressFamily == AddressFamily.InterNetworkV6)

						if (ipAddress.ToString() != "::1")
							return ipAddress;

			// Search for IPv4
			foreach (IPAddress ipAddress in localMachineInfo.AddressList)

				if (ipAddress.AddressFamily == AddressFamily.InterNetwork)

					if (ipAddress.ToString() != "127.0.0.1")
						return ipAddress;

			// IP = "127.0.0.1" ... No IP !!
			return IPAddress.Parse("127.0.0.1");
		}

		#endregion

		#region Packets Pool

		private static LockFreeQueue<RUDPOutgoingPacket> _outgoingPacketsPools = new LockFreeQueue<RUDPOutgoingPacket>();

		internal static RUDPOutgoingPacket NewOutgoingPacket(int packetId, long sequence, byte[] payload, RUDPPacketChannel channel)
		{
			RUDPOutgoingPacket packet;

			if (!_outgoingPacketsPools.TryDequeue(out packet))
			{
				for (int index = 0; index < 100; index++)
					_outgoingPacketsPools.Enqueue(new RUDPOutgoingPacket(-1, -1, null, RUDPPacketChannel.Undefined));
				return new RUDPOutgoingPacket(packetId, sequence, payload, channel);
			}

			packet.Reset();
			packet.PacketId = packetId;
			packet.Payload = payload;
			packet.Channel = channel;
			packet.Sequence = sequence;

			return packet;
		}

		internal static void ReleaseOutgoingPacket(RUDPOutgoingPacket packet)
		{
			_outgoingPacketsPools.Enqueue(packet);
			PayloadManager.Deallocate(packet.Channel, packet.Payload);
			packet.Payload = null;
		}

		#endregion

		#region Fragments Pool

		private static LockFreeQueue<FragmentInformation> _fragmentsPools = new LockFreeQueue<FragmentInformation>();

		internal static FragmentInformation NewFragmentInformation(RUDPSocket rudpSocket, bool isReliable, int mtu, byte[] payload, int offset, int size, RUDPSendIAsyncResult asyncResult)
		{
			FragmentInformation fragment;

			if (!_fragmentsPools.TryDequeue(out fragment))
			{
				for (int index = 0; index < 100; index++)
					_fragmentsPools.Enqueue(new FragmentInformation(null, false, -1, null, -1, -1, null));
				return new FragmentInformation(rudpSocket, isReliable, mtu, payload, offset, size, asyncResult);
			}

			fragment.rudp = rudpSocket;
			fragment.IsReliable = isReliable;
			fragment.Offset = offset;
			fragment.Size = size;
			fragment.Payload = payload;
			fragment.MTU = mtu;
			fragment.AsyncResult = asyncResult;
			fragment.IsInAsyncThread = false;

			return fragment;
		}

		internal static void ReleaseFragmentInformation(FragmentInformation fragment)
		{
			_fragmentsPools.Enqueue(fragment);
		}

		#endregion

		#region API

		[DllImport("Kernel32", ExactSpelling = true)]
		[return: MarshalAs(UnmanagedType.Bool)]
		private static extern Boolean SwitchToThread();

		#endregion

	}

	#region DisconnectionReason

	public enum DisconnectionReason
	{
		TimeOut,
		ConnectionClosed,
		SocketError
	}

	#endregion

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -