⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cxferthread.cpp

📁 vc环境下的pgp源码
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/*____________________________________________________________________________
	Copyright (C) 1996-1999 Network Associates, Inc.
	All rights reserved.

	$Id: CXferThread.cpp,v 1.15 1999/03/10 02:41:57 heller Exp $
____________________________________________________________________________*/
#include <stdio.h>
#include <string.h>

#include "CXferThread.h"

#include "CPFPackets.h"
#include "CXferWindow.h"
#include "CControlThread.h"
#include "CPacketWatch.h"
#include "CPFWindow.h"
#include "CStatusPane.h"
#include "CPipe.h"
#include "fastpool.h"
#include "SHA.h"

#define noDEBUGXFERLOG

#ifdef	PGP_MACINTOSH
#include <UThread.h>
#include "PGPFMacUtils.h"
#include "CMacBinaryPipe.h"
#define CFileSendPipe		CMacBinarySendPipe
#define CFileReceivePipe	CMacBinaryReceivePipe
#elif	PGP_WIN32
#include "CWinFilePipe.h"
#define CFileSendPipe		CWinFileSendPipe
#define CFileReceivePipe	CWinFileReceivePipe
#endif


CXferThread::CXferThread(CPFWindow *window,
				CXferWindow *xferWindow,
				CPFPacketsOut *packetOut,
				CMessageQueue *inQueue,
				CMessageQueue *outQueue,
				void **outResult)
#ifdef	PGP_MACINTOSH
        : LThread(FALSE, thread_DefaultStack, threadOption_UsePool, outResult)
#else
        : LThread(outResult)
#endif  // PGP_WIN32
{
	pgpAssertAddrValid(window, CPFWindow);
	pgpAssertAddrValid(xferWindow, VoidAlign);
	pgpAssertAddrValid(packetOut, VoidAlign);
	pgpAssertAddrValid(inQueue, CMessageQueue);
	pgpAssertAddrValid(outQueue, CMessageQueue);
	mPFWindow = window;
	mXferWindow = xferWindow;
	mPacketThread = packetOut;
	mInQueue = inQueue;
	mOutQueue = outQueue;
	mNextStreamID = 0;
	mSending = NIL;
	mReceiving = NIL;
	mOverflowSends = NIL;
	mNumSending = mNumReceiving = mOpeningStream = 0;
	mSendMutex = new LMutexSemaphore;
}

CXferThread::~CXferThread()
{
	delete mSendMutex;
}

void *
CXferThread::Run(void)
{
	PFMessage *msg;
	Boolean done = false;
	long subLen;
	ulong salt[2];
	uchar *packet, ptype, streamID, *push, str[128];
	XSendFile *xsf, *walk;
	XRcvFile *xrf;

	while(!done)
	{
		// We track send requests from CXferWindow and handle all incoming data pkts here
		if(msg = mInQueue->Recv())
		{
			switch(msg->type)
			{
				case _mt_sendFile:
					mSendMutex->Wait();
					xsf = InitSend(msg->data);
					pgpAssertAddrValid(xsf, XSendFile);
					if(xsf)
					{
						if(!mOpeningStream && (mNumSending<MAXSNDFILES))
						{
							OpenStream(xsf);
#ifdef	DEBUGXFERLOG
							GetFileName(&xsf->xi, str);
							DebugLog("Send: OpenStream(_mt_sendFile): %s", str);
#endif
						}
						else
						{
							if(IsNull(mOverflowSends))
								mOverflowSends = xsf;
							else
							{
								pgpAssertAddrValid(mOverflowSends, XSendFile);
								for(walk=mOverflowSends;walk->next;)
								{
									if(IsntNull(walk->next))
									{
										pgpAssertAddrValid(walk->next, XSendFile);
									}
									walk=walk->next;
								}
								walk->next = xsf;
							}
						}
					}
					mSendMutex->Signal();
					break;
				case _mt_abortSend:
					mSendMutex->Wait();
					if(!msg->data)
						xsf = mSending;
					else
						for(xsf=mSending;xsf && xsf!=msg->data;)
						{
							if(IsntNull(xsf->next))
							{
								pgpAssertAddrValid(xsf->next, XSendFile);
							}
							xsf=xsf->next;
						}
					if(xsf)
					{
						pgpAssertAddrValid(xsf, XSendFile);
#ifdef	DEBUGXFERLOG
						DebugLog("(ID:%d)Sent: _xst_AbortStream - user aborted xfer",
									(int)xsf->localStreamID);
#endif
						if(xsf->xi.pipe)
						{
							xsf->xi.pipe->DoAbortPipe();
							if(!xsf->xi.fatalErr)
								xsf->xi.fatalErr=_pge_InternalAbort;
							xsf->xi.pipe->DoAckEndPipe();
						}
						AbortSend(xsf->localStreamID, &xsf->salt[0]);
						if(xsf == mSending)
							EndSend();
						else
							DisposeSend(xsf);
						if(!mOpeningStream)
							CheckForNewStreams();
					}
					mSendMutex->Signal();
					break;
				case _mt_abortReceive:
					if(IsNull(msg->data))
						xrf = mReceiving;
					else
						for(xrf=mReceiving;IsntNull(xrf) && xrf!=msg->data;)
						{
							if(IsntNull(xrf->next))
							{
								pgpAssertAddrValid(xrf->next, XRcvFile);
							}
							xrf=xrf->next;
						}
					if(xrf)
					{
						pgpAssertAddrValid(xrf, XRcvFile);
#ifdef	DEBUGXFERLOG
						DebugLog("(ID:%d)Sent: _xst_RemoteAbortStream - user aborted xfer",
									(int)xrf->remoteStreamID);
#endif
						AbortReceive(xrf->remoteStreamID, &xrf->salt[0]);
						if(xrf->xi.pipe)
						{
							xrf->xi.pipe->DoAbortPipe();
							mXferWindow->EndReceive();
						}
						DisposeRecv(xrf);
					}
					break;
				case _mt_filePacket:
					packet = (uchar *)msg->data;
					pgpAssertAddrValid(packet, uchar *);
					if(msg->len < 2)
						break;
					ptype = *packet++;
					streamID = *packet++;
					subLen = msg->len - 2;
					// Find the stream
					if((ptype == _xst_AckOpenStream) || (ptype == _xst_RemoteAbortStream))
					{
						mSendMutex->Wait();
						if(IsntNull(mSending))
						{
							pgpAssertAddrValid(mSending, XSendFile);
						}
						for(xsf=mSending;xsf;)
						{
							if(xsf->localStreamID == streamID)
							{
								switch(ptype)
								{
									case _xst_AckOpenStream:{
										ulong startPos;
										uchar hash[SHS_DIGESTSIZE];
										
										// Sanity checks and interpretations on packet
										// Packet format: StartPos(32)/Hash(>=16bytes) - if Start>0
										pgpAssert(!xsf->openAckd);
										xsf->openAckd = TRUE;
										BUFFER_TO_LONG(packet, startPos);	packet+=4;
#ifdef	DEBUGXFERLOG
										DebugLog("(ID:%d)Rcvd: _xst_AckOpenStream (pos:%ld)",
													(int)streamID, startPos);
#endif
										xsf->xi.bytesDone = startPos;
										xsf->xi.cpsBase = startPos;


										if((subLen >= 4) && (startPos<xsf->xi.bytesTotal) && !xsf->xi.pipe
											&& (!startPos ||
												((subLen >= 4+SHS_DIGESTSIZE) &&
												HashPartialFile(&xsf->xi, xsf->hash, &hash[0]) && 
												!memcmp(&hash, packet, SHS_DIGESTSIZE))))
										{
											// If a waiting file is at the head of the queue,
											// start sending it now.
											if(mSending == xsf)
											{
												StartSend();
											}
										}
										else
										{
											AbortSend(xsf->localStreamID, &xsf->salt[0]);
											DisposeSend(xsf);
											if(mSending && mSending->openAckd && !mSending->xi.pipe)
												StartSend();
#ifdef	DEBUGXFERLOG
											DebugLog("_xst_AckOpenStream: Error in file position or hash");
#endif
										}
										mOpeningStream = FALSE;
										if(mNumSending<MAXSNDFILES)
											CheckForNewStreams();
										break;}
									case _xst_RemoteAbortStream:
										pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
										packet += XFERSALTCHECKSIZE;
										if((subLen >= XFERSALTCHECKSIZE) &&
											!memcmp(salt, &xsf->salt[0], XFERSALTCHECKSIZE))
										{
											if((mSending==xsf) && xsf->xi.pipe)
											{
												// Remote has aborted xfer in progress
#ifdef	DEBUGXFERLOG
												DebugLog("(ID:%d)_xst_RemoteAbortStream: "
															"Aborting current xfer.", (int)streamID);
#endif
												xsf->xi.pipe->DoAbortPipe();
												if(!xsf->xi.fatalErr)
													xsf->xi.fatalErr=_pge_InternalAbort;
												xsf->xi.pipe->DoAckEndPipe();
												EndSend();
											}
											else
											{
												if(!xsf->openAckd)
													mOpeningStream = FALSE;
												DisposeSend(xsf);
#ifdef	DEBUGXFERLOG
												DebugLog("(ID:%d)_xst_RemoteAbortStream: "
															"Aborting future xfer", (int)streamID);
#endif
											}
											if(!mOpeningStream)
												CheckForNewStreams();
										}
										break;
								}
								break;
							}
							if(IsntNull(xsf->next))
							{
								pgpAssertAddrValid(xsf->next, XSendFile);
							}
							xsf=xsf->next;
						}
						mSendMutex->Signal();
					}
					else if(ptype == _xst_OpenStream)
					{
						// Confirm that this stream does not exist
						for(xrf=mReceiving;xrf;xrf=xrf->next)
							if(xrf->remoteStreamID == streamID)
							{
#ifdef	DEBUGXFERLOG
								DebugLog("(ID:%d)Rcvd: _xst_OpenStream FAILED, dup stream",
											(int)streamID);
#endif
								break;
							}
						if(!xrf)
						{
							if(!(xrf=InitRecv(packet, streamID, subLen)))
							{
								pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
								packet += XFERSALTCHECKSIZE;
								AbortReceive(streamID, &salt[0]);
#ifdef	DEBUGXFERLOG
								DebugLog("(ID:%d)Rcvd: _xst_OpenStream FAILED", (int)streamID);
#endif
							}
							else
							{
								AckOpenStream(xrf);
#ifdef	DEBUGXFERLOG
								DebugLog("(ID:%d)Rcvd: _xst_OpenStream, sent _xst_AckOpenStream", (int)streamID);
#endif
							}
						}
					 }
					 else for(xrf=mReceiving;xrf;xrf=xrf->next)
						if(xrf->remoteStreamID == streamID)
						{
							switch(ptype)
							{
								case _xst_AbortStream:
									// Packet format is simply the original salt
									pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
									packet += XFERSALTCHECKSIZE;
									if((subLen >= XFERSALTCHECKSIZE) &&
										!memcmp(&salt[0], &xrf->salt[0], XFERSALTCHECKSIZE))
									{
#ifdef	DEBUGXFERLOG
										DebugLog("(ID:%d)Rcvd: _xst_AbortStream salt matched",
													(int)streamID);
#endif
										if(xrf->xi.pipe)
										{
											xrf->xi.pipe->DoAbortPipe();
											mXferWindow->EndReceive();
										}
										DisposeRecv(xrf);
									}
									else
									{
#ifdef	DEBUGXFERLOG
										DebugLog("(ID:%d)Rcvd: _xst_AbortStream FAILED SALT MATCH",
													(int)streamID);
#endif
									}
									break;
								case _xst_DataStream:
									if(!xrf->xi.pipe)
									{
										// This is the first data packet of a transfer
										// that has already been setup, so we need
										// to create the pipe.  Theoretically, we could
										// receive more than one file at a time.
										xrf->xi.startTime=xrf->xi.lastTime=pgp_getticks();
										xrf->xi.pipe = new CPipe(STDPIPESIZE, STDPIPEEXTRA);
										xrf->fileThreadResult = 0;
										xrf->fileThread = new CFileReceivePipe(&xrf->xi,
												(void **)&xrf->fileThreadResult);
										xrf->xi.pipe->SetPusher(this);
										xrf->xi.pipe->SetPuller(xrf->fileThread);
										xrf->fileThread->Resume();
										mXferWindow->StartReceive(&xrf->xi);
#ifdef	DEBUGXFERLOG
										DebugLog("(ID:%d)Rcvd: _xst_DataStream - creating pipe",
												(int)streamID);
#endif
									}
									// Push the data into the pipe.
									xrf->xi.bytesDone += subLen;
									xrf->xi.lastTime=pgp_getticks();
									xrf->hash->Update(packet, subLen);
									xrf->xi.pipe->DoStartPipePush(subLen, (void **)&push);
									pgp_memcpy(push, packet, subLen);
									xrf->xi.pipe->DoEndPipePush(-1L);
									if(xrf->xi.pipe->DoPipeAborted())
										mInQueue->Send(_mt_abortReceive);
#ifdef	DEBUGXFERLOG
									DebugLog("(ID:%d)Rcvd: _xst_DataStream - pushed %d bytes",
												(int)streamID, (int)subLen);
#endif
									break;
								case _xst_EndStream:{
									Boolean goodHash = FALSE;
									
									pgp_memcpy( &salt[0], packet, XFERSALTCHECKSIZE );
									packet += XFERSALTCHECKSIZE;
#ifdef	DEBUGXFERLOG
									DebugLog("(ID:%d)Rcvd: _xst_EndStream", (int)streamID );
#endif
									if((subLen >= XFERSALTCHECKSIZE + SHS_DIGESTSIZE) &&
										!memcmp(&salt[0], &xrf->salt[0], XFERSALTCHECKSIZE))
									{
										xrf->hash->Final(str);
										if(!memcmp(str, packet, SHS_DIGESTSIZE))
										{
											goodHash = TRUE;
											xrf->xi.pipe->DoEndPipe((long *)&xrf->xi.bytesDone);
											GetFileName(&xrf->xi, str);
											CStatusPane::GetStatusPane()->AddStatus(0,
												"Rcvd: %s (%ld K) - SHA matched.", 
												str, maxl(xrf->xi.bytesTotal / 1024,1));
										}
									}
									if(!goodHash)
									{
										xrf->xi.pipe->DoAbortPipe();
										CStatusPane::GetStatusPane()->AddStatus(0,
											"Receive aborted due to SHA mismatch!");
									}
									DisposeRecv(xrf);
									mXferWindow->EndReceive();
									break;}
							}
							break;
						}
					break;
				case _mt_abort:
					done = TRUE;
					break;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -