📄 oopprovidermanagerrouter.cpp
字号:
// Note: Caller must lock _agentMutexvoid ProviderAgentContainer::_initialize(){ PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, "ProviderAgentContainer::_initialize"); if (_isInitialized) { PEGASUS_ASSERT(0); PEG_METHOD_EXIT(); return; } if (_maxProviderProcesses == PEG_NOT_FOUND) { String maxProviderProcesses = ConfigManager::getInstance()-> getCurrentValue("maxProviderProcesses"); CString maxProviderProcessesString = maxProviderProcesses.getCString(); char* end = 0; _maxProviderProcesses = strtol(maxProviderProcessesString, &end, 10); } { AutoMutex lock(_numProviderProcessesMutex); if ((_maxProviderProcesses != 0) && (_numProviderProcesses >= _maxProviderProcesses)) { throw PEGASUS_CIM_EXCEPTION( CIM_ERR_FAILED, MessageLoaderParms( "ProviderManager.OOPProviderManagerRouter." "MAX_PROVIDER_PROCESSES_REACHED", "The maximum number of cimprovagt processes has been " "reached.")); } else { _numProviderProcesses++; } } try { _startAgentProcess(); _isInitialized = true; _sendInitializationData(); // Start a thread to read and process responses from the Provider Agent ThreadStatus rtn = PEGASUS_THREAD_OK; while ((rtn = MessageQueueService::get_thread_pool()-> allocate_and_awaken(this, _responseProcessor)) != PEGASUS_THREAD_OK) { if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES) { Threads::yield(); } else { Logger::put( Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE, "Not enough threads to process responses from the " "provider agent."); Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2, "Could not allocate thread to process responses from the " "provider agent."); throw Exception(MessageLoaderParms( "ProviderManager.OOPProviderManagerRouter." "CIMPROVAGT_THREAD_ALLOCATION_FAILED", "Failed to allocate thread for cimprovagt \"$0\".", _moduleName)); } } } catch (...) { // Closing the connection causes the agent process to exit _pipeToAgent.reset(); _pipeFromAgent.reset();#if defined(PEGASUS_HAS_SIGNALS) if (_isInitialized) { // Harvest the status of the agent process to prevent a zombie pid_t status = 0; do { status = waitpid(_pid, 0, 0); } while ((status == -1) && (errno == EINTR)); if (status == -1) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ProviderAgentContainer::_initialize(): " "waitpid failed; errno = %d.", errno); } }#endif _isInitialized = false; { AutoMutex lock(_numProviderProcessesMutex); _numProviderProcesses--; } PEG_METHOD_EXIT(); throw; } PEG_METHOD_EXIT();}Boolean ProviderAgentContainer::isInitialized(){ AutoMutex lock(_agentMutex); return _isInitialized;}// Note: Caller must lock _agentMutexvoid ProviderAgentContainer::_uninitialize(Boolean cleanShutdown){ PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, "ProviderAgentContainer::_uninitialize"); if (!_isInitialized) { PEGASUS_ASSERT(0); PEG_METHOD_EXIT(); return; } try { // Close the connection with the Provider Agent _pipeFromAgent.reset(); _pipeToAgent.reset(); _providerModuleCache = CIMInstance(); { AutoMutex lock(_numProviderProcessesMutex); _numProviderProcesses--; }#if defined(PEGASUS_HAS_SIGNALS) // Harvest the status of the agent process to prevent a zombie pid_t status = 0; do { status = waitpid(_pid, 0, 0); } while ((status == -1) && (errno == EINTR)); if (status == -1) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "ProviderAgentContainer::_uninitialize(): " "waitpid failed; errno = %d.", errno); }#endif _isInitialized = false; // // Complete with null responses all outstanding requests on this // connection // { AutoMutex tableLock(_outstandingRequestTableMutex); CIMResponseMessage* response = cleanShutdown ? _REQUEST_NOT_PROCESSED : 0; for (OutstandingRequestTable::Iterator i = _outstandingRequestTable.start(); i != 0; i++) { PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL2, String("Completing messageId \"") + i.key() + "\" with a null response."); i.value()->responseMessage = response; i.value()->responseReady->signal(); } _outstandingRequestTable.clear(); } // // If not a clean shutdown, call the provider module failure callback // if (!cleanShutdown) { // // Call the provider module failure callback to communicate // the failure to the Provider Manager Service. The Provider // Manager Service will inform the Indication Service. // _providerModuleFailCallback(_moduleName, _userName, _userContext); } } catch (...) { // We're uninitializing, so do not propagate the exception PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2, "Ignoring _uninitialize() exception."); } PEG_METHOD_EXIT();}String ProviderAgentContainer::getModuleName() const{ return _moduleName;}CIMResponseMessage* ProviderAgentContainer::processMessage( CIMRequestMessage* request){ PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, "ProviderAgentContainer::processMessage"); CIMResponseMessage* response; do { response = _processMessage(request); if (response == _REQUEST_NOT_PROCESSED) { // Check for request message types that should not be retried. if ((request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) || (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) || (request->getType() == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) || (request->getType() == CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE)) { response = request->buildResponse(); break; } else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE) { CIMDisableModuleResponseMessage* dmResponse = dynamic_cast<CIMDisableModuleResponseMessage*>(response); PEGASUS_ASSERT(dmResponse != 0); Array<Uint16> operationalStatus; operationalStatus.append(CIM_MSE_OPSTATUS_VALUE_STOPPED); dmResponse->operationalStatus = operationalStatus; break; } } } while (response == _REQUEST_NOT_PROCESSED); if (request->getType() == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) { _subscriptionInitComplete = true; } PEG_METHOD_EXIT(); return response;}CIMResponseMessage* ProviderAgentContainer::_processMessage( CIMRequestMessage* request){ PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, "ProviderAgentContainer::_processMessage"); CIMResponseMessage* response; String originalMessageId = request->messageId; // These three variables are used for the provider module optimization. // See the _providerModuleCache member description for more information. AutoPtr<ProviderIdContainer> origProviderId; Boolean doProviderModuleOptimization = false; Boolean updateProviderModuleCache = false; try { // The messageId attribute is used to correlate response messages // from the Provider Agent with request messages, so it is imperative // that the ID is unique for each request. The incoming ID cannot be // trusted to be unique, so we substitute a unique one. The memory // address of the request is used as the source of a unique piece of // data. (The message ID is only required to be unique while the // request is outstanding.) char messagePtrString[20]; sprintf(messagePtrString, "%p", request); String uniqueMessageId = messagePtrString; // // Set up the OutstandingRequestEntry for this request // Semaphore waitSemaphore(0); OutstandingRequestEntry outstandingRequestEntry( originalMessageId, request, response, &waitSemaphore); // // Lock the Provider Agent Container while initializing the // agent and writing the request to the connection // { AutoMutex lock(_agentMutex); // // Initialize the Provider Agent, if necessary // if (!_isInitialized) { _initialize(); } // // Add an entry to the OutstandingRequestTable for this request // { AutoMutex tableLock(_outstandingRequestTableMutex); _outstandingRequestTable.insert( uniqueMessageId, &outstandingRequestEntry); } // Get the provider module from the ProviderIdContainer to see if // we can optimize out the transmission of this instance to the // Provider Agent. (See the _providerModuleCache description.) if (request->operationContext.contains(ProviderIdContainer::NAME)) { ProviderIdContainer pidc = request->operationContext.get( ProviderIdContainer::NAME); origProviderId.reset(new ProviderIdContainer( pidc.getModule(), pidc.getProvider(), pidc.isRemoteNameSpace(), pidc.getRemoteInfo())); if (_providerModuleCache.isUninitialized() || (!pidc.getModule().identical(_providerModuleCache))) { // We haven't sent this provider module instance to the // Provider Agent yet. Update our cache after we send it. updateProviderModuleCache = true; } else { // Replace the provider module in the ProviderIdContainer // with an uninitialized instance. We'll need to put the // original one back after the message is sent. request->operationContext.set(ProviderIdContainer( CIMInstance(), pidc.getProvider(), pidc.isRemoteNameSpace(), pidc.getRemoteInfo())); doProviderModuleOptimization = true; } } // // Write the message to the pipe // try { PEG_TRACE_STRING(TRC_PROVIDERMANAGER, Tracer::LEVEL3, String("Sending request to agent with messageId ") + uniqueMessageId); request->messageId = uniqueMessageId; AnonymousPipe::Status writeStatus = _pipeToAgent->writeMessage(request); request->messageId = originalMessageId; if (doProviderModuleOptimization) { request->operationContext.set(*origProviderId.get()); } if (writeStatus != AnonymousPipe::STATUS_SUCCESS) { Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -