⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.cpp

📁 Pegasus is an open-source implementationof the DMTF CIM and WBEM standards. It is designed to be por
💻 CPP
📖 第 1 页 / 共 2 页
字号:
    {        PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,            "Caught exception: \"" + e.getMessage() + "\".  Exiting _loop.");    }    catch (...)    {        PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,            "Caught unrecognized exception.  Exiting _loop.");    }    PEG_METHOD_EXIT();    return (ThreadReturnType) 0;}ThreadStatus ThreadPool::allocate_and_awaken(    void* parm,    ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),    Semaphore* blocking){    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");    // Allocate_and_awaken will not run if the _dying flag is set.    // Once the lock is acquired, ~ThreadPool will not change    // the value of _dying until the lock is released.    try    {        if (_dying.get())        {            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");            return PEGASUS_THREAD_UNAVAILABLE;        }        struct timeval start;        Time::gettimeofday(&start);        Thread *th = 0;        th = _idleThreads.remove_front();        if (th == 0)        {            if ((_maxThreads == 0) ||                (_currentThreads.get() < Uint32(_maxThreads)))            {                th = _initializeThread();            }        }        if (th == 0)        {            Tracer::trace(TRC_THREAD, Tracer::LEVEL2,                "ThreadPool::allocate_and_awaken: Insufficient resources: "                    " pool = %s, running threads = %d, idle threads = %d",                _key, _runningThreads.size(), _idleThreads.size());            return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;        }        // initialize the thread data with the work function and parameters        Tracer::trace(TRC_THREAD, Tracer::LEVEL4,            "Initializing thread with work function and parameters: parm = %p",            parm);        th->delete_tsd("work func");        th->put_tsd("work func", NULL,                    sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)                            (void *)), (void *) work);        th->delete_tsd("work parm");        th->put_tsd("work parm", NULL, sizeof (void *), parm);        th->delete_tsd("blocking sem");        if (blocking != 0)            th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);        // put the thread on the running list        _runningThreads.insert_front(th);        // signal the thread's sleep semaphore to awaken it        Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");        PEGASUS_ASSERT(sleep_sem != 0);        Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");        sleep_sem->signal();        th->dereference_tsd();    }    catch (...)    {        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                      "ThreadPool::allocate_and_awaken: Operation Failed.");        PEG_METHOD_EXIT();        // ATTN: Error result has not yet been defined        return PEGASUS_THREAD_SETUP_FAILURE;    }    PEG_METHOD_EXIT();    return PEGASUS_THREAD_OK;}// caller is responsible for only calling this routine during slack periods// but should call it at least once per _deallocateWait interval.Uint32 ThreadPool::cleanupIdleThreads(){    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");    Uint32 numThreadsCleanedUp = 0;    Uint32 numIdleThreads = _idleThreads.size();    for (Uint32 i = 0; i < numIdleThreads; i++)    {        // Do not dip below the minimum thread count        if (_currentThreads.get() <= (Uint32) _minThreads)        {            break;        }        Thread *thread = _idleThreads.remove_back();        // If there are no more threads in the _idleThreads queue, we're        // done.        if (thread == 0)        {            break;        }        struct timeval *lastActivityTime;        try        {            lastActivityTime =                (struct timeval *) thread->                try_reference_tsd("last activity time");            PEGASUS_ASSERT(lastActivityTime != 0);        }        catch (...)        {            PEGASUS_ASSERT(false);            _idleThreads.insert_back(thread);            break;        }        Boolean cleanupThisThread =            _timeIntervalExpired(lastActivityTime, &_deallocateWait);        thread->dereference_tsd();        if (cleanupThisThread)        {            _cleanupThread(thread);            _currentThreads--;            numThreadsCleanedUp++;        }        else        {            _idleThreads.insert_front(thread);        }    }    PEG_METHOD_EXIT();    return numThreadsCleanedUp;}void ThreadPool::_cleanupThread(Thread * thread){    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");    // Set the "work func" and "work parm" to 0 so _loop() knows to exit.    thread->delete_tsd("work func");    thread->put_tsd("work func", 0,                    sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)                            (void *)), (void *) 0);    thread->delete_tsd("work parm");    thread->put_tsd("work parm", 0, sizeof (void *), 0);    // signal the thread's sleep semaphore to awaken it    Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");    PEGASUS_ASSERT(sleep_sem != 0);    sleep_sem->signal();    thread->dereference_tsd();    thread->join();    delete thread;    PEG_METHOD_EXIT();}Boolean ThreadPool::_timeIntervalExpired(    struct timeval* start,    struct timeval* interval){    // never time out if the interval is zero    if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))    {        return false;    }    struct timeval now, finish, remaining;    Uint32 usec;    Time::gettimeofday(&now);    Time::gettimeofday(&remaining);     // Avoid valgrind error    finish.tv_sec = start->tv_sec + interval->tv_sec;    usec = start->tv_usec + interval->tv_usec;    finish.tv_sec += (usec / 1000000);    usec %= 1000000;    finish.tv_usec = usec;    return (Time::subtract(&remaining, &finish, &now) != 0);}void ThreadPool::_deleteSemaphore(void *p){    delete(Semaphore *) p;}Thread *ThreadPool::_initializeThread(){    PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");    Thread *th = (Thread *) new Thread(_loop, this, false);    // allocate a sleep semaphore and pass it in the thread context    // initial count is zero, loop function will sleep until    // we signal the semaphore    Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);    th->put_tsd(        "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);    struct timeval* lastActivityTime =        (struct timeval *)::operator  new(sizeof (struct timeval));    Time::gettimeofday(lastActivityTime);    th->put_tsd(        "last activity time",        thread_data::default_delete,        sizeof(struct timeval),        (void*) lastActivityTime);    // thread will enter _loop() and sleep on sleep_sem until we signal it    if (th->run() != PEGASUS_THREAD_OK)    {        Tracer::trace(TRC_THREAD, Tracer::LEVEL2,            "Could not create thread. Error code is %d.", errno);        delete th;        return 0;    }    _currentThreads++;    Threads::yield();    PEG_METHOD_EXIT();    return th;}void ThreadPool::_addToIdleThreadsQueue(Thread * th){    if (th == 0)    {        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,            "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");        throw NullPointer();    }    try    {        _idleThreads.insert_front(th);    }    catch (...)    {        Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,            "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "                "failed.");    }}PEGASUS_NAMESPACE_END

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -