📄 wartransfersocket.cpp
字号:
{ debug_log << "Data connection with peer is established." << war_endl; } PumpData();}void WarTransferSocket::CallbackFunc( war_cptr_t ptrToObject, const WarError&, war_transfer_buffer_ptr_t& bufferPtr){ WarTransferSocket *pObject = (WarTransferSocket *)ptrToObject; pObject->OnFileIo(WarError(), bufferPtr);}void WarTransferSocket::OnFileIo(const WarError& status, war_transfer_buffer_ptr_t& bufferPtr){ WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnFileIo); WarError my_status = status; WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::OnFileIo()", this); if (mHasFinished) { // Just ignore the event if (debug_log) { AUTO_LOCK; --mNumPendingFileIoRequests; debug_log << "Trashing I/O callback. The connection is already closed. " << GetReferenceCount() << " references left and " << mNumPendingNetIoRequests << " + " << mNumPendingFileIoRequests << " pendig IO requests." << war_endl; } return; } if (D_SENDING == mDirection) { // Sending to client // Pass the buffer to the socket if (my_status) { WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnFileIo_ERROR); if (my_status.LocalError() == WAR_FERR_END_OF_FILE) { AUTO_LOCK; --mNumPendingFileIoRequests; mHaveSeenEof = true; } else { goto abort_transfer; } } else { AUTO_LOCK; if (bufferPtr->mSerialNum == mIoSerialFirst) { WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnFileIo_SendWithCallback); ++mIoSerialFirst; try { QueueNetWrite(bufferPtr); } catch(WarException& ex) { WarLog warn_log(WARLOG_WARNINGS, "WarTransferSocket::OnFileIo()", this); warn_log << "Caught exception during QueueNetWrite() " << ex.Explain() << " Aborting transfer" << war_endl; my_status = ex; goto abort_transfer; } } else { WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnFileIo_QUEUE); mPendingInQueue.insert(bufferPtr); } --mNumPendingFileIoRequests; } } else { // Receiving from client WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnFileIo_RECV); if (my_status) goto abort_transfer; AUTO_LOCK; if (!mHaveSeenEof) { // Add the buffer to the free-queue mFreeQueue.push_front(bufferPtr); } --mNumPendingFileIoRequests; } PumpData(); return; abort_transfer: { // Fatal error. Abort the transfer! try { --mNumPendingFileIoRequests; OnTransferDone(my_status); } catch(WarException& ex) { WarLog warn_log(WARLOG_WARNINGS, "WarTransferSocket::OnFileIo()", this); warn_log << "Caught exception during OnTransferDone() " << ex.Explain() << " Ignoring" << war_endl; } return; } }void WarTransferSocket::PreOnReceived(const WarError& status, war_transfer_buffer_ptr_t& buffer){ if ((GetTransferMode() == TM_NORMAL) || (D_RECEIVING != mDirection)) { OnReceived(status, buffer); return; } WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::PreOnReceived()", this); { WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnReceived); AUTO_LOCK; if (mHasFinished) { --mNumPendingNetIoRequests; // Just ignore the event if (debug_log) { debug_log << "Trashing I/O callback. The connection is already closed. " << GetReferenceCount() << " references left and " << mNumPendingNetIoRequests << " + " << mNumPendingFileIoRequests << " pendig IO requests." << war_endl; } return; } if (status) { if (status.LocalError() == WAR_FERR_END_OF_FILE) { if (debug_log) { debug_log << "EOF on input file. " << GetReferenceCount() << " references left and " << mNumPendingNetIoRequests << " + " << mNumPendingFileIoRequests << " pendig IO requests. " << "Buffer serial # " << buffer->mSerialNum << ", first serial # " << mIoSerialFirst << ", last serial # " << mIoSerialLast << war_endl; } mHaveSeenEof = true; } else { if (debug_log) { debug_log << "Error on input file Aborting the transfer. " << GetReferenceCount() << " references left and " << mNumPendingNetIoRequests << " + " << mNumPendingFileIoRequests << " pendig IO requests." << war_endl; } // Fatal error. Abort the transfer! OnTransferDone(status); } } else // status { // No error if (buffer->mSerialNum == mIoSerialFirst) { WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnReceived_SIMPLE); ++mIoSerialFirst; WriteToFile(buffer); } else { // Buffer is out of sync. Queue it. WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnReceived_QUEUE); mPendingInQueue.insert(buffer); } } --mNumPendingNetIoRequests; } PumpData();}void WarTransferSocket::WriteToFile(war_transfer_buffer_ptr_t& buffer) { // Must be locked#ifdef _DEBUG assert(pmLock->GetLockCount() >= 1);#endif try { if (mUseFileCallback) { QueueFileWrite(buffer); } else { mFilePtr->Write(buffer->mBuffer, buffer->mBytesUsed); mFreeQueue.push_front(buffer); } } catch(WarException& ex) { WarLog err_log(WARLOG_ERROR, "WarTransferSocket::WriteToFile()", this); err_log << "Failed to write to destination file " << ex.Explain() << " Aborting transfer." << war_endl; OnTransferDone(ex); throw ex; }}void WarTransferSocket::ProcessPendingBuffers(){ // Must be locked#ifdef _DEBUG assert(pmLock->GetLockCount() >= 1);#endif WAR_PROFILE_FUNC(WAR_FP_WarSvrProtocolFtpDataConn_ProcessPendingBuffers); while(!mPendingInQueue.empty() && ((*mPendingInQueue.begin())->mSerialNum == mIoSerialFirst)) { ++mIoSerialFirst; war_transfer_buffer_ptr_t buffer = *mPendingInQueue.begin(); mPendingInQueue.erase(mPendingInQueue.begin()); if (D_SENDING == mDirection) { // Sending QueueNetWrite(buffer); } else { // Receiving WriteToFile(buffer); } }} void WarTransferSocket::PreOnSent(const WarError& status, war_transfer_buffer_ptr_t& buffer){ if ((GetTransferMode() == TM_NORMAL) || (D_SENDING != mDirection)) { OnSent(status, buffer); return; } { WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnSent); assert(D_SENDING == mDirection); AUTO_LOCK; WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::OnSent()", this); if (mHasFinished) { --mNumPendingNetIoRequests; // Just ignore the event if (debug_log) { debug_log << "Trashing I/O callback. The connection is already closed. " << GetReferenceCount() << " references left and " << mNumPendingNetIoRequests << " + " << mNumPendingFileIoRequests << " pendig IO requests." << war_endl; } return; } if (status) { // Fatal error. Abort the transfer! --mNumPendingNetIoRequests; OnTransferDone(status); return; } if (mNumExtraBuffers) --mNumExtraBuffers; else if (!mHaveSeenEof) { mFreeQueue.push_front(buffer); } --mNumPendingNetIoRequests; } PumpData();}bool WarTransferSocket::QueueNetRead(){ war_transfer_buffer_ptr_t buffer; AUTO_LOCK; if (mFreeQueue.empty() || mHaveSeenEof) return false; buffer = mFreeQueue.front(); buffer->mSerialNum = mIoSerialLast++; mFreeQueue.pop_front(); RecvWithCallback(buffer, mNumBufSegments); ++mNumPendingNetIoRequests; return true;}void WarTransferSocket::QueueNetWrite(war_transfer_buffer_ptr_t& buffer){ //AUTO_LOCK; // Must be locked#ifdef _DEBUG assert(pmLock->GetLockCount() >= 1);#endif SendWithCallback(buffer, mNumBufSegments); ++mNumPendingNetIoRequests;}bool WarTransferSocket::QueueFileRead(){ war_transfer_buffer_ptr_t buffer; AUTO_LOCK; if (mFreeQueue.empty() || mHaveSeenEof) return false; buffer = mFreeQueue.front(); buffer->mSerialNum = mIoSerialLast++; mFreeQueue.pop_front(); mFilePtr->ReadWithCallback(buffer); ++mNumPendingFileIoRequests; return true;}void WarTransferSocket::QueueFileWrite(war_transfer_buffer_ptr_t& buffer){ mFilePtr->WriteWithCallback(buffer); ++mNumPendingFileIoRequests;}void WarTransferSocket::Reset(){ mDirection = D_INVALID; mHaveSeenEof = false; mIoSerialLast = 0; mIoSerialFirst = 0; mNumBuffers = 16; mNumBufSegments = 1; mBufferLen = (1024 * 16); mTimeOutSeconds = 120; mHasFinished = false; mUseFileCallback = false; mIsInitialized = false; mNumPendingNetIoRequests = 0; mNumPendingFileIoRequests = 0; mTransferMode = TM_NORMAL; mNumExtraBuffers = 0;}/////////////////////////////// PRIVATE ///////////////////////////////////
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -