⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagequeueservice.cpp

📁 Pegasus is an open-source implementationof the DMTF CIM and WBEM standards. It is designed to be por
💻 CPP
📖 第 1 页 / 共 3 页
字号:
//%2006//////////////////////////////////////////////////////////////////////////// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;// IBM Corp.; EMC Corporation, The Open Group.// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;// EMC Corporation; VERITAS Software Corporation; The Open Group.// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;// EMC Corporation; Symantec Corporation; The Open Group.//// Permission is hereby granted, free of charge, to any person obtaining a copy// of this software and associated documentation files (the "Software"), to// deal in the Software without restriction, including without limitation the// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or// sell copies of the Software, and to permit persons to whom the Software is// furnished to do so, subject to the following conditions:// // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.////==============================================================================////%/////////////////////////////////////////////////////////////////////////////#include "MessageQueueService.h"#include <Pegasus/Common/Tracer.h>#include <Pegasus/Common/MessageLoader.h>PEGASUS_NAMESPACE_BEGINcimom *MessageQueueService::_meta_dispatcher = 0;AtomicInt MessageQueueService::_service_count(0);Mutex MessageQueueService::_meta_dispatcher_mutex;static struct timeval deallocateWait = {300, 0};ThreadPool *MessageQueueService::_thread_pool = 0;MessageQueueService::PollingList* MessageQueueService::_polling_list;Mutex MessageQueueService::_polling_list_mutex;Thread* MessageQueueService::_polling_thread = 0;ThreadPool *MessageQueueService::get_thread_pool(){   return _thread_pool;}//// MAX_THREADS_PER_SVC_QUEUE//// JR Wunderlich Jun 6, 2005//#define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000#define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5#ifndef MAX_THREADS_PER_SVC_QUEUE# define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT#endifUint32 max_threads_per_svc_queue;ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(    void* parm){    Thread *myself = reinterpret_cast<Thread *>(parm);    List<MessageQueueService, Mutex> *list =        reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());    while (_stop_polling.get()  == 0)    {        _polling_sem.wait();        if (_stop_polling.get() != 0)        {            break;        }        // The polling_routine thread must hold the lock on the        // _polling_list while processing incoming messages.        // This lock is used to give this thread ownership of        // services on the _polling_routine list.        // This is necessary to avoid confict with other threads        // processing the _polling_list        // (e.g., MessageQueueServer::~MessageQueueService).        list->lock();        MessageQueueService *service = list->front();        ThreadStatus rtn = PEGASUS_THREAD_OK;        while (service != NULL)        {            if ((service->_incoming.count() > 0) &&                (service->_die.get() == 0) &&                (service->_threads.get() < max_threads_per_svc_queue))            {                // The _threads count is used to track the                // number of active threads that have been allocated                // to process messages for this service.                // The _threads count MUST be incremented while                // the polling_routine owns the _polling_thread                // lock and has ownership of the service object.                service->_threads++;                try                {                    rtn = _thread_pool->allocate_and_awaken(                        service, _req_proc, &_polling_sem);                }                catch (...)                {                    service->_threads--;                    // allocate_and_awaken should never generate an exception.                    PEGASUS_ASSERT(0);                }                // if no more threads available, break from processing loop                if (rtn != PEGASUS_THREAD_OK )                {                    service->_threads--;                    Logger::put(                        Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,                        "Not enough threads to process this request. "                            "Skipping.");                    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,                        "Could not allocate thread for %s.  Queue has %d "                            "messages waiting and %d threads servicing."                            "Skipping the service for right now. ",                        service->getQueueName(),                        service->_incoming.count(),                        service->_threads.get());                    Threads::yield();                    service = NULL;                }            }            if (service != NULL)            {                service = list->next_of(service);            }        }        list->unlock();    }    myself->exit_self( (ThreadReturnType) 1 );    return 0;}Semaphore MessageQueueService::_polling_sem(0);AtomicInt MessageQueueService::_stop_polling(0);MessageQueueService::MessageQueueService(    const char* name,    Uint32 queueID,    Uint32 capabilities,    Uint32 mask)    : Base(name, true,  queueID),      _mask(mask),      _die(0),      _threads(0),      _incoming(),      _incoming_queue_shutdown(0){    _capabilities = (capabilities | module_capabilities::async);    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_usec = 100;    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;    // if requested thread max is out of range, then set to    // MAX_THREADS_PER_SVC_QUEUE_LIMIT    if ((max_threads_per_svc_queue < 1) ||        (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))    {        max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;    }    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,       "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);    AutoMutex autoMut(_meta_dispatcher_mutex);    if (_meta_dispatcher == 0)    {        _stop_polling = 0;        PEGASUS_ASSERT(_service_count.get() == 0);        _meta_dispatcher = new cimom();        //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",        //   minimum_cnt, maximum_cnt, deallocateWait);        //        _thread_pool =            new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);    }    _service_count++;    if (false == register_service(name, _capabilities, _mask))    {        MessageLoaderParms parms(            "Common.MessageQueueService.UNABLE_TO_REGISTER",            "CIM base message queue service is unable to register with the "                "CIMOM dispatcher.");        throw BindFailedException(parms);    }    _get_polling_list()->insert_back(this);}MessageQueueService::~MessageQueueService(){    _die = 1;    // The polling_routine locks the _polling_list while    // processing the incoming messages for services on the    // list.  Deleting the service from the _polling_list    // prior to processing, avoids synchronization issues    // with the _polling_routine.    // ATTN: added to prevent assertion in List in which the list does not    // contain this element.    if (_get_polling_list()->contains(this))        _get_polling_list()->remove(this);    // ATTN: The code for closing the _incoming queue    // is not working correctly. In OpenPegasus 2.5,    // execution of the following code is very timing    // dependent. This needs to be fix.    // See Bug 4079 for details.    if (_incoming_queue_shutdown.get() == 0)    {        _shutdown_incoming_queue();    }    // Wait until all threads processing the messages    // for this service have completed.    while (_threads.get() > 0)    {        Threads::yield();    }    {        AutoMutex autoMut(_meta_dispatcher_mutex);        _service_count--;        if (_service_count.get() == 0)        {            _stop_polling++;            _polling_sem.signal();            if (_polling_thread)            {                _polling_thread->join();                delete _polling_thread;                _polling_thread = 0;            }            _meta_dispatcher->_shutdown_routed_queue();            delete _meta_dispatcher;            _meta_dispatcher = 0;            delete _thread_pool;            _thread_pool = 0;        }    } // mutex unlocks here    // Clean up in case there are extra stuff on the queue.    while (_incoming.count())    {        try        {            delete _incoming.dequeue();        }        catch (const ListClosed&)        {            // If the list is closed, there is nothing we can do.            break;        }    }}void MessageQueueService::_shutdown_incoming_queue(){    if (_incoming_queue_shutdown.get() > 0)        return;    AsyncIoctl *msg = new AsyncIoctl(        0,        _queueId,        _queueId,        true,        AsyncIoctl::IO_CLOSE,        0,        0);    msg->op = get_op();    msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;    msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK        | ASYNC_OPFLAGS_SIMPLE_STATUS);    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;    msg->op->_op_dest = this;    msg->op->_request.reset(msg);    try    {        _incoming.enqueue_wait(msg->op);        _polling_sem.signal();    }    catch (const ListClosed&)    {        // This means the queue has already been shut-down (happens  when there        // are two AsyncIoctrl::IO_CLOSE messages generated and one got first        // processed.        delete msg;    }    catch (const Permission&)    {        delete msg;    }}void MessageQueueService::enqueue(Message* msg){    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");    Base::enqueue(msg);    PEG_METHOD_EXIT();}ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(    void* parm){    MessageQueueService* service =        reinterpret_cast<MessageQueueService*>(parm);    PEGASUS_ASSERT(service != 0);    try    {        if (service->_die.get() != 0)        {            service->_threads--;            return 0;        }        // pull messages off the incoming queue and dispatch them. then        // check pending messages that are non-blocking        AsyncOpNode *operation = 0;        // many operations may have been queued.        do        {            try            {                operation = service->_incoming.dequeue();            }            catch (ListClosed&)            {                // ATTN: This appears to be a common loop exit path.                //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,                //    "Caught ListClosed exception.  Exiting _req_proc.");                break;            }            if (operation)            {               operation->_service_ptr = service;               service->_handle_incoming_operation(operation);            }        } while (operation);    }    catch (const Exception& e)    {        PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,            String("Caught exception: \"") + e.getMessage() +                "\".  Exiting _req_proc.");    }    catch (...)    {        PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,            "Caught unrecognized exception.  Exiting _req_proc.");    }    service->_threads--;    return 0;}void MessageQueueService::_sendwait_callback(    AsyncOpNode* op,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -