⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagequeueservice.cpp

📁 Pegasus is an open-source implementationof the DMTF CIM and WBEM standards. It is designed to be por
💻 CPP
📖 第 1 页 / 共 3 页
字号:
    MessageQueue* q,    void *parm){    op->_client_sem.signal();}// callback function is responsible for cleaning up all resources// including op, op->_callback_node, and op->_callback_ptrvoid MessageQueueService::_handle_async_callback(AsyncOpNode* op){    if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)    {        Message *msg = op->removeRequest();        if (msg && (msg->getMask() & MessageMask::ha_async))        {            if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)            {                AsyncLegacyOperationStart *wrapper =                    static_cast<AsyncLegacyOperationStart *>(msg);                msg = wrapper->get_action();                delete wrapper;            }            else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)            {                AsyncModuleOperationStart *wrapper =                    static_cast<AsyncModuleOperationStart *>(msg);                msg = wrapper->get_action();                delete wrapper;            }            else if (msg->getType() == async_messages::ASYNC_OP_START)            {                AsyncOperationStart *wrapper =                    static_cast<AsyncOperationStart *>(msg);                msg = wrapper->get_action();                delete wrapper;            }            delete msg;        }        msg = op->removeResponse();        if (msg && (msg->getMask() & MessageMask::ha_async))        {            if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)            {                AsyncLegacyOperationResult *wrapper =                    static_cast<AsyncLegacyOperationResult *>(msg);                msg = wrapper->get_result();                delete wrapper;            }            else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)            {                AsyncModuleOperationResult *wrapper =                    static_cast<AsyncModuleOperationResult *>(msg);                msg = wrapper->get_result();                delete wrapper;            }        }        void (*callback)(Message *, void *, void *) = op->__async_callback;        void *handle = op->_callback_handle;        void *parm = op->_callback_parameter;        op->release();        return_op(op);        callback(msg, handle, parm);    }    else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)    {        // note that _callback_node may be different from op        // op->_callback_response_q is a "this" pointer we can use for        // static callback methods        op->_async_callback(            op->_callback_node, op->_callback_response_q, op->_callback_ptr);    }}void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation){    if (operation != 0)    {// ATTN: optimization// << Tue Feb 19 14:10:38 2002 mdd >>        operation->lock();        Message *rq = operation->_request.get();// optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>// move this to the bottom of the loop when the majority of// messages become async messages.        // divert legacy messages to handleEnqueue        if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))        {            operation->_request.release();            operation->unlock();            // delete the op node            operation->release();            return_op(operation);            handleEnqueue(rq);            return;        }        if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||             operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&            (operation->_state & ASYNC_OPSTATE_COMPLETE))        {            operation->unlock();            _handle_async_callback(operation);        }        else        {            PEGASUS_ASSERT(rq != 0);            operation->unlock();            _handle_async_request(static_cast<AsyncRequest *>(rq));        }    }    return;}void MessageQueueService::_handle_async_request(AsyncRequest *req){    if (req != 0)    {        req->op->processing();        Uint32 type = req->getType();        if (type == async_messages::HEARTBEAT)            handle_heartbeat_request(req);        else if (type == async_messages::IOCTL)            handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));        else if (type == async_messages::CIMSERVICE_START)            handle_CimServiceStart(static_cast<CimServiceStart *>(req));        else if (type == async_messages::CIMSERVICE_STOP)            handle_CimServiceStop(static_cast<CimServiceStop *>(req));        else if (type == async_messages::CIMSERVICE_PAUSE)            handle_CimServicePause(static_cast<CimServicePause *>(req));        else if (type == async_messages::CIMSERVICE_RESUME)            handle_CimServiceResume(static_cast<CimServiceResume *>(req));        else if (type == async_messages::ASYNC_OP_START)            handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));        else        {            // we don't handle this request message            _make_response(req, async_results::CIM_NAK);        }    }}Boolean MessageQueueService::_enqueueResponse(    Message* request,    Message* response){    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,        "MessageQueueService::_enqueueResponse");    if (request->getMask() & MessageMask::ha_async)    {        if (response->getMask() & MessageMask::ha_async)        {            _completeAsyncResponse(                static_cast<AsyncRequest *>(request),                static_cast<AsyncReply *>(response),                ASYNC_OPSTATE_COMPLETE, 0);            PEG_METHOD_EXIT();            return true;        }    }    if (request->_async != 0)    {        Uint32 mask = request->_async->getMask();        PEGASUS_ASSERT(mask &            (MessageMask::ha_async | MessageMask::ha_request));        AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);        AsyncOpNode *op = async->op;        request->_async = 0;        // the legacy request is going to be deleted by its handler        // remove it from the op node        static_cast<AsyncLegacyOperationStart *>(async)->get_action();        AsyncLegacyOperationResult *async_result =            new AsyncLegacyOperationResult(                op,                response);        _completeAsyncResponse(            async,            async_result,            ASYNC_OPSTATE_COMPLETE,            0);        PEG_METHOD_EXIT();        return true;    }    // ensure that the destination queue is in response->dest    PEG_METHOD_EXIT();    return SendForget(response);}void MessageQueueService::_make_response(Message* req, Uint32 code){    cimom::_make_response(req, code);}void MessageQueueService::_completeAsyncResponse(    AsyncRequest* request,    AsyncReply* reply,    Uint32 state,    Uint32 flag){    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,        "MessageQueueService::_completeAsyncResponse");    cimom::_completeAsyncResponse(request, reply, state, flag);    PEG_METHOD_EXIT();}void MessageQueueService::_complete_op_node(    AsyncOpNode* op,    Uint32 state,    Uint32 flag,    Uint32 code){    cimom::_complete_op_node(op, state, flag, code);}Boolean MessageQueueService::accept_async(AsyncOpNode* op){    if (_incoming_queue_shutdown.get() > 0)        return false;    if (_polling_thread == NULL)    {        _polling_thread = new Thread(            polling_routine,            reinterpret_cast<void *>(_get_polling_list()),            false);        ThreadStatus tr = PEGASUS_THREAD_OK;        while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)        {            if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)                Threads::yield();            else                throw Exception(MessageLoaderParms(                    "Common.MessageQueueService.NOT_ENOUGH_THREAD",                    "Could not allocate thread for the polling thread."));        }    }// ATTN optimization remove the message checking altogether in the base// << Mon Feb 18 14:02:20 2002 mdd >>    op->lock();    Message *rq = op->_request.get();    Message *rp = op->_response.get();    op->unlock();    if ((rq != 0 && (true == messageOK(rq))) ||        (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)    {        _incoming.enqueue_wait(op);        _polling_sem.signal();        return true;    }    return false;}Boolean MessageQueueService::messageOK(const Message* msg){    if (_incoming_queue_shutdown.get() > 0)        return false;    return true;}void MessageQueueService::handle_heartbeat_request(AsyncRequest* req){    // default action is to echo a heartbeat response    AsyncReply *reply = new AsyncReply(        async_messages::HEARTBEAT,        0,        req->op,        async_results::OK,        req->resp,        false);    _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);}void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep){}void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req){    switch (req->ctl)    {        case AsyncIoctl::IO_CLOSE:        {            MessageQueueService *service =                static_cast<MessageQueueService *>(req->op->_service_ptr);#ifdef MESSAGEQUEUESERVICE_DEBUG            PEGASUS_STD(cout) << service->getQueueName() <<                " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);#endif            // respond to this message. this is fire and forget, so we            // don't need to delete anything.            // this takes care of two problems that were being found            // << Thu Oct  9 10:52:48 2003 mdd >>            _make_response(req, async_results::OK);            // ensure we do not accept any further messages            // ensure we don't recurse on IO_CLOSE            if (_incoming_queue_shutdown.get() > 0)                break;            // set the closing flag            service->_incoming_queue_shutdown = 1;            // empty out the queue            while (1)            {                AsyncOpNode *operation;                try                {                    operation = service->_incoming.dequeue();                }                catch (IPCException&)                {                    break;                }                if (operation)                {                    operation->_service_ptr = service;                    service->_handle_incoming_operation(operation);                }                else                    break;            } // message processing loop            // shutdown the AsyncQueue            service->_incoming.close();            return;        }        default:            _make_response(req, async_results::CIM_NAK);    }}void MessageQueueService::handle_CimServiceStart(CimServiceStart* req){#ifdef MESSAGEQUEUESERVICE_DEBUG    PEGASUS_STD(cout) << getQueueName() << "received START" <<        PEGASUS_STD(endl);#endif    // clear the stoped bit and update    _capabilities &= (~(module_capabilities::stopped));    _make_response(req, async_results::OK);    // now tell the meta dispatcher we are stopped    update_service(_capabilities, _mask);}void MessageQueueService::handle_CimServiceStop(CimServiceStop* req){#ifdef MESSAGEQUEUESERVICE_DEBUG    PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);#endif    // set the stopeed bit and update    _capabilities |= module_capabilities::stopped;    _make_response(req, async_results::CIM_STOPPED);    // now tell the meta dispatcher we are stopped    update_service(_capabilities, _mask);}void MessageQueueService::handle_CimServicePause(CimServicePause* req){    // set the paused bit and update    _capabilities |= module_capabilities::paused;    update_service(_capabilities, _mask);    _make_response(req, async_results::CIM_PAUSED);    // now tell the meta dispatcher we are stopped}void MessageQueueService::handle_CimServiceResume(CimServiceResume* req){    // clear the paused  bit and update    _capabilities &= (~(module_capabilities::paused));    update_service(_capabilities, _mask);    _make_response(req, async_results::OK);    // now tell the meta dispatcher we are stopped}void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req){    _make_response(req, async_results::CIM_NAK);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -