⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rudpstack.cs

📁 rudp可靠保障得udp传输
💻 CS
📖 第 1 页 / 共 4 页
字号:
			if (sacksSlotCount > 1)
			{
				startPacketId = BinaryHelper.ReadInt(payload, offset);
				offset += 4;
				endPacketId = BinaryHelper.ReadInt(payload, offset);
				offset += 4;
				slot2 = new SACKSlot(startPacketId, endPacketId);
			}
			else offset += 8;
			if (sacksSlotCount > 2)
			{
				startPacketId = BinaryHelper.ReadInt(payload, offset);
				offset += 4;
				endPacketId = BinaryHelper.ReadInt(payload, offset);
				offset += 4;
				slot3 = new SACKSlot(startPacketId, endPacketId);
			}
			else offset += 8;
			if (sacksSlotCount > 3)
			{
				startPacketId = BinaryHelper.ReadInt(payload, offset);
				offset += 4;
				endPacketId = BinaryHelper.ReadInt(payload, offset);
				offset += 4;
				slot4 = new SACKSlot(startPacketId, endPacketId);
			}
			else offset += 8;

			//-- Payload
			packetPayload = new byte[payloadLength];
			if (payloadLength > 0/* && channel != RUDPPacketChannel.UserPacket*/)
				Buffer.BlockCopy(payload, offset, packetPayload, 0, payloadLength);
		}

		#endregion

		#region HandlePacket

		private static void HandlePacket(PhysicalSocket physical,
										IPEndPoint sender,
										RUDPPacketChannel channel,
										int packetId,
										int advertisedWindowSize,
										SACKSlot slot1, SACKSlot slot2, SACKSlot slot3, SACKSlot slot4,
										byte[] payload)
		{
			RUDPSocket rudp = null;

			//---- PING
			if (channel == RUDPPacketChannel.Ping || channel == RUDPPacketChannel.PingRendezVous)
			{
				rudp = HandlePing(physical, sender, packetId, channel);

				// Do not handle this message
				if (rudp == null)
					return;
			}

			//---- Search the socket
			if (rudp == null)
			{
				physical._connectedRDUPsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);
				physical._connectedRDUPs.TryGetValue(sender, out rudp);
				physical._connectedRDUPsLock.ReleaseReaderLock();
			}

			//---- Direct send of ACK, because socket can be shutdowned and removed
			if (channel == RUDPPacketChannel.TearDown)
			{
				byte[] packetPayload = MakePacketPayload(rudp, -1, RUDPPacketChannel.ACK, new SACKSlot(packetId, packetId), null, null, null, null, 0, 0);
				SocketSendACK(rudp, physical, sender, packetPayload);
				PayloadManager.Deallocate(RUDPPacketChannel.ACK, packetPayload);
			}

			//---- Released socket
			if (rudp == null)
				return;

#if CONSOLE_TRACE
			if (packetId > -1)
				Trace("Handle packet (" + rudp.Handle + ")(" + channel + "):" + packetId);
#endif

			//---- Advertised window
			rudp._controlWindow.OnReceiveAdvertisedWindow(advertisedWindowSize);

			//---- Handle ACKs
			HandleACKs(rudp, slot1, slot2, slot3, slot4);

			if (channel == RUDPPacketChannel.ACK)
				return;

			//---- Non reliable messages
			if (packetId < 0)
			{
				//-- Bandwidth
				if (channel == RUDPPacketChannel.Bandwidth01)
				{
					PushPacketToSend(rudp, false, RUDPPacketChannel.BandwidthResponse01, null, 0, 0);
					return;
				}
				else if (channel == RUDPPacketChannel.Bandwidth02)
				{
					PushPacketToSend(rudp, false, RUDPPacketChannel.BandwidthResponse02, payload, 0, 8);
					return;
				}
				else if (channel == RUDPPacketChannel.BandwidthResponse01)
				{
					rudp._bandwidthResponse01TS = HiResTimer.MicroSeconds;
				}
				else if (channel == RUDPPacketChannel.BandwidthResponse02)
				{
					//---- Calculate bandwidth
					// Bdw (Bytes / milli-sec)
					long now = HiResTimer.MicroSeconds;
					double delay = (now - rudp._bandwidthResponse01TS) / 1000;
					if (delay < 0.001)
						delay = 0.001;

					// Arrival Speed
					double arrivalSpeed = (RUDPHeaderLength + UDPHeaderLength) / delay;

					// RTT
					double currentRtt = (now - BitConverter.ToInt64(payload, 0)) / 1000;
					if (currentRtt < 0.001)
						currentRtt = 0.001;

					// BDP = Bandwidth(Byte / Ms) * RTT;
					double bandwidth = (long)(arrivalSpeed * currentRtt);
					rudp._bandwidth = (long)(rudp._bandwidth * 0.875f + bandwidth * 0.125f);
				}

				//-- MTU Tuning
				else if (channel == RUDPPacketChannel.MTUTuning)
				{
					rudp._pmtuDiscovery.OnReceiveProbe(payload.Length);
					return;
				}
				else if (channel == RUDPPacketChannel.MTUTuningACK)
				{
					rudp._pmtuDiscovery.OnReceiveProbeACK(payload);
					return;
				}

				//if ((rudp._incomingNonReliablePackets.Count * rudp._mtu) >= rudp._receiveSize)
				//return;

				RUDPIngoingPacket nonReliablePacket = new RUDPIngoingPacket(rudp, packetId, payload, channel, HiResTimer.MicroSeconds);
				rudp._incomingNonReliablePackets.Enqueue(nonReliablePacket);

				rudp.HandleNextUserPacket(false);
				return;
			}

			//---- Check if we can handle this message
			if (!rudp._controlWindow.CanReceive(payload.Length))
			{
				Trace("Return :" + packetId + " " + rudp._controlWindow._rwnd + " " + rudp._receiveSize);
				return;
			}

			//---- Send the ACK
			if (channel != RUDPPacketChannel.Ping && channel != RUDPPacketChannel.PingRendezVous)
				rudp._sackWindow.OnReceivePacket(packetId);

			//---- Do not process a duplicated packets
			bool isDuplicatedPacket;
			lock (rudp._incomingPacketsLock)
			{
				isDuplicatedPacket = (packetId <= rudp._incomingPacketId);
				if (!isDuplicatedPacket)
					isDuplicatedPacket = rudp._incomingPackets.ContainsKey(packetId);
			}
			if (isDuplicatedPacket)
				return;

			//---- If we are not connected, we cannot hanlde messages ! We need a connection before.
			if (rudp._status != RUDPSocketStatus.Connected && channel == RUDPPacketChannel.UserPacket)
				return;

			//---- TEAR DOWN
			if (channel == RUDPPacketChannel.TearDown)
			{
				// Initiate the close process
				if (rudp._status == RUDPSocketStatus.Connected)
				{
					// Notify control window
					rudp._controlWindow.OnReceive(null);

					// Start shutdown
					AsyncShutdown(rudp);
				}

				return;
			}

			//---- Add the packet to incoming list
			RUDPIngoingPacket packet = new RUDPIngoingPacket(rudp, packetId, payload, channel, HiResTimer.MicroSeconds);

			// Notify control window
			rudp._controlWindow.OnReceive(packet);

			lock (rudp._incomingPacketsLock)
				rudp._incomingPackets.Add(packetId, packet);

			//------ Handle the ordered ingoing packets
			rudp.HandleNextUserPacket(false);
		}

		#endregion

		#region HandleACKs

		private static void HandleACKs(RUDPSocket rudp,
										SACKSlot slot1,
										SACKSlot slot2,
										SACKSlot slot3,
										SACKSlot slot4)
		{
			// No ack
			if (slot1 == null)
				return;

			int maxId = slot1.EndPacketId;
			if (slot4 != null)
				maxId = slot4.EndPacketId;
			else if (slot3 != null)
				maxId = slot3.EndPacketId;
			else if (slot2 != null)
				maxId = slot2.EndPacketId;

#if CONSOLE_TRACE
			if (slot1 != null)
				Trace("Handle ACK[1](" + rudp.Handle + "): " + slot1.StartPacketId + " <-> " + slot1.EndPacketId);
			if (slot2 != null)
				Trace("Handle ACK[2](" + rudp.Handle + "): " + slot2.StartPacketId + " <-> " + slot2.EndPacketId);
			if (slot3 != null)
				Trace("Handle ACK[3](" + rudp.Handle + "): " + slot3.StartPacketId + " <-> " + slot3.EndPacketId);
			if (slot4 != null)
				Trace("Handle ACK[4](" + rudp.Handle + "): " + slot4.StartPacketId + " <-> " + slot4.EndPacketId);
#endif

			//---- Prepare the list of packets
			List<RUDPOutgoingPacket> toACKPackets = new List<RUDPOutgoingPacket>();

			RUDPOutgoingPacket lastPacket = null;
			double currentRTT = Double.MaxValue;
			rudp._sendingPacketsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);

			try
			{
				for (int index = 0; index < rudp._sendingPackets.Count; index++)
				{
					RUDPOutgoingPacket packet = rudp._sendingPackets[index];

					if (packet.PacketId > maxId)
						break;

					if (packet.IsACKed)
						continue;

					if (slot4 != null)
						if (packet.PacketId >= slot4.StartPacketId && packet.PacketId <= slot4.EndPacketId)
						{
							if (packet.Retransmission < 1)
							{
								lastPacket = packet;
								currentRTT = Math.Min(currentRTT, HiResTimer.MicroSeconds - lastPacket.TSFirstSend);
							}
							toACKPackets.Add(packet);
							continue;
						}

					if (slot3 != null)
						if (packet.PacketId >= slot3.StartPacketId && packet.PacketId <= slot3.EndPacketId)
						{
							if (packet.Retransmission < 1)
							{
								lastPacket = packet;
								currentRTT = Math.Min(currentRTT, HiResTimer.MicroSeconds - lastPacket.TSFirstSend);
							}
							toACKPackets.Add(packet);
							continue;
						}

					if (slot2 != null)
						if (packet.PacketId >= slot2.StartPacketId && packet.PacketId <= slot2.EndPacketId)
						{
							if (packet.Retransmission < 1)
							{
								lastPacket = packet;
								currentRTT = Math.Min(currentRTT, HiResTimer.MicroSeconds - lastPacket.TSFirstSend);
							}
							toACKPackets.Add(packet);
							continue;
						}

					if (packet.PacketId >= slot1.StartPacketId && packet.PacketId <= slot1.EndPacketId)
					{
						if (packet.Retransmission < 1)
						{
							lastPacket = packet;
							currentRTT = Math.Min(currentRTT, HiResTimer.MicroSeconds - lastPacket.TSFirstSend);
						}
						toACKPackets.Add(packet);
					}
				}
			}
			finally
			{
				rudp._sendingPacketsLock.ReleaseReaderLock();
			}

			//---- If no good packet, use current RTT
			if (lastPacket == null)
				currentRTT = rudp._rtt;

			if (currentRTT < 1)
				currentRTT = 1;

			//---- Set the ACK for all the packets
			for (int index = 0; index < toACKPackets.Count; index++)
			{
				RUDPOutgoingPacket packet = toACKPackets[index];
				SetPacketACKed(rudp, packet, currentRTT);
			}
		}

		#endregion

		#region SetPacketACKed

		private static void SetPacketACKed(RUDPSocket rudp, RUDPOutgoingPacket packet, double currentRTT)
		{
			rudp._controlWindow.OnACK(packet, currentRTT);

			// Mark as ACKed
			packet.IsACKed = true;

			Trace("Packet ACKed(" + rudp.Handle + "): " + packet.PacketId + " " + packet.Channel);

			//---- Ping ACK
			if ((packet.Channel == RUDPPacketChannel.Ping || packet.Channel == RUDPPacketChannel.PingRendezVous) &&
				rudp._status == RUDPSocketStatus.Connecting)
			{
				rudp._status = RUDPSocketStatus.Connected;

				// MTU tuning
				if (rudp._usePMTUDiscovery)
					rudp._pmtuDiscovery.StartTuning();

				// connection done
				rudp.OnEndConnect(RUDPSocketError.Success);

				return;
			}

			//---- Tear Down ACK : It was a tear down message, it has been received, we can close
			if (packet.Channel == RUDPPacketChannel.TearDown &&
				rudp._status == RUDPSocketStatus.Closing)
			{
				rudp._status = RUDPSocketStatus.ClosingACKed;

				// Remove it to our list of "connected" sockets
				if (rudp._remoteEndPoint != null)
					rudp._physical.UnregisterConnectedSocket(rudp);
			}
		}

		#endregion

		#region HandlePing

		private static RUDPSocket HandlePing(PhysicalSocket physical, IPEndPoint sender, int packetId, RUDPPacketChannel channel)
		{
			RUDPSocket rudp = null;

			physical._connectedRDUPsLock.AcquireReaderLock(RUDPSocket.LockTimeOut);
			physical._connectedRDUPs.TryGetValue(sender, out rudp);
			physical._connectedRDUPsLock.ReleaseReaderLock();

			//---- Ping
			if (channel == RUDPPacketChannel.Ping)
			{
				//-- This connection already exist, duplicated Ping
				if (rudp != null)
				{
					// Resend the ACK
					rudp._sackWindow.OnReceivePacket(packetId);
					return null;
				}

				//-- No accepting socket
				if (physical._acceptingRDUP == null)
				{
					// Maybe the socket is not yet ready for accepting, do nothing
					return null;
				}

				//-- Accept
				rudp = physical.OnEndAccept(sender, packetId);

				//-- ACK connection
				rudp._sackWindow.OnReceivePacket(packetId);

				//return physical._acceptingRDUP;
				return rudp;
			}

			//---- Ping , with Rendez vous
			if (rudp != null && rudp._status == RUDPSocketStatus.Connecting && rudp._isRendezVousMode)
			{
				//---- End of connection
				rudp._status = RUDPSocketStatus.Connected;
				rudp.OnEndConnect(RUDPSocketError.Success);

				//---- Accept the rendez vous connection
				rudp._sackWindow.OnReceivePacket(packetId);

				return rudp;
			}

			return null;
		}

		#endregion

		#region OnDisconnected

		internal static void OnDisconnected(RUDPSocket rudp, DisconnectionReason reason)
		{
			if (rudp._status == RUDPSocketStatus.Closed)
				return;

			//---- Reset
			rudp.Reset(RUDPSocketStatus.Closed);

			//---- Notify
			if (reason != DisconnectionReason.ConnectionClosed)
				OnDisconnected(rudp, reason);
		}

		#endregion

		#region HeartBeat

		private static void HeartBeat()
		{
			try
			{
				UpdateAffinity();
				HeartBeatProcessing();
			}
			catch (Exception exception)
			{
				StackFatalException(exception);
			}

			return;
		}

		private static void HeartBeatProcessing()
		{

			while (Thread.CurrentThread.IsAlive)
			{
				//---- When we are disconnected of the overlay
				if (!_isStackRunning)
					return;

				//---- Processing
				long now = HiResTimer.MicroSeconds;
				for (int index = _rudpSockets.Count - 1; index > -1; index--)
				{
					RUDPSocket rudp = _rudpSockets[index];
					if (rudp._status == RUDPSocketStatus.Closed)
					{
						UnregisterRUDPSocket(rudp);
						continue;
					}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -