⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 warsvrwin32eventthread.cpp

📁 ftpserver very good sample
💻 CPP
字号:
#include "StdAfx.h"#include "WarSvrWin32EventThread.h"   // class implemented#ifndef WAR_AUTO_LOCK_H#   include "WarAutoLock.h"#endif#ifndef WAR_SVR_WIN32_NT_ENGINE_H#   include "WarSvrWin32NtEngine.h"#endif#ifndef WAR_LOG_H#   include "WarLog.h"#endif#ifndef WAR_ASSERT_H_INCLUDED#   define WAR_ASSERT_H_INCLUDED#   include <assert.h>#endifusing namespace std;#define AUTO_LOCK WarAutoLock my_lock(mLock);/////////////////////////////// PUBLIC /////////////////////////////////////////============================= LIFECYCLE ====================================WarSvrWin32EventThread::WarSvrWin32EventThread(): mEvent(NULL),bDone(false){}// WarSvrWin32EventThreadWarSvrWin32EventThread::~WarSvrWin32EventThread(){    {        AUTO_LOCK                    if (mEvent)        {            ::CloseHandle(mEvent);            mEvent = NULL;        }    }}// ~WarSvrWin32EventThread//============================= OPERATORS ====================================//============================= OPERATIONS ===================================void WarSvrWin32EventThread::Create(){    if ((mEvent = ::CreateEvent(NULL, true, true, NULL)) == NULL)        WarThrow(WarSystemError(), "CreateEvent()");    Attach(); // Detached in Run()    if (::CreateThread(NULL, 0, ThreadProc, this, 0, NULL) == NULL)    {        Detach();        WarThrow(WarSystemError(), "CreateThread()");    }}void WarSvrWin32EventThread::QueueRequest(task_ptr_t& taskPtr){    WarLog socket_log(WARLOG_SOCKET, "WarSvrWin32EventThread::QueueRequest()");    if (socket_log)    {        socket_log << "Queuing socket "            << (int)taskPtr->mSocket            << " for I/O completion port event signaling"            << war_endl;    }    AUTO_LOCK            pair<task_set_t::iterator, bool> result = mTasks.insert(taskPtr);    if (!result.second)        WarThrow(WarError(WAR_ERR_OBJECT_EXIST), NULL); // A socket can only be in the list once!    assert(MAX_HANDLES > mTasks.size());    ::SetEvent(mEvent);}void WarSvrWin32EventThread::RemoveRequest(const war_socket_t sck){    WarLog socket_log(WARLOG_SOCKET, "WarSvrWin32EventThread::RemoveRequest()");    if (socket_log)    {        socket_log << "Cancelling socket "            << (int)sck            << " for queued I/O completion port event signaling"            << war_endl;    }    AUTO_LOCK    WarSvrWin32EventThreadTask *ptask = new WarSvrWin32EventThreadTask;    ptask->mSocket = sck;        task_ptr_t src_key = ptask;    task_set_t::iterator P = mTasks.find(src_key);    if (P != mTasks.end())        mTasks.erase(P);}void WarSvrWin32EventThread::Shutdown(){    bDone = true;}//============================= ACCESS     ===================================size_t WarSvrWin32EventThread::GetNumQueued(bool doLock) {    if (doLock)    {        AUTO_LOCK        return mTasks.size();    }    return mTasks.size();};//============================= INQUIRY    ===================================/////////////////////////////// PROTECTED  ////////////////////////////////////////////////////////////////// PRIVATE    ///////////////////////////////////DWORD WINAPI WarSvrWin32EventThread::ThreadProc(LPVOID lpParameter){    WarSvrWin32EventThread *p = (WarSvrWin32EventThread *)lpParameter;    p->Run();    return 0;}void WarSvrWin32EventThread::Run(){    WarLog debug_log(WARLOG_DEBUG, "WarSvrWin32EventThread::Run()");    if (debug_log)    {        debug_log << "Starting thread for I/O completion port simulation for non-supported operations."            << (int)::GetCurrentThreadId()             << war_endl;    }    WarPtrWrapper<WarSvrWin32EventThread> me_ptr(this); // Prevent destruction    Detach(); // Attached in Create()        HANDLE handles[MAX_HANDLES];    war_socket_t sockets[MAX_HANDLES];    size_t index = 0;    handles[index++] = mEvent;        while(!bDone)    {        {            AUTO_LOCK                        assert(MAX_HANDLES > mTasks.size());            for(task_set_t::iterator P = mTasks.begin()                ; P != mTasks.end()                ; P++)            {                sockets[index] = (*P)->mSocket;                handles[index++] = (*P)->mEvent;            }        }        // Sleep for 5 minutes at max. If the queue is empty after        // that time, we exit.        DWORD result = ::WaitForMultipleObjects(index, handles, false,             1000 * 60 * 5); // 5 minutes timeout        ::ResetEvent(mEvent);                if (WAIT_OBJECT_0 == result) // Command        {            continue; // reinitialize the arrays        }        else if ((result >= WAIT_OBJECT_0 +1)            && (result < WAIT_OBJECT_0 + index))        {            // We got an event!            AUTO_LOCK                            index = result - WAIT_OBJECT_0;                        WarSvrWin32EventThreadTask *ptask = new WarSvrWin32EventThreadTask;            ptask->mSocket = (war_socket_t)sockets[index];            task_ptr_t src_key = ptask;                        task_set_t::iterator P = mTasks.find(src_key);            if (P != mTasks.end())            {                Execute(*P);                mTasks.erase(P);            }        }        else if (result == WAIT_TIMEOUT)        {            AUTO_LOCK            if (mTasks.empty())                WarSvrWin32NtEngine::GetEngine().RemoveIdleEventThread(this);        }    }    {        AUTO_LOCK        while(!mTasks.empty())        {            Execute(*mTasks.begin());            mTasks.erase(mTasks.begin());        }    }    if (debug_log)    {        debug_log << "Ending thread for I/O completion port simulation for non-supported operations."            << (int)::GetCurrentThreadId()             << war_endl;    }}void WarSvrWin32EventThread::Execute(task_ptr_t& taskPtr){    WarLog socket_log(WARLOG_SOCKET, "WarSvrWin32EventThread::Execute()");    WarSvrWin32EventThreadTask *p = &(*taskPtr);    if (socket_log)    {        socket_log << "Signaling I/O completion port for socket "            << (int)p->mSocket            << war_endl;    }    try    {        PostQueuedCompletionStatus(p->mCompletionPort,            p->mNumberOfBytesTransferred,            p->mCompletionKey,            p->mpOverlapped);    }    catch(WarException& ex)    {        WarLog warn_log(WARLOG_WARNINGS, "WarSvrWin32EventThread::Execute()");        warn_log << "Caught unexpected exception during cleanup. "            << ex.Explain()            <<" Ignoring."            << war_endl;    }#if WAR_CATCH_ALL        catch(...)    {        WarLog warn_log(WARLOG_WARNINGS, "WarSvrWin32EventThread::Execute()");        warn_log << "Caught unexpected and unknown exception during cleanup. "            <<" Ignoring."            << war_endl;    }#endif}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -