⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 warthreadpool.cpp

📁 ftpserver very good sample
💻 CPP
字号:
#include "StdAfx.h"#include "WarThreadPool.h"   // class implemented#ifndef WAR_THREAD_FOR_POOL_H#   include "WarThreadForPool.h"#endif#ifndef WAR_THREAD_POOL_TASK_H#   include "WarThreadPoolTask.h"#endif#ifndef WAR_TIME_H#   include "WarTime.h"#endif#ifndef WAR_LOG_H#   include "WarLog.h"#endif#ifndef WAR_STRSTREAM_INCLUDED#   include <strstream>#endif#ifndef WAR_AUTO_LOCK_H#   include "WarAutoLock.h"#endifusing namespace std;/////////////////////////////// PUBLIC ///////////////////////////////////////#define AUTO_LOCK WarAutoLock MyLock(mLock);//============================= LIFECYCLE ====================================WarThreadPool::WarThreadPool()        : mMaxThreads(0),        mMinThreads(0){}WarThreadPool::~WarThreadPool(){    TerminateAllThreads();    AUTO_LOCK    // Clear all queues    while (!mPendingQueue.empty())    {        mPendingQueue.pop_front();    }    while (!mProcessingQueue.empty())    {        mProcessingQueue.pop_front();    }    while (!mDoneQueue.empty())    {        mDoneQueue.pop_front();    }}//============================= OPERATIONS ===================================voidWarThreadPool::FlagAllThreads(bool doAbort, bool doExit){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::FlagAllThreads()");    thrd_debug << "Flagging all threads for:";    if (doAbort)        thrd_debug << " abort";    if (doExit)        thrd_debug << " exit";    thrd_debug << war_endl;    AUTO_LOCK    for (thread_list_t::const_iterator P = mThreads.begin()        ; P != mThreads.end()        ; P++)    {        if (doAbort)            (*P)->SetAbortFlag();        if (doExit)            (*P)->SetExitFlag();        mSignal.Signal();	// One signal for each thread    }}void WarThreadPool::TerminateAllThreads(bool doAbort){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::TerminateAllThreads()");    thrd_debug << "Terminating all threads.";    if (doAbort)        thrd_debug << " Aborting all processing.";    thrd_debug << war_endl;            while (GetNumThreads())    {        thrd_debug << ListCurrentThreads() << war_endl;        FlagAllThreads(doAbort, true);        WarTime::Sleep(500);    }    while(mCntThreadsExist > 0)    {        WarTime::Sleep(500);    }    thrd_debug << "All threads and tasks are stopped" << war_endl;}void WarThreadPool::StartNewThread()throw(WarException){    WarThreadForPool *pThread = NewThread();    pThread->Open();}void WarThreadPool::Create(war_uint32_t MinThreads,                            war_uint32_t MaxThreads)                           throw(WarException){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::Create()");    thrd_debug << "Starting threadpool with "         << (int)MinThreads         << "/"         << (int)MaxThreads         << " threads"         << war_endl;    mMinThreads = MinThreads;    mMaxThreads = MaxThreads;        if (MaxThreads < mMinThreads)        WarThrow(WarError(WAR_ERR_INVALID_ARGUMENT), NULL);        // Create the thread pool    try    {        for (war_uint32_t i = 0; i < mMinThreads; i++)            StartNewThread();    }    catch (WarException& ex)    {        WarLogError("WarThreadPool::Create()",            "Caught exception while creating pool",            &ex);        TerminateAllThreads();        throw(ex);    }        thrd_debug << ListCurrentThreads() << war_endl;}//============================= ACCESS     ===================================//============================= INQUIRY    ===================================size_t WarThreadPool::GetNumThreads(){    AUTO_LOCK    return mThreads.size();}int WarThreadPool::GetNumIdleThreads(){    AUTO_LOCK    int Cnt = 0;    for (thread_list_t::const_iterator P = mThreads.begin()        ; P != mThreads.end()        ; P++)    {        if ((*P)->IsIdle())            ++Cnt;    }    return Cnt;}std::string WarThreadPool::ListCurrentTasks(war_ccstr_t Label){    AUTO_LOCK    ostrstream return_val;    return_val << "Listing current task-status for the thread-pool " << Label << "\n"        << ListCurrentTasks(mPendingQueue, "Pending")        << ListCurrentTasks(mProcessingQueue, "Current")        << ListCurrentTasks(mDoneQueue, "Processed")        << 0;        string rbuf = return_val.str();    return_val.freeze(false);    return rbuf;}std::string WarThreadPool::ListCurrentTasks(const task_queue_t & List, war_ccstr_t Label) const{    ostrstream return_val;        return_val << "---- QUEUE " << Label << "\n";        for (task_queue_t::const_iterator P = List.begin()        ; P != List.end()        ; P++)    {        const WarThreadPoolTask& rtask = *(*P);        char buf[128];                sprintf(buf, "%5d %-20s %-20s %-20s %7u",            rtask.mTaskCookie,            rtask.GetTypeName().c_str(),            rtask.GetStateName(),            rtask.GetPriorityName(),            (unsigned)rtask.GetElapsedTimeMs());        return_val << buf;        return_val << rtask.GetInfo().c_str() << endl;    }    return_val << '\0';    string rbuf = return_val.str();    return_val.freeze(false);    return rbuf;}std::string WarThreadPool::ListCurrentThreads() {    AUTO_LOCK    ostrstream return_val;    for(thread_list_t::iterator P = mThreads.begin()        ; P != mThreads.end()        ; P++)    {        return_val << (*P)->GetThreadInfo() << endl;    }    return_val << '\0';    string rbuf = return_val.str();    return_val.freeze(false);    return rbuf;}/////////////////////////////// PROTECTED  ///////////////////////////////////// Called to schedule a task for processingvoid WarThreadPool::PushTask(WarThreadPoolTask * pTask)throw(WarException){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::PushTask()");    thrd_debug << "New task being scheduled: "        << pTask->GetTaskId()         << " "        << pTask->GetTypeName()         << " "        << pTask->GetInfo()        << " "        << pTask->GetPriorityName();    bool have_higherrtaskriority_tasks = false;    bool do_start_new_thread = false;    {        AUTO_LOCK        // Insert in the order they appear, sorted by priority        task_queue_t::iterator P;        for (P = mPendingQueue.begin()            ; P != mPendingQueue.end()            ; P++)        {            if (!have_higherrtaskriority_tasks                && (pTask->mPriority < WarThreadPoolTask::PRI_VERY_LOW))                have_higherrtaskriority_tasks = true;                        if (pTask->mPriority < (*P)->mPriority)                break;        }        mPendingQueue.insert(P, pTask);        pTask->SetState(WarThreadPoolTask::STATE_QUEUED);    }    switch (pTask->mPriority)    {    case WarThreadPoolTask::PRI_HIGHEST:        if (!GetNumIdleThreads())            do_start_new_thread = true;        break;    case WarThreadPoolTask::PRI_HIGH:    case WarThreadPoolTask::PRI_NORMAL:        if (!GetNumIdleThreads())        {            AUTO_LOCK                            size_t NumThrds = mThreads.size();            if (NumThrds < mMaxThreads)                do_start_new_thread = true;        }        break;    case WarThreadPoolTask::PRI_LOW:    case WarThreadPoolTask::PRI_VERY_LOW:        {            AUTO_LOCK            size_t NumThrds = mThreads.size();            if (NumThrds == 0)                do_start_new_thread = true;        }        break;    }    if (do_start_new_thread)    {        thrd_debug << " [on new thread]";        try        {            StartNewThread();        }        catch(WarException& ex)        {            WarLogError("WarThreadPool::PushTask()",                "Failed to create new thread. Continuing with the current threads.",                &ex);        }    }    thrd_debug << war_endl;    mSignal.Signal();}// Called from the worker thread to query for a new taskWarThreadPoolTask *WarThreadPool::PopTask(){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::PopTask()");    WarThreadPoolTask *pTask = NULL;        AUTO_LOCK    if (!mPendingQueue.empty())    {        // Take from pending queue        WarPtrWrapper<WarThreadPoolTask> Task = mPendingQueue.front();        mProcessingQueue.push_front(Task);        mPendingQueue.pop_front();        pTask = &(*Task);            thrd_debug << "Task picked for processing: "            << pTask->GetTaskId()             << " "            << pTask->GetTypeName()             << war_endl;    }        return pTask;}WarPtrWrapper<WarThreadPoolTask> WarThreadPool::PopDoneTask(){       AUTO_LOCK    WarPtrWrapper<WarThreadPoolTask> Task;    if (!mDoneQueue.empty())    {        // Take from pending queue        Task = mDoneQueue.front();        mDoneQueue.pop_front();        return Task;    }    WarThrow(WarError(WAR_ERR_NO_OBJECT), NULL);    return Task; // Compiler food}// Called from the worker thread to notify that a task is processedvoid WarThreadPool::TaskDone(WarPtrWrapper<WarThreadPoolTask>&                              rWrappedTask, bool isAborted){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::TaskDone()");        AUTO_LOCK    WarThreadPoolTask *pTask = &(*rWrappedTask);    thrd_debug << "Task ended processing: "            << pTask->GetTaskId()             << " "            << pTask->GetTypeName()            << " after "            << (unsigned)pTask->GetElapsedTimeMs()            << " milliseconds";    if (isAborted)        thrd_debug << ". The task was aborted.";    thrd_debug << war_endl;    CheckIfExist(mProcessingQueue, pTask, true);	// Remove from queue        pTask->SetState(        isAborted        ? WarThreadPoolTask::STATE_ABORTED        : WarThreadPoolTask::STATE_PROCESSED);        if (!(isAborted ? pTask->OnAborted() : pTask->OnProcessed()))        mDoneQueue.push_front(pTask);}// Return true if the task was removed, false if it was flagged for abortbool WarThreadPool::AbortTask(WarThreadPoolTask * pTask){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::AbortTask()");    bool return_val = true;    thrd_debug << "Tagging task for abort: "        << pTask->GetTaskId()         << pTask->GetTypeName()        << war_endl;        AUTO_LOCK    if (!CheckIfExist(mPendingQueue, pTask, true)        && !CheckIfExist(mDoneQueue, pTask, true))    {        if (CheckIfExist(mProcessingQueue, pTask))        {            return_val = false;            if (pTask->mpThread)                pTask->mpThread->SetCancelFlag();        }    }        return return_val;}void WarThreadPool::KillAllTasks(){    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::KillAllTasks()");    thrd_debug << "Killing all tasks" << war_endl;    AUTO_LOCK        // Delete the pending queue    while(!mPendingQueue.empty())        mPendingQueue.pop_front();        // Delete the current queue    for(task_queue_t::iterator P = mProcessingQueue.begin()        ; P != mProcessingQueue.end()        ; ++P)    {        if ((*P)->mpThread)            (*P)->mpThread->SetCancelFlag();    }        // Delete the done queue    while(!mDoneQueue.empty())        mDoneQueue.pop_front();    thrd_debug << "All tasks are killed" << war_endl;}void WarThreadPool::RemoveFromDoneQueue(WarThreadPoolTask * pTask){    AUTO_LOCK    CheckIfExist(mDoneQueue, pTask, true);}void WarThreadPool::Attach(WarThreadForPool *pThread){    AUTO_LOCK;    mThreads.push_front(pThread);    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::Attach()");    if (thrd_debug)    {        thrd_debug << "Attaching thread "            << (unsigned) pThread->GetThreadId()            << " to the list of active threads. "            << mThreads.size()             << " threads are now available in this thread-pool."            << war_endl;    }}void WarThreadPool::Detach(WarThreadForPool *pThread){    AUTO_LOCK;    const char *found = " The thread was not found!";    for (WarThreadPool::thread_list_t::iterator P = mThreads.begin()        ; P != mThreads.end()        ; P++)    {        if (*pThread == **P)        {            mThreads.erase(P);            found = "";            break;        }    }    WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::Detach()");    if (thrd_debug)    {        thrd_debug << "Detaching thread "            << (unsigned) pThread->GetThreadId()            << " from the list of active threads. "            << found            << mThreads.size()             << " threads are now available in this thread-pool."            << war_endl;    }}WarThreadForPool *WarThreadPool::NewThread(){    return new WarThreadForPool(this);}/////////////////////////////// PRIVATE    ///////////////////////////////////bool WarThreadPool::CheckIfExist(task_queue_t & List,                          WarThreadPoolTask *pTask,                         bool doRemove){    bool return_val = false;        for (task_queue_t::iterator P = List.begin()        ; P != List.end()        ; P++)    {        if (&(**P) == pTask)        {            return_val = true;            if (doRemove)                List.erase(P);            break;        }    }    return return_val;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -