📄 monitor.cpp
字号:
// coming thru. The last response to come thru after a // _connectionClosePending will reset _responsePending to false // and then cause the monitor to rerun this code and clean up. // (see HTTPConnection.cpp) if (h._responsePending == true) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - Ignoring connection delete request " "because responses are still pending. " "connection=0x%p, socket=%d\n", (void *)&h, h.getSocket()); continue; } h._connectionClosePending = false; MessageQueue &o = h.get_owner(); Message* message= new CloseConnectionMessage(entry.socket); message->dest = o.getQueueId(); // HTTPAcceptor is responsible for closing the connection. // The lock is released to allow HTTPAcceptor to call // unsolicitSocketMessages to free the entry. // Once HTTPAcceptor completes processing of the close // connection, the lock is re-requested and processing of // the for loop continues. This is safe with the current // implementation of the entries object. Note that the // loop condition accesses the entries.size() on each // iteration, so that a change in size while the mutex is // unlocked will not result in an ArrayIndexOutOfBounds // exception. _entry_mut.unlock(); o.enqueue(message); _entry_mut.lock(); // After enqueue a message and the autoEntryMutex has been // released and locked again, the array of _entries can be // changed. The ArrayIterator has be reset with the original // _entries. entries.reset(_entries); } } Uint32 _idleEntries = 0; /* We will keep track of the maximum socket number and pass this value to the kernel as a parameter to SELECT. This loop seems like a good place to calculate the max file descriptor (maximum socket number) because we have to traverse the entire array. */ SocketHandle maxSocketCurrentPass = 0; for (int indx = 0; indx < (int)entries.size(); indx++) { if (maxSocketCurrentPass < entries[indx].socket) maxSocketCurrentPass = entries[indx].socket; if (entries[indx]._status.get() == _MonitorEntry::IDLE) { _idleEntries++; FD_SET(entries[indx].socket, &fdread); } } /* Add 1 then assign maxSocket accordingly. We add 1 to account for descriptors starting at 0. */ maxSocketCurrentPass++; _entry_mut.unlock(); // // The first argument to select() is ignored on Windows and it is not // a socket value. The original code assumed that the number of sockets // and a socket value have the same type. On Windows they do not. //#ifdef PEGASUS_OS_TYPE_WINDOWS int events = select(0, &fdread, NULL, NULL, &tv);#else int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);#endif _entry_mut.lock(); // After enqueue a message and the autoEntryMutex has been released and // locked again, the array of _entries can be changed. The ArrayIterator // has be reset with the original _entries entries.reset(_entries); if (events == PEGASUS_SOCKET_ERROR) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - errorno = %d has occurred on select.", errno); // The EBADF error indicates that one or more or the file // descriptions was not valid. This could indicate that // the entries structure has been corrupted or that // we have a synchronization error. PEGASUS_ASSERT(errno != EBADF); } else if (events) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run select event received events = %d, monitoring %d " "idle entries", events, _idleEntries); for (int indx = 0; indx < (int)entries.size(); indx++) { // The Monitor should only look at entries in the table that are // IDLE (i.e., owned by the Monitor). if ((entries[indx]._status.get() == _MonitorEntry::IDLE) && (FD_ISSET(entries[indx].socket, &fdread))) { MessageQueue *q = MessageQueue::lookup(entries[indx].queueId); Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run indx = %d, queueId = %d, q = %p", indx, entries[indx].queueId, q); PEGASUS_ASSERT(q !=0); try { if (entries[indx]._type == Monitor::CONNECTION) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "entries[indx].type for indx = %d is " "Monitor::CONNECTION", indx); static_cast<HTTPConnection *>(q)->_entry_index = indx; // Do not update the entry just yet. The entry gets // updated once the request has been read. //entries[indx]._status = _MonitorEntry::BUSY; // If allocate_and_awaken failure, retry on next // iteration/* Removed for PEP 183. if (!MessageQueueService::get_thread_pool()-> allocate_and_awaken((void *)q, _dispatch)) { Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, "Monitor::run: Insufficient resources to " "process request."); entries[indx]._status = _MonitorEntry::IDLE; return true; }*/// Added for PEP 183 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q); Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::_dispatch: entering run() for " "indx = %d, queueId = %d, q = %p", dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst); try { dst->run(1); } catch (...) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::_dispatch: exception received"); } Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::_dispatch: exited run() for index %d", dst->_entry_index); // It is possible the entry status may not be set to // busy. The following will fail in that case. // PEGASUS_ASSERT(dst->_monitor->_entries[ // dst->_entry_index]._status.get() == // _MonitorEntry::BUSY); // Once the HTTPConnection thread has set the status // value to either Monitor::DYING or Monitor::IDLE, // it has returned control of the connection to the // Monitor. It is no longer permissible to access // the connection or the entry in the _entries table. // The following is not relevant as the worker thread // or the reader thread will update the status of the // entry. //if (dst->_connectionClosePending) //{ // dst->_monitor->_entries[dst->_entry_index]._status = // _MonitorEntry::DYING; //} //else //{ // dst->_monitor->_entries[dst->_entry_index]._status = // _MonitorEntry::IDLE; //}// end Added for PEP 183 } else if (entries[indx]._type == Monitor::INTERNAL) { // set ourself to BUSY, // read the data // and set ourself back to IDLE entries[indx]._status = _MonitorEntry::BUSY; static char buffer[2]; Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2); if (amt == PEGASUS_SOCKET_ERROR && getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run: Tickler socket got an IO error. " "Going to re-create Socket and wait for " "TCP/IP restart."); uninitializeTickler(); initializeTickler(); } else { entries[indx]._status = _MonitorEntry::IDLE; } } else { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Non-connection entry, indx = %d, has been " "received.", indx); int events = 0; events |= SocketMessage::READ; Message* msg = new SocketMessage( entries[indx].socket, events); entries[indx]._status = _MonitorEntry::BUSY; _entry_mut.unlock(); q->enqueue(msg); _entry_mut.lock(); // After enqueue a message and the autoEntryMutex has // been released and locked again, the array of // entries can be changed. The ArrayIterator has be // reset with the original _entries entries.reset(_entries); entries[indx]._status = _MonitorEntry::IDLE; } } catch (...) { } } } }}void Monitor::stopListeningForConnections(Boolean wait){ PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()"); // set boolean then tickle the server to recognize _stopConnections _stopConnections = 1; tickle(); if (wait) { // Wait for the monitor to notice _stopConnections. Otherwise the // caller of this function may unbind the ports while the monitor // is still accepting connections on them. _stopConnectionsSem.wait(); } PEG_METHOD_EXIT();}int Monitor::solicitSocketMessages( SocketHandle socket, Uint32 events, Uint32 queueId, int type){ PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages"); AutoMutex autoMut(_entry_mut); // Check to see if we need to dynamically grow the _entries array // We always want the _entries array to 2 bigger than the // current connections requested _solicitSocketCount++; // bump the count int size = (int)_entries.size(); if ((int)_solicitSocketCount >= (size-1)) { for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++) { _MonitorEntry entry(0, 0, 0); _entries.append(entry); } } int index; for (index = 1; index < (int)_entries.size(); index++) { try { if (_entries[index]._status.get() == _MonitorEntry::EMPTY) { _entries[index].socket = socket; _entries[index].queueId = queueId; _entries[index]._type = type; _entries[index]._status = _MonitorEntry::IDLE; return index; } } catch (...) { } } // decrease the count, if we are here we didn't do anything meaningful _solicitSocketCount--; PEG_METHOD_EXIT(); return -1;}void Monitor::unsolicitSocketMessages(SocketHandle socket){ PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages"); AutoMutex autoMut(_entry_mut); /* Start at index = 1 because _entries[0] is the tickle entry which never needs to be EMPTY; */ unsigned int index; for (index = 1; index < _entries.size(); index++) { if (_entries[index].socket == socket) { _entries[index]._status = _MonitorEntry::EMPTY; _entries[index].socket = PEGASUS_INVALID_SOCKET; _solicitSocketCount--; break; } } /* Dynamic Contraction: To remove excess entries we will start from the end of the _entries array and remove all entries with EMPTY status until we find the first NON EMPTY. This prevents the positions, of the NON EMPTY entries, from being changed. */ index = _entries.size() - 1; while (_entries[index]._status.get() == _MonitorEntry::EMPTY) { if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES) _entries.remove(index); index--; } PEG_METHOD_EXIT();}// Note: this is no longer called with PEP 183.ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm){ HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm); Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, " "q = %p", dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst); try { dst->run(1); } catch (...) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::_dispatch: exception received"); } Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::_dispatch: exited run() for index %d", dst->_entry_index); PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY); // Once the HTTPConnection thread has set the status value to either // Monitor::DYING or Monitor::IDLE, it has returned control of the // connection to the Monitor. It is no longer permissible to access the // connection or the entry in the _entries table. if (dst->_connectionClosePending) { dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING; } else { dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE; } return 0;}PEGASUS_NAMESPACE_END
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -