📄 warsocketcpslimit.h
字号:
/** Plugin to enable CPS restrictions on async socket * connections*/#ifndef WAR_SOCKET_CPS_LIMIT_H#define WAR_SOCKET_CPS_LIMIT_H/* SYSTEM INCLUDES *//* PROJECT INCLUDES */#ifndef WAR_PLUGIN_SUPPORT_H# include "WarPluginSupport.h"#endif/* LOCAL INCLUDES *//* FORWARD REFERENCES */class WarSocket;class WarSocketIo;#ifdef __cplusplusextern "C" {#endif/****************** BEGIN OLD STYLE C spesific ********//****************** END OLD STYLE C spesific **********/#ifdef __cplusplus }#endif/****************** BEGIN C++ spesific ****************/#ifdef __cplusplusclass WarSocketCpsLimit :public WarPluginModule // This is a plugin module{public: enum DirectionE { SENDING, RECEIVING }; // LIFECYCLE /** * Default constructor. */ WarSocketCpsLimit(); /** * Destructor. */ ~WarSocketCpsLimit(); // OPERATORS // OPERATIONS void EnableCps( WarSocket *pSocket, const unsigned delayMs, const unsigned holdMs, const unsigned minCnt, const unsigned maxCnt, const unsigned minCps, const unsigned maxCps, const DirectionE forDirection); // CALLBACK virtual void OnClassInstance(WarPluginBaseSupport *pClassInstance, const NotificationE notificationType) throw(WarException); // Should not throw // ACCESS static WarSocketCpsLimit& GetModule() { if (NULL == spThis) WarThrow(WarError(WAR_ERR_INTERNAL_DATA_NOT_INITIALIZED), NULL); return *spThis; } // INQUIRY protected:private: static WarSocketCpsLimit *spThis;};#if WAR_CPS_LIMIT_WANT_PLUGINS#ifndef WAR_TRANSFER_BUFFER_H# include "WarTransferBuffer.h"#endif#ifndef WAR_CPS_H# include "WarCps.h"#endif#ifndef WAR_TIMER_GENERIC_EVENT_H# include "WarTimerGenericEvent.h"#endif#ifndef WAR_AUTO_COUNTER_H# include "WarAutoCounter.h"#endif#ifndef WAR_SOCKET_H# include "WarSocket.h"#endif#ifndef WAR_AUTO_LOCK_H# include "WarAutoLock.h"#endif#ifndef WAR_LOG_H# include "WarLog.h"#endif#ifndef WAR_TIMER_H# include "WarTimer.h"#endiftemplate <class classT>class WarSocketCpsLimit_WarSocketIo_OnXmitBase :public classT{public: typedef std::pair<WarError, war_transfer_buffer_ptr_t> data_t; typedef std::list<data_t> buffer_queue_t; typedef WarTimerGenericEvent<WarSocketCpsLimit_WarSocketIo_OnXmitBase<classT> > timer_event_t; typedef WarPtrWrapper<timer_event_t> timer_ptr_t; WarSocketCpsLimit_WarSocketIo_OnXmitBase( WarSocket *pSocket, const unsigned delayMs, const unsigned holdMs, const unsigned minCnt, const unsigned maxCnt, const unsigned minCps, const unsigned maxCps) : mCps(delayMs, holdMs, minCnt, maxCnt, minCps, maxCps), mpSocket(pSocket), mMaxCps(maxCps) {} virtual void OnProcess(const WarError& status, war_transfer_buffer_ptr_t& buffer) throw(WarException) { WarAutoLock my_lock(mLock); WarLog cps_log(WARLOG_CPS, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::DoProcess()"); if (cps_log) cps_log << "Evaluating CPS rate. "; bool is_empty; if (true == (is_empty = mBuffers.empty())) mCps.Record(buffer->GetBytesUsed()); if (!is_empty || mCps.IsOverLimit()) { mBuffers.push_back(data_t(status, buffer)); if (cps_log) cps_log << "Caching the request for delayed callback." << war_endl; if (is_empty) SetTimer(); WarThrow(WarError(WAR_ERR_PLUGIN_DONE), "suspend"); } if (cps_log) cps_log << "Continuing with default processing." << war_endl; } void OnTimer() { data_t my_data; int cnt = 0; WarLog cps_log(WARLOG_CPS, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::OnTimer()"); WarAutoLock my_lock(mLock); if (cps_log) { cps_log << "Resuming transfer " << mpSocket->GetSeqNumber() << " Current CPS is " << mCps.GetCps() << war_endl; } mTimerPtr = NULL; if (mBuffers.empty()) return; // Nothing to do /* The top buffer may be sampled, the rest is not, and we have to do the sampling here, before OnSent() is called. We can not sample the first entry!. */ while(!mBuffers.empty()) { my_data = mBuffers.front(); if (0 != cnt++) { mCps.Record(my_data.second->GetBytesUsed()); if (mCps.IsOverLimit()) { SetTimer(); return; } } mBuffers.pop_front(); try { OnXmitted(my_data); } catch(WarException& e) { WarLog err_log(WARLOG_ERROR, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::OnTimer()"); err_log << "Caught unexpected exception:" << e << " Ignoring it!" << war_endl; }#if WAR_CATCH_ALL catch(...) { WarLog err_log(WARLOG_ERROR, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::OnTimer()"); err_log << "Caught unknown exception. Ignoring it!" << war_endl; }#endif // WAR_CATCH_ALL } if (cps_log) { cps_log << "The queue is clear. Proceeding with undelayed processing." << war_endl; } #ifdef _DEBUG assert(mBuffers.empty());#endif } protected: void SetTimer() { if (!mTimerPtr.IsEmpty()) return; // Timer is set mTimerPtr = new timer_event_t(this, mCps.GetNextCpsTime()); WarLog cps_log(WARLOG_CPS, "WarSocketCpsLimit_WarSocketIo_OnXmitBase::SetTimer()"); if (cps_log) { cps_log << "Suspending transfer " << mpSocket->GetSeqNumber() << " due to CPS limit. Current CPS is " << mCps.GetCps() << " and the limit is " << mMaxCps << ". Timer event data: " << mTimerPtr->Explain() << war_endl; } WarTimer::GetTimer().AddEvent((war_timer_event_ptr_t &)mTimerPtr); } virtual void OnXmitted(data_t& data) = 0; WarCps mCps; WarSocket *mpSocket; const int mMaxCps; buffer_queue_t mBuffers; timer_ptr_t mTimerPtr; WarCriticalSection mLock;};class WarSocketCpsLimit_WarSocketIo_OnReceived : public WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnReceived>{public: WarSocketCpsLimit_WarSocketIo_OnReceived( WarSocket *pSocket, const unsigned delayMs, const unsigned holdMs, const unsigned minCnt, const unsigned maxCnt, const unsigned minCps, const unsigned maxCps) : WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnReceived>( pSocket, delayMs, holdMs, minCnt, maxCnt, minCps, maxCps) {}protected: virtual void OnXmitted(data_t& data) { mpSocket->OnReceived(data.first, data.second); } ~WarSocketCpsLimit_WarSocketIo_OnReceived() {}};class WarSocketCpsLimit_WarSocketIo_OnSent : public WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnSent>{public: WarSocketCpsLimit_WarSocketIo_OnSent( WarSocket *pSocket, const unsigned delayMs, const unsigned holdMs, const unsigned minCnt, const unsigned maxCnt, const unsigned minCps, const unsigned maxCps) : WarSocketCpsLimit_WarSocketIo_OnXmitBase<WarSocketIo_OnSent>( pSocket, delayMs, holdMs, minCnt, maxCnt, minCps, maxCps) {}protected: virtual void OnXmitted(data_t& data) { mpSocket->OnSent(data.first, data.second); } ~WarSocketCpsLimit_WarSocketIo_OnSent() {}};#endif // WAR_CPS_LIMIT_WANT_PLUGINS#endif /* __cplusplus *//****************** END C++ spesific ******************/#endif /* WAR_SOCKET_CPS_LIMIT_H_ */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -