📄 messagequeueservice.cpp
字号:
MessageQueue* q, void *parm){ op->_client_sem.signal();}// callback function is responsible for cleaning up all resources// including op, op->_callback_node, and op->_callback_ptrvoid MessageQueueService::_handle_async_callback(AsyncOpNode* op){ if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) { Message *msg = op->removeRequest(); if (msg && (msg->getMask() & MessageMask::ha_async)) { if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START) { AsyncLegacyOperationStart *wrapper = static_cast<AsyncLegacyOperationStart *>(msg); msg = wrapper->get_action(); delete wrapper; } else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START) { AsyncModuleOperationStart *wrapper = static_cast<AsyncModuleOperationStart *>(msg); msg = wrapper->get_action(); delete wrapper; } else if (msg->getType() == async_messages::ASYNC_OP_START) { AsyncOperationStart *wrapper = static_cast<AsyncOperationStart *>(msg); msg = wrapper->get_action(); delete wrapper; } delete msg; } msg = op->removeResponse(); if (msg && (msg->getMask() & MessageMask::ha_async)) { if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT) { AsyncLegacyOperationResult *wrapper = static_cast<AsyncLegacyOperationResult *>(msg); msg = wrapper->get_result(); delete wrapper; } else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT) { AsyncModuleOperationResult *wrapper = static_cast<AsyncModuleOperationResult *>(msg); msg = wrapper->get_result(); delete wrapper; } } void (*callback)(Message *, void *, void *) = op->__async_callback; void *handle = op->_callback_handle; void *parm = op->_callback_parameter; op->release(); return_op(op); callback(msg, handle, parm); } else if (op->_flags & ASYNC_OPFLAGS_CALLBACK) { // note that _callback_node may be different from op // op->_callback_response_q is a "this" pointer we can use for // static callback methods op->_async_callback( op->_callback_node, op->_callback_response_q, op->_callback_ptr); }}void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation){ if (operation != 0) {// ATTN: optimization// << Tue Feb 19 14:10:38 2002 mdd >> operation->lock(); Message *rq = operation->_request.get();// optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>// move this to the bottom of the loop when the majority of// messages become async messages. // divert legacy messages to handleEnqueue if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async))) { operation->_request.release(); operation->unlock(); // delete the op node operation->release(); return_op(operation); handleEnqueue(rq); return; } if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK || operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) && (operation->_state & ASYNC_OPSTATE_COMPLETE)) { operation->unlock(); _handle_async_callback(operation); } else { PEGASUS_ASSERT(rq != 0); operation->unlock(); _handle_async_request(static_cast<AsyncRequest *>(rq)); } } return;}void MessageQueueService::_handle_async_request(AsyncRequest *req){ if (req != 0) { req->op->processing(); Uint32 type = req->getType(); if (type == async_messages::HEARTBEAT) handle_heartbeat_request(req); else if (type == async_messages::IOCTL) handle_AsyncIoctl(static_cast<AsyncIoctl *>(req)); else if (type == async_messages::CIMSERVICE_START) handle_CimServiceStart(static_cast<CimServiceStart *>(req)); else if (type == async_messages::CIMSERVICE_STOP) handle_CimServiceStop(static_cast<CimServiceStop *>(req)); else if (type == async_messages::CIMSERVICE_PAUSE) handle_CimServicePause(static_cast<CimServicePause *>(req)); else if (type == async_messages::CIMSERVICE_RESUME) handle_CimServiceResume(static_cast<CimServiceResume *>(req)); else if (type == async_messages::ASYNC_OP_START) handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req)); else { // we don't handle this request message _make_response(req, async_results::CIM_NAK); } }}Boolean MessageQueueService::_enqueueResponse( Message* request, Message* response){ PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::_enqueueResponse"); if (request->getMask() & MessageMask::ha_async) { if (response->getMask() & MessageMask::ha_async) { _completeAsyncResponse( static_cast<AsyncRequest *>(request), static_cast<AsyncReply *>(response), ASYNC_OPSTATE_COMPLETE, 0); PEG_METHOD_EXIT(); return true; } } if (request->_async != 0) { Uint32 mask = request->_async->getMask(); PEGASUS_ASSERT(mask & (MessageMask::ha_async | MessageMask::ha_request)); AsyncRequest *async = static_cast<AsyncRequest *>(request->_async); AsyncOpNode *op = async->op; request->_async = 0; // the legacy request is going to be deleted by its handler // remove it from the op node static_cast<AsyncLegacyOperationStart *>(async)->get_action(); AsyncLegacyOperationResult *async_result = new AsyncLegacyOperationResult( op, response); _completeAsyncResponse( async, async_result, ASYNC_OPSTATE_COMPLETE, 0); PEG_METHOD_EXIT(); return true; } // ensure that the destination queue is in response->dest PEG_METHOD_EXIT(); return SendForget(response);}void MessageQueueService::_make_response(Message* req, Uint32 code){ cimom::_make_response(req, code);}void MessageQueueService::_completeAsyncResponse( AsyncRequest* request, AsyncReply* reply, Uint32 state, Uint32 flag){ PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::_completeAsyncResponse"); cimom::_completeAsyncResponse(request, reply, state, flag); PEG_METHOD_EXIT();}void MessageQueueService::_complete_op_node( AsyncOpNode* op, Uint32 state, Uint32 flag, Uint32 code){ cimom::_complete_op_node(op, state, flag, code);}Boolean MessageQueueService::accept_async(AsyncOpNode* op){ if (_incoming_queue_shutdown.get() > 0) return false; if (_polling_thread == NULL) { _polling_thread = new Thread( polling_routine, reinterpret_cast<void *>(_get_polling_list()), false); ThreadStatus tr = PEGASUS_THREAD_OK; while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK) { if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES) Threads::yield(); else throw Exception(MessageLoaderParms( "Common.MessageQueueService.NOT_ENOUGH_THREAD", "Could not allocate thread for the polling thread.")); } }// ATTN optimization remove the message checking altogether in the base// << Mon Feb 18 14:02:20 2002 mdd >> op->lock(); Message *rq = op->_request.get(); Message *rp = op->_response.get(); op->unlock(); if ((rq != 0 && (true == messageOK(rq))) || (rp != 0 && (true == messageOK(rp))) && _die.get() == 0) { _incoming.enqueue_wait(op); _polling_sem.signal(); return true; } return false;}Boolean MessageQueueService::messageOK(const Message* msg){ if (_incoming_queue_shutdown.get() > 0) return false; return true;}void MessageQueueService::handle_heartbeat_request(AsyncRequest* req){ // default action is to echo a heartbeat response AsyncReply *reply = new AsyncReply( async_messages::HEARTBEAT, 0, req->op, async_results::OK, req->resp, false); _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);}void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep){}void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req){ switch (req->ctl) { case AsyncIoctl::IO_CLOSE: { MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);#ifdef MESSAGEQUEUESERVICE_DEBUG PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);#endif // respond to this message. this is fire and forget, so we // don't need to delete anything. // this takes care of two problems that were being found // << Thu Oct 9 10:52:48 2003 mdd >> _make_response(req, async_results::OK); // ensure we do not accept any further messages // ensure we don't recurse on IO_CLOSE if (_incoming_queue_shutdown.get() > 0) break; // set the closing flag service->_incoming_queue_shutdown = 1; // empty out the queue while (1) { AsyncOpNode *operation; try { operation = service->_incoming.dequeue(); } catch (IPCException&) { break; } if (operation) { operation->_service_ptr = service; service->_handle_incoming_operation(operation); } else break; } // message processing loop // shutdown the AsyncQueue service->_incoming.close(); return; } default: _make_response(req, async_results::CIM_NAK); }}void MessageQueueService::handle_CimServiceStart(CimServiceStart* req){#ifdef MESSAGEQUEUESERVICE_DEBUG PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);#endif // clear the stoped bit and update _capabilities &= (~(module_capabilities::stopped)); _make_response(req, async_results::OK); // now tell the meta dispatcher we are stopped update_service(_capabilities, _mask);}void MessageQueueService::handle_CimServiceStop(CimServiceStop* req){#ifdef MESSAGEQUEUESERVICE_DEBUG PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);#endif // set the stopeed bit and update _capabilities |= module_capabilities::stopped; _make_response(req, async_results::CIM_STOPPED); // now tell the meta dispatcher we are stopped update_service(_capabilities, _mask);}void MessageQueueService::handle_CimServicePause(CimServicePause* req){ // set the paused bit and update _capabilities |= module_capabilities::paused; update_service(_capabilities, _mask); _make_response(req, async_results::CIM_PAUSED); // now tell the meta dispatcher we are stopped}void MessageQueueService::handle_CimServiceResume(CimServiceResume* req){ // clear the paused bit and update _capabilities &= (~(module_capabilities::paused)); update_service(_capabilities, _mask); _make_response(req, async_results::OK); // now tell the meta dispatcher we are stopped}void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req){ _make_response(req, async_results::CIM_NAK);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -