⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagequeueservice.cpp

📁 Pegasus is an open-source implementationof the DMTF CIM and WBEM standards. It is designed to be por
💻 CPP
📖 第 1 页 / 共 3 页
字号:
void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req){}void MessageQueueService::handle_AsyncLegacyOperationStart(    AsyncLegacyOperationStart* req){    // remove the legacy message from the request and enqueue it to its    // destination    Uint32 result = async_results::CIM_NAK;    Message* legacy = req->_act;    if (legacy != 0)    {        MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);        if (queue != 0)        {            if (queue->isAsync() == true)            {                (static_cast<MessageQueueService *>(queue))->handleEnqueue(                    legacy);            }            else            {                // Enqueue the response:                queue->enqueue(req->get_action());            }            result = async_results::OK;        }    }    _make_response(req, result);}void MessageQueueService::handle_AsyncLegacyOperationResult(    AsyncLegacyOperationResult* rep){}AsyncOpNode* MessageQueueService::get_op(){   AsyncOpNode* op = new AsyncOpNode();   op->_state = ASYNC_OPSTATE_UNKNOWN;   op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;   return op;}void MessageQueueService::return_op(AsyncOpNode* op){    PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);    delete op;}Boolean MessageQueueService::SendAsync(    AsyncOpNode* op,    Uint32 destination,    void (*callback)(AsyncOpNode*, MessageQueue*, void*),    MessageQueue* callback_response_q,    void* callback_ptr){    PEGASUS_ASSERT(op != 0 && callback != 0);    // get the queue handle for the destination    op->lock();    // destination of this message    op->_op_dest = MessageQueue::lookup(destination);    op->_flags |= ASYNC_OPFLAGS_CALLBACK;    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);    // initialize the callback data    // callback function to be executed by recpt. of response    op->_async_callback = callback;    // the op node    op->_callback_node = op;    // the queue that will receive the response    op->_callback_response_q = callback_response_q;    // user data for callback    op->_callback_ptr = callback_ptr;    // I am the originator of this request    op->_callback_request_q = this;    op->unlock();    if (op->_op_dest == 0)        return false;    return  _meta_dispatcher->route_async(op);}Boolean MessageQueueService::SendAsync(    Message* msg,    Uint32 destination,    void (*callback)(Message* response, void* handle, void* parameter),    void* handle,    void* parameter){    if (msg == NULL)        return false;    if (callback == NULL)        return SendForget(msg);    AsyncOpNode *op = get_op();    msg->dest = destination;    if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))    {        op->release();        return_op(op);        return false;    }    op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);    op->_state &= ~ASYNC_OPSTATE_COMPLETE;    op->__async_callback = callback;    op->_callback_node = op;    op->_callback_handle = handle;    op->_callback_parameter = parameter;    op->_callback_response_q = this;    if (!(msg->getMask() & MessageMask::ha_async))    {        AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(            op,            destination,            msg,            destination);    }    else    {        op->_request.reset(msg);        (static_cast<AsyncMessage *>(msg))->op = op;    }    return _meta_dispatcher->route_async(op);}Boolean MessageQueueService::SendForget(Message* msg){    AsyncOpNode* op = 0;    Uint32 mask = msg->getMask();    if (mask & MessageMask::ha_async)    {        op = (static_cast<AsyncMessage *>(msg))->op;    }    if (op == 0)    {        op = get_op();        op->_request.reset(msg);        if (mask & MessageMask::ha_async)        {            (static_cast<AsyncMessage *>(msg))->op = op;        }    }    op->_op_dest = MessageQueue::lookup(msg->dest);    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK        | ASYNC_OPFLAGS_SIMPLE_STATUS);    op->_state &= ~ASYNC_OPSTATE_COMPLETE;    if (op->_op_dest == 0)    {        op->release();        return_op(op);        return false;    }    // now see if the meta dispatcher will take it    return  _meta_dispatcher->route_async(op);}AsyncReply *MessageQueueService::SendWait(AsyncRequest* request){    if (request == 0)        return 0;    Boolean destroy_op = false;    if (request->op == 0)    {        request->op = get_op();        request->op->_request.reset(request);        destroy_op = true;    }    request->block = false;    request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;    SendAsync(        request->op,        request->dest,        _sendwait_callback,        this,        (void *)0);    request->op->_client_sem.wait();    AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());    rpl->op = 0;    if (destroy_op == true)    {        request->op->lock();        request->op->_request.release();        request->op->_state |= ASYNC_OPSTATE_RELEASED;        request->op->unlock();        return_op(request->op);        request->op = 0;    }    return rpl;}Boolean MessageQueueService::register_service(    String name,    Uint32 capabilities,    Uint32 mask){    RegisterCimService *msg = new RegisterCimService(        0,        true,        name,        capabilities,        mask,        _queueId);    msg->dest = CIMOM_Q_ID;    Boolean registered = false;    AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));    if (reply != 0)    {        if (reply->getMask() & MessageMask::ha_async)        {            if (reply->getMask() & MessageMask::ha_reply)            {                if (reply->result == async_results::OK ||                    reply->result == async_results::MODULE_ALREADY_REGISTERED)                {                    registered = true;                }            }        }        delete reply;    }    delete msg;    return registered;}Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask){    UpdateCimService *msg = new UpdateCimService(        0,        true,        _queueId,        _capabilities,        _mask);    Boolean registered = false;    AsyncMessage* reply = SendWait(msg);    if (reply)    {        if (reply->getMask() & MessageMask::ha_async)        {            if (reply->getMask() & MessageMask::ha_reply)            {                if (static_cast<AsyncReply *>(reply)->result ==                        async_results::OK)                {                    registered = true;                }            }        }        delete reply;    }    delete msg;    return registered;}Boolean MessageQueueService::deregister_service(){    _meta_dispatcher->deregister_module(_queueId);    return true;}void MessageQueueService::find_services(    String name,    Uint32 capabilities,    Uint32 mask,    Array<Uint32>* results){    if (results == 0)    {        throw NullPointer();    }    results->clear();    FindServiceQueue *req = new FindServiceQueue(        0,        _queueId,        true,        name,        capabilities,        mask);    req->dest = CIMOM_Q_ID;    AsyncMessage *reply = SendWait(req);    if (reply)    {        if (reply->getMask() & MessageMask::ha_async)        {            if (reply->getMask() & MessageMask::ha_reply)            {                if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)                {                    if ((static_cast<FindServiceQueueResult*>(reply))->result ==                            async_results::OK)                        *results =                            (static_cast<FindServiceQueueResult*>(reply))->qids;                }            }        }        delete reply;    }    delete req;}void MessageQueueService::enumerate_service(    Uint32 queue,    message_module* result){    if (result == 0)    {        throw NullPointer();    }    EnumerateService *req = new EnumerateService(        0,        _queueId,        true,        queue);    AsyncMessage* reply = SendWait(req);    if (reply)    {        Boolean found = false;        if (reply->getMask() & MessageMask::ha_async)        {            if (reply->getMask() & MessageMask::ha_reply)            {                if (reply->getType() ==                        async_messages::ENUMERATE_SERVICE_RESULT)                {                    if ((static_cast<EnumerateServiceResponse*>(reply))->                            result == async_results::OK)                    {                        if (found == false)                        {                            found = true;                            result->put_name((static_cast<                                EnumerateServiceResponse*>(reply))->name);                            result->put_capabilities((static_cast<                                EnumerateServiceResponse*>(reply))->                                    capabilities);                            result->put_mask((static_cast<                                EnumerateServiceResponse*>(reply))->mask);                            result->put_queue((static_cast<                                EnumerateServiceResponse*>(reply))->qid);                        }                    }                }            }        }        delete reply;    }    delete req;}MessageQueueService::PollingList* MessageQueueService::_get_polling_list(){    _polling_list_mutex.lock();    if (!_polling_list)        _polling_list = new PollingList;    _polling_list_mutex.unlock();    return _polling_list;}PEGASUS_NAMESPACE_END

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -