⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 warsocketcpslimit.h

📁 ftpserver very good sample
💻 H
字号:
/** Plugin to enable CPS restrictions on async socket  * connections*/#ifndef WAR_SOCKET_CPS_LIMIT_H#define WAR_SOCKET_CPS_LIMIT_H/* SYSTEM INCLUDES *//* PROJECT INCLUDES */#ifndef WAR_PLUGIN_SUPPORT_H#   include "WarPluginSupport.h"#endif/* LOCAL INCLUDES *//* FORWARD REFERENCES */class WarSocket;class WarSocketIo;#ifdef __cplusplusextern "C" {#endif/****************** BEGIN OLD STYLE C spesific ********//****************** END OLD STYLE C spesific **********/#ifdef __cplusplus }#endif/****************** BEGIN C++ spesific ****************/#ifdef __cplusplusclass WarSocketCpsLimit :public WarPluginModule // This is a plugin module{public:    enum DirectionE    {        SENDING,        RECEIVING    };    // LIFECYCLE        /**    * Default constructor.    */    WarSocketCpsLimit();        /**    * Destructor.    */    ~WarSocketCpsLimit();        // OPERATORS    // OPERATIONS    void EnableCps(        WarSocket *pSocket,        const unsigned delayMs,        const unsigned holdMs,        const unsigned minCnt,        const unsigned maxCnt,        const unsigned minCps,        const unsigned maxCps,        const DirectionE forDirection);    // CALLBACK    virtual void OnClassInstance(WarPluginBaseSupport *pClassInstance,        const NotificationE notificationType)        throw(WarException); // Should not throw    // ACCESS    static WarSocketCpsLimit& GetModule()    {        if (NULL == spThis)            WarThrow(WarError(WAR_ERR_INTERNAL_DATA_NOT_INITIALIZED), NULL);                return *spThis;    }    // INQUIRY    protected:private:    static WarSocketCpsLimit *spThis;};#if WAR_CPS_LIMIT_WANT_PLUGINS#ifndef WAR_TRANSFER_BUFFER_H#   include "WarTransferBuffer.h"#endif#ifndef WAR_CPS_H#   include "WarCps.h"#endif#ifndef WAR_TIMER_GENERIC_EVENT_H#   include "WarTimerGenericEvent.h"#endif#ifndef WAR_AUTO_COUNTER_H#   include "WarAutoCounter.h"#endif#ifndef WAR_SOCKET_H#   include "WarSocket.h"#endif#ifndef WAR_AUTO_LOCK_H#   include "WarAutoLock.h"#endif#ifndef WAR_LOG_H#   include "WarLog.h"#endif#ifndef WAR_TIMER_H#   include "WarTimer.h"#endiftemplate <class classT>class WarSocketCpsLimit_WarSocketIo_OnXmitBase :public classT{public:    typedef std::pair<WarError, war_transfer_buffer_ptr_t> data_t;    typedef std::list<data_t> buffer_queue_t;    typedef WarTimerGenericEvent<WarSocketCpsLimit_WarSocketIo_OnXmitBase<classT> > timer_event_t;    typedef WarPtrWrapper<timer_event_t> timer_ptr_t;    WarSocketCpsLimit_WarSocketIo_OnXmitBase(        WarSocket *pSocket,        const unsigned delayMs,        const unsigned holdMs,        const unsigned minCnt,        const unsigned maxCnt,        const unsigned minCps,        const unsigned maxCps) :        mCps(delayMs, holdMs, minCnt, maxCnt, minCps, maxCps),        mpSocket(pSocket),        mMaxCps(maxCps)    {}    virtual void OnProcess(const WarError& status,        war_transfer_buffer_ptr_t& buffer)        throw(WarException)    {        WarAutoLock my_lock(mLock);                    WarLog cps_log(WARLOG_CPS, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::DoProcess()");        if (cps_log)            cps_log << "Evaluating CPS rate. ";                bool is_empty;                if (true == (is_empty = mBuffers.empty()))            mCps.Record(buffer->GetBytesUsed());                if (!is_empty || mCps.IsOverLimit())        {            mBuffers.push_back(data_t(status, buffer));                        if (cps_log)                cps_log << "Caching the request for delayed callback." << war_endl;                        if (is_empty)                SetTimer();                        WarThrow(WarError(WAR_ERR_PLUGIN_DONE), "suspend");        }                if (cps_log)            cps_log << "Continuing with default processing." << war_endl;    }        void OnTimer()    {        data_t my_data;        int cnt = 0;                WarLog cps_log(WARLOG_CPS, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::OnTimer()");                WarAutoLock my_lock(mLock);                if (cps_log)        {            cps_log << "Resuming transfer "                << mpSocket->GetSeqNumber()                << " Current CPS is "                << mCps.GetCps()                << war_endl;        }                mTimerPtr = NULL;                if (mBuffers.empty())            return; // Nothing to do                            /* The top buffer may be sampled, the rest is not, and we have to                    do the sampling here, before OnSent() is called. We can not                    sample the first entry!.        */                while(!mBuffers.empty())        {            my_data = mBuffers.front();                        if (0 != cnt++)            {                mCps.Record(my_data.second->GetBytesUsed());                if (mCps.IsOverLimit())                {                    SetTimer();                    return;                }            }                        mBuffers.pop_front();                        try            {                OnXmitted(my_data);            }            catch(WarException& e)            {                WarLog err_log(WARLOG_ERROR, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::OnTimer()");                                err_log << "Caught unexpected exception:"                    << e                     << " Ignoring it!"                    << war_endl;            }#if WAR_CATCH_ALL            catch(...)            {                WarLog err_log(WARLOG_ERROR, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::OnTimer()");                                err_log << "Caught unknown exception. Ignoring it!"                    << war_endl;            }#endif // WAR_CATCH_ALL        }                if (cps_log)        {            cps_log << "The queue is clear. Proceeding with undelayed processing."                << war_endl;        }        #ifdef _DEBUG        assert(mBuffers.empty());#endif    }    protected:    void SetTimer()    {        if (!mTimerPtr.IsEmpty())            return; // Timer is set                mTimerPtr = new timer_event_t(this, mCps.GetNextCpsTime());                WarLog cps_log(WARLOG_CPS, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::SetTimer()");        if (cps_log)        {            cps_log << "Suspending transfer "                << mpSocket->GetSeqNumber()                << " due to CPS limit. Current CPS is "                << mCps.GetCps()                << " and the limit is "                << mMaxCps                << ". Timer event data: "                << mTimerPtr->Explain()                << war_endl;        }                WarTimer::GetTimer().AddEvent((war_timer_event_ptr_t &)mTimerPtr);    }        virtual void OnXmitted(data_t& data) = 0;    WarCps mCps;    WarSocket *mpSocket;    const int mMaxCps;    buffer_queue_t mBuffers;    timer_ptr_t mTimerPtr;    WarCriticalSection mLock;};class WarSocketCpsLimit_WarSocketIo_OnReceived : public WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnReceived>{public:        WarSocketCpsLimit_WarSocketIo_OnReceived(        WarSocket *pSocket,        const unsigned delayMs,        const unsigned holdMs,        const unsigned minCnt,        const unsigned maxCnt,        const unsigned minCps,        const unsigned maxCps)        : WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnReceived>(        pSocket,        delayMs,        holdMs,        minCnt,        maxCnt,        minCps,        maxCps)    {}protected:    virtual void OnXmitted(data_t& data)    {        mpSocket->OnReceived(data.first, data.second);    }    ~WarSocketCpsLimit_WarSocketIo_OnReceived() {}};class WarSocketCpsLimit_WarSocketIo_OnSent : public WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnSent>{public:        WarSocketCpsLimit_WarSocketIo_OnSent(        WarSocket *pSocket,        const unsigned delayMs,        const unsigned holdMs,        const unsigned minCnt,        const unsigned maxCnt,        const unsigned minCps,        const unsigned maxCps)        : WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnSent>(        pSocket,        delayMs,        holdMs,        minCnt,        maxCnt,        minCps,        maxCps)    {}protected:    virtual void OnXmitted(data_t& data)    {        mpSocket->OnSent(data.first, data.second);    }    ~WarSocketCpsLimit_WarSocketIo_OnSent() {}};#endif // WAR_CPS_LIMIT_WANT_PLUGINS#endif /* __cplusplus *//****************** END C++ spesific ******************/#endif  /* WAR_SOCKET_CPS_LIMIT_H_ */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -