⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumermanager.cpp

📁 Pegasus is an open-source implementationof the DMTF CIM and WBEM standards. It is designed to be por
💻 CPP
📖 第 1 页 / 共 3 页
字号:
            _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());            //reset the consumer            consumersToUnload[i]->reset();            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");        } catch (Exception& e)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());             //ATTN: throw exception? log warning?        }    }    PEG_METHOD_EXIT();}/** Serializes oustanding indications to a <MyConsumer>.dat file */void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");    if (!indications.size())    {        PEG_METHOD_EXIT();        return;    }    String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);    Buffer buffer;    // Open the log file and serialize remaining     FILE* fileHandle = 0;    fileHandle = fopen((const char*)fileName.getCString(), "w");     if (!fileHandle)    {        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);    } else    {        Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,                      "Serializing %d outstanding requests for %s",                      indications.size(),                      (const char*)consumerName.getCString());        //we have to put the array of instances under a valid root element or the parser complains         XmlWriter::append(buffer, "<IRETURNVALUE>\n");		CIMInstance cimInstance;        for (Uint32 i = 0; i < indications.size(); i++)        {			//set the URL string property on the serializable instance			CIMValue cimValue(CIMTYPE_STRING, false);			cimValue.set(indications[i].getURL());			cimInstance = indications[i].getIndicationInstance();			CIMProperty cimProperty(URL_PROPERTY, cimValue);			cimInstance.addProperty(cimProperty);            XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);		}        XmlWriter::append(buffer, "</IRETURNVALUE>\0");        fputs((const char*)buffer.getData(), fileHandle);        fclose(fileHandle);    }    PEG_METHOD_EXIT();}/** Reads outstanding indications from a <MyConsumer>.dat file */ Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");    String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);    Array<CIMInstance> cimInstances;	Array<String>      urlStrings;	Array<IndicationDispatchEvent> indications;    // Open the log file and serialize remaining indications    if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))    {        Buffer text;        CIMInstance cimInstance;		CIMProperty cimProperty;		CIMValue cimValue;		String urlString;        XmlEntry entry;        try        {            FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?            text.append('\0');            //parse the file            XmlParser parser((char*)text.getData());            XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");            while (XmlReader::getNamedInstanceElement(parser, cimInstance))            {				Uint32 index = cimInstance.findProperty(URL_PROPERTY);				if (index != PEG_NOT_FOUND)				{					//get the URL string property from the serialized instance and remove the property					cimProperty = cimInstance.getProperty(index);					cimValue = cimProperty.getValue();					cimValue.get(urlString);					cimInstance.removeProperty(index);				}				IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);                indications.append(*indicationEvent);            }            XmlReader::expectEndTag(parser, "IRETURNVALUE");            Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,                          "Consumer %s has %d outstanding indications",                          (const char*)consumerName.getCString(),                          indications.size());            //delete the file             FileSystem::removeFile(fileName);        } catch (Exception& ex)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);        } catch (...)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);        }    }    PEG_METHOD_EXIT();    return indications;}/**  * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said, * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.  *  * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer * can attain extremely high performance.  *  * There are three different events that can signal us: * 1) A new indication (signalled by DynamicListenerIndicationDispatcher) * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state) * 3) A retry signal (signalled from this routine itself) *  * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry * indications are put on the back of the queue and are only processed AFTER all new indications are sent. * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so, * we immediately break out of the loop, and another compenent serializes the remaining indications to a file. *  * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception. *  * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario: * 20 new indications come in, 10 of them are successful, 10 are not. * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication * could be minimal.  We could potentially proceed to process the retries after a very small time interval since * we would never hit the wait for the retry timeout.   *  */ ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param){    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");    DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);    String name = myself->getName();    List<IndicationDispatchEvent,Mutex> tmpEventQueue;    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);    myself->_listeningSemaphore->signal();    while (true)    {        try        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);            //wait to be signalled            myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);            //check whether we received the shutdown signal            if (myself->_dieNow)            {                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);                break;            }            //create a temporary queue to store failed indications            tmpEventQueue.clear();            //continue processing events until the queue is empty            //make sure to check for the shutdown signal before every iteration            // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.            // Because we are popping off the front and new events are being thrown on the back if events are failing when we start            // But are succeeding by the end of the processing, events may be sent out of chronological order.            // However. Once we complete the current queue of events, we will always send old events to be retried before sending any            // new events added afterwards.            while (myself->_eventqueue.size())            {                //check for shutdown signal                //this only breaks us out of the queue loop, but we will immediately get through the next wait from                //the shutdown signal itself, at which time we break out of the main loop                if (myself->_dieNow)                {                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);                    break;                }                //pop next indication off the queue                IndicationDispatchEvent* event = 0;                event = myself->_eventqueue.remove_front();  //what exceptions/errors can this throw?                if (!event)                {                    //this should never happen                    continue;                }                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);                try                {                    myself->consumeIndication(event->getContext(),                                              event->getURL(),                                              event->getIndicationInstance());                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);                    delete event;                    continue;                } catch (CIMException & ce)                {                    //check for failure                    if (ce.getCode() == CIM_ERR_FAILED)                    {                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);                                                // Here we simply determine if we should increment the retry count or not.                        // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for                        // order's sake. If the retry Lapse has lapsed on this event then increment the counter.                        if (event->getRetries() > 0) {                            Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());                            if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))                                event->increaseRetries();                        } else {                            event->increaseRetries();                        }                        //determine if we have hit the max retry count                        if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)                        {                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,                                             "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");                            Logger::put(                                       Logger::ERROR_LOG,                                       System::CIMLISTENER,                                       Logger::SEVERE,                                       "The following indication did not get processed successfully: $0",                                        event->getIndicationInstance().getPath().toString());                            delete event;                            continue;                        } else                        {                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");                            tmpEventQueue.insert_back(event);                        }                    } else                    {                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());                        delete event;                        continue;                    }                } catch (Exception & ex)                {                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());                    delete event;                    continue;                } catch (...)                {                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");                    delete event;                    continue;                } //end try            } //while eventqueue            // Copy the failed indications back to the main queue            // We now lock the queue while adding the retries on to the queue so that new events can't get in in front            // Of those events we are retrying. Retried events happened before any new events coming in.            IndicationDispatchEvent* tmpEvent = 0;            myself->_eventqueue.try_lock();            while (tmpEventQueue.size())            {                tmpEvent = tmpEventQueue.remove_front();                myself->_eventqueue.insert_back(tmpEvent);                            }            myself->_eventqueue.unlock();        } catch (TimeOut&)        {            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");            //signal the queue in the same way we would if we received a new indication            //this allows the thread to fall into the queue processing code            myself->_check_queue->signal();        } //time_wait    } //shutdown    PEG_METHOD_EXIT();    return 0;}PEGASUS_NAMESPACE_END

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -