📄 messagequeueservice.cpp
字号:
void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req){}void MessageQueueService::handle_AsyncLegacyOperationStart( AsyncLegacyOperationStart* req){ // remove the legacy message from the request and enqueue it to its // destination Uint32 result = async_results::CIM_NAK; Message* legacy = req->_act; if (legacy != 0) { MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination); if (queue != 0) { if (queue->isAsync() == true) { (static_cast<MessageQueueService *>(queue))->handleEnqueue( legacy); } else { // Enqueue the response: queue->enqueue(req->get_action()); } result = async_results::OK; } } _make_response(req, result);}void MessageQueueService::handle_AsyncLegacyOperationResult( AsyncLegacyOperationResult* rep){}AsyncOpNode* MessageQueueService::get_op(){ AsyncOpNode* op = new AsyncOpNode(); op->_state = ASYNC_OPSTATE_UNKNOWN; op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL; return op;}void MessageQueueService::return_op(AsyncOpNode* op){ PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED); delete op;}Boolean MessageQueueService::SendAsync( AsyncOpNode* op, Uint32 destination, void (*callback)(AsyncOpNode*, MessageQueue*, void*), MessageQueue* callback_response_q, void* callback_ptr){ PEGASUS_ASSERT(op != 0 && callback != 0); // get the queue handle for the destination op->lock(); // destination of this message op->_op_dest = MessageQueue::lookup(destination); op->_flags |= ASYNC_OPFLAGS_CALLBACK; op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); // initialize the callback data // callback function to be executed by recpt. of response op->_async_callback = callback; // the op node op->_callback_node = op; // the queue that will receive the response op->_callback_response_q = callback_response_q; // user data for callback op->_callback_ptr = callback_ptr; // I am the originator of this request op->_callback_request_q = this; op->unlock(); if (op->_op_dest == 0) return false; return _meta_dispatcher->route_async(op);}Boolean MessageQueueService::SendAsync( Message* msg, Uint32 destination, void (*callback)(Message* response, void* handle, void* parameter), void* handle, void* parameter){ if (msg == NULL) return false; if (callback == NULL) return SendForget(msg); AsyncOpNode *op = get_op(); msg->dest = destination; if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest))) { op->release(); return_op(op); return false; } op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK; op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); op->_state &= ~ASYNC_OPSTATE_COMPLETE; op->__async_callback = callback; op->_callback_node = op; op->_callback_handle = handle; op->_callback_parameter = parameter; op->_callback_response_q = this; if (!(msg->getMask() & MessageMask::ha_async)) { AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart( op, destination, msg, destination); } else { op->_request.reset(msg); (static_cast<AsyncMessage *>(msg))->op = op; } return _meta_dispatcher->route_async(op);}Boolean MessageQueueService::SendForget(Message* msg){ AsyncOpNode* op = 0; Uint32 mask = msg->getMask(); if (mask & MessageMask::ha_async) { op = (static_cast<AsyncMessage *>(msg))->op; } if (op == 0) { op = get_op(); op->_request.reset(msg); if (mask & MessageMask::ha_async) { (static_cast<AsyncMessage *>(msg))->op = op; } } op->_op_dest = MessageQueue::lookup(msg->dest); op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET; op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS); op->_state &= ~ASYNC_OPSTATE_COMPLETE; if (op->_op_dest == 0) { op->release(); return_op(op); return false; } // now see if the meta dispatcher will take it return _meta_dispatcher->route_async(op);}AsyncReply *MessageQueueService::SendWait(AsyncRequest* request){ if (request == 0) return 0; Boolean destroy_op = false; if (request->op == 0) { request->op = get_op(); request->op->_request.reset(request); destroy_op = true; } request->block = false; request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK; SendAsync( request->op, request->dest, _sendwait_callback, this, (void *)0); request->op->_client_sem.wait(); AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse()); rpl->op = 0; if (destroy_op == true) { request->op->lock(); request->op->_request.release(); request->op->_state |= ASYNC_OPSTATE_RELEASED; request->op->unlock(); return_op(request->op); request->op = 0; } return rpl;}Boolean MessageQueueService::register_service( String name, Uint32 capabilities, Uint32 mask){ RegisterCimService *msg = new RegisterCimService( 0, true, name, capabilities, mask, _queueId); msg->dest = CIMOM_Q_ID; Boolean registered = false; AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg)); if (reply != 0) { if (reply->getMask() & MessageMask::ha_async) { if (reply->getMask() & MessageMask::ha_reply) { if (reply->result == async_results::OK || reply->result == async_results::MODULE_ALREADY_REGISTERED) { registered = true; } } } delete reply; } delete msg; return registered;}Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask){ UpdateCimService *msg = new UpdateCimService( 0, true, _queueId, _capabilities, _mask); Boolean registered = false; AsyncMessage* reply = SendWait(msg); if (reply) { if (reply->getMask() & MessageMask::ha_async) { if (reply->getMask() & MessageMask::ha_reply) { if (static_cast<AsyncReply *>(reply)->result == async_results::OK) { registered = true; } } } delete reply; } delete msg; return registered;}Boolean MessageQueueService::deregister_service(){ _meta_dispatcher->deregister_module(_queueId); return true;}void MessageQueueService::find_services( String name, Uint32 capabilities, Uint32 mask, Array<Uint32>* results){ if (results == 0) { throw NullPointer(); } results->clear(); FindServiceQueue *req = new FindServiceQueue( 0, _queueId, true, name, capabilities, mask); req->dest = CIMOM_Q_ID; AsyncMessage *reply = SendWait(req); if (reply) { if (reply->getMask() & MessageMask::ha_async) { if (reply->getMask() & MessageMask::ha_reply) { if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT) { if ((static_cast<FindServiceQueueResult*>(reply))->result == async_results::OK) *results = (static_cast<FindServiceQueueResult*>(reply))->qids; } } } delete reply; } delete req;}void MessageQueueService::enumerate_service( Uint32 queue, message_module* result){ if (result == 0) { throw NullPointer(); } EnumerateService *req = new EnumerateService( 0, _queueId, true, queue); AsyncMessage* reply = SendWait(req); if (reply) { Boolean found = false; if (reply->getMask() & MessageMask::ha_async) { if (reply->getMask() & MessageMask::ha_reply) { if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT) { if ((static_cast<EnumerateServiceResponse*>(reply))-> result == async_results::OK) { if (found == false) { found = true; result->put_name((static_cast< EnumerateServiceResponse*>(reply))->name); result->put_capabilities((static_cast< EnumerateServiceResponse*>(reply))-> capabilities); result->put_mask((static_cast< EnumerateServiceResponse*>(reply))->mask); result->put_queue((static_cast< EnumerateServiceResponse*>(reply))->qid); } } } } } delete reply; } delete req;}MessageQueueService::PollingList* MessageQueueService::_get_polling_list(){ _polling_list_mutex.lock(); if (!_polling_list) _polling_list = new PollingList; _polling_list_mutex.unlock(); return _polling_list;}PEGASUS_NAMESPACE_END
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -