⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpgnetconnection.cc

📁 五行MMORPG引擎系统V1.0
💻 CC
📖 第 1 页 / 共 3 页
字号:
	}
	ms_mutexConnTable.unlock();

   while(mNotifyQueueHead)
      handleNotify(false);


   //ghostOnRemove();
   //eventOnRemove();

   Parent::onRemove();
}

char NetConnection::mErrorBuffer[256];

void NetConnection::setLastError(const char *fmt, ...)
{
   va_list argptr;
   va_start(argptr, fmt);
   dVsprintf(mErrorBuffer, sizeof(mErrorBuffer), fmt, argptr);
   va_end(argptr);

#ifdef TORQUE_DEBUG_NET
   // setLastErrors assert in net_debug builds
   AssertFatal(false, mErrorBuffer);
#endif

}

//--------------------------------------------------------------------

void NetConnection::handleNotify(bool recvd)
{
//   Con::printf("NET  %d: NOTIFY - %d %s", getId(), gPacketId, recvd ? "RECVD" : "DROPPED");

   PacketNotify *note = mNotifyQueueHead;
   AssertFatal(note != NULL, "Error: got a notify with a null notify head.");
   mNotifyQueueHead = mNotifyQueueHead->nextPacket;

   if(note->rateChanged && !recvd)
      mCurRate.changed = true;
   if(note->maxRateChanged && !recvd)
      mMaxRate.changed = true;

   if(recvd) 
   {
      // Running average of roundTrip time
      U32 curTime = Platform::getVirtualMilliseconds();
      mRoundTripTime = (mRoundTripTime + (curTime - note->sendTime)) * 0.5;
      packetReceived(note);
   }
   else
      packetDropped(note);

   delete note;
}

void NetConnection::processRawPacket(BitStream *bstream)
{
#ifdef TGE_RPGTESTING

	char szBuf[256];
	bstream->readString(szBuf);
	Con::printf(" [%d %d.%d.%d.%d] : %s", getId(),
					mNetAddress.netNum[0],mNetAddress.netNum[1],
					mNetAddress.netNum[2],mNetAddress.netNum[3],
					szBuf);
	return;
#endif


   if(mDemoWriteStream)
      recordBlock(BlockTypePacket, bstream->getReadByteSize(), bstream->getBuffer());

   ConnectionProtocol::processRawPacket(bstream);
}

void NetConnection::handlePacket(BitStream *bstream)
{
//   Con::printf("NET  %d: RECV - %d", getId(), mLastSeqRecvd);
   // clear out any errors

   mErrorBuffer[0] = 0;

   if(bstream->readFlag())
   {
      mCurRate.updateDelay = bstream->readInt(10);
      mCurRate.packetSize = bstream->readInt(10);
   }

   if(bstream->readFlag())
   {
      U32 omaxDelay = bstream->readInt(10);
      S32 omaxSize = bstream->readInt(10);
      if(omaxDelay < mMaxRate.updateDelay)
         omaxDelay = mMaxRate.updateDelay;
      if(omaxSize > mMaxRate.packetSize)
         omaxSize = mMaxRate.packetSize;
      if(omaxDelay != mCurRate.updateDelay || omaxSize != mCurRate.packetSize)
      {
         mCurRate.updateDelay = omaxDelay;
         mCurRate.packetSize = omaxSize;
         mCurRate.changed = true;
      }
   }
   readPacket(bstream);

   if(mErrorBuffer[0])
      connectionError(mErrorBuffer);
}

void NetConnection::connectionError(const char *errorString)
{
   errorString;
}

//--------------------------------------------------------------------

NetConnection::PacketNotify *NetConnection::allocNotify()
{
   return new PacketNotify;
}

/// Used when simulating lag.
///
/// We post this SimEvent when we want to send a packet; it delays for a bit, then
/// sends the actual packet.
class NetDelayEvent : public SimEvent
{
   U8 buffer[MaxPacketDataSize];
   BitStream stream;
public:
   NetDelayEvent(BitStream *inStream) : stream(NULL, 0)
   {
      dMemcpy(buffer, inStream->getBuffer(), inStream->getPosition());
      stream.setBuffer(buffer, inStream->getPosition());
      stream.setPosition(inStream->getPosition());
   }
   void process(SimObject *object)
   {
      ((NetConnection *) object)->sendPacket(&stream);
   }
};


void NetConnection::checkPacketRecv(bool force)
{

   if(isLocalConnection())
		return;

	//AssertWarn(m_pAuthSocket,"需要调用AttachSocket");
	if(m_pAuthSocket == NULL)
		return;

	if(!m_pAuthSocket->IsESTABLISHED())
		return;

	//每帧处理一个数据包
	m_pAuthSocket->DispatchRecvStream(&m_streamRecv);
}



void NetConnection::checkPacketSend(bool force)
{

   if(!isLocalConnection())
	{	
		if(m_pAuthSocket == NULL)
			return;
		if(!m_pAuthSocket->IsESTABLISHED())
			return;
	}

#ifdef TGE_RPGTESTING
	static U32 uCounter	= 0;

	CSTR	pText = avar("Testing %d",uCounter);
	uCounter++;
	m_pAuthSocket->BeginSend(&m_streamSend);
	m_streamSend.writeString(pText);
	Con::printf("(Send): %s", pText);
	m_pAuthSocket->EndSend(&m_streamSend,TRUE);
	return;
#endif


   U32 curTime = Platform::getVirtualMilliseconds();
   U32 delay = isConnectionToServer() ? gPacketUpdateDelayToServer : mCurRate.updateDelay;

   if(!force)
   {
      if(curTime < mLastUpdateTime + delay - mSendDelayCredit)
         return;

      mSendDelayCredit = curTime - (mLastUpdateTime + delay - mSendDelayCredit);
      if(mSendDelayCredit > 1000)
         mSendDelayCredit = 1000;

      if(mDemoWriteStream)
         recordBlock(BlockTypeSendPacket, 0, 0);
   }
   if(windowFull())
      return;

	//////////////////////////////////////////
	///  填充数据包头信息
   BitStream *stream;

   if(isLocalConnection())
	{
		AssertWarn(m_pAuthSocket==NULL,"需要调用AttachSocket");
		stream = BitStream::getPacketStream(mCurRate.packetSize);
	}
	else
	{
		AssertWarn(m_pAuthSocket,"需要调用AttachSocket");
		if(!m_pAuthSocket->BeginSend(&m_streamSend,TRUE))
			return;
		stream = &m_streamSend;
	}

   buildSendPacketHeader(stream);

   mLastUpdateTime = curTime;

	// 建立数据包传递Notify信息
   PacketNotify *note = allocNotify();
   if(!mNotifyQueueHead)
      mNotifyQueueHead = note;
   else
      mNotifyQueueTail->nextPacket = note;
   mNotifyQueueTail = note;
   note->nextPacket = NULL;
   note->sendTime = curTime;

   note->rateChanged = mCurRate.changed;
   note->maxRateChanged = mMaxRate.changed;

   if(stream->writeFlag(mCurRate.changed))
   {
      stream->writeInt(mCurRate.updateDelay, 10);
      stream->writeInt(mCurRate.packetSize, 10);
      mCurRate.changed = false;
   }
   if(stream->writeFlag(mMaxRate.changed))
   {
      stream->writeInt(mMaxRate.updateDelay, 10);
      stream->writeInt(mMaxRate.packetSize, 10);
      mMaxRate.changed = false;
   }



	//////////////////////////////////////////
	///  填充数据包
   U32 start = stream->getCurPos();
   DEBUG_LOG(("PKLOG %d START", getId()) );

   writePacket(stream, note);

   DEBUG_LOG(("PKLOG %d END - %d", getId(), stream->getCurPos() - start) );





	/// 数据包丢失模拟
   if(mSimulatedPacketLoss && Platform::getRandom() < mSimulatedPacketLoss)
   {
      //Con::printf("NET  %d: SENDDROP - %d", getId(), mLastSendSeq);
      return;
   }
   if(mSimulatedPing)
   {
      Sim::postEvent(getId(), new NetDelayEvent(stream), Sim::getCurrentTime() + mSimulatedPing);
      return;
   }
   sendPacket(stream);
}


Net::Error NetConnection::sendPacket(BitStream *stream)
{
   //Con::printf("NET  %d: SEND - %d", getId(), mLastSendSeq);
   // do nothing on send if this is a demo replay.
   if(mDemoReadStream)
      return Net::NoError;

   gNetBitsSent = stream->getStreamSize();

   if(isLocalConnection())
   {
      // short circuit connection to the other side.
      // handle the packet, then force a notify.
      stream->setBuffer(stream->getBuffer(), stream->getPosition(), stream->getPosition());
      mRemoteConnection->processRawPacket(stream);

      return Net::NoError;
   }
   else
   {
		AssertWarn(m_pAuthSocket,"需要调用AttachSocket");
		m_pAuthSocket->EndSend(stream);

		//由socketLayer线程调用
		//m_pAuthSocket->SendOutPacket();
      //return Net::sendto(getNetAddress(), stream->getBuffer(), stream->getPosition());
   }
	return Net::NoError;
}

//--------------------------------------------------------------------
//--------------------------------------------------------------------

// these are the virtual function defs for Connection -
// if your subclass has additional data to read / write / notify, add it in these functions.

void NetConnection::readPacket(BitStream *bstream)
{
   //eventReadPacket(bstream);
   //ghostReadPacket(bstream);
}

void NetConnection::writePacket(BitStream *bstream, PacketNotify *note)
{
   //eventWritePacket(bstream, note);
   //ghostWritePacket(bstream, note);
}

void NetConnection::packetReceived(PacketNotify *note)
{
   //eventPacketReceived(note);
   //ghostPacketReceived(note);
}

void NetConnection::packetDropped(PacketNotify *note)
{
   //eventPacketDropped(note);
   //ghostPacketDropped(note);
}

//--------------------------------------------------------------------
//--------------------------------------------------------------------

void NetConnection::writeDemoStartBlock(ResizeBitStream* stream)
{
   ConnectionProtocol::writeDemoStartBlock(stream);

   stream->write(mRoundTripTime);
   stream->write(mPacketLoss);

   // Write all the current paths to the stream...
   gClientPathManager->dumpState(stream);
   stream->validate();
   mStringTable->writeDemoStartBlock(stream);

   U32 start = 0;
   PacketNotify *note = mNotifyQueueHead;
   while(note)
   {
      start++;
      note = note->nextPacket;
   }
   stream->write(start);

   //eventWriteStartBlock(stream);
   //ghostWriteStartBlock(stream);
}

bool NetConnection::readDemoStartBlock(BitStream* stream)
{
   ConnectionProtocol::readDemoStartBlock(stream);

   stream->read(&mRoundTripTime);
   stream->read(&mPacketLoss);

   // Read
   gClientPathManager->readState(stream);
   mStringTable->readDemoStartBlock(stream);
   U32 pos;
   stream->read(&pos); // notify count
   for(U32 i = 0; i < pos; i++)
   {
      PacketNotify *note = allocNotify();
      note->nextPacket = NULL;
      if(!mNotifyQueueHead)
         mNotifyQueueHead = note;
      else
         mNotifyQueueTail->nextPacket = note;
      mNotifyQueueTail = note;
   }
   //eventReadStartBlock(stream);
   //ghostReadStartBlock(stream);
   return true;
}

bool NetConnection::startDemoRecord(const char *fileName)
{
   FileStream *fs = new FileStream;

   if(!ResourceManager->openFileForWrite(*fs, fileName))
   {
      delete fs;
      return false;
   }

   mDemoWriteStream = fs;
   mDemoWriteStream->write(mProtocolVersion);
   ResizeBitStream bs;

   // then write out the start block
   writeDemoStartBlock(&bs);
   U32 size = bs.getPosition() + 1;
   mDemoWriteStream->write(size);
   mDemoWriteStream->write(size, bs.getBuffer());
   return true;
}

bool NetConnection::replayDemoRecord(const char *fileName)
{
   Stream *fs = ResourceManager->openStream(fileName);
   if(!fs)
      return false;

   mDemoReadStream = fs;
   mDemoReadStream->read(&mProtocolVersion);
   U32 size;
   mDemoReadStream->read(&size);
   U8 *block = new U8[size];
   mDemoReadStream->read(size, block);
   BitStream bs(block, size);

   bool res = readDemoStartBlock(&bs);
   delete[] block;
   if(!res)
      return false;

   // prep for first block read
   // type/size stored in U16: [type:4][size:12]
   U16 typeSize;
   mDemoReadStream->read(&typeSize);

   mDemoNextBlockType = typeSize >> 12;
   mDemoNextBlockSize = typeSize & 0xFFF;

   if(mDemoReadStream->getStatus() != Stream::Ok)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -