📄 warsvrwin32eventthread.cpp
字号:
#include "StdAfx.h"#include "WarSvrWin32EventThread.h" // class implemented#ifndef WAR_AUTO_LOCK_H# include "WarAutoLock.h"#endif#ifndef WAR_SVR_WIN32_NT_ENGINE_H# include "WarSvrWin32NtEngine.h"#endif#ifndef WAR_LOG_H# include "WarLog.h"#endif#ifndef WAR_ASSERT_H_INCLUDED# define WAR_ASSERT_H_INCLUDED# include <assert.h>#endifusing namespace std;#define AUTO_LOCK WarAutoLock my_lock(mLock);/////////////////////////////// PUBLIC /////////////////////////////////////////============================= LIFECYCLE ====================================WarSvrWin32EventThread::WarSvrWin32EventThread(): mEvent(NULL),bDone(false){}// WarSvrWin32EventThreadWarSvrWin32EventThread::~WarSvrWin32EventThread(){ { AUTO_LOCK if (mEvent) { ::CloseHandle(mEvent); mEvent = NULL; } }}// ~WarSvrWin32EventThread//============================= OPERATORS ====================================//============================= OPERATIONS ===================================void WarSvrWin32EventThread::Create(){ if ((mEvent = ::CreateEvent(NULL, true, true, NULL)) == NULL) WarThrow(WarSystemError(), "CreateEvent()"); Attach(); // Detached in Run() if (::CreateThread(NULL, 0, ThreadProc, this, 0, NULL) == NULL) { Detach(); WarThrow(WarSystemError(), "CreateThread()"); }}void WarSvrWin32EventThread::QueueRequest(task_ptr_t& taskPtr){ WarLog socket_log(WARLOG_SOCKET, "WarSvrWin32EventThread::QueueRequest()"); if (socket_log) { socket_log << "Queuing socket " << (int)taskPtr->mSocket << " for I/O completion port event signaling" << war_endl; } AUTO_LOCK pair<task_set_t::iterator, bool> result = mTasks.insert(taskPtr); if (!result.second) WarThrow(WarError(WAR_ERR_OBJECT_EXIST), NULL); // A socket can only be in the list once! assert(MAX_HANDLES > mTasks.size()); ::SetEvent(mEvent);}void WarSvrWin32EventThread::RemoveRequest(const war_socket_t sck){ WarLog socket_log(WARLOG_SOCKET, "WarSvrWin32EventThread::RemoveRequest()"); if (socket_log) { socket_log << "Cancelling socket " << (int)sck << " for queued I/O completion port event signaling" << war_endl; } AUTO_LOCK WarSvrWin32EventThreadTask *ptask = new WarSvrWin32EventThreadTask; ptask->mSocket = sck; task_ptr_t src_key = ptask; task_set_t::iterator P = mTasks.find(src_key); if (P != mTasks.end()) mTasks.erase(P);}void WarSvrWin32EventThread::Shutdown(){ bDone = true;}//============================= ACCESS ===================================size_t WarSvrWin32EventThread::GetNumQueued(bool doLock) { if (doLock) { AUTO_LOCK return mTasks.size(); } return mTasks.size();};//============================= INQUIRY ===================================/////////////////////////////// PROTECTED ////////////////////////////////////////////////////////////////// PRIVATE ///////////////////////////////////DWORD WINAPI WarSvrWin32EventThread::ThreadProc(LPVOID lpParameter){ WarSvrWin32EventThread *p = (WarSvrWin32EventThread *)lpParameter; p->Run(); return 0;}void WarSvrWin32EventThread::Run(){ WarLog debug_log(WARLOG_DEBUG, "WarSvrWin32EventThread::Run()"); if (debug_log) { debug_log << "Starting thread for I/O completion port simulation for non-supported operations." << (int)::GetCurrentThreadId() << war_endl; } WarPtrWrapper<WarSvrWin32EventThread> me_ptr(this); // Prevent destruction Detach(); // Attached in Create() HANDLE handles[MAX_HANDLES]; war_socket_t sockets[MAX_HANDLES]; size_t index = 0; handles[index++] = mEvent; while(!bDone) { { AUTO_LOCK assert(MAX_HANDLES > mTasks.size()); for(task_set_t::iterator P = mTasks.begin() ; P != mTasks.end() ; P++) { sockets[index] = (*P)->mSocket; handles[index++] = (*P)->mEvent; } } // Sleep for 5 minutes at max. If the queue is empty after // that time, we exit. DWORD result = ::WaitForMultipleObjects(index, handles, false, 1000 * 60 * 5); // 5 minutes timeout ::ResetEvent(mEvent); if (WAIT_OBJECT_0 == result) // Command { continue; // reinitialize the arrays } else if ((result >= WAIT_OBJECT_0 +1) && (result < WAIT_OBJECT_0 + index)) { // We got an event! AUTO_LOCK index = result - WAIT_OBJECT_0; WarSvrWin32EventThreadTask *ptask = new WarSvrWin32EventThreadTask; ptask->mSocket = (war_socket_t)sockets[index]; task_ptr_t src_key = ptask; task_set_t::iterator P = mTasks.find(src_key); if (P != mTasks.end()) { Execute(*P); mTasks.erase(P); } } else if (result == WAIT_TIMEOUT) { AUTO_LOCK if (mTasks.empty()) WarSvrWin32NtEngine::GetEngine().RemoveIdleEventThread(this); } } { AUTO_LOCK while(!mTasks.empty()) { Execute(*mTasks.begin()); mTasks.erase(mTasks.begin()); } } if (debug_log) { debug_log << "Ending thread for I/O completion port simulation for non-supported operations." << (int)::GetCurrentThreadId() << war_endl; }}void WarSvrWin32EventThread::Execute(task_ptr_t& taskPtr){ WarLog socket_log(WARLOG_SOCKET, "WarSvrWin32EventThread::Execute()"); WarSvrWin32EventThreadTask *p = &(*taskPtr); if (socket_log) { socket_log << "Signaling I/O completion port for socket " << (int)p->mSocket << war_endl; } try { PostQueuedCompletionStatus(p->mCompletionPort, p->mNumberOfBytesTransferred, p->mCompletionKey, p->mpOverlapped); } catch(WarException& ex) { WarLog warn_log(WARLOG_WARNINGS, "WarSvrWin32EventThread::Execute()"); warn_log << "Caught unexpected exception during cleanup. " << ex.Explain() <<" Ignoring." << war_endl; }#if WAR_CATCH_ALL catch(...) { WarLog warn_log(WARLOG_WARNINGS, "WarSvrWin32EventThread::Execute()"); warn_log << "Caught unexpected and unknown exception during cleanup. " <<" Ignoring." << war_endl; }#endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -