⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 monitor.cpp

📁 Pegasus is an open-source implementationof the DMTF CIM and WBEM standards. It is designed to be por
💻 CPP
📖 第 1 页 / 共 2 页
字号:
            // coming thru. The last response to come thru after a            // _connectionClosePending will reset _responsePending to false            // and then cause the monitor to rerun this code and clean up.            // (see HTTPConnection.cpp)            if (h._responsePending == true)            {                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    "Monitor::run - Ignoring connection delete request "                        "because responses are still pending. "                        "connection=0x%p, socket=%d\n",                    (void *)&h, h.getSocket());                continue;            }            h._connectionClosePending = false;            MessageQueue &o = h.get_owner();            Message* message= new CloseConnectionMessage(entry.socket);            message->dest = o.getQueueId();            // HTTPAcceptor is responsible for closing the connection.            // The lock is released to allow HTTPAcceptor to call            // unsolicitSocketMessages to free the entry.            // Once HTTPAcceptor completes processing of the close            // connection, the lock is re-requested and processing of            // the for loop continues.  This is safe with the current            // implementation of the entries object.  Note that the            // loop condition accesses the entries.size() on each            // iteration, so that a change in size while the mutex is            // unlocked will not result in an ArrayIndexOutOfBounds            // exception.            _entry_mut.unlock();            o.enqueue(message);            _entry_mut.lock();            // After enqueue a message and the autoEntryMutex has been            // released and locked again, the array of _entries can be            // changed. The ArrayIterator has be reset with the original            // _entries.            entries.reset(_entries);        }    }    Uint32 _idleEntries = 0;    /*        We will keep track of the maximum socket number and pass this value        to the kernel as a parameter to SELECT.  This loop seems like a good        place to calculate the max file descriptor (maximum socket number)        because we have to traverse the entire array.    */    SocketHandle maxSocketCurrentPass = 0;    for (int indx = 0; indx < (int)entries.size(); indx++)    {       if (maxSocketCurrentPass < entries[indx].socket)           maxSocketCurrentPass = entries[indx].socket;       if (entries[indx]._status.get() == _MonitorEntry::IDLE)       {           _idleEntries++;           FD_SET(entries[indx].socket, &fdread);       }    }    /*        Add 1 then assign maxSocket accordingly. We add 1 to account for        descriptors starting at 0.    */    maxSocketCurrentPass++;    _entry_mut.unlock();    //    // The first argument to select() is ignored on Windows and it is not    // a socket value.  The original code assumed that the number of sockets    // and a socket value have the same type.  On Windows they do not.    //#ifdef PEGASUS_OS_TYPE_WINDOWS    int events = select(0, &fdread, NULL, NULL, &tv);#else    int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);#endif    _entry_mut.lock();    // After enqueue a message and the autoEntryMutex has been released and    // locked again, the array of _entries can be changed. The ArrayIterator    // has be reset with the original _entries    entries.reset(_entries);    if (events == PEGASUS_SOCKET_ERROR)    {        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,            "Monitor::run - errorno = %d has occurred on select.", errno);        // The EBADF error indicates that one or more or the file        // descriptions was not valid. This could indicate that        // the entries structure has been corrupted or that        // we have a synchronization error.        PEGASUS_ASSERT(errno != EBADF);    }    else if (events)    {        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,            "Monitor::run select event received events = %d, monitoring %d "                "idle entries",            events, _idleEntries);        for (int indx = 0; indx < (int)entries.size(); indx++)        {            // The Monitor should only look at entries in the table that are            // IDLE (i.e., owned by the Monitor).            if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&                (FD_ISSET(entries[indx].socket, &fdread)))            {                MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    "Monitor::run indx = %d, queueId =  %d, q = %p",                    indx, entries[indx].queueId, q);                PEGASUS_ASSERT(q !=0);                try                {                    if (entries[indx]._type == Monitor::CONNECTION)                    {                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                            "entries[indx].type for indx = %d is "                                "Monitor::CONNECTION",                            indx);                        static_cast<HTTPConnection *>(q)->_entry_index = indx;                        // Do not update the entry just yet. The entry gets                        // updated once the request has been read.                        //entries[indx]._status = _MonitorEntry::BUSY;                        // If allocate_and_awaken failure, retry on next                        // iteration/* Removed for PEP 183.                        if (!MessageQueueService::get_thread_pool()->                                allocate_and_awaken((void *)q, _dispatch))                        {                            Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                                "Monitor::run: Insufficient resources to "                                    "process request.");                            entries[indx]._status = _MonitorEntry::IDLE;                            return true;                        }*/// Added for PEP 183                        HTTPConnection *dst =                            reinterpret_cast<HTTPConnection *>(q);                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                            "Monitor::_dispatch: entering run() for "                                "indx = %d, queueId = %d, q = %p",                            dst->_entry_index,                            dst->_monitor->_entries[dst->_entry_index].queueId,                            dst);                        try                        {                            dst->run(1);                        }                        catch (...)                        {                            Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                                "Monitor::_dispatch: exception received");                        }                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                            "Monitor::_dispatch: exited run() for index %d",                            dst->_entry_index);                        // It is possible the entry status may not be set to                        // busy.  The following will fail in that case.                        // PEGASUS_ASSERT(dst->_monitor->_entries[                        //     dst->_entry_index]._status.get() ==                        //    _MonitorEntry::BUSY);                        // Once the HTTPConnection thread has set the status                        // value to either Monitor::DYING or Monitor::IDLE,                        // it has returned control of the connection to the                        // Monitor.  It is no longer permissible to access                        // the connection or the entry in the _entries table.                        // The following is not relevant as the worker thread                        // or the reader thread will update the status of the                        // entry.                        //if (dst->_connectionClosePending)                        //{                        //  dst->_monitor->_entries[dst->_entry_index]._status =                        //    _MonitorEntry::DYING;                        //}                        //else                        //{                        //  dst->_monitor->_entries[dst->_entry_index]._status =                        //    _MonitorEntry::IDLE;                        //}// end Added for PEP 183                    }                    else if (entries[indx]._type == Monitor::INTERNAL)                    {                        // set ourself to BUSY,                        // read the data                        // and set ourself back to IDLE                        entries[indx]._status = _MonitorEntry::BUSY;                        static char buffer[2];                        Sint32 amt =                            Socket::read(entries[indx].socket,&buffer, 2);                        if (amt == PEGASUS_SOCKET_ERROR &&                            getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)                        {                            Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                                "Monitor::run: Tickler socket got an IO error. "                                    "Going to re-create Socket and wait for "                                    "TCP/IP restart.");                            uninitializeTickler();                            initializeTickler();                        }                        else                        {                            entries[indx]._status = _MonitorEntry::IDLE;                        }                    }                    else                    {                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                            "Non-connection entry, indx = %d, has been "                                "received.",                            indx);                        int events = 0;                        events |= SocketMessage::READ;                        Message* msg = new SocketMessage(                            entries[indx].socket, events);                        entries[indx]._status = _MonitorEntry::BUSY;                        _entry_mut.unlock();                        q->enqueue(msg);                        _entry_mut.lock();                        // After enqueue a message and the autoEntryMutex has                        // been released and locked again, the array of                        // entries can be changed. The ArrayIterator has be                        // reset with the original _entries                        entries.reset(_entries);                        entries[indx]._status = _MonitorEntry::IDLE;                    }                }                catch (...)                {                }            }        }    }}void Monitor::stopListeningForConnections(Boolean wait){    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");    // set boolean then tickle the server to recognize _stopConnections    _stopConnections = 1;    tickle();    if (wait)    {      // Wait for the monitor to notice _stopConnections.  Otherwise the      // caller of this function may unbind the ports while the monitor      // is still accepting connections on them.      _stopConnectionsSem.wait();    }    PEG_METHOD_EXIT();}int Monitor::solicitSocketMessages(    SocketHandle socket,    Uint32 events,    Uint32 queueId,    int type){    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");    AutoMutex autoMut(_entry_mut);    // Check to see if we need to dynamically grow the _entries array    // We always want the _entries array to 2 bigger than the    // current connections requested    _solicitSocketCount++;  // bump the count    int size = (int)_entries.size();    if ((int)_solicitSocketCount >= (size-1))    {        for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)        {            _MonitorEntry entry(0, 0, 0);            _entries.append(entry);        }    }    int index;    for (index = 1; index < (int)_entries.size(); index++)    {        try        {            if (_entries[index]._status.get() == _MonitorEntry::EMPTY)            {                _entries[index].socket = socket;                _entries[index].queueId  = queueId;                _entries[index]._type = type;                _entries[index]._status = _MonitorEntry::IDLE;                return index;            }        }        catch (...)        {        }    }    // decrease the count, if we are here we didn't do anything meaningful    _solicitSocketCount--;    PEG_METHOD_EXIT();    return -1;}void Monitor::unsolicitSocketMessages(SocketHandle socket){    PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");    AutoMutex autoMut(_entry_mut);    /*        Start at index = 1 because _entries[0] is the tickle entry which        never needs to be EMPTY;    */    unsigned int index;    for (index = 1; index < _entries.size(); index++)    {        if (_entries[index].socket == socket)        {            _entries[index]._status = _MonitorEntry::EMPTY;            _entries[index].socket = PEGASUS_INVALID_SOCKET;            _solicitSocketCount--;            break;        }    }    /*        Dynamic Contraction:        To remove excess entries we will start from the end of the _entries        array and remove all entries with EMPTY status until we find the        first NON EMPTY.  This prevents the positions, of the NON EMPTY        entries, from being changed.    */    index = _entries.size() - 1;    while (_entries[index]._status.get() == _MonitorEntry::EMPTY)    {        if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)                _entries.remove(index);        index--;    }    PEG_METHOD_EXIT();}// Note: this is no longer called with PEP 183.ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm){    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "            "q = %p",        dst->_entry_index,        dst->_monitor->_entries[dst->_entry_index].queueId,        dst);    try    {        dst->run(1);    }    catch (...)    {        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,            "Monitor::_dispatch: exception received");    }    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==        _MonitorEntry::BUSY);    // Once the HTTPConnection thread has set the status value to either    // Monitor::DYING or Monitor::IDLE, it has returned control of the    // connection to the Monitor.  It is no longer permissible to access the    // connection or the entry in the _entries table.    if (dst->_connectionClosePending)    {        dst->_monitor->_entries[dst->_entry_index]._status =            _MonitorEntry::DYING;    }    else    {        dst->_monitor->_entries[dst->_entry_index]._status =            _MonitorEntry::IDLE;    }    return 0;}PEGASUS_NAMESPACE_END

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -