📄 threadpool.cpp
字号:
{ PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, "Caught exception: \"" + e.getMessage() + "\". Exiting _loop."); } catch (...) { PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, "Caught unrecognized exception. Exiting _loop."); } PEG_METHOD_EXIT(); return (ThreadReturnType) 0;}ThreadStatus ThreadPool::allocate_and_awaken( void* parm, ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*), Semaphore* blocking){ PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken"); // Allocate_and_awaken will not run if the _dying flag is set. // Once the lock is acquired, ~ThreadPool will not change // the value of _dying until the lock is released. try { if (_dying.get()) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::allocate_and_awaken: ThreadPool is dying(1)."); return PEGASUS_THREAD_UNAVAILABLE; } struct timeval start; Time::gettimeofday(&start); Thread *th = 0; th = _idleThreads.remove_front(); if (th == 0) { if ((_maxThreads == 0) || (_currentThreads.get() < Uint32(_maxThreads))) { th = _initializeThread(); } } if (th == 0) { Tracer::trace(TRC_THREAD, Tracer::LEVEL2, "ThreadPool::allocate_and_awaken: Insufficient resources: " " pool = %s, running threads = %d, idle threads = %d", _key, _runningThreads.size(), _idleThreads.size()); return PEGASUS_THREAD_INSUFFICIENT_RESOURCES; } // initialize the thread data with the work function and parameters Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Initializing thread with work function and parameters: parm = %p", parm); th->delete_tsd("work func"); th->put_tsd("work func", NULL, sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)), (void *) work); th->delete_tsd("work parm"); th->put_tsd("work parm", NULL, sizeof (void *), parm); th->delete_tsd("blocking sem"); if (blocking != 0) th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking); // put the thread on the running list _runningThreads.insert_front(th); // signal the thread's sleep semaphore to awaken it Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem"); PEGASUS_ASSERT(sleep_sem != 0); Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken"); sleep_sem->signal(); th->dereference_tsd(); } catch (...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::allocate_and_awaken: Operation Failed."); PEG_METHOD_EXIT(); // ATTN: Error result has not yet been defined return PEGASUS_THREAD_SETUP_FAILURE; } PEG_METHOD_EXIT(); return PEGASUS_THREAD_OK;}// caller is responsible for only calling this routine during slack periods// but should call it at least once per _deallocateWait interval.Uint32 ThreadPool::cleanupIdleThreads(){ PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads"); Uint32 numThreadsCleanedUp = 0; Uint32 numIdleThreads = _idleThreads.size(); for (Uint32 i = 0; i < numIdleThreads; i++) { // Do not dip below the minimum thread count if (_currentThreads.get() <= (Uint32) _minThreads) { break; } Thread *thread = _idleThreads.remove_back(); // If there are no more threads in the _idleThreads queue, we're // done. if (thread == 0) { break; } struct timeval *lastActivityTime; try { lastActivityTime = (struct timeval *) thread-> try_reference_tsd("last activity time"); PEGASUS_ASSERT(lastActivityTime != 0); } catch (...) { PEGASUS_ASSERT(false); _idleThreads.insert_back(thread); break; } Boolean cleanupThisThread = _timeIntervalExpired(lastActivityTime, &_deallocateWait); thread->dereference_tsd(); if (cleanupThisThread) { _cleanupThread(thread); _currentThreads--; numThreadsCleanedUp++; } else { _idleThreads.insert_front(thread); } } PEG_METHOD_EXIT(); return numThreadsCleanedUp;}void ThreadPool::_cleanupThread(Thread * thread){ PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread"); // Set the "work func" and "work parm" to 0 so _loop() knows to exit. thread->delete_tsd("work func"); thread->put_tsd("work func", 0, sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *)), (void *) 0); thread->delete_tsd("work parm"); thread->put_tsd("work parm", 0, sizeof (void *), 0); // signal the thread's sleep semaphore to awaken it Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem"); PEGASUS_ASSERT(sleep_sem != 0); sleep_sem->signal(); thread->dereference_tsd(); thread->join(); delete thread; PEG_METHOD_EXIT();}Boolean ThreadPool::_timeIntervalExpired( struct timeval* start, struct timeval* interval){ // never time out if the interval is zero if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0)) { return false; } struct timeval now, finish, remaining; Uint32 usec; Time::gettimeofday(&now); Time::gettimeofday(&remaining); // Avoid valgrind error finish.tv_sec = start->tv_sec + interval->tv_sec; usec = start->tv_usec + interval->tv_usec; finish.tv_sec += (usec / 1000000); usec %= 1000000; finish.tv_usec = usec; return (Time::subtract(&remaining, &finish, &now) != 0);}void ThreadPool::_deleteSemaphore(void *p){ delete(Semaphore *) p;}Thread *ThreadPool::_initializeThread(){ PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread"); Thread *th = (Thread *) new Thread(_loop, this, false); // allocate a sleep semaphore and pass it in the thread context // initial count is zero, loop function will sleep until // we signal the semaphore Semaphore *sleep_sem = (Semaphore *) new Semaphore(0); th->put_tsd( "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem); struct timeval* lastActivityTime = (struct timeval *)::operator new(sizeof (struct timeval)); Time::gettimeofday(lastActivityTime); th->put_tsd( "last activity time", thread_data::default_delete, sizeof(struct timeval), (void*) lastActivityTime); // thread will enter _loop() and sleep on sleep_sem until we signal it if (th->run() != PEGASUS_THREAD_OK) { Tracer::trace(TRC_THREAD, Tracer::LEVEL2, "Could not create thread. Error code is %d.", errno); delete th; return 0; } _currentThreads++; Threads::yield(); PEG_METHOD_EXIT(); return th;}void ThreadPool::_addToIdleThreadsQueue(Thread * th){ if (th == 0) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null."); throw NullPointer(); } try { _idleThreads.insert_front(th); } catch (...) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front " "failed."); }}PEGASUS_NAMESPACE_END
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -