⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.cpp

📁 dcom机制在vxworks上的实现源代码
💻 CPP
字号:
/* ThreadPool *//* Copyright (c) 1999 Wind River Systems, Inc. *//*modification history--------------------01m,22apr02,nel  SPR#76056. Correct test on task creation to test for -1                 rather than < 0.01l,17dec01,nel  Add include symbol for diab.01k,03aug01,dbs  remove usage of Thread class01j,13jul01,dbs  fix up includes01i,24feb00,dbs  fix thread-killing in dtor01h,21sep99,aim  changed API for activate01g,20sep99,aim  added Thread name parameter01f,19aug99,aim  dtor now tries to clean up all threads01e,19aug99,aim  change assert to VXDCOM_ASSERT01d,13aug99,aim  added default threadPriority01c,12aug99,aim  added queue length ctor parameter01b,11aug99,aim  now blocks when all threads are busy01a,29jul99,aim  created*/#include "ThreadPool.h"#include "EventHandler.h"#include "Syslog.h"#include "TraceCall.h"#include "private/comMisc.h"#include "taskLib.h"/* Include symbol for diab */extern "C" int include_vxdcom_ThreadPool (void)    {    return 0;    }ThreadPool::ThreadPool    (    size_t	thrStackSize,    long	thrPriority    )  : m_minThreads (0),    m_maxThreads (0),    m_threadCount (0),    m_threadCountLock (),    m_thrStackSize (thrStackSize),    m_thrPriority (thrPriority),    m_thrScavenger (0),    m_thrName (0)    {    TRACE_CALL;    }ThreadPool::~ThreadPool ()    {    TRACE_CALL;    DELZERO (m_thrScavenger);    threadNameDelete ();    }int ThreadPool::open    (    int		maxThreads,    const char*	threadName    )    {    TRACE_CALL;    threadNameSet (threadName);    for (int i = 0; i < maxThreads; ++i)	{	    if (threadAdd () < 0)		return -1;	    else {	        ++m_minThreads;		++m_maxThreads;	    }	}    return 0;    }int ThreadPool::open    (    Reactor*	reactor,    int		minThreads,    int		maxThreads,    const char*	threadName    )    {    TRACE_CALL;    if ((m_thrScavenger = createScavenger (reactor)) == 0)	return -1;    threadNameSet (threadName);    for (int i = 0; i < minThreads; ++i)	if (threadAdd () < 0) 	    {	    m_minThreads = 0;	    m_maxThreads = 0;	    return -1;	    }    m_minThreads = minThreads;    m_maxThreads = maxThreads;    return 0;    }int ThreadPool::close ()    {    TRACE_CALL;    DELZERO (m_thrScavenger);    removeAll ();    // Post a NULL event to all possible threads...    for (int i=0; i < threadCount (); ++i)	threadRemove ();        // ...and wait for threads to terminate...    cout << "waiting for threads..." << endl;    while (threadCount () > 0)        {        cout << "...sleeping..." << endl;        ::taskDelay (1);        }    return 0;    }int ThreadPool::queueFullHandler ()    {    // If we have no scavenger we cannot dynamically add threads.    if (m_thrScavenger == 0)	return 0;    if (threadCount () < maxThreads ())	return threadAdd ();    else	return 0;    }void* ThreadPool::serviceHandler ()    {    TRACE_CALL;    EventHandler* pEventHandler;    while (1)	{        ::taskPrioritySet (::taskIdSelf (), m_thrPriority);		pEventHandler = 0;	// remove() will block until a job is inserted into the Q.	if (remove (pEventHandler) < 0)	    break;	if (pEventHandler == 0)	    break;	REACTOR_HANDLE handle = pEventHandler->handleGet ();	if (pEventHandler->handleInput (handle) < 0)	    pEventHandler->handleClose (handle);	}    VxCritSec cs (m_threadCountLock);    queueSizeSet (--m_threadCount);        return 0;    }int ThreadPool::minThreads () const    {    TRACE_CALL;    return m_minThreads;    }int ThreadPool::maxThreads () const    {    TRACE_CALL;    return m_maxThreads;    }int ThreadPool::threadCount () const    {    TRACE_CALL;    return m_threadCount;    }int ThreadPool::enqueue (EventHandler* pEventHandler)    {    TRACE_CALL;    return add (pEventHandler);    }int ThreadPool::threadAdd ()    {    TRACE_CALL;    VxCritSec cs (m_threadCountLock);    // Create a new (named) thread.        int result = activate (m_thrName, m_thrPriority, m_thrStackSize);        if (result != -1)	{	queueSizeSet (++m_threadCount);	return 0;	}        return -1;    }int ThreadPool::threadRemove ()    {    TRACE_CALL;    return enqueue (0);    }int ThreadPool::threadReaper ()    {    TRACE_CALL;    VxCritSec cs (m_threadCountLock);    int unusedThreads = threadCount() - queueSize ();    if ((threadCount () - unusedThreads) < minThreads ())	unusedThreads -= minThreads ();    if (queueIsFull () || unusedThreads <= 0)	return 0;        while (unusedThreads-- > 0)	threadRemove ();    return 0;    }ThreadPool::Scavenger* ThreadPool::createScavenger    (    Reactor*	reactor,    int		scavengerTimeout    )    {    if (reactor == 0)	return 0;        ThreadPool::Scavenger* s = new ThreadPool::Scavenger (reactor, this);    if (s)	reactor->timerAdd (s, scavengerTimeout);    else	DELZERO (s);    return s;    }    ThreadPool::Scavenger::Scavenger    (    Reactor*	reactor,    ThreadPool*	threadPool    )  : m_threadPool (threadPool)    {    TRACE_CALL;    }ThreadPool::Scavenger::~Scavenger ()    {    TRACE_CALL;    Reactor* reactor = reactorGet ();    if (reactor)	reactor->timerRemove (this);    }int ThreadPool::Scavenger::handleTimeout (const TimeValue&)    {    TRACE_CALL;    COM_ASSERT (m_threadPool);    return m_threadPool->threadReaper ();    }void ThreadPool::threadNameDelete ()    {    delete [] m_thrName;    m_thrName = 0;    }void ThreadPool::threadNameSet (const char* threadName)    {    threadNameDelete ();    xstrdup (m_thrName, threadName);    }const char* ThreadPool::xstrdup (char*& dst, const char* src)    {    if (src == 0)	return 0;    dst = new char [::strlen (src) +1 ];    return dst ? ::strcpy (dst, src) : 0;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -