📄 wartransfersocket.cpp
字号:
#include "StdAfx.h"#include "WarTransferSocket.h" // class implemented#ifndef WAR_PERFMON_DEF_H# include "WarPerfmonDef.h"#endif#ifndef WAR_FUNCTION_PROFILER_H# include "WarFunctionProfiler.h"#endif#ifndef WAR_SOCKET_CPS_LIMIT_H# include "WarSocketCpsLimit.h"#endif#ifndef WAR_AUTO_LOCK_H# include "WarAutoLock.h"#endif#ifndef WAR_LOG_H# include "WarLog.h"#endif#ifndef WAR_FILE_DRIVER_FILE_H# include "WarFileDriverFile.h"#endif#define AUTO_LOCK WarAutoLock my_lock((WarCriticalSection *)pmLock)/////////////////////////////// PUBLIC /////////////////////////////////////////============================= LIFECYCLE ====================================WarTransferSocket::WarTransferSocket(war_socket_io_ptr_t& companionPtr): WarSocket(companionPtr),pmLock(&mSvrLock),mSvrLock(true){ Reset();}// WarTransferSocketWarTransferSocket::~WarTransferSocket(){ mLockOwner = NULL;}// ~WarTransferSocket//============================= OPERATORS ====================================//============================= OPERATIONS ===================================void WarTransferSocket::SendFile(war_svrfile_ptr_t& filePtr){ mDirection = D_SENDING; StartTransfer(filePtr);}void WarTransferSocket::ReceiveFile(war_svrfile_ptr_t& filePtr){ mDirection = D_RECEIVING; StartTransfer(filePtr);}void WarTransferSocket::Abort(const war_error_definitions errReason){ OnTransferDone(WarError(errReason));}void WarTransferSocket::SetTransferMode(const TransferModeE mode) throw(WarException){ WarLog netdb_log(WARLOG_NETWORK, "WarTransferSocket::SetTransferMode()"); if (netdb_log) { netdb_log << "Changing transfer-mode from " << mTransferMode << " to " << (TransferModeE)mode << war_endl; } mTransferMode = mode;}void WarTransferSocket::QueueOutputData(war_transfer_buffer_ptr_t buf){ AUTO_LOCK; if (mPendingOutputQueue.size() > 512) { WarThrow(WarError(WAR_ERR_BUFFER_OVERFLOW), "Too many queued output buffers in WarTransferSocket"); } mPendingOutputQueue.push_back(buf);}//============================= CALLBACK ===================================//============================= ACCESS ===================================//============================= INQUIRY ===================================war_uint32_t WarTransferSocket::GetFileFlags() const throw(WarException){ return mFilePtr->GetFlags();}bool WarTransferSocket::IsTimeOut() const{ return GetIdleTimeSince().IsTimeOut(mTimeOutSeconds);}bool WarTransferSocket::HasFinished() const{ return mHasFinished;}WarTransferSocket::TransferModeE WarTransferSocket::GetTransferMode() const{ return mTransferMode;}/////////////////////////////// PROTECTED ///////////////////////////////////int WarTransferSocket::GetNumBuffers(){ return mNumPendingNetIoRequests + mNumPendingFileIoRequests + mFreeQueue.size() + mPendingInQueue.size();}void WarTransferSocket::OnTransferDone(const WarError& status){ WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnTransferDone); AUTO_LOCK; if (mHasFinished) return; // Kill transfer plugins GetSocketIo().RemovePlugins(WarSocketIo::PLUGIN_ON_RECEIVED); GetSocketIo().RemovePlugins(WarSocketIo::PLUGIN_ON_SENT); // Clean up databuffers KillBuffers(); mHasFinished = true; if (!mFilePtr.IsEmpty() && mFilePtr->IsOpen()) mFilePtr->Close(); mFilePtr = NULL;}void WarTransferSocket::StartTransfer(war_svrfile_ptr_t& filePtr){ WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::StartTransfer()", this); if (debug_log) { debug_log << "Starting file transfer of \"" << filePtr->GetUrl() << "\"" << war_endl; } // Allocate transfer buffers filePtr->GetFileDriverFile().OnFtpBufferSize( mNumBuffers, mBufferLen, mNumBufSegments); mSegmentLen = mBufferLen / mNumBufSegments; for(int i = 0; i < mNumBuffers; i++) mFreeQueue.push_back( new WarTransferBuffer(mBufferLen, this, i)); mFilePtr = filePtr; if (mFilePtr->GetFlags() & WarFileEnums::F_CALLBACK) { mFilePtr->AssignCallback(CallbackFunc, this); mUseFileCallback = true; } mIsInitialized = true; if (mpCompanion->GetCurrentState() != WAR_SCKSTATE_CONNECTED) return; PumpData();}void WarTransferSocket::KillBuffers(){ while(!mFreeQueue.empty()) { mFreeQueue.pop_front(); } while(!mPendingInQueue.empty()) { mPendingInQueue.erase(mPendingInQueue.begin()); } WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::KillBuffers()", this); if (debug_log) { debug_log << GetNumBuffers() << " buffers and " << mNumPendingNetIoRequests << " + " << mNumPendingFileIoRequests << " + " << mNumExtraBuffers << " IO requests left after kill." << war_endl; }}void WarTransferSocket::PumpData(){ WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP); AUTO_LOCK; if (!mIsInitialized) return; while(!mHaveSeenEof) { if (D_SENDING == mDirection) { // Send data try { if (!mPendingOutputQueue.empty()) { SendWithCallback(mPendingOutputQueue.front()); mPendingOutputQueue.pop_front(); ++mNumExtraBuffers; ++mNumPendingNetIoRequests; } else if (mUseFileCallback) { WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_FREAD_ASYNCH); while(QueueFileRead()) ; break; } else { WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_FREAD_SYNCH); if (mHaveSeenEof || mFreeQueue.empty()) break; WarTransferBuffer *pbuf = &(*mFreeQueue.front()); pbuf->mBytesUsed = mFilePtr->Read( pbuf->mBuffer, pbuf->GetLength()); if (0 == pbuf->mBytesUsed) { WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_FREAD_CHKEOF); if (mFilePtr->IsEof()) WarThrow(WarError(WAR_FERR_END_OF_FILE), NULL); else { // No data fetched, - not callback-mode // and no error. // We must handle this as a error-situation WarLog err_log(WARLOG_ERROR, "WarTransferSocket::PumpData()", this); err_log << "Read() from file returen zero bytes in non-callback mode. No error was reported. This should never happen!. Aborting transfer." << war_endl; WarThrow(WarError(WAR_ERR_SYSTEM_ERROR), NULL); } } { WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_SCK_OUT); QueueNetWrite(mFreeQueue.front()); mFreeQueue.pop_front(); } } } catch(WarException& ex) { if (ex.LocalError() == WAR_FERR_END_OF_FILE) { mHaveSeenEof = true; break; } // Abort WarLog warn_log(WARLOG_WARNINGS, "WarTransferSocket::PumpData()", this); warn_log << "Caught unexpected exception " << ex.Explain() << " Aborting transfer" << war_endl; OnTransferDone(ex); return; } } else // Direction { // Receiving WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_RECEIVING); // Receive data try { while(QueueNetRead()) ; break; } catch(WarException& ex) { if (ex.LocalError() == WAR_FERR_END_OF_FILE) { mHaveSeenEof = true; break; } // Abort WarLog warn_log(WARLOG_WARNINGS, "WarTransferSocket::PumpData()", this); warn_log << "Caught unexpected exception from QueueNetRead() " << ex.Explain() << " Aborting transfer" << war_endl; OnTransferDone(ex); return; } } } // while(!mFreeQueue.empty()) ProcessPendingBuffers(); if (mHaveSeenEof) {#ifdef DEBUG WarLog db_log(WARLOG_DEBUG, "WarTransferSocket::PumpData()", this); if (db_log) { db_log << "EOF has been set. " << GetReferenceCount() << " references left and " << mNumPendingNetIoRequests << " + " << mNumPendingFileIoRequests << " pendig IO requests and " << (int)mPendingInQueue.size() << " buffers pending for full sequence." << war_endl; }#endif if (D_SENDING == mDirection) { if ((0 >= mNumPendingFileIoRequests) && (0 >= mNumPendingNetIoRequests) && mPendingInQueue.empty()) { OnTransferDone(WarError()); } } else { if ((0 >= mNumPendingFileIoRequests) && (0 >= mNumPendingNetIoRequests) && mPendingInQueue.empty()) { OnTransferDone(WarError()); } } }}void WarTransferSocket::PreOnAccept(const WarError& status, war_socket_t newSocket, const WarNetAddress& remoteAddress, const WarNetAddress& localAddress){ if (GetTransferMode() == TM_NORMAL) { OnAccept(status, newSocket, remoteAddress, localAddress); return; } WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnAccept); AUTO_LOCK; if (status) { OnTransferDone(status); return; } // We take over the new socket, and call OnConnect() mpCompanion->Close(); mpCompanion->AssignSocket(newSocket); SetCurrentState(WAR_SCKSTATE_CONNECTED); OnConnect(status);}void WarTransferSocket::PreOnConnect(const WarError& status){ if (GetTransferMode() == TM_NORMAL) { OnConnect(status); return; } WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnConnect); WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::OnConnect()", this); AUTO_LOCK; if (status) { if (debug_log) { debug_log << "Data connection with peer failed." << status << war_endl; } OnTransferDone(status); return; } if (debug_log)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -