📄 dynamicconsumer.cpp
字号:
if (!isLoaded()) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: The consumer is not loaded and therefore cannot handle events."); return; } try { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent before " + _name); // Our event queue is first in first out. _eventqueue.insert_back(event); _check_queue->signal(); PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent after " + _name); } catch (Exception& ex) { //ATTN: Log missed indication PEGASUS_STD(cout) << "Error enqueueing event" << ex.getMessage() << "\n"; } catch (...) { //ATTN: Log missed indication PEGASUS_STD(cout) << "Unknown exception"; } PEG_METHOD_EXIT();}void DynamicConsumer::getIdleTimer(struct timeval *tv){ if (tv == 0) { return; } try { AutoMutex lock(_idleTimeMutex); memcpy(tv, &_idleTime, sizeof(struct timeval)); } catch (...) { Time::gettimeofday(tv); }}void DynamicConsumer::updateIdleTimer(){ try { AutoMutex lock(_idleTimeMutex); Time::gettimeofday(&_idleTime); } catch (...) { }}Uint32 DynamicConsumer::getPendingIndications(){ return _eventqueue.size();}String DynamicConsumer::toString(){ PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::toString"); String buffer = String::EMPTY; if (_initialized) { buffer.append("Consumer " + _name + " is initialized.\n"); buffer.append("Module name " + _module->getFileName() + "\n"); } PEG_METHOD_EXIT(); return buffer;}/** Returns true if the consumer has been inactive for longer than the idle period. */ Boolean DynamicConsumer::isIdle() { PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::isIdle"); if (!isLoaded()) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer is not loaded."); return false; } struct timeval now; Time::gettimeofday(&now); struct timeval timeout = {0,0}; getIdleTimer(&timeout); //if no consumer is currently being served and there's no consumer that has pending indications, we are idle if (!_current_operations.get() && !getPendingIndications()) { PEG_METHOD_EXIT(); return true; } PEG_METHOD_EXIT(); return false;}/** This method waits until the event thread is ready to accept incoming indications. Otherwise, there is a miniscule chance that * the first event will be enqueued before the consumer is waiting for it and the first indication after loading the consumer will be lost. */ void DynamicConsumer::waitForEventThread(){ _listeningSemaphore->wait();}/** This method is called when the consumer is initialized for the first time. * It reads the outstanding requests from the dat file and enqueues them. * * ATTN: This method will only get called when a consumer is initialized. Therefore, * when the listener starts, the outstanding indications for this consumer will not get sent * UNTIL a new indication comes in. This is not really an acceptable scenario. Maybe the consumer * manager needs to check the .dat files upon startup and load if they are not empty. * */ void DynamicConsumer::_loadOutstandingIndications(Array<IndicationDispatchEvent> indications){ PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_loadOutstandingIndications"); //create dispatch events from the instances IndicationDispatchEvent* event = 0; for (Uint32 i=0; i < indications.size(); i++) { event = new IndicationDispatchEvent(OperationContext(), //ATTN: Do we need to store this? indications[i].getURL(), indications[i].getIndicationInstance()); _eventqueue.insert_back(event); } //signal the worker thread so it falls into the queue processing code if (_eventqueue.size()) { _check_queue->signal(); } PEG_METHOD_EXIT();}/** This method serializes the remaining indications in the queue. It should be called when the * consumer is shutting down. Each time the consumer is loaded, these indications will be * reloaded into the queue. Therefore, the file should be overwritten each time to eliminate * duplicating outstanding indications. * * ATTN: Should we let another method delete the instances? */ Array<IndicationDispatchEvent> DynamicConsumer::_retrieveOutstandingIndications(){ PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_retrieveOutstandingIndications"); Array<IndicationDispatchEvent> indications; IndicationDispatchEvent* temp = 0; try { _eventqueue.try_lock(); temp = _eventqueue.front(); while (temp) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "retrieving"); indications.append(*temp); temp = _eventqueue.next_of(temp); } _eventqueue.unlock(); } catch (...) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Unknown Exception"); } PEG_METHOD_EXIT(); return indications;}////////////////////////////////// IndicationDispatchEvent////////////////////////////////IndicationDispatchEvent::IndicationDispatchEvent(){}IndicationDispatchEvent::IndicationDispatchEvent(OperationContext context, String url, CIMInstance instance) :_context(context),_url(url),_instance(instance),_retries(0),_lastAttemptTime(CIMDateTime()){}IndicationDispatchEvent::IndicationDispatchEvent( const IndicationDispatchEvent &event) : Linkable(event){ _context = event._context; _url = event._url; _instance = event._instance; _retries = event._retries.get(); _lastAttemptTime = event._lastAttemptTime;}IndicationDispatchEvent::~IndicationDispatchEvent(){}OperationContext IndicationDispatchEvent::getContext() const{ return _context;}String IndicationDispatchEvent::getURL() const{ return _url;}CIMInstance IndicationDispatchEvent::getIndicationInstance() const{ return _instance;}Uint32 IndicationDispatchEvent::getRetries(){ return _retries.get();}void IndicationDispatchEvent::increaseRetries(){ PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Increasing retries\n"); _retries++; _lastAttemptTime = CIMDateTime::getCurrentDateTime(); PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time " + _lastAttemptTime.toString());}CIMDateTime IndicationDispatchEvent::getLastAttemptTime(){ return _lastAttemptTime;}IndicationDispatchEvent& IndicationDispatchEvent::operator=(const IndicationDispatchEvent &event){ _context = event._context; _url = event._url; _instance = event._instance; _retries = event._retries.get(); _lastAttemptTime = event._lastAttemptTime; return *this;}Boolean IndicationDispatchEvent::operator==(const IndicationDispatchEvent& event) const{ if (String::equal(this->_url, event._url) && (this->_instance.identical(event._instance))) { return true; } return false;}PEGASUS_NAMESPACE_END
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -