📄 consumermanager.cpp
字号:
_serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications()); //reset the consumer consumersToUnload[i]->reset(); PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded."); } catch (Exception& e) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); //ATTN: throw exception? log warning? } } PEG_METHOD_EXIT();}/** Serializes oustanding indications to a <MyConsumer>.dat file */void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications"); if (!indications.size()) { PEG_METHOD_EXIT(); return; } String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat")); PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName); Buffer buffer; // Open the log file and serialize remaining FILE* fileHandle = 0; fileHandle = fopen((const char*)fileName.getCString(), "w"); if (!fileHandle) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName); } else { Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3, "Serializing %d outstanding requests for %s", indications.size(), (const char*)consumerName.getCString()); //we have to put the array of instances under a valid root element or the parser complains XmlWriter::append(buffer, "<IRETURNVALUE>\n"); CIMInstance cimInstance; for (Uint32 i = 0; i < indications.size(); i++) { //set the URL string property on the serializable instance CIMValue cimValue(CIMTYPE_STRING, false); cimValue.set(indications[i].getURL()); cimInstance = indications[i].getIndicationInstance(); CIMProperty cimProperty(URL_PROPERTY, cimValue); cimInstance.addProperty(cimProperty); XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance); } XmlWriter::append(buffer, "</IRETURNVALUE>\0"); fputs((const char*)buffer.getData(), fileHandle); fclose(fileHandle); } PEG_METHOD_EXIT();}/** Reads outstanding indications from a <MyConsumer>.dat file */ Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications"); String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat")); PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName); Array<CIMInstance> cimInstances; Array<String> urlStrings; Array<IndicationDispatchEvent> indications; // Open the log file and serialize remaining indications if (FileSystem::exists(fileName) && FileSystem::canRead(fileName)) { Buffer text; CIMInstance cimInstance; CIMProperty cimProperty; CIMValue cimValue; String urlString; XmlEntry entry; try { FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs? text.append('\0'); //parse the file XmlParser parser((char*)text.getData()); XmlReader::expectStartTag(parser, entry, "IRETURNVALUE"); while (XmlReader::getNamedInstanceElement(parser, cimInstance)) { Uint32 index = cimInstance.findProperty(URL_PROPERTY); if (index != PEG_NOT_FOUND) { //get the URL string property from the serialized instance and remove the property cimProperty = cimInstance.getProperty(index); cimValue = cimProperty.getValue(); cimValue.get(urlString); cimInstance.removeProperty(index); } IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance); indications.append(*indicationEvent); } XmlReader::expectEndTag(parser, "IRETURNVALUE"); Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3, "Consumer %s has %d outstanding indications", (const char*)consumerName.getCString(), indications.size()); //delete the file FileSystem::removeFile(fileName); } catch (Exception& ex) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName); } catch (...) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName); } } PEG_METHOD_EXIT(); return indications;}/** * This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive * operations at once. This also prevents one bad consumer from taking the entire listener down. That being said, * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. * * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to * complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off * new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer * can attain extremely high performance. * * There are three different events that can signal us: * 1) A new indication (signalled by DynamicListenerIndicationDispatcher) * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state) * 3) A retry signal (signalled from this routine itself) * * The idea is that all new indications are put on the front of the queue and processed first. All of the retry * indications are put on the back of the queue and are only processed AFTER all new indications are sent. * Before processing each indication, we check to see whether or not the shutdown signal was given. If so, * we immediately break out of the loop, and another compenent serializes the remaining indications to a file. * * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception. * * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario: * 20 new indications come in, 10 of them are successful, 10 are not. * We were signalled 20 times, so we will pass the time_wait 20 times. Perceivably, the process time on each indication * could be minimal. We could potentially proceed to process the retries after a very small time interval since * we would never hit the wait for the retry timeout. * */ ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param){ PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine"); DynamicConsumer* myself = static_cast<DynamicConsumer*>(param); String name = myself->getName(); List<IndicationDispatchEvent,Mutex> tmpEventQueue; PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name); myself->_listeningSemaphore->signal(); while (true) { try { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name); //wait to be signalled myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE); PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name); //check whether we received the shutdown signal if (myself->_dieNow) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name); break; } //create a temporary queue to store failed indications tmpEventQueue.clear(); //continue processing events until the queue is empty //make sure to check for the shutdown signal before every iteration // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process. // Because we are popping off the front and new events are being thrown on the back if events are failing when we start // But are succeeding by the end of the processing, events may be sent out of chronological order. // However. Once we complete the current queue of events, we will always send old events to be retried before sending any // new events added afterwards. while (myself->_eventqueue.size()) { //check for shutdown signal //this only breaks us out of the queue loop, but we will immediately get through the next wait from //the shutdown signal itself, at which time we break out of the main loop if (myself->_dieNow) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name); break; } //pop next indication off the queue IndicationDispatchEvent* event = 0; event = myself->_eventqueue.remove_front(); //what exceptions/errors can this throw? if (!event) { //this should never happen continue; } PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name); try { myself->consumeIndication(event->getContext(), event->getURL(), event->getIndicationInstance()); PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name); delete event; continue; } catch (CIMException & ce) { //check for failure if (ce.getCode() == CIM_ERR_FAILED) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name); // Here we simply determine if we should increment the retry count or not. // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for // order's sake. If the retry Lapse has lapsed on this event then increment the counter. if (event->getRetries() > 0) { Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime()); if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000)) event->increaseRetries(); } else { event->increaseRetries(); } //determine if we have hit the max retry count if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: the maximum retry count has been exceeded. Removing the event from the queue."); Logger::put( Logger::ERROR_LOG, System::CIMLISTENER, Logger::SEVERE, "The following indication did not get processed successfully: $0", event->getIndicationInstance().getPath().toString()); delete event; continue; } else { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue"); tmpEventQueue.insert_back(event); } } else { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage()); delete event; continue; } } catch (Exception & ex) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage()); delete event; continue; } catch (...) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception."); delete event; continue; } //end try } //while eventqueue // Copy the failed indications back to the main queue // We now lock the queue while adding the retries on to the queue so that new events can't get in in front // Of those events we are retrying. Retried events happened before any new events coming in. IndicationDispatchEvent* tmpEvent = 0; myself->_eventqueue.try_lock(); while (tmpEventQueue.size()) { tmpEvent = tmpEventQueue.remove_front(); myself->_eventqueue.insert_back(tmpEvent); } myself->_eventqueue.unlock(); } catch (TimeOut&) { PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications."); //signal the queue in the same way we would if we received a new indication //this allows the thread to fall into the queue processing code myself->_check_queue->signal(); } //time_wait } //shutdown PEG_METHOD_EXIT(); return 0;}PEGASUS_NAMESPACE_END
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -