📄 consumermanager.cpp
字号:
Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE, "Not enough threads for consumer."); Tracer::trace(TRC_LISTENER, Tracer::LEVEL2, "Could not allocate thread for consumer."); consumer->setShutdownSemaphore(0); delete semaphore; throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD", "Not enough threads for consumer worker routine.")); } //wait until the listening thread has started. Otherwise, there is a miniscule chance that the first event will be enqueued //before the consumer is waiting for it and the first indication after loading the consumer will be lost consumer->waitForEventThread(); //load any outstanding requests Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName); if (outstandingIndications.size()) { //the consumer will signal itself in _loadOustandingIndications consumer->_loadOutstandingIndications(outstandingIndications); } PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName); } catch (...) { module->unloadModule(); consumer->reset(); throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER", "Cannot initialize consumer ($0).", consumerName)); } PEG_METHOD_EXIT(); }/** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it * DNE, we create it and add it to the table. * @throws Exception if we cannot successfully create and initialize the consumer */ ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) { PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule"); AutoMutex lock(_moduleTableMutex); ConsumerModule* module = 0; //see if consumer module is cached if (_modules.lookup(libraryName, module)) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found Consumer Module" + libraryName + " in Consumer Manager Cache"); } else { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Creating Consumer Provider Module " + libraryName); module = new ConsumerModule(); _modules.insert(libraryName, module); } PEG_METHOD_EXIT(); return(module);}/** Returns true if there are active consumers */ Boolean ConsumerManager::hasActiveConsumers(){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers"); AutoMutex lock(_consumerTableMutex); DynamicConsumer* consumer = 0; try { for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++) { consumer = i.value(); if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0)) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name); PEG_METHOD_EXIT(); return true; } } } catch (...) { // Unexpected exception; do not assume that no providers are loaded PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers."); PEG_METHOD_EXIT(); return true; } PEG_METHOD_EXIT(); return false;}/** Returns true if there are loaded consumers */ Boolean ConsumerManager::hasLoadedConsumers(){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers"); AutoMutex lock(_consumerTableMutex); DynamicConsumer* consumer = 0; try { for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++) { consumer = i.value(); if (consumer && consumer->isLoaded()) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name); PEG_METHOD_EXIT(); return true; } } } catch (...) { // Unexpected exception; do not assume that no providers are loaded PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers."); PEG_METHOD_EXIT(); return true; } PEG_METHOD_EXIT(); return false;}/** Shutting down a consumer consists of four major steps: * 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit. * 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This * is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener * will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows * the current consumer indication to finish. All other queued indications are serialized to a log and * are sent when the consumer is reoaded. * 3) Terminate the consumer provider interface. * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0) * * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all * of the consumers so the threads can finish simultaneously. * * ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications * to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL * queued indications on shutdown. */ /** Unloads all consumers. */ void ConsumerManager::unloadAllConsumers(){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers"); AutoMutex lock(_consumerTableMutex); if (!_consumers.size()) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); PEG_METHOD_EXIT(); return; } if (!_forceShutdown) { //wait until all the consumers have finished processing the events in their queue //ATTN: Should this have a timeout even though it's a force?? while (hasActiveConsumers()) { Threads::sleep(500); } } Array<DynamicConsumer*> loadedConsumers; ConsumerTable::Iterator i = _consumers.start(); DynamicConsumer* consumer = 0; for (; i!=0; i++) { consumer = i.value(); if (consumer && consumer->isLoaded()) { loadedConsumers.append(consumer); } } if (loadedConsumers.size()) { try { _unloadConsumers(loadedConsumers); } catch (Exception&) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers."); } } else { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); } PEG_METHOD_EXIT();}/** Unloads idle consumers. */ void ConsumerManager::unloadIdleConsumers(){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers"); AutoMutex lock(_consumerTableMutex); if (!_consumers.size()) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); PEG_METHOD_EXIT(); return; } Array<DynamicConsumer*> loadedConsumers; ConsumerTable::Iterator i = _consumers.start(); DynamicConsumer* consumer = 0; for (; i!=0; i++) { consumer = i.value(); if (consumer && consumer->isLoaded() && consumer->isIdle()) { loadedConsumers.append(consumer); } } if (loadedConsumers.size()) { try { _unloadConsumers(loadedConsumers); } catch (Exception&) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers."); } } else { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); } PEG_METHOD_EXIT();}/** Unloads a single consumer. */ void ConsumerManager::unloadConsumer(const String& consumerName){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer"); AutoMutex lock(_consumerTableMutex); DynamicConsumer* consumer = 0; //check whether the consumer exists if (!_consumers.lookup(consumerName, consumer)) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName); return; } //check whether the consumer is loaded if (consumer && consumer->isLoaded()) //ATTN: forceShutdown? { //unload the consumer Array<DynamicConsumer*> loadedConsumers; loadedConsumers.append(consumer); try { _unloadConsumers(loadedConsumers); } catch (Exception&) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers."); } } else { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName); } PEG_METHOD_EXIT();}/** Unloads the consumers in the given array. * The consumerTable mutex MUST be locked prior to entering this method. */ void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers"); //tell consumers to shutdown for (Uint32 i = 0; i < consumersToUnload.size(); i++) { consumersToUnload[i]->sendShutdownSignal(); } PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers."); //wait for all the consumer worker threads to complete //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop //processing its requests. for (Uint32 i = 0; i < consumersToUnload.size(); i++) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName()); //wait for the consumer worker thread to end try { Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore(); if (_shutdownSemaphore) { _shutdownSemaphore->time_wait(10000); } } catch (TimeOut &) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread."); } PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer."); try { //terminate consumer provider interface consumersToUnload[i]->terminate(); //unload consumer provider module PEGASUS_ASSERT(consumersToUnload[i]->_module != 0); consumersToUnload[i]->_module->unloadModule(); //serialize outstanding indications
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -