⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transporterregistry.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <my_pthread.h>#include <TransporterRegistry.hpp>#include "TransporterInternalDefinitions.hpp"#include "Transporter.hpp"#include <SocketAuthenticator.hpp>#ifdef NDB_TCP_TRANSPORTER#include "TCP_Transporter.hpp"#endif#ifdef NDB_OSE_TRANSPORTER#include "OSE_Receiver.hpp"#include "OSE_Transporter.hpp"#endif#ifdef NDB_SCI_TRANSPORTER#include "SCI_Transporter.hpp"#endif#ifdef NDB_SHM_TRANSPORTER#include "SHM_Transporter.hpp"extern int g_ndb_shm_signum;#endif#include "TransporterCallback.hpp"#include "NdbOut.hpp"#include <NdbSleep.h>#include <NdbTick.h>#include <InputStream.hpp>#include <OutputStream.hpp>#include <mgmapi/mgmapi.h>#include <mgmapi_internal.h>#include <mgmapi/mgmapi_debug.h>#include <EventLogger.hpp>extern EventLogger g_eventLogger;struct in_addrTransporterRegistry::get_connect_address(NodeId node_id) const{  return theTransporters[node_id]->m_connect_address;}SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd){  DBUG_ENTER("SocketServer::Session * TransporterService::newSession");  if (m_auth && !m_auth->server_authenticate(sockfd)){    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(0);  }  if (!m_transporter_registry->connect_server(sockfd))  {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(0);  }  DBUG_RETURN(0);}TransporterRegistry::TransporterRegistry(void * callback,					 unsigned _maxTransporters,					 unsigned sizeOfLongSignalMemory){  DBUG_ENTER("TransporterRegistry::TransporterRegistry");  nodeIdSpecified = false;  maxTransporters = _maxTransporters;  sendCounter = 1;  m_mgm_handle= 0;    callbackObj=callback;  theTCPTransporters  = new TCP_Transporter * [maxTransporters];  theSCITransporters  = new SCI_Transporter * [maxTransporters];  theSHMTransporters  = new SHM_Transporter * [maxTransporters];  theOSETransporters  = new OSE_Transporter * [maxTransporters];  theTransporterTypes = new TransporterType   [maxTransporters];  theTransporters     = new Transporter     * [maxTransporters];  performStates       = new PerformState      [maxTransporters];  ioStates            = new IOState           [maxTransporters];     // Initialize member variables  nTransporters    = 0;  nTCPTransporters = 0;  nSCITransporters = 0;  nSHMTransporters = 0;  nOSETransporters = 0;    // Initialize the transporter arrays  for (unsigned i=0; i<maxTransporters; i++) {    theTCPTransporters[i] = NULL;    theSCITransporters[i] = NULL;    theSHMTransporters[i] = NULL;    theOSETransporters[i] = NULL;    theTransporters[i]    = NULL;    performStates[i]      = DISCONNECTED;    ioStates[i]           = NoHalt;  }  theOSEReceiver = 0;  theOSEJunkSocketSend = 0;  theOSEJunkSocketRecv = 0;  DBUG_VOID_RETURN;}void TransporterRegistry::set_mgm_handle(NdbMgmHandle h){  DBUG_ENTER("TransporterRegistry::set_mgm_handle");  if (m_mgm_handle)    ndb_mgm_destroy_handle(&m_mgm_handle);  m_mgm_handle= h;#ifndef DBUG_OFF  if (h)  {    char buf[256];    DBUG_PRINT("info",("handle set with connectstring: %s",		       ndb_mgm_get_connectstring(h,buf, sizeof(buf))));  }  else  {    DBUG_PRINT("info",("handle set to NULL"));  }#endif  DBUG_VOID_RETURN;}TransporterRegistry::~TransporterRegistry(){  DBUG_ENTER("TransporterRegistry::~TransporterRegistry");    removeAll();    delete[] theTCPTransporters;  delete[] theSCITransporters;  delete[] theSHMTransporters;  delete[] theOSETransporters;  delete[] theTransporterTypes;  delete[] theTransporters;  delete[] performStates;  delete[] ioStates;#ifdef NDB_OSE_TRANSPORTER  if(theOSEReceiver != NULL){    theOSEReceiver->destroyPhantom();    delete theOSEReceiver;    theOSEReceiver = 0;  }#endif  if (m_mgm_handle)    ndb_mgm_destroy_handle(&m_mgm_handle);  DBUG_VOID_RETURN;}voidTransporterRegistry::removeAll(){  for(unsigned i = 0; i<maxTransporters; i++){    if(theTransporters[i] != NULL)      removeTransporter(theTransporters[i]->getRemoteNodeId());  }}voidTransporterRegistry::disconnectAll(){  for(unsigned i = 0; i<maxTransporters; i++){    if(theTransporters[i] != NULL)      theTransporters[i]->doDisconnect();  }}boolTransporterRegistry::init(NodeId nodeId) {  DBUG_ENTER("TransporterRegistry::init");  nodeIdSpecified = true;  localNodeId = nodeId;    DEBUG("TransporterRegistry started node: " << localNodeId);    DBUG_RETURN(true);}boolTransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd){  DBUG_ENTER("TransporterRegistry::connect_server");  // read node id from client  // read transporter type  int nodeId, remote_transporter_type= -1;  SocketInputStream s_input(sockfd);  char buf[256];  if (s_input.gets(buf, 256) == 0) {    DBUG_PRINT("error", ("Could not get node id from client"));    DBUG_RETURN(false);  }  int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);  switch (r) {  case 2:    break;  case 1:    // we're running version prior to 4.1.9    // ok, but with no checks on transporter configuration compatability    break;  default:    DBUG_PRINT("error", ("Error in node id from client"));    DBUG_RETURN(false);  }  DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",		      nodeId,remote_transporter_type));  //check that nodeid is valid and that there is an allocated transporter  if ( nodeId < 0 || nodeId >= (int)maxTransporters) {    DBUG_PRINT("error", ("Node id out of range from client"));    DBUG_RETURN(false);  }  if (theTransporters[nodeId] == 0) {      DBUG_PRINT("error", ("No transporter for this node id from client"));      DBUG_RETURN(false);  }  //check that the transporter should be connected  if (performStates[nodeId] != TransporterRegistry::CONNECTING) {    DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));    DBUG_RETURN(false);  }  Transporter *t= theTransporters[nodeId];  // send info about own id (just as response to acknowledge connection)  // send info on own transporter type  SocketOutputStream s_output(sockfd);  s_output.println("%d %d", t->getLocalNodeId(), t->m_type);  if (remote_transporter_type != -1)  {    if (remote_transporter_type != t->m_type)    {      DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",			   t->m_type, remote_transporter_type));      g_eventLogger.error("Incompatible configuration: Transporter type "			  "mismatch with node %d", nodeId);      // wait for socket close for 1 second to let message arrive at client      {	fd_set a_set;	FD_ZERO(&a_set);	FD_SET(sockfd, &a_set);	struct timeval timeout;	timeout.tv_sec  = 1; timeout.tv_usec = 0;	select(sockfd+1, &a_set, 0, 0, &timeout);      }      DBUG_RETURN(false);    }  }  else if (t->m_type == tt_SHM_TRANSPORTER)  {    g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);  }  // setup transporter (transporter responsible for closing sockfd)  t->connect_server(sockfd);  DBUG_RETURN(true);}boolTransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {#ifdef NDB_TCP_TRANSPORTER  if(!nodeIdSpecified){    init(config->localNodeId);  }    if(config->localNodeId != localNodeId)     return false;    if(theTransporters[config->remoteNodeId] != NULL)    return false;     TCP_Transporter * t = new TCP_Transporter(*this,					    config->tcp.sendBufferSize,					    config->tcp.maxReceiveSize,					    config->localHostName,					    config->remoteHostName,					    config->s_port,					    config->isMgmConnection,					    localNodeId,					    config->remoteNodeId,					    config->serverNodeId,					    config->checksum,					    config->signalId);  if (t == NULL)     return false;  else if (!t->initTransporter()) {    delete t;    return false;  }  // Put the transporter in the transporter arrays  theTCPTransporters[nTCPTransporters]      = t;  theTransporters[t->getRemoteNodeId()]     = t;  theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;  performStates[t->getRemoteNodeId()]       = DISCONNECTED;  nTransporters++;  nTCPTransporters++;#if defined NDB_OSE || defined NDB_SOFTOSE  t->theReceiverPid = theReceiverPid;#endif    return true;#else  return false;#endif}boolTransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {#ifdef NDB_OSE_TRANSPORTER  if(!nodeIdSpecified){    init(conf->localNodeId);  }    if(conf->localNodeId != localNodeId)    return false;    if(theTransporters[conf->remoteNodeId] != NULL)    return false;  if(theOSEReceiver == NULL){    theOSEReceiver = new OSE_Receiver(this,				      10,				      localNodeId);  }    OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize,					    conf->ose.prioBSignalSize,					    localNodeId,					    conf->localHostName,					    conf->remoteNodeId,					    conf->serverNodeId,					    conf->remoteHostName,					    conf->checksum,					    conf->signalId);  if (t == NULL)    return false;  else if (!t->initTransporter()) {    delete t;    return false;  }  // Put the transporter in the transporter arrays  theOSETransporters[nOSETransporters]      = t;  theTransporters[t->getRemoteNodeId()]     = t;  theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;  performStates[t->getRemoteNodeId()]       = DISCONNECTED;    nTransporters++;  nOSETransporters++;  return true;#else  return false;#endif}boolTransporterRegistry::createSCITransporter(TransporterConfiguration *config) {#ifdef NDB_SCI_TRANSPORTER  if(!SCI_Transporter::initSCI())    abort();    if(!nodeIdSpecified){    init(config->localNodeId);  }    if(config->localNodeId != localNodeId)    return false;   if(theTransporters[config->remoteNodeId] != NULL)    return false;   SCI_Transporter * t = new SCI_Transporter(*this,                                            config->localHostName,                                            config->remoteHostName,                                            config->s_port,					    config->isMgmConnection,                                            config->sci.sendLimit, 					    config->sci.bufferSize,					    config->sci.nLocalAdapters,					    config->sci.remoteSciNodeId0,					    config->sci.remoteSciNodeId1,					    localNodeId,					    config->remoteNodeId,					    config->serverNodeId,					    config->checksum,					    config->signalId);    if (t == NULL)     return false;  else if (!t->initTransporter()) {    delete t;    return false;  }  // Put the transporter in the transporter arrays  theSCITransporters[nSCITransporters]      = t;  theTransporters[t->getRemoteNodeId()]     = t;  theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;  performStates[t->getRemoteNodeId()]       = DISCONNECTED;  nTransporters++;  nSCITransporters++;    return true;#else  return false;#endif}boolTransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {  DBUG_ENTER("TransporterRegistry::createTransporter SHM");#ifdef NDB_SHM_TRANSPORTER  if(!nodeIdSpecified){    init(config->localNodeId);  }    if(config->localNodeId != localNodeId)    return false;    if (!g_ndb_shm_signum) {    g_ndb_shm_signum= config->shm.signum;    DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));    /**     * Make sure to block g_ndb_shm_signum     *   TransporterRegistry::init is run from "main" thread     */    sigset_t mask;    sigemptyset(&mask);    sigaddset(&mask, g_ndb_shm_signum);    pthread_sigmask(SIG_BLOCK, &mask, 0);  }  if(config->shm.signum != g_ndb_shm_signum)    return false;    if(theTransporters[config->remoteNodeId] != NULL)    return false;  SHM_Transporter * t = new SHM_Transporter(*this,					    config->localHostName,					    config->remoteHostName,					    config->s_port,					    config->isMgmConnection,					    localNodeId,					    config->remoteNodeId,					    config->serverNodeId,					    config->checksum,					    config->signalId,					    config->shm.shmKey,					    config->shm.shmSize					    );  if (t == NULL)    return false;  else if (!t->initTransporter()) {    delete t;    return false;  }  // Put the transporter in the transporter arrays  theSHMTransporters[nSHMTransporters]      = t;  theTransporters[t->getRemoteNodeId()]     = t;  theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;  performStates[t->getRemoteNodeId()]       = DISCONNECTED;    nTransporters++;  nSHMTransporters++;  DBUG_RETURN(true);#else  DBUG_RETURN(false);#endif}voidTransporterRegistry::removeTransporter(NodeId nodeId) {  DEBUG("Removing transporter from " << localNodeId	<< " to " << nodeId);    if(theTransporters[nodeId] == NULL)    return;    theTransporters[nodeId]->doDisconnect();    const TransporterType type = theTransporterTypes[nodeId];  int ind = 0;  switch(type){  case tt_TCP_TRANSPORTER:#ifdef NDB_TCP_TRANSPORTER    for(; ind < nTCPTransporters; ind++)      if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)	break;    ind++;    for(; ind<nTCPTransporters; ind++)      theTCPTransporters[ind-1] = theTCPTransporters[ind];    nTCPTransporters --;#endif    break;  case tt_SCI_TRANSPORTER:#ifdef NDB_SCI_TRANSPORTER    for(; ind < nSCITransporters; ind++)      if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)	break;    ind++;    for(; ind<nSCITransporters; ind++)      theSCITransporters[ind-1] = theSCITransporters[ind];    nSCITransporters --;#endif    break;  case tt_SHM_TRANSPORTER:#ifdef NDB_SHM_TRANSPORTER    for(; ind < nSHMTransporters; ind++)      if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)	break;    ind++;    for(; ind<nSHMTransporters; ind++)      theSHMTransporters[ind-1] = theSHMTransporters[ind];    nSHMTransporters --;#endif    break;  case tt_OSE_TRANSPORTER:#ifdef NDB_OSE_TRANSPORTER    for(; ind < nOSETransporters; ind++)      if(theOSETransporters[ind]->getRemoteNodeId() == nodeId)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -