📄 monitor.cpp
字号:
//%2006//////////////////////////////////////////////////////////////////////////// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;// IBM Corp.; EMC Corporation, The Open Group.// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;// EMC Corporation; VERITAS Software Corporation; The Open Group.// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;// EMC Corporation; Symantec Corporation; The Open Group.//// Permission is hereby granted, free of charge, to any person obtaining a copy// of this software and associated documentation files (the "Software"), to// deal in the Software without restriction, including without limitation the// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or// sell copies of the Software, and to permit persons to whom the Software is// furnished to do so, subject to the following conditions:// // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.////==============================================================================////%/////////////////////////////////////////////////////////////////////////////#include "Network.h"#include <Pegasus/Common/Config.h>#include <cstring>#include "Monitor.h"#include "MessageQueue.h"#include "Socket.h"#include <Pegasus/Common/Tracer.h>#include <Pegasus/Common/HTTPConnection.h>#include <Pegasus/Common/MessageQueueService.h>#include <Pegasus/Common/Exception.h>#include "ArrayIterator.h"#include <errno.h>PEGASUS_USING_STD;PEGASUS_NAMESPACE_BEGINstatic AtomicInt _connections(0);//////////////////////////////////////////////////////////////////////////////////// Monitor//////////////////////////////////////////////////////////////////////////////////#define MAX_NUMBER_OF_MONITOR_ENTRIES 32Monitor::Monitor() : _stopConnections(0), _stopConnectionsSem(0), _solicitSocketCount(0), _tickle_client_socket(-1), _tickle_server_socket(-1), _tickle_peer_socket(-1){ int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES; Socket::initializeInterface(); _entries.reserveCapacity(numberOfMonitorEntriesToAllocate); // setup the tickler initializeTickler(); // Start the count at 1 because initilizeTickler() // has added an entry in the first position of the // _entries array for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++) { _MonitorEntry entry(0, 0, 0); _entries.append(entry); }}Monitor::~Monitor(){ uninitializeTickler(); Socket::uninitializeInterface(); Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "returning from monitor destructor");}void Monitor::uninitializeTickler(){ Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface"); try { if (_tickle_peer_socket >= 0) { Socket::close(_tickle_peer_socket); } if (_tickle_client_socket >= 0) { Socket::close(_tickle_client_socket); } if (_tickle_server_socket >= 0) { Socket::close(_tickle_server_socket); } } catch (...) { Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Failed to close tickle sockets"); }}void Monitor::initializeTickler(){ /* NOTE: On any errors trying to setup out tickle connection, throw an exception/end the server */ /* setup the tickle server/listener */ // try until the tcpip is restarted do { // get a socket for the server side if ((_tickle_server_socket = Socket::createSocket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET) { MessageLoaderParms parms( "Common.Monitor.TICKLE_CREATE", "Received error number $0 while creating the internal socket.", getSocketError()); throw Exception(parms); } // initialize the address memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));#ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM#pragma convert(37)#endif _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");#ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM#pragma convert(0)#endif _tickle_server_addr.sin_family = PF_INET; _tickle_server_addr.sin_port = 0; SocketLength _addr_size = sizeof(_tickle_server_addr); // bind server side to socket if ((::bind(_tickle_server_socket, reinterpret_cast<struct sockaddr*>(&_tickle_server_addr), sizeof(_tickle_server_addr))) < 0) {#ifdef PEGASUS_OS_ZOS MessageLoaderParms parms( "Common.Monitor.TICKLE_BIND_LONG", "Received error:$0 while binding the internal socket.", strerror(errno));#else MessageLoaderParms parms( "Common.Monitor.TICKLE_BIND", "Received error number $0 while binding the internal socket.", getSocketError());#endif throw Exception(parms); } // tell the kernel we are a server if ((::listen(_tickle_server_socket, 3)) < 0) { MessageLoaderParms parms( "Common.Monitor.TICKLE_LISTEN", "Received error number $0 while listening to the internal " "socket.", getSocketError()); throw Exception(parms); } // make sure we have the correct socket for our server int sock = ::getsockname( _tickle_server_socket, reinterpret_cast<struct sockaddr*>(&_tickle_server_addr), &_addr_size); if (sock < 0) { MessageLoaderParms parms( "Common.Monitor.TICKLE_SOCKNAME", "Received error number $0 while getting the internal socket " "name.", getSocketError()); throw Exception(parms); } /* set up the tickle client/connector */ // get a socket for our tickle client if ((_tickle_client_socket = Socket::createSocket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET) { MessageLoaderParms parms( "Common.Monitor.TICKLE_CLIENT_CREATE", "Received error number $0 while creating the internal client " "socket.", getSocketError()); throw Exception(parms); } // setup the address of the client memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));#ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM# pragma convert(37)#endif _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");#ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM# pragma convert(0)#endif _tickle_client_addr.sin_family = PF_INET; _tickle_client_addr.sin_port = 0; // bind socket to client side if ((::bind(_tickle_client_socket, reinterpret_cast<struct sockaddr*>(&_tickle_client_addr), sizeof(_tickle_client_addr))) < 0) { MessageLoaderParms parms( "Common.Monitor.TICKLE_CLIENT_BIND", "Received error number $0 while binding the internal client " "socket.", getSocketError()); throw Exception(parms); } // connect to server side if ((::connect(_tickle_client_socket, reinterpret_cast<struct sockaddr*>(&_tickle_server_addr), sizeof(_tickle_server_addr))) < 0) { MessageLoaderParms parms( "Common.Monitor.TICKLE_CLIENT_CONNECT", "Received error number $0 while connecting the internal " "client socket.", getSocketError()); throw Exception(parms); } /* set up the slave connection */ memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr)); SocketLength peer_size = sizeof(_tickle_peer_addr); Threads::sleep(1); // this call may fail, we will try a max of 20 times to establish // this peer connection if ((_tickle_peer_socket = ::accept(_tickle_server_socket, reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr), &peer_size)) < 0) { if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR && getSocketError() == PEGASUS_NETWORK_TRYAGAIN) { int retries = 0; do { Threads::sleep(1); _tickle_peer_socket = ::accept( _tickle_server_socket, reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr), &peer_size); retries++; } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR && getSocketError() == PEGASUS_NETWORK_TRYAGAIN && retries < 20); } // TCP/IP is down, destroy sockets and retry again. if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR && getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED) { // destroy everything uninitializeTickler(); // retry again. continue; } } if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR) { MessageLoaderParms parms( "Common.Monitor.TICKLE_ACCEPT", "Received error number $0 while accepting the internal " "socket connection.", getSocketError()); throw Exception(parms); } else { // socket is ok break; } } while (1); // try until TCP/IP is restarted Socket::disableBlocking(_tickle_peer_socket); Socket::disableBlocking(_tickle_client_socket); // add the tickler to the list of entries to be monitored and set to // IDLE because Monitor only // checks entries with IDLE state for events _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL); entry._status = _MonitorEntry::IDLE; // is the tickler initalized as first socket on startup ? if (_entries.size()==0) { // if yes, append a new entry _entries.append(entry); } else { // if not, overwrite the tickler entry with new socket _entries[0]=entry; }}void Monitor::tickle(){ static char _buffer[] = { '0','0' }; AutoMutex autoMutex(_tickle_mutex); Socket::write(_tickle_client_socket,&_buffer, 2);}void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status ){ // Set the state to requested state _entries[index]._status = status;}void Monitor::run(Uint32 milliseconds){ int i = 0; struct timeval tv = {milliseconds/1000, milliseconds%1000*1000}; fd_set fdread; FD_ZERO(&fdread); AutoMutex autoEntryMutex(_entry_mut); ArrayIterator<_MonitorEntry> entries(_entries); // Check the stopConnections flag. If set, clear the Acceptor monitor // entries if (_stopConnections.get() == 1) { for ( int indx = 0; indx < (int)entries.size(); indx++) { if (entries[indx]._type == Monitor::ACCEPTOR) { if ( entries[indx]._status.get() != _MonitorEntry::EMPTY) { if ( entries[indx]._status.get() == _MonitorEntry::IDLE || entries[indx]._status.get() == _MonitorEntry::DYING ) { // remove the entry entries[indx]._status = _MonitorEntry::EMPTY; } else { // set status to DYING entries[indx]._status = _MonitorEntry::DYING; } } } } _stopConnections = 0; _stopConnectionsSem.signal(); } for (int indx = 0; indx < (int)entries.size(); indx++) { const _MonitorEntry &entry = entries[indx]; if ((entry._status.get() == _MonitorEntry::DYING) && (entry._type == Monitor::CONNECTION)) { MessageQueue *q = MessageQueue::lookup(entry.queueId); PEGASUS_ASSERT(q != 0); HTTPConnection &h = *static_cast<HTTPConnection *>(q); if (h._connectionClosePending == false) continue; // NOTE: do not attempt to delete while there are pending responses
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -