📄 cpfpackets.cpp
字号:
*p++ = seqbyte;
mOutQueue->Send(_mt_ack, ackpkt, 2, TRUE);
}
}
void *
CPFPacketsIn::Run(void)
{
enum HPPPacketType packetType;
uchar *packet;
ulong seq, reliableSeq;
short packetLen;
Boolean crypted;
SavedOOPacket *soop, *lsoop;
Boolean found;
while(!mAbort)
{
do
{
// make sure we haven't received any packets we were waiting for
// that will allow us to forward some saved packets
found = FALSE;
for(lsoop=NIL,soop=mSavedOOPkts;soop;soop=soop->next)
{
if(soop->seq == ((soop->type == _hpt_File)?sRFileSequence:sRControlSequence))
{
#ifdef BETA
//CStatusPane::GetStatusPane()->AddStatus(0, "Forwarded OO pkt(%ld).", soop->seq);
#endif
DeliverPacket(soop->type, soop->data, soop->len);
mNumSavedOOPkts--;
if(lsoop)
lsoop->next = soop->next;
else
mSavedOOPkts = soop->next;
safe_free(soop);
if(packetType == _hpt_File)
sRFileSequence++;
else
sRControlSequence++;
found = TRUE;
break;
}
lsoop=soop;
}
} while(found);
if(mPacketMode)
crypted=GetPacket(&packetType, &packet, &packetLen, &seq);
else
crypted=StreamGetPacket(&packetType, &packet, &packetLen, &seq);
if(!packet && mAbort)
break;
#ifdef BETA
//CStatusPane::GetStatusPane()->AddStatus(0, "Rcv(%d, %ld).", (short)packetType, seq);
#endif
if(packetType >= RELIABLESTART)
{
if(packetType == _hpt_File)
reliableSeq = sRFileSequence;
else
reliableSeq = sRControlSequence;
if(seq <= reliableSeq)
{
if(!mNoCrypto && (packetType >= _hpt_ControlEncrypt) && !crypted)
{
// We aren't ready for encrypted packets yet because the
// decryptor hasn't been set. We will drop this packet
// and expect it to be retransmitted later when we should
// hopefully be ready. This packet will not be ACKd.
safe_free(packet);
if(gPacketWatch)
gPacketWatch->GotPacket(FALSE);
#ifdef BETA
//CStatusPane::GetStatusPane()->AddStatus(0, "Out of band encrypted packet(%ld).", seq);
#endif
}
else
{
AckPacket(packetType, seq);
if(seq < reliableSeq)
{
safe_free(packet);// ignore it, already processed
if(gPacketWatch)
gPacketWatch->GotPacket(FALSE);
#ifdef BETA
//DebugLog("RXDUPE: %ld", seq);
//CStatusPane::GetStatusPane()->AddStatus(0, "Duplicate packet(%ld).", seq);
#endif
}
else // seq == reliableSeq, next expected
{
if(packetType == _hpt_File)
sRFileSequence++;
else
sRControlSequence++;
if(gPacketWatch)
gPacketWatch->GotPacket(TRUE);
DeliverPacket(packetType, packet, packetLen);
#ifdef BETA
//CStatusPane::GetStatusPane()->AddStatus(0, "Packet(%ld) OK.", seq);
#endif
}
}
}
else
{
// We received a packet out of order, record it
// so that we can automatically forward it later
// when we receive the appropriate fill-in packets.
if(mNumSavedOOPkts<MAXSAVEDOOPACKETS)
{
AckPacket(packetType, seq);
soop = (SavedOOPacket *)safe_malloc(sizeof(SavedOOPacket));pgpAssert(soop);
soop->type = packetType;
soop->data = packet;
soop->seq = seq;
soop->len = packetLen;
soop->next = mSavedOOPkts;
mSavedOOPkts = soop;
mNumSavedOOPkts++;
if(gPacketWatch)
gPacketWatch->GotPacket(TRUE);
#ifdef BETA
//CStatusPane::GetStatusPane()->AddStatus(0, "Rcvd OO packet(%ld), expected(%ld).",
// seq, reliableSeq);
#endif
}
else
{
// Too many saved packets, let some of them time out
// so we receive the ones we lost.
safe_free(packet);
if(gPacketWatch)
gPacketWatch->GotPacket(FALSE);
#ifdef BETA
//DebugLog("OO exhausted, wasting:%ld", seq);
//CStatusPane::GetStatusPane()->AddStatus(0, "OO packets exhausted, wasted:(%ld).", seq);
#endif
}
}
}
else
{
if(packetType == _hpt_Setup)
{
// We received a stray setup packet from the machine
// we're connected to. We ignore setup packets from
// the remote during a call. Setup packets from a
// different machine are handled by the Transport layer.
safe_free(packet);
continue;
}
else if(seq == mSequence)
{
gPacketWatch->GotPacket(TRUE);
mSequence++;
}
else if(seq < mSequence)
{
// we already got this or it was out of order
safe_free(packet);
if(gPacketWatch)
gPacketWatch->GotPacket(FALSE);
continue;
}
else // seq > mSequence
{
//we lost packets, who cares, automatically resync
mSequence = ++seq;
if(gPacketWatch)
{
gPacketWatch->GotPacket(FALSE); //we're really talking about the lost packet(s) here
gPacketWatch->GotPacket(TRUE); //but this packet is good
}
}
DeliverPacket(packetType, packet, packetLen);
}
(Yield)();
}
return (void *)1L;
}
void
CPFPacketsOut::SetSound(ulong intervalms)
{
sInOutMutex->Wait();
sIntervalms = intervalms + 5; // pad it a bit so we don't miss a lot
sBandwidthpp = sBandwidthcps / (1000 / sIntervalms);
sInOutMutex->Signal();
}
void
CPFPacketsOut::SetSoundStatus(Boolean status)
{
mSendingSound = status;
}
void
CPFPacketsOut::Send(enum MsgType type, uchar *data, long len)
{
Boolean isLast;
isLast = TRUE; //mOutQueue->Peek() ? FALSE : TRUE;
switch(type)
{
case _mt_controlPacket:
if(mPacketMode) // are we on a packet switched network?
SendPacket(mEncryptor ? _hpt_ControlEncrypt : _hpt_Control,
sSControlSequence++,len,data);
else // or are we on a circuit network
StreamSendPacket(mEncryptor ? _hpt_ControlEncrypt : _hpt_Control,
sSControlSequence++,len,data,isLast);
break;
case _mt_voicePacket:
if(mPacketMode)
SendPacket(_hpt_Voice, mSequence++,len,data);
else
StreamSendPacket(_hpt_Voice,mSequence++,len,data,isLast);
break;
case _mt_abort:
mAbort = TRUE;
#ifdef PGP_WIN32
if(mAbortEvent != INVALID_HANDLE_VALUE)
SetEvent(mAbortEvent);
#endif
break;
case _mt_ack:
if(mPacketMode)
SendPacket(_hpt_ACK,mSequence++,len,data);
else
StreamSendPacket(_hpt_ACK,mSequence++,len,data,isLast);
break;
case _mt_rr:
sLastSRR = pgp_getticks();
if(mPacketMode)
SendPacket(_hpt_ReceptionReport,mSequence++,len,data);
else
StreamSendPacket(_hpt_ReceptionReport,mSequence++,len,data,isLast);
break;
case _mt_rtt:
if(mPacketMode)
SendPacket(_hpt_RTT,mSequence++,len,data);
else
StreamSendPacket(_hpt_RTT,mSequence++,len,data,isLast);
break;
case _mt_filePacket:
if(mPacketMode)
SendPacket(_hpt_File,sSFileSequence++,len,data);
else
StreamSendPacket(_hpt_File,sSFileSequence++,len,data,isLast);
break;
default:
pgp_errstring("Unknown outgoing HPP packet.");
break;
}
}
void
CPFPacketsOut::ServiceControl(long *sentBytes)
{
PFMessage *msg;
HPPReliablePacket *hppr;
long now = pgp_getticks(), b;
// Now look for reliable control packets that must be retransmitted
sInOutMutex->Wait();
for(hppr=sControlReliables;hppr;hppr=hppr->next)
{
b = now - hppr->lastXmitTime;
if(b >= sACKWaitTime)
{
if(hppr->retransmits < HPPMAXRETRANSMIT)
{
mTransport->WaitAsync(&mAsyncs[mNextAsync]);
mAsyncs[mNextAsync].buffer = hppr->data;
safe_increaseCount(hppr->data);
mTransport->WriteAsync(hppr->len, _pfc_Control,
&mAsyncs[mNextAsync++]);
mNextAsync %= NUMHPPASYNCS;
hppr->retransmits++;
hppr->lastXmitTime = pgp_getticks();
*sentBytes += hppr->len;
#ifdef BETA
// Debugging code to show retransmissions
//CStatusPane::GetStatusPane()->AddStatus(0, "(C)Retransmission %ld.",
// (long)hppr->retransmits);
#endif
if(gPacketWatch)
gPacketWatch->SentPacket();
}
else
{
// The remote side has not responded to our reliable packet.
// Reliable packets are not optional. We tried to send it many
// times over a 30 second period.
// Something is wrong, so abort the protocol.
// In most cases, this just means the remote went down.
if(now - hppr->firstXmitTime >= MINACKTIMEOUT)
mPFWindow->GetControlThread()->ProtocolViolation(_pv_noRemote);
else
{
sACKWaitTime += 1000;
if(sACKWaitTime > DEFAULTACKWAITTIME)
sACKWaitTime = DEFAULTACKWAITTIME;
hppr->retransmits--;
}
}
}
}
sInOutMutex->Signal();
// Now look for new control packets. We always send all available
// control packets
while((msg = mControlQueue->Recv(0)) != NULL)
{
*sentBytes += msg->len;
Send(msg->type, (uchar *)msg->data, msg->len);
mControlQueue->Free(msg);
}
}
void *
CPFPacketsOut::Run(void)
{
PFMessage *msg;
uchar rttData[16];
long waittime, lastInterval, sentBytes;
Boolean sentSound;
#ifdef PGPXFER
long b;
HPPReliablePacket *hppr;
Boolean windowok;
PFMessage *peekmsg;
#endif
lastInterval = pgp_getticks();
while(!mAbort)
{
sentBytes = 0;
// First wait for a sound packet to send, always send all sound packets first
sentSound = FALSE;
msg = NIL;
while(mSendingSound && (!sentSound || msg) && !mAbort)
{
waittime = msg? 0 : (sIntervalms - (pgp_getticks() - lastInterval));
if(waittime<0)
waittime=0;
msg = mSoundQueue->Recv(waittime);
if(msg)
{
lastInterval = pgp_getticks();
sentBytes += msg->len;
Send(msg->type, (uchar *)msg->data, msg->len);
mSoundQueue->Free(msg);
sentSound = TRUE;
}
else
ServiceControl(&sentBytes);
(Yield)();
}
if(mAbort)
continue;
if(!sentSound)
lastInterval = pgp_getticks();
ServiceControl(&sentBytes);
#ifdef PGPXFER
// Now with any remaining bandwidth we may send file packets
// First we handle any necessary retransmission of file packets
// up to the bandwidth
if(mXferThread)
{
sInOutMutex->Wait();
for(hppr=sFileReliables;hppr;hppr=hppr->next)
{
b = lastInterval - hppr->lastXmitTime;
if(b >= sACKWaitTime)
{
if(sentBytes + hppr->len > sBandwidthpp)
continue;
if(hppr->retransmits < HPPMAXRETRANSMIT)
{
mTransport->WaitAsync(&mAsyncs[mNextAsync]);
mAsyncs[mNextAsync].buffer = hppr->data;
safe_increaseCount(hppr->data);
mTransport->WriteAsync(hppr->len, _pfc_Control,
&mAsyncs[mNextAsync++]);
mNextAsync %= NUMHPPASYNCS;
hppr->retransmits++;
hppr->lastXmitTime = pgp_getticks();
sentBytes += hppr->len;
//DebugLog("ReTX: %ld", hppr->seq);
#ifdef BETA
// Debugging code to show retransmissions
//CStatusPane::GetStatusPane()->AddStatus(0, "(F)Retransmission %ld.",
// (long)hppr->retransmits);
#endif
if(gPacketWatch)
gPacketWatch->SentPacket();
}
else
{
// See comment above
if(lastInterval - hppr->firstXmitTime >= MINACKTIMEOUT)
{
//HPPReliablePacket *bhppr;
//for(bhppr=sFileReliables;bhppr;bhppr=bhppr->next)
// DebugLog("Fatal No Response: seq (%ld), current (%ld)", bhppr->seq, sSFileSequence);
mPFWindow->GetControlThread()->ProtocolViolation(_pv_noRemote);
}
else
{
sACKWaitTime += 1000;
if(sACKWaitTime > DEFAULTACKWAITTIME)
sACKWaitTime = DEFAULTACKWAITTIME;
hppr->retransmits--;
}
}
}
}
sInOutMutex->Signal();
b=1;
windowok = TRUE;
while(mXferThread && !(peekmsg=mSoundQueue->Peek()) && b &&
((sBandwidthpp - sentBytes) >= MINXFERPACKETSIZE) &&
(!sFileReliables || (sSFileSequence - sFileReliables->seq < mReceiverWindow)))
{
b = mXferThread->GetNextSendPacket(minl(sBandwidthpp - sentBytes, sMTU-9));
sentBytes += b;
(Yield)();
}
if(b && !peekmsg && (pgp_getticks() > mLastBWIncrease+5000) &&
(pgp_getticks() > sLastRRR+6500))
{
/*+++++++
sInOutMutex->Wait();
mLastBWIncrease = pgp_getticks();
sBandwidthcps = sBandwidthcps / 10 * 11; // add 10%
sBandwidthpp = sBandwidthcps / (1000 / sIntervalms);
gPacketWatch->SetBandwidth(sBandwidthcps);
sInOutMutex->Signal();*/
}
}
#endif //PGPXFER
// And finally we'll send an RTT packet if necessary
if(lastInterval - sRTTBase >= HPPRTTINTERVAL)
{
// HPPRTTINTERVAL has passed since the last time we sent a
// round trip timing packet to measure the channel delay.
// We will send one now.
sRTTSequence++;
sRTTBase = lastInterval;
#ifdef PGP_MACINTOSH
if(UEnvironment::HasFeature(env_HasOpenTransport))
OTGetTimeStamp(&sOTRTTBase);
#endif
rttData[0] = (uchar) mPFWindow->GetControlThread()->IsOriginator();
LONG_TO_BUFFER(sRTTSequence, &rttData[1]);
if(mPacketMode)
SendPacket(_hpt_RTT,mSequence++,5,rttData);
else
StreamSendPacket(_hpt_RTT,mSequence++,5,rttData,TRUE);
//CStatusPane::GetStatusPane()->AddStatus(0, "Sent RTT");
}
(Yield)();
}
return (void *)1L;
}
void
CPFPacketsIn::AbortSync()
{
mAbort = TRUE;
#ifdef PGP_MACINTOSH
mTransport->AbortSync();
#elif PGP_WIN32
if(mAbortEvent != INVALID_HANDLE_VALUE)
SetEvent(mAbortEvent);
#endif // PGP_WIN32
}
void
CPFPacketsIn::ChangeRXKey(uchar *keyMaterial, uchar *firstEA, uchar *,
uchar )
{
// we have a mutex surrounding this operation because it will be executed
// by the control thread, not the HPP-in thread. The HPP-in thread
// may be in the middle of a read when control calls this.
mRXKeyChangeMutex.Wait();
if(mDecryptor)
delete mDecryptor;
mDecryptor = NIL;
if(memcmp(firstEA, sCryptorHash[_enc_none], 4))
mDecryptor = new CCounterEncryptor(keyMaterial, firstEA);
mRXKeyChangeMutex.Signal();
}
void
CPFPacketsOut::ChangeTXKey(uchar *keyMaterial, uchar *firstEA, uchar *,
uchar )
{
mTXKeyChangeMutex.Wait();
if(mEncryptor)
delete mEncryptor;
mEncryptor = NIL;
if(memcmp(firstEA, sCryptorHash[_enc_none], 4))
mEncryptor = new CCounterEncryptor(keyMaterial, firstEA);
mTXKeyChangeMutex.Signal();
}
void
CPFPacketsIn::SetXferQueue(CMessageQueue *xferQueue)
{
mXferQueue = xferQueue;
}
void
CPFPacketsOut::SetXferThread(CXferThread *xferThread)
{
mXferThread = xferThread;
}
void
CPFPacketsIn::SetNoCryptoMode()
{
mNoCrypto = TRUE;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -