socketregistry.hpp

来自「MySQL数据库开发源码 值得一看哦」· HPP 代码 · 共 291 行

HPP
291
字号
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#ifndef SocketClientRegistry_H#define SocketClientRegistry_H#include <NdbTCP.h>#include <NdbOut.hpp>#include "SocketClient.hpp" template<class T>class SocketRegistry {public:  SocketRegistry(Uint32 maxSocketClients);  ~SocketRegistry();  /**   * creates and adds a SocketClient to m_socketClients[]   * @param host - host name   * @param port - port to connect to   */  bool createSocketClient(const char * host, const Uint16 port);  /**   * performReceive reads from sockets should do more stuff    */  int performReceive(T &);  /**   * performReceive reads from sockets should do more stuff    */  int syncPerformReceive(const char* ,T &, Uint32);  /**   * performSend sends a command to a host   */  bool performSend(const char * buf, Uint32 len, const char * remotehost);  /**   * pollSocketClients performs a select (for a max. of timeoutmillis) or   * until there is data to be read from any SocketClient   * @param timeOutMillis - select timeout   */  int pollSocketClients(Uint32 timeOutMillis);  /**   * reconnect tries to reconnect to a cpcd given its hostname   * @param host - name of host to reconnect to.   */  bool reconnect(const char * host);  /**   * removeSocketClient   * @param host - name of host for which to remove the SocketConnection   */  bool removeSocketClient(const char * host);private:  SocketClient** m_socketClients;  Uint32 m_maxSocketClients;  Uint32 m_nSocketClients;  int tcpReadSelectReply;  fd_set tcpReadset;  };template<class T>inlineSocketRegistry<T>::SocketRegistry(Uint32 maxSocketClients) {  m_maxSocketClients = maxSocketClients;  m_socketClients  = new SocketClient * [m_maxSocketClients];  m_nSocketClients = 0;}template<class T>inlineSocketRegistry<T>::~SocketRegistry() {  delete [] m_socketClients;}template<class T>inlinebool SocketRegistry<T>::createSocketClient(const char * host, Uint16 port) {  if(port == 0)    return false;  if(host==NULL)    return false;    SocketClient * socketClient = new SocketClient(host, port);  if(socketClient->openSocket() < 0 || socketClient == NULL) {    ndbout << "could not connect" << endl;    delete socketClient;    return false;  }  else {    m_socketClients[m_nSocketClients] = socketClient;    m_nSocketClients++;  }  return true;}template<class T>inlineint SocketRegistry<T>::pollSocketClients(Uint32 timeOutMillis) {  // Return directly if there are no TCP transporters configured  if (m_nSocketClients == 0){    tcpReadSelectReply = 0;    return 0;  }  struct timeval timeout;  timeout.tv_sec  = timeOutMillis / 1000;  timeout.tv_usec = (timeOutMillis % 1000) * 1000;  NDB_SOCKET_TYPE maxSocketValue = 0;    // Needed for TCP/IP connections  // The read- and writeset are used by select    FD_ZERO(&tcpReadset);  // Prepare for sending and receiving  for (Uint32 i = 0; i < m_nSocketClients; i++) {    SocketClient * t = m_socketClients[i];        // If the socketclient is connected    if (t->isConnected()) {            const NDB_SOCKET_TYPE socket = t->getSocket();      // Find the highest socket value. It will be used by select      if (socket > maxSocketValue)	maxSocketValue = socket;            // Put the connected transporters in the socket read-set       FD_SET(socket, &tcpReadset);    }  }    // The highest socket value plus one  maxSocketValue++;     tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);  #ifdef NDB_WIN32  if(tcpReadSelectReply == SOCKET_ERROR)  {    NdbSleep_MilliSleep(timeOutMillis);  }#endif  return tcpReadSelectReply;}template<class T>inlinebool SocketRegistry<T>::performSend(const char * buf, Uint32 len, const char * remotehost){  SocketClient * socketClient;  for(Uint32 i=0; i < m_nSocketClients; i++) {    socketClient = m_socketClients[i];    if(strcmp(socketClient->gethostname(), remotehost)==0) {      if(socketClient->isConnected()) {	if(socketClient->writeSocket(buf, len)>0)	  return true;	else	  return false;      }    }  }  return false;}template<class T>inlineint SocketRegistry<T>::performReceive(T & t) {    char buf[255] ; //temp. just for testing. must fix better  if(tcpReadSelectReply > 0){    for (Uint32 i=0; i<m_nSocketClients; i++) {      SocketClient *sc = m_socketClients[i];      const NDB_SOCKET_TYPE socket    = sc->getSocket();      if(sc->isConnected() && FD_ISSET(socket, &tcpReadset)) {	t->runSession(socket,t);      }    }    return 1;  }  return 0;  }template<class T>inlineint SocketRegistry<T>::syncPerformReceive(const char * remotehost,				      T & t,				      Uint32 timeOutMillis) {    char buf[255] ; //temp. just for testing. must fix better  struct timeval timeout;  timeout.tv_sec  = timeOutMillis / 1000;  timeout.tv_usec = (timeOutMillis % 1000) * 1000;  int reply;  SocketClient * sc;  for(Uint32 i=0; i < m_nSocketClients; i++) {    sc = m_socketClients[i];    if(strcmp(sc->gethostname(), remotehost)==0) {      if(sc->isConnected()) {	/*FD_ZERO(&tcpReadset);	  reply = select(sc->getSocket()+1, 0, 0, 0, &timeout);	reply=1;	if(reply > 0) {*/	  t.runSession(sc->getSocket(), t);	  //}      }          }  }  }template<class T>inlinebool SocketRegistry<T>::reconnect(const char * host){  for(Uint32 i=0; i < m_nSocketClients; i++) {    SocketClient * socketClient = m_socketClients[i];    if(strcmp(socketClient->gethostname(), host)==0) {      if(!socketClient->isConnected()) {	if(socketClient->openSocket() > 0)	  return true;	else return false;      }    }  }  return false;}template<class T>inlinebool SocketRegistry<T>::removeSocketClient(const char * host){  for(Uint32 i=0; i < m_nSocketClients; i++) {    SocketClient * socketClient = m_socketClients[i];    if(strcmp(socketClient->gethostname(), host)==0) {      if(!socketClient->isConnected()) {	if(socketClient->closeSocket() > 0) {	  delete socketClient;	  return true;	}	else return false;      }    }  }  return false;}#endif // Define of SocketRegistry

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?