📄 cxferthread.cpp
字号:
/*____________________________________________________________________________
Copyright (C) 1996-1999 Network Associates, Inc.
All rights reserved.
$Id: CXferThread.cpp,v 1.15 1999/03/10 02:41:57 heller Exp $
____________________________________________________________________________*/
#include <stdio.h>
#include <string.h>
#include "CXferThread.h"
#include "CPFPackets.h"
#include "CXferWindow.h"
#include "CControlThread.h"
#include "CPacketWatch.h"
#include "CPFWindow.h"
#include "CStatusPane.h"
#include "CPipe.h"
#include "fastpool.h"
#include "SHA.h"
#define noDEBUGXFERLOG
#ifdef PGP_MACINTOSH
#include <UThread.h>
#include "PGPFMacUtils.h"
#include "CMacBinaryPipe.h"
#define CFileSendPipe CMacBinarySendPipe
#define CFileReceivePipe CMacBinaryReceivePipe
#elif PGP_WIN32
#include "CWinFilePipe.h"
#define CFileSendPipe CWinFileSendPipe
#define CFileReceivePipe CWinFileReceivePipe
#endif
CXferThread::CXferThread(CPFWindow *window,
CXferWindow *xferWindow,
CPFPacketsOut *packetOut,
CMessageQueue *inQueue,
CMessageQueue *outQueue,
void **outResult)
#ifdef PGP_MACINTOSH
: LThread(FALSE, thread_DefaultStack, threadOption_UsePool, outResult)
#else
: LThread(outResult)
#endif // PGP_WIN32
{
pgpAssertAddrValid(window, CPFWindow);
pgpAssertAddrValid(xferWindow, VoidAlign);
pgpAssertAddrValid(packetOut, VoidAlign);
pgpAssertAddrValid(inQueue, CMessageQueue);
pgpAssertAddrValid(outQueue, CMessageQueue);
mPFWindow = window;
mXferWindow = xferWindow;
mPacketThread = packetOut;
mInQueue = inQueue;
mOutQueue = outQueue;
mNextStreamID = 0;
mSending = NIL;
mReceiving = NIL;
mOverflowSends = NIL;
mNumSending = mNumReceiving = mOpeningStream = 0;
mSendMutex = new LMutexSemaphore;
}
CXferThread::~CXferThread()
{
delete mSendMutex;
}
void *
CXferThread::Run(void)
{
PFMessage *msg;
Boolean done = false;
long subLen;
ulong salt[2];
uchar *packet, ptype, streamID, *push, str[128];
XSendFile *xsf, *walk;
XRcvFile *xrf;
while(!done)
{
// We track send requests from CXferWindow and handle all incoming data pkts here
if(msg = mInQueue->Recv())
{
switch(msg->type)
{
case _mt_sendFile:
mSendMutex->Wait();
xsf = InitSend(msg->data);
pgpAssertAddrValid(xsf, XSendFile);
if(xsf)
{
if(!mOpeningStream && (mNumSending<MAXSNDFILES))
{
OpenStream(xsf);
#ifdef DEBUGXFERLOG
GetFileName(&xsf->xi, str);
DebugLog("Send: OpenStream(_mt_sendFile): %s", str);
#endif
}
else
{
if(IsNull(mOverflowSends))
mOverflowSends = xsf;
else
{
pgpAssertAddrValid(mOverflowSends, XSendFile);
for(walk=mOverflowSends;walk->next;)
{
if(IsntNull(walk->next))
{
pgpAssertAddrValid(walk->next, XSendFile);
}
walk=walk->next;
}
walk->next = xsf;
}
}
}
mSendMutex->Signal();
break;
case _mt_abortSend:
mSendMutex->Wait();
if(!msg->data)
xsf = mSending;
else
for(xsf=mSending;xsf && xsf!=msg->data;)
{
if(IsntNull(xsf->next))
{
pgpAssertAddrValid(xsf->next, XSendFile);
}
xsf=xsf->next;
}
if(xsf)
{
pgpAssertAddrValid(xsf, XSendFile);
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Sent: _xst_AbortStream - user aborted xfer",
(int)xsf->localStreamID);
#endif
if(xsf->xi.pipe)
{
xsf->xi.pipe->DoAbortPipe();
if(!xsf->xi.fatalErr)
xsf->xi.fatalErr=_pge_InternalAbort;
xsf->xi.pipe->DoAckEndPipe();
}
AbortSend(xsf->localStreamID, &xsf->salt[0]);
if(xsf == mSending)
EndSend();
else
DisposeSend(xsf);
if(!mOpeningStream)
CheckForNewStreams();
}
mSendMutex->Signal();
break;
case _mt_abortReceive:
if(IsNull(msg->data))
xrf = mReceiving;
else
for(xrf=mReceiving;IsntNull(xrf) && xrf!=msg->data;)
{
if(IsntNull(xrf->next))
{
pgpAssertAddrValid(xrf->next, XRcvFile);
}
xrf=xrf->next;
}
if(xrf)
{
pgpAssertAddrValid(xrf, XRcvFile);
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Sent: _xst_RemoteAbortStream - user aborted xfer",
(int)xrf->remoteStreamID);
#endif
AbortReceive(xrf->remoteStreamID, &xrf->salt[0]);
if(xrf->xi.pipe)
{
xrf->xi.pipe->DoAbortPipe();
mXferWindow->EndReceive();
}
DisposeRecv(xrf);
}
break;
case _mt_filePacket:
packet = (uchar *)msg->data;
pgpAssertAddrValid(packet, uchar *);
if(msg->len < 2)
break;
ptype = *packet++;
streamID = *packet++;
subLen = msg->len - 2;
// Find the stream
if((ptype == _xst_AckOpenStream) || (ptype == _xst_RemoteAbortStream))
{
mSendMutex->Wait();
if(IsntNull(mSending))
{
pgpAssertAddrValid(mSending, XSendFile);
}
for(xsf=mSending;xsf;)
{
if(xsf->localStreamID == streamID)
{
switch(ptype)
{
case _xst_AckOpenStream:{
ulong startPos;
uchar hash[SHS_DIGESTSIZE];
// Sanity checks and interpretations on packet
// Packet format: StartPos(32)/Hash(>=16bytes) - if Start>0
pgpAssert(!xsf->openAckd);
xsf->openAckd = TRUE;
BUFFER_TO_LONG(packet, startPos); packet+=4;
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_AckOpenStream (pos:%ld)",
(int)streamID, startPos);
#endif
xsf->xi.bytesDone = startPos;
xsf->xi.cpsBase = startPos;
if((subLen >= 4) && (startPos<xsf->xi.bytesTotal) && !xsf->xi.pipe
&& (!startPos ||
((subLen >= 4+SHS_DIGESTSIZE) &&
HashPartialFile(&xsf->xi, xsf->hash, &hash[0]) &&
!memcmp(&hash, packet, SHS_DIGESTSIZE))))
{
// If a waiting file is at the head of the queue,
// start sending it now.
if(mSending == xsf)
{
StartSend();
}
}
else
{
AbortSend(xsf->localStreamID, &xsf->salt[0]);
DisposeSend(xsf);
if(mSending && mSending->openAckd && !mSending->xi.pipe)
StartSend();
#ifdef DEBUGXFERLOG
DebugLog("_xst_AckOpenStream: Error in file position or hash");
#endif
}
mOpeningStream = FALSE;
if(mNumSending<MAXSNDFILES)
CheckForNewStreams();
break;}
case _xst_RemoteAbortStream:
pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
packet += XFERSALTCHECKSIZE;
if((subLen >= XFERSALTCHECKSIZE) &&
!memcmp(salt, &xsf->salt[0], XFERSALTCHECKSIZE))
{
if((mSending==xsf) && xsf->xi.pipe)
{
// Remote has aborted xfer in progress
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)_xst_RemoteAbortStream: "
"Aborting current xfer.", (int)streamID);
#endif
xsf->xi.pipe->DoAbortPipe();
if(!xsf->xi.fatalErr)
xsf->xi.fatalErr=_pge_InternalAbort;
xsf->xi.pipe->DoAckEndPipe();
EndSend();
}
else
{
if(!xsf->openAckd)
mOpeningStream = FALSE;
DisposeSend(xsf);
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)_xst_RemoteAbortStream: "
"Aborting future xfer", (int)streamID);
#endif
}
if(!mOpeningStream)
CheckForNewStreams();
}
break;
}
break;
}
if(IsntNull(xsf->next))
{
pgpAssertAddrValid(xsf->next, XSendFile);
}
xsf=xsf->next;
}
mSendMutex->Signal();
}
else if(ptype == _xst_OpenStream)
{
// Confirm that this stream does not exist
for(xrf=mReceiving;xrf;xrf=xrf->next)
if(xrf->remoteStreamID == streamID)
{
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_OpenStream FAILED, dup stream",
(int)streamID);
#endif
break;
}
if(!xrf)
{
if(!(xrf=InitRecv(packet, streamID, subLen)))
{
pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
packet += XFERSALTCHECKSIZE;
AbortReceive(streamID, &salt[0]);
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_OpenStream FAILED", (int)streamID);
#endif
}
else
{
AckOpenStream(xrf);
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_OpenStream, sent _xst_AckOpenStream", (int)streamID);
#endif
}
}
}
else for(xrf=mReceiving;xrf;xrf=xrf->next)
if(xrf->remoteStreamID == streamID)
{
switch(ptype)
{
case _xst_AbortStream:
// Packet format is simply the original salt
pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
packet += XFERSALTCHECKSIZE;
if((subLen >= XFERSALTCHECKSIZE) &&
!memcmp(&salt[0], &xrf->salt[0], XFERSALTCHECKSIZE))
{
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_AbortStream salt matched",
(int)streamID);
#endif
if(xrf->xi.pipe)
{
xrf->xi.pipe->DoAbortPipe();
mXferWindow->EndReceive();
}
DisposeRecv(xrf);
}
else
{
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_AbortStream FAILED SALT MATCH",
(int)streamID);
#endif
}
break;
case _xst_DataStream:
if(!xrf->xi.pipe)
{
// This is the first data packet of a transfer
// that has already been setup, so we need
// to create the pipe. Theoretically, we could
// receive more than one file at a time.
xrf->xi.startTime=xrf->xi.lastTime=pgp_getticks();
xrf->xi.pipe = new CPipe(STDPIPESIZE, STDPIPEEXTRA);
xrf->fileThreadResult = 0;
xrf->fileThread = new CFileReceivePipe(&xrf->xi,
(void **)&xrf->fileThreadResult);
xrf->xi.pipe->SetPusher(this);
xrf->xi.pipe->SetPuller(xrf->fileThread);
xrf->fileThread->Resume();
mXferWindow->StartReceive(&xrf->xi);
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_DataStream - creating pipe",
(int)streamID);
#endif
}
// Push the data into the pipe.
xrf->xi.bytesDone += subLen;
xrf->xi.lastTime=pgp_getticks();
xrf->hash->Update(packet, subLen);
xrf->xi.pipe->DoStartPipePush(subLen, (void **)&push);
pgp_memcpy(push, packet, subLen);
xrf->xi.pipe->DoEndPipePush(-1L);
if(xrf->xi.pipe->DoPipeAborted())
mInQueue->Send(_mt_abortReceive);
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_DataStream - pushed %d bytes",
(int)streamID, (int)subLen);
#endif
break;
case _xst_EndStream:{
Boolean goodHash = FALSE;
pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
packet += XFERSALTCHECKSIZE;
#ifdef DEBUGXFERLOG
DebugLog("(ID:%d)Rcvd: _xst_EndStream", (int)streamID );
#endif
if((subLen >= XFERSALTCHECKSIZE + SHS_DIGESTSIZE) &&
!memcmp(&salt[0], &xrf->salt[0], XFERSALTCHECKSIZE))
{
xrf->hash->Final(str);
if(!memcmp(str, packet, SHS_DIGESTSIZE))
{
goodHash = TRUE;
xrf->xi.pipe->DoEndPipe((long *)&xrf->xi.bytesDone);
GetFileName(&xrf->xi, str);
CStatusPane::GetStatusPane()->AddStatus(0,
"Rcvd: %s (%ld K) - SHA matched.",
str, maxl(xrf->xi.bytesTotal / 1024,1));
}
}
if(!goodHash)
{
xrf->xi.pipe->DoAbortPipe();
CStatusPane::GetStatusPane()->AddStatus(0,
"Receive aborted due to SHA mismatch!");
}
DisposeRecv(xrf);
mXferWindow->EndReceive();
break;}
}
break;
}
break;
case _mt_abort:
done = TRUE;
break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -