⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 wartransfersocket.cpp

📁 ftpserver very good sample
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "StdAfx.h"#include "WarTransferSocket.h"   // class implemented#ifndef WAR_PERFMON_DEF_H#   include "WarPerfmonDef.h"#endif#ifndef WAR_FUNCTION_PROFILER_H#   include "WarFunctionProfiler.h"#endif#ifndef WAR_SOCKET_CPS_LIMIT_H#   include "WarSocketCpsLimit.h"#endif#ifndef WAR_AUTO_LOCK_H#   include "WarAutoLock.h"#endif#ifndef WAR_LOG_H#   include "WarLog.h"#endif#ifndef WAR_FILE_DRIVER_FILE_H#   include "WarFileDriverFile.h"#endif#define AUTO_LOCK WarAutoLock my_lock((WarCriticalSection *)pmLock)/////////////////////////////// PUBLIC /////////////////////////////////////////============================= LIFECYCLE ====================================WarTransferSocket::WarTransferSocket(war_socket_io_ptr_t& companionPtr): WarSocket(companionPtr),pmLock(&mSvrLock),mSvrLock(true){	Reset();}// WarTransferSocketWarTransferSocket::~WarTransferSocket(){	mLockOwner = NULL;}// ~WarTransferSocket//============================= OPERATORS ====================================//============================= OPERATIONS ===================================void WarTransferSocket::SendFile(war_svrfile_ptr_t& filePtr){	mDirection = D_SENDING;	StartTransfer(filePtr);}void WarTransferSocket::ReceiveFile(war_svrfile_ptr_t& filePtr){	mDirection = D_RECEIVING;	 	StartTransfer(filePtr);}void WarTransferSocket::Abort(const war_error_definitions errReason){	OnTransferDone(WarError(errReason));}void WarTransferSocket::SetTransferMode(const TransferModeE mode) throw(WarException){	WarLog netdb_log(WARLOG_NETWORK, "WarTransferSocket::SetTransferMode()");	if (netdb_log)	{		netdb_log << "Changing transfer-mode from "			<< mTransferMode			<< " to "			<< (TransferModeE)mode			<< war_endl;	}	mTransferMode = mode;}void WarTransferSocket::QueueOutputData(war_transfer_buffer_ptr_t buf){	AUTO_LOCK;	if (mPendingOutputQueue.size() > 512)	{		WarThrow(WarError(WAR_ERR_BUFFER_OVERFLOW), 			"Too many queued output buffers in WarTransferSocket");	}	mPendingOutputQueue.push_back(buf);}//============================= CALLBACK   ===================================//============================= ACCESS     ===================================//============================= INQUIRY    ===================================war_uint32_t WarTransferSocket::GetFileFlags() const throw(WarException){	return mFilePtr->GetFlags();}bool WarTransferSocket::IsTimeOut() const{    return GetIdleTimeSince().IsTimeOut(mTimeOutSeconds);}bool WarTransferSocket::HasFinished() const{	return mHasFinished;}WarTransferSocket::TransferModeE WarTransferSocket::GetTransferMode() const{	return mTransferMode;}/////////////////////////////// PROTECTED  ///////////////////////////////////int WarTransferSocket::GetNumBuffers(){    return mNumPendingNetIoRequests        + mNumPendingFileIoRequests        + mFreeQueue.size()        + mPendingInQueue.size();}void WarTransferSocket::OnTransferDone(const WarError& status){    WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnTransferDone);    AUTO_LOCK;    if (mHasFinished)        return;    // Kill transfer plugins    GetSocketIo().RemovePlugins(WarSocketIo::PLUGIN_ON_RECEIVED);    GetSocketIo().RemovePlugins(WarSocketIo::PLUGIN_ON_SENT);        // Clean up databuffers    KillBuffers();    mHasFinished = true;    if (!mFilePtr.IsEmpty() && mFilePtr->IsOpen())        mFilePtr->Close();    mFilePtr = NULL;}void WarTransferSocket::StartTransfer(war_svrfile_ptr_t& filePtr){    WarLog debug_log(WARLOG_DEBUG,         "WarTransferSocket::StartTransfer()", this);    if (debug_log)    {        debug_log << "Starting file transfer of \""            << filePtr->GetUrl()			<< "\""            << war_endl;    }			// Allocate transfer buffers		    filePtr->GetFileDriverFile().OnFtpBufferSize(		mNumBuffers,		mBufferLen,		mNumBufSegments);		mSegmentLen = mBufferLen / mNumBufSegments;		for(int i = 0; i < mNumBuffers; i++)		mFreeQueue.push_back(		new WarTransferBuffer(mBufferLen, this, i));		mFilePtr = filePtr;	if (mFilePtr->GetFlags() & WarFileEnums::F_CALLBACK)	{		mFilePtr->AssignCallback(CallbackFunc, this);		mUseFileCallback = true;	}		mIsInitialized = true;		if (mpCompanion->GetCurrentState() != WAR_SCKSTATE_CONNECTED)		return;        PumpData();}void WarTransferSocket::KillBuffers(){    while(!mFreeQueue.empty())    {        mFreeQueue.pop_front();    }    while(!mPendingInQueue.empty())    {        mPendingInQueue.erase(mPendingInQueue.begin());    }    WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::KillBuffers()", this);    if (debug_log)    {        debug_log << GetNumBuffers() << " buffers and "             << mNumPendingNetIoRequests            << " + "            << mNumPendingFileIoRequests			<< " + "			<< mNumExtraBuffers            << " IO requests left after kill."            << war_endl;    }}void WarTransferSocket::PumpData(){    WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP);	AUTO_LOCK;    if (!mIsInitialized)        return;    while(!mHaveSeenEof)    {        if (D_SENDING == mDirection)        {            // Send data            try            {				if (!mPendingOutputQueue.empty())				{					SendWithCallback(mPendingOutputQueue.front());					mPendingOutputQueue.pop_front();					++mNumExtraBuffers;					++mNumPendingNetIoRequests;				}                else if (mUseFileCallback)                {                    WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_FREAD_ASYNCH);                    while(QueueFileRead())                        ;                    break;                }                else                {                    WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_FREAD_SYNCH);                    if (mHaveSeenEof || mFreeQueue.empty())                        break;                    WarTransferBuffer *pbuf = &(*mFreeQueue.front());                    pbuf->mBytesUsed = mFilePtr->Read(                        pbuf->mBuffer,                         pbuf->GetLength());                                        if (0 == pbuf->mBytesUsed)                    {                        WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_FREAD_CHKEOF);                        if (mFilePtr->IsEof())                            WarThrow(WarError(WAR_FERR_END_OF_FILE), NULL);                        else                        {                            // No data fetched, - not callback-mode                            // and no error.                             // We must handle this as a error-situation                            WarLog err_log(WARLOG_ERROR, "WarTransferSocket::PumpData()", this);                                                        err_log << "Read() from file returen zero bytes in non-callback mode. No error was reported. This should never happen!. Aborting transfer."                                << war_endl;                                                        WarThrow(WarError(WAR_ERR_SYSTEM_ERROR), NULL);                        }                    }                    {                        WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_SCK_OUT);                        QueueNetWrite(mFreeQueue.front());                        mFreeQueue.pop_front();                    }                }            }            catch(WarException& ex)            {                if (ex.LocalError() == WAR_FERR_END_OF_FILE)                {                    mHaveSeenEof = true;                    break;                }                                // Abort                WarLog warn_log(WARLOG_WARNINGS, "WarTransferSocket::PumpData()", this);                warn_log << "Caught unexpected exception "                    << ex.Explain()                    << " Aborting transfer"                    << war_endl;                OnTransferDone(ex);                return;            }        }        else // Direction        {            // Receiving             WAR_PROFILE_FUNC(WAR_FP_DATA_PUMP_RECEIVING);            // Receive data            try            {                while(QueueNetRead())                        ;                    break;            }            catch(WarException& ex)            {                if (ex.LocalError() == WAR_FERR_END_OF_FILE)                {                    mHaveSeenEof = true;                    break;                }                                // Abort                WarLog warn_log(WARLOG_WARNINGS, "WarTransferSocket::PumpData()", this);                warn_log << "Caught unexpected exception from QueueNetRead() "                    << ex.Explain()                    << " Aborting transfer"                    << war_endl;                OnTransferDone(ex);                return;            }        }            } // while(!mFreeQueue.empty())    ProcessPendingBuffers();            if (mHaveSeenEof)    {#ifdef DEBUG        WarLog db_log(WARLOG_DEBUG, "WarTransferSocket::PumpData()", this);        if (db_log)        {            db_log << "EOF has been set. "                << GetReferenceCount() << " references left and "                << mNumPendingNetIoRequests                << " + "                << mNumPendingFileIoRequests                << " pendig IO requests and "                << (int)mPendingInQueue.size()                << " buffers pending for full sequence."                << war_endl;        }#endif        if (D_SENDING == mDirection)        {            if ((0 >= mNumPendingFileIoRequests)                && (0 >= mNumPendingNetIoRequests)                && mPendingInQueue.empty())            {                OnTransferDone(WarError());            }        }        else        {             if ((0 >= mNumPendingFileIoRequests)                 && (0 >= mNumPendingNetIoRequests)                 && mPendingInQueue.empty())             {                 OnTransferDone(WarError());             }        }    }}void WarTransferSocket::PreOnAccept(const WarError& status,        war_socket_t newSocket,        const WarNetAddress& remoteAddress,        const WarNetAddress& localAddress){	if (GetTransferMode() == TM_NORMAL)	{		OnAccept(status, newSocket, remoteAddress, localAddress);		return;	}    WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnAccept);    AUTO_LOCK;    if (status)    {        OnTransferDone(status);        return;    }    // We take over the new socket, and call OnConnect()    mpCompanion->Close();    mpCompanion->AssignSocket(newSocket);    SetCurrentState(WAR_SCKSTATE_CONNECTED);    OnConnect(status);}void WarTransferSocket::PreOnConnect(const WarError& status){	if (GetTransferMode() == TM_NORMAL)	{		OnConnect(status);		return;	}    WAR_PROFILE_FUNC(WAR_FP_WarTransferSocket_OnConnect);    WarLog debug_log(WARLOG_DEBUG, "WarTransferSocket::OnConnect()", this);    AUTO_LOCK;    if (status)    {        if (debug_log)        {            debug_log << "Data connection with peer failed."                << status                << war_endl;        }        OnTransferDone(status);        return;    }    if (debug_log)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -