📄 warthreadpool.cpp
字号:
#include "StdAfx.h"#include "WarThreadPool.h" // class implemented#ifndef WAR_THREAD_FOR_POOL_H# include "WarThreadForPool.h"#endif#ifndef WAR_THREAD_POOL_TASK_H# include "WarThreadPoolTask.h"#endif#ifndef WAR_TIME_H# include "WarTime.h"#endif#ifndef WAR_LOG_H# include "WarLog.h"#endif#ifndef WAR_STRSTREAM_INCLUDED# include <strstream>#endif#ifndef WAR_AUTO_LOCK_H# include "WarAutoLock.h"#endifusing namespace std;/////////////////////////////// PUBLIC ///////////////////////////////////////#define AUTO_LOCK WarAutoLock MyLock(mLock);//============================= LIFECYCLE ====================================WarThreadPool::WarThreadPool() : mMaxThreads(0), mMinThreads(0){}WarThreadPool::~WarThreadPool(){ TerminateAllThreads(); AUTO_LOCK // Clear all queues while (!mPendingQueue.empty()) { mPendingQueue.pop_front(); } while (!mProcessingQueue.empty()) { mProcessingQueue.pop_front(); } while (!mDoneQueue.empty()) { mDoneQueue.pop_front(); }}//============================= OPERATIONS ===================================voidWarThreadPool::FlagAllThreads(bool doAbort, bool doExit){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::FlagAllThreads()"); thrd_debug << "Flagging all threads for:"; if (doAbort) thrd_debug << " abort"; if (doExit) thrd_debug << " exit"; thrd_debug << war_endl; AUTO_LOCK for (thread_list_t::const_iterator P = mThreads.begin() ; P != mThreads.end() ; P++) { if (doAbort) (*P)->SetAbortFlag(); if (doExit) (*P)->SetExitFlag(); mSignal.Signal(); // One signal for each thread }}void WarThreadPool::TerminateAllThreads(bool doAbort){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::TerminateAllThreads()"); thrd_debug << "Terminating all threads."; if (doAbort) thrd_debug << " Aborting all processing."; thrd_debug << war_endl; while (GetNumThreads()) { thrd_debug << ListCurrentThreads() << war_endl; FlagAllThreads(doAbort, true); WarTime::Sleep(500); } while(mCntThreadsExist > 0) { WarTime::Sleep(500); } thrd_debug << "All threads and tasks are stopped" << war_endl;}void WarThreadPool::StartNewThread()throw(WarException){ WarThreadForPool *pThread = NewThread(); pThread->Open();}void WarThreadPool::Create(war_uint32_t MinThreads, war_uint32_t MaxThreads) throw(WarException){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::Create()"); thrd_debug << "Starting threadpool with " << (int)MinThreads << "/" << (int)MaxThreads << " threads" << war_endl; mMinThreads = MinThreads; mMaxThreads = MaxThreads; if (MaxThreads < mMinThreads) WarThrow(WarError(WAR_ERR_INVALID_ARGUMENT), NULL); // Create the thread pool try { for (war_uint32_t i = 0; i < mMinThreads; i++) StartNewThread(); } catch (WarException& ex) { WarLogError("WarThreadPool::Create()", "Caught exception while creating pool", &ex); TerminateAllThreads(); throw(ex); } thrd_debug << ListCurrentThreads() << war_endl;}//============================= ACCESS ===================================//============================= INQUIRY ===================================size_t WarThreadPool::GetNumThreads(){ AUTO_LOCK return mThreads.size();}int WarThreadPool::GetNumIdleThreads(){ AUTO_LOCK int Cnt = 0; for (thread_list_t::const_iterator P = mThreads.begin() ; P != mThreads.end() ; P++) { if ((*P)->IsIdle()) ++Cnt; } return Cnt;}std::string WarThreadPool::ListCurrentTasks(war_ccstr_t Label){ AUTO_LOCK ostrstream return_val; return_val << "Listing current task-status for the thread-pool " << Label << "\n" << ListCurrentTasks(mPendingQueue, "Pending") << ListCurrentTasks(mProcessingQueue, "Current") << ListCurrentTasks(mDoneQueue, "Processed") << 0; string rbuf = return_val.str(); return_val.freeze(false); return rbuf;}std::string WarThreadPool::ListCurrentTasks(const task_queue_t & List, war_ccstr_t Label) const{ ostrstream return_val; return_val << "---- QUEUE " << Label << "\n"; for (task_queue_t::const_iterator P = List.begin() ; P != List.end() ; P++) { const WarThreadPoolTask& rtask = *(*P); char buf[128]; sprintf(buf, "%5d %-20s %-20s %-20s %7u", rtask.mTaskCookie, rtask.GetTypeName().c_str(), rtask.GetStateName(), rtask.GetPriorityName(), (unsigned)rtask.GetElapsedTimeMs()); return_val << buf; return_val << rtask.GetInfo().c_str() << endl; } return_val << '\0'; string rbuf = return_val.str(); return_val.freeze(false); return rbuf;}std::string WarThreadPool::ListCurrentThreads() { AUTO_LOCK ostrstream return_val; for(thread_list_t::iterator P = mThreads.begin() ; P != mThreads.end() ; P++) { return_val << (*P)->GetThreadInfo() << endl; } return_val << '\0'; string rbuf = return_val.str(); return_val.freeze(false); return rbuf;}/////////////////////////////// PROTECTED ///////////////////////////////////// Called to schedule a task for processingvoid WarThreadPool::PushTask(WarThreadPoolTask * pTask)throw(WarException){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::PushTask()"); thrd_debug << "New task being scheduled: " << pTask->GetTaskId() << " " << pTask->GetTypeName() << " " << pTask->GetInfo() << " " << pTask->GetPriorityName(); bool have_higherrtaskriority_tasks = false; bool do_start_new_thread = false; { AUTO_LOCK // Insert in the order they appear, sorted by priority task_queue_t::iterator P; for (P = mPendingQueue.begin() ; P != mPendingQueue.end() ; P++) { if (!have_higherrtaskriority_tasks && (pTask->mPriority < WarThreadPoolTask::PRI_VERY_LOW)) have_higherrtaskriority_tasks = true; if (pTask->mPriority < (*P)->mPriority) break; } mPendingQueue.insert(P, pTask); pTask->SetState(WarThreadPoolTask::STATE_QUEUED); } switch (pTask->mPriority) { case WarThreadPoolTask::PRI_HIGHEST: if (!GetNumIdleThreads()) do_start_new_thread = true; break; case WarThreadPoolTask::PRI_HIGH: case WarThreadPoolTask::PRI_NORMAL: if (!GetNumIdleThreads()) { AUTO_LOCK size_t NumThrds = mThreads.size(); if (NumThrds < mMaxThreads) do_start_new_thread = true; } break; case WarThreadPoolTask::PRI_LOW: case WarThreadPoolTask::PRI_VERY_LOW: { AUTO_LOCK size_t NumThrds = mThreads.size(); if (NumThrds == 0) do_start_new_thread = true; } break; } if (do_start_new_thread) { thrd_debug << " [on new thread]"; try { StartNewThread(); } catch(WarException& ex) { WarLogError("WarThreadPool::PushTask()", "Failed to create new thread. Continuing with the current threads.", &ex); } } thrd_debug << war_endl; mSignal.Signal();}// Called from the worker thread to query for a new taskWarThreadPoolTask *WarThreadPool::PopTask(){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::PopTask()"); WarThreadPoolTask *pTask = NULL; AUTO_LOCK if (!mPendingQueue.empty()) { // Take from pending queue WarPtrWrapper<WarThreadPoolTask> Task = mPendingQueue.front(); mProcessingQueue.push_front(Task); mPendingQueue.pop_front(); pTask = &(*Task); thrd_debug << "Task picked for processing: " << pTask->GetTaskId() << " " << pTask->GetTypeName() << war_endl; } return pTask;}WarPtrWrapper<WarThreadPoolTask> WarThreadPool::PopDoneTask(){ AUTO_LOCK WarPtrWrapper<WarThreadPoolTask> Task; if (!mDoneQueue.empty()) { // Take from pending queue Task = mDoneQueue.front(); mDoneQueue.pop_front(); return Task; } WarThrow(WarError(WAR_ERR_NO_OBJECT), NULL); return Task; // Compiler food}// Called from the worker thread to notify that a task is processedvoid WarThreadPool::TaskDone(WarPtrWrapper<WarThreadPoolTask>& rWrappedTask, bool isAborted){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::TaskDone()"); AUTO_LOCK WarThreadPoolTask *pTask = &(*rWrappedTask); thrd_debug << "Task ended processing: " << pTask->GetTaskId() << " " << pTask->GetTypeName() << " after " << (unsigned)pTask->GetElapsedTimeMs() << " milliseconds"; if (isAborted) thrd_debug << ". The task was aborted."; thrd_debug << war_endl; CheckIfExist(mProcessingQueue, pTask, true); // Remove from queue pTask->SetState( isAborted ? WarThreadPoolTask::STATE_ABORTED : WarThreadPoolTask::STATE_PROCESSED); if (!(isAborted ? pTask->OnAborted() : pTask->OnProcessed())) mDoneQueue.push_front(pTask);}// Return true if the task was removed, false if it was flagged for abortbool WarThreadPool::AbortTask(WarThreadPoolTask * pTask){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::AbortTask()"); bool return_val = true; thrd_debug << "Tagging task for abort: " << pTask->GetTaskId() << pTask->GetTypeName() << war_endl; AUTO_LOCK if (!CheckIfExist(mPendingQueue, pTask, true) && !CheckIfExist(mDoneQueue, pTask, true)) { if (CheckIfExist(mProcessingQueue, pTask)) { return_val = false; if (pTask->mpThread) pTask->mpThread->SetCancelFlag(); } } return return_val;}void WarThreadPool::KillAllTasks(){ WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::KillAllTasks()"); thrd_debug << "Killing all tasks" << war_endl; AUTO_LOCK // Delete the pending queue while(!mPendingQueue.empty()) mPendingQueue.pop_front(); // Delete the current queue for(task_queue_t::iterator P = mProcessingQueue.begin() ; P != mProcessingQueue.end() ; ++P) { if ((*P)->mpThread) (*P)->mpThread->SetCancelFlag(); } // Delete the done queue while(!mDoneQueue.empty()) mDoneQueue.pop_front(); thrd_debug << "All tasks are killed" << war_endl;}void WarThreadPool::RemoveFromDoneQueue(WarThreadPoolTask * pTask){ AUTO_LOCK CheckIfExist(mDoneQueue, pTask, true);}void WarThreadPool::Attach(WarThreadForPool *pThread){ AUTO_LOCK; mThreads.push_front(pThread); WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::Attach()"); if (thrd_debug) { thrd_debug << "Attaching thread " << (unsigned) pThread->GetThreadId() << " to the list of active threads. " << mThreads.size() << " threads are now available in this thread-pool." << war_endl; }}void WarThreadPool::Detach(WarThreadForPool *pThread){ AUTO_LOCK; const char *found = " The thread was not found!"; for (WarThreadPool::thread_list_t::iterator P = mThreads.begin() ; P != mThreads.end() ; P++) { if (*pThread == **P) { mThreads.erase(P); found = ""; break; } } WarLog thrd_debug(WARLOG_THREADS, "WarThreadPool::Detach()"); if (thrd_debug) { thrd_debug << "Detaching thread " << (unsigned) pThread->GetThreadId() << " from the list of active threads. " << found << mThreads.size() << " threads are now available in this thread-pool." << war_endl; }}WarThreadForPool *WarThreadPool::NewThread(){ return new WarThreadForPool(this);}/////////////////////////////// PRIVATE ///////////////////////////////////bool WarThreadPool::CheckIfExist(task_queue_t & List, WarThreadPoolTask *pTask, bool doRemove){ bool return_val = false; for (task_queue_t::iterator P = List.begin() ; P != List.end() ; P++) { if (&(**P) == pTask) { return_val = true; if (doRemove) List.erase(P); break; } } return return_val;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -