📄 transporterregistry.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <my_pthread.h>#include <TransporterRegistry.hpp>#include "TransporterInternalDefinitions.hpp"#include "Transporter.hpp"#include <SocketAuthenticator.hpp>#ifdef NDB_TCP_TRANSPORTER#include "TCP_Transporter.hpp"#endif#ifdef NDB_OSE_TRANSPORTER#include "OSE_Receiver.hpp"#include "OSE_Transporter.hpp"#endif#ifdef NDB_SCI_TRANSPORTER#include "SCI_Transporter.hpp"#endif#ifdef NDB_SHM_TRANSPORTER#include "SHM_Transporter.hpp"extern int g_ndb_shm_signum;#endif#include "TransporterCallback.hpp"#include "NdbOut.hpp"#include <NdbSleep.h>#include <NdbTick.h>#include <InputStream.hpp>#include <OutputStream.hpp>#include <mgmapi/mgmapi.h>#include <mgmapi_internal.h>#include <mgmapi/mgmapi_debug.h>#include <EventLogger.hpp>extern EventLogger g_eventLogger;struct in_addrTransporterRegistry::get_connect_address(NodeId node_id) const{ return theTransporters[node_id]->m_connect_address;}SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd){ DBUG_ENTER("SocketServer::Session * TransporterService::newSession"); if (m_auth && !m_auth->server_authenticate(sockfd)){ NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(0); } if (!m_transporter_registry->connect_server(sockfd)) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(0); } DBUG_RETURN(0);}TransporterRegistry::TransporterRegistry(void * callback, unsigned _maxTransporters, unsigned sizeOfLongSignalMemory){ DBUG_ENTER("TransporterRegistry::TransporterRegistry"); nodeIdSpecified = false; maxTransporters = _maxTransporters; sendCounter = 1; m_mgm_handle= 0; callbackObj=callback; theTCPTransporters = new TCP_Transporter * [maxTransporters]; theSCITransporters = new SCI_Transporter * [maxTransporters]; theSHMTransporters = new SHM_Transporter * [maxTransporters]; theOSETransporters = new OSE_Transporter * [maxTransporters]; theTransporterTypes = new TransporterType [maxTransporters]; theTransporters = new Transporter * [maxTransporters]; performStates = new PerformState [maxTransporters]; ioStates = new IOState [maxTransporters]; // Initialize member variables nTransporters = 0; nTCPTransporters = 0; nSCITransporters = 0; nSHMTransporters = 0; nOSETransporters = 0; // Initialize the transporter arrays for (unsigned i=0; i<maxTransporters; i++) { theTCPTransporters[i] = NULL; theSCITransporters[i] = NULL; theSHMTransporters[i] = NULL; theOSETransporters[i] = NULL; theTransporters[i] = NULL; performStates[i] = DISCONNECTED; ioStates[i] = NoHalt; } theOSEReceiver = 0; theOSEJunkSocketSend = 0; theOSEJunkSocketRecv = 0; DBUG_VOID_RETURN;}void TransporterRegistry::set_mgm_handle(NdbMgmHandle h){ DBUG_ENTER("TransporterRegistry::set_mgm_handle"); if (m_mgm_handle) ndb_mgm_destroy_handle(&m_mgm_handle); m_mgm_handle= h;#ifndef DBUG_OFF if (h) { char buf[256]; DBUG_PRINT("info",("handle set with connectstring: %s", ndb_mgm_get_connectstring(h,buf, sizeof(buf)))); } else { DBUG_PRINT("info",("handle set to NULL")); }#endif DBUG_VOID_RETURN;}TransporterRegistry::~TransporterRegistry(){ DBUG_ENTER("TransporterRegistry::~TransporterRegistry"); removeAll(); delete[] theTCPTransporters; delete[] theSCITransporters; delete[] theSHMTransporters; delete[] theOSETransporters; delete[] theTransporterTypes; delete[] theTransporters; delete[] performStates; delete[] ioStates;#ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != NULL){ theOSEReceiver->destroyPhantom(); delete theOSEReceiver; theOSEReceiver = 0; }#endif if (m_mgm_handle) ndb_mgm_destroy_handle(&m_mgm_handle); DBUG_VOID_RETURN;}voidTransporterRegistry::removeAll(){ for(unsigned i = 0; i<maxTransporters; i++){ if(theTransporters[i] != NULL) removeTransporter(theTransporters[i]->getRemoteNodeId()); }}voidTransporterRegistry::disconnectAll(){ for(unsigned i = 0; i<maxTransporters; i++){ if(theTransporters[i] != NULL) theTransporters[i]->doDisconnect(); }}boolTransporterRegistry::init(NodeId nodeId) { DBUG_ENTER("TransporterRegistry::init"); nodeIdSpecified = true; localNodeId = nodeId; DEBUG("TransporterRegistry started node: " << localNodeId); DBUG_RETURN(true);}boolTransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd){ DBUG_ENTER("TransporterRegistry::connect_server"); // read node id from client // read transporter type int nodeId, remote_transporter_type= -1; SocketInputStream s_input(sockfd); char buf[256]; if (s_input.gets(buf, 256) == 0) { DBUG_PRINT("error", ("Could not get node id from client")); DBUG_RETURN(false); } int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type); switch (r) { case 2: break; case 1: // we're running version prior to 4.1.9 // ok, but with no checks on transporter configuration compatability break; default: DBUG_PRINT("error", ("Error in node id from client")); DBUG_RETURN(false); } DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d", nodeId,remote_transporter_type)); //check that nodeid is valid and that there is an allocated transporter if ( nodeId < 0 || nodeId >= (int)maxTransporters) { DBUG_PRINT("error", ("Node id out of range from client")); DBUG_RETURN(false); } if (theTransporters[nodeId] == 0) { DBUG_PRINT("error", ("No transporter for this node id from client")); DBUG_RETURN(false); } //check that the transporter should be connected if (performStates[nodeId] != TransporterRegistry::CONNECTING) { DBUG_PRINT("error", ("Transporter in wrong state for this node id from client")); DBUG_RETURN(false); } Transporter *t= theTransporters[nodeId]; // send info about own id (just as response to acknowledge connection) // send info on own transporter type SocketOutputStream s_output(sockfd); s_output.println("%d %d", t->getLocalNodeId(), t->m_type); if (remote_transporter_type != -1) { if (remote_transporter_type != t->m_type) { DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d", t->m_type, remote_transporter_type)); g_eventLogger.error("Incompatible configuration: Transporter type " "mismatch with node %d", nodeId); // wait for socket close for 1 second to let message arrive at client { fd_set a_set; FD_ZERO(&a_set); FD_SET(sockfd, &a_set); struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; select(sockfd+1, &a_set, 0, 0, &timeout); } DBUG_RETURN(false); } } else if (t->m_type == tt_SHM_TRANSPORTER) { g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId); } // setup transporter (transporter responsible for closing sockfd) t->connect_server(sockfd); DBUG_RETURN(true);}boolTransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {#ifdef NDB_TCP_TRANSPORTER if(!nodeIdSpecified){ init(config->localNodeId); } if(config->localNodeId != localNodeId) return false; if(theTransporters[config->remoteNodeId] != NULL) return false; TCP_Transporter * t = new TCP_Transporter(*this, config->tcp.sendBufferSize, config->tcp.maxReceiveSize, config->localHostName, config->remoteHostName, config->s_port, config->isMgmConnection, localNodeId, config->remoteNodeId, config->serverNodeId, config->checksum, config->signalId); if (t == NULL) return false; else if (!t->initTransporter()) { delete t; return false; } // Put the transporter in the transporter arrays theTCPTransporters[nTCPTransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER; performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nTCPTransporters++;#if defined NDB_OSE || defined NDB_SOFTOSE t->theReceiverPid = theReceiverPid;#endif return true;#else return false;#endif}boolTransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {#ifdef NDB_OSE_TRANSPORTER if(!nodeIdSpecified){ init(conf->localNodeId); } if(conf->localNodeId != localNodeId) return false; if(theTransporters[conf->remoteNodeId] != NULL) return false; if(theOSEReceiver == NULL){ theOSEReceiver = new OSE_Receiver(this, 10, localNodeId); } OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize, conf->ose.prioBSignalSize, localNodeId, conf->localHostName, conf->remoteNodeId, conf->serverNodeId, conf->remoteHostName, conf->checksum, conf->signalId); if (t == NULL) return false; else if (!t->initTransporter()) { delete t; return false; } // Put the transporter in the transporter arrays theOSETransporters[nOSETransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER; performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nOSETransporters++; return true;#else return false;#endif}boolTransporterRegistry::createSCITransporter(TransporterConfiguration *config) {#ifdef NDB_SCI_TRANSPORTER if(!SCI_Transporter::initSCI()) abort(); if(!nodeIdSpecified){ init(config->localNodeId); } if(config->localNodeId != localNodeId) return false; if(theTransporters[config->remoteNodeId] != NULL) return false; SCI_Transporter * t = new SCI_Transporter(*this, config->localHostName, config->remoteHostName, config->s_port, config->isMgmConnection, config->sci.sendLimit, config->sci.bufferSize, config->sci.nLocalAdapters, config->sci.remoteSciNodeId0, config->sci.remoteSciNodeId1, localNodeId, config->remoteNodeId, config->serverNodeId, config->checksum, config->signalId); if (t == NULL) return false; else if (!t->initTransporter()) { delete t; return false; } // Put the transporter in the transporter arrays theSCITransporters[nSCITransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER; performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nSCITransporters++; return true;#else return false;#endif}boolTransporterRegistry::createSHMTransporter(TransporterConfiguration *config) { DBUG_ENTER("TransporterRegistry::createTransporter SHM");#ifdef NDB_SHM_TRANSPORTER if(!nodeIdSpecified){ init(config->localNodeId); } if(config->localNodeId != localNodeId) return false; if (!g_ndb_shm_signum) { g_ndb_shm_signum= config->shm.signum; DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); /** * Make sure to block g_ndb_shm_signum * TransporterRegistry::init is run from "main" thread */ sigset_t mask; sigemptyset(&mask); sigaddset(&mask, g_ndb_shm_signum); pthread_sigmask(SIG_BLOCK, &mask, 0); } if(config->shm.signum != g_ndb_shm_signum) return false; if(theTransporters[config->remoteNodeId] != NULL) return false; SHM_Transporter * t = new SHM_Transporter(*this, config->localHostName, config->remoteHostName, config->s_port, config->isMgmConnection, localNodeId, config->remoteNodeId, config->serverNodeId, config->checksum, config->signalId, config->shm.shmKey, config->shm.shmSize ); if (t == NULL) return false; else if (!t->initTransporter()) { delete t; return false; } // Put the transporter in the transporter arrays theSHMTransporters[nSHMTransporters] = t; theTransporters[t->getRemoteNodeId()] = t; theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER; performStates[t->getRemoteNodeId()] = DISCONNECTED; nTransporters++; nSHMTransporters++; DBUG_RETURN(true);#else DBUG_RETURN(false);#endif}voidTransporterRegistry::removeTransporter(NodeId nodeId) { DEBUG("Removing transporter from " << localNodeId << " to " << nodeId); if(theTransporters[nodeId] == NULL) return; theTransporters[nodeId]->doDisconnect(); const TransporterType type = theTransporterTypes[nodeId]; int ind = 0; switch(type){ case tt_TCP_TRANSPORTER:#ifdef NDB_TCP_TRANSPORTER for(; ind < nTCPTransporters; ind++) if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId) break; ind++; for(; ind<nTCPTransporters; ind++) theTCPTransporters[ind-1] = theTCPTransporters[ind]; nTCPTransporters --;#endif break; case tt_SCI_TRANSPORTER:#ifdef NDB_SCI_TRANSPORTER for(; ind < nSCITransporters; ind++) if(theSCITransporters[ind]->getRemoteNodeId() == nodeId) break; ind++; for(; ind<nSCITransporters; ind++) theSCITransporters[ind-1] = theSCITransporters[ind]; nSCITransporters --;#endif break; case tt_SHM_TRANSPORTER:#ifdef NDB_SHM_TRANSPORTER for(; ind < nSHMTransporters; ind++) if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId) break; ind++; for(; ind<nSHMTransporters; ind++) theSHMTransporters[ind-1] = theSHMTransporters[ind]; nSHMTransporters --;#endif break; case tt_OSE_TRANSPORTER:#ifdef NDB_OSE_TRANSPORTER for(; ind < nOSETransporters; ind++) if(theOSETransporters[ind]->getRemoteNodeId() == nodeId)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -