⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumermanager.cpp

📁 Pegasus is an open-source implementationof the DMTF CIM and WBEM standards. It is designed to be por
💻 CPP
📖 第 1 页 / 共 3 页
字号:
        Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,        "Not enough threads for consumer.");         Tracer::trace(TRC_LISTENER, Tracer::LEVEL2,        "Could not allocate thread for consumer.");       consumer->setShutdownSemaphore(0);       delete semaphore;           throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",                        "Not enough threads for consumer worker routine."));        }        //wait until the listening thread has started.  Otherwise, there is a miniscule chance that the first event will be enqueued        //before the consumer is waiting for it and the first indication after loading the consumer will be lost        consumer->waitForEventThread();        //load any outstanding requests        Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);        if (outstandingIndications.size())        {            //the consumer will signal itself in _loadOustandingIndications            consumer->_loadOutstandingIndications(outstandingIndications);        }        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);    } catch (...)    {        module->unloadModule();        consumer->reset();        throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",                                           "Cannot initialize consumer ($0).",                                           consumerName));            }    PEG_METHOD_EXIT();    }/** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it *  DNE, we create it and add it to the table. * @throws Exception if we cannot successfully create and initialize the consumer */ ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) {    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");    AutoMutex lock(_moduleTableMutex);    ConsumerModule* module = 0;    //see if consumer module is cached    if (_modules.lookup(libraryName, module))    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,                         "Found Consumer Module" + libraryName + " in Consumer Manager Cache");    } else    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,                         "Creating Consumer Provider Module " + libraryName);        module = new ConsumerModule();         _modules.insert(libraryName, module);    }    PEG_METHOD_EXIT();    return(module);}/** Returns true if there are active consumers */ Boolean ConsumerManager::hasActiveConsumers(){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");    AutoMutex lock(_consumerTableMutex);    DynamicConsumer* consumer = 0;    try    {        for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)        {            consumer = i.value();            if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))            {                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);                PEG_METHOD_EXIT();                return true;            }        }    } catch (...)    {        // Unexpected exception; do not assume that no providers are loaded        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");        PEG_METHOD_EXIT();        return true;    }    PEG_METHOD_EXIT();    return false;}/** Returns true if there are loaded consumers */ Boolean ConsumerManager::hasLoadedConsumers(){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");    AutoMutex lock(_consumerTableMutex);    DynamicConsumer* consumer = 0;    try    {        for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)        {            consumer = i.value();            if (consumer && consumer->isLoaded())            {                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);                PEG_METHOD_EXIT();                return true;            }        }    } catch (...)    {        // Unexpected exception; do not assume that no providers are loaded        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");        PEG_METHOD_EXIT();        return true;    }    PEG_METHOD_EXIT();    return false;}/** Shutting down a consumer consists of four major steps: * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit. * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows *    the current consumer indication to finish.  All other queued indications are serialized to a log and  *    are sent when the consumer is reoaded. * 3) Terminate the consumer provider interface. * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0) *  * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all * of the consumers so the threads can finish simultaneously. *  * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing  * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL * queued indications on shutdown.   */ /** Unloads all consumers. */ void ConsumerManager::unloadAllConsumers(){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");    AutoMutex lock(_consumerTableMutex);    if (!_consumers.size())    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");        PEG_METHOD_EXIT();        return;    }    if (!_forceShutdown)    {        //wait until all the consumers have finished processing the events in their queue        //ATTN: Should this have a timeout even though it's a force??        while (hasActiveConsumers())        {            Threads::sleep(500);        }    }    Array<DynamicConsumer*> loadedConsumers;    ConsumerTable::Iterator i = _consumers.start();    DynamicConsumer* consumer = 0;    for (; i!=0; i++)    {        consumer = i.value();        if (consumer && consumer->isLoaded())        {            loadedConsumers.append(consumer);        }    }    if (loadedConsumers.size())    {        try        {            _unloadConsumers(loadedConsumers);        } catch (Exception&)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");        }    } else    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");    }    PEG_METHOD_EXIT();}/** Unloads idle consumers. */ void ConsumerManager::unloadIdleConsumers(){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");    AutoMutex lock(_consumerTableMutex);    if (!_consumers.size())    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");        PEG_METHOD_EXIT();        return;    }    Array<DynamicConsumer*> loadedConsumers;    ConsumerTable::Iterator i = _consumers.start();    DynamicConsumer* consumer = 0;    for (; i!=0; i++)    {        consumer = i.value();        if (consumer && consumer->isLoaded() && consumer->isIdle())        {            loadedConsumers.append(consumer);        }    }    if (loadedConsumers.size())    {        try        {            _unloadConsumers(loadedConsumers);        } catch (Exception&)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");        }    } else    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");    }    PEG_METHOD_EXIT();}/** Unloads a single consumer. */ void ConsumerManager::unloadConsumer(const String& consumerName){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");    AutoMutex lock(_consumerTableMutex);    DynamicConsumer* consumer = 0;    //check whether the consumer exists    if (!_consumers.lookup(consumerName, consumer))    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);        return;    }    //check whether the consumer is loaded    if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?    {        //unload the consumer        Array<DynamicConsumer*> loadedConsumers;        loadedConsumers.append(consumer);        try        {            _unloadConsumers(loadedConsumers);        } catch (Exception&)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");        }    } else    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);    }    PEG_METHOD_EXIT();}/** Unloads the consumers in the given array. *  The consumerTable mutex MUST be locked prior to entering this method. */ void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");    //tell consumers to shutdown    for (Uint32 i = 0; i < consumersToUnload.size(); i++)    {        consumersToUnload[i]->sendShutdownSignal();    }    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");    //wait for all the consumer worker threads to complete    //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last    //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop    //processing its requests.    for (Uint32 i = 0; i < consumersToUnload.size(); i++)    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());        //wait for the consumer worker thread to end        try        {            Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();            if (_shutdownSemaphore)            {                _shutdownSemaphore->time_wait(10000);             }        } catch (TimeOut &)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");        }        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");        try        {            //terminate consumer provider interface            consumersToUnload[i]->terminate();            //unload consumer provider module            PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);            consumersToUnload[i]->_module->unloadModule();            //serialize outstanding indications

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -