📄 transporter.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <TransporterRegistry.hpp>#include <TransporterCallback.hpp>#include "Transporter.hpp"#include "TransporterInternalDefinitions.hpp"#include <NdbSleep.h>#include <SocketAuthenticator.hpp>#include <InputStream.hpp>#include <OutputStream.hpp>#include <EventLogger.hpp>extern EventLogger g_eventLogger;Transporter::Transporter(TransporterRegistry &t_reg, TransporterType _type, const char *lHostName, const char *rHostName, int s_port, bool _isMgmConnection, NodeId lNodeId, NodeId rNodeId, NodeId serverNodeId, int _byteorder, bool _compression, bool _checksum, bool _signalId) : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId), isServer(lNodeId==serverNodeId), isMgmConnection(_isMgmConnection), m_packer(_signalId, _checksum), m_type(_type), m_transporter_registry(t_reg){ DBUG_ENTER("Transporter::Transporter"); if (rHostName && strlen(rHostName) > 0){ strncpy(remoteHostName, rHostName, sizeof(remoteHostName)); Ndb_getInAddr(&remoteHostAddress, rHostName); } else { if (!isServer) { ndbout << "Unable to setup transporter. Node " << rNodeId << " must have hostname. Update configuration." << endl; exit(-1); } remoteHostName[0]= 0; } strncpy(localHostName, lHostName, sizeof(localHostName)); if (strlen(lHostName) > 0) Ndb_getInAddr(&localHostAddress, lHostName); DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d", remoteNodeId, localNodeId, isServer, remoteHostName, localHostName, s_port)); byteOrder = _byteorder; compressionUsed = _compression; checksumUsed = _checksum; signalIdUsed = _signalId; m_connected = false; m_timeOutMillis = 1000; m_connect_address.s_addr= 0; if(s_port<0) s_port= -s_port; // was dynamic if (isServer) m_socket_client= 0; else m_socket_client= new SocketClient(remoteHostName, s_port, new SocketAuthSimple("ndbd", "ndbd passwd")); DBUG_VOID_RETURN;}Transporter::~Transporter(){ if (m_socket_client) delete m_socket_client;}boolTransporter::connect_server(NDB_SOCKET_TYPE sockfd) { // all initial negotiation is done in TransporterRegistry::connect_server DBUG_ENTER("Transporter::connect_server"); if(m_connected) { DBUG_RETURN(true); // TODO assert(0); } { struct sockaddr addr; SOCKET_SIZE_TYPE addrlen= sizeof(addr); int r= getpeername(sockfd, &addr, &addrlen); m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr; } bool res = connect_server_impl(sockfd); if(res){ m_connected = true; m_errorCount = 0; } DBUG_RETURN(res);}boolTransporter::connect_client() { NDB_SOCKET_TYPE sockfd; if(m_connected) return true; if(isMgmConnection) sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client); else sockfd= m_socket_client->connect(); return connect_client(sockfd);} boolTransporter::connect_client(NDB_SOCKET_TYPE sockfd) { if(m_connected) return true; if (sockfd == NDB_INVALID_SOCKET) return false; DBUG_ENTER("Transporter::connect_client"); DBUG_PRINT("info",("port %d isMgmConnection=%d",m_s_port,isMgmConnection)); SocketOutputStream s_output(sockfd); SocketInputStream s_input(sockfd); // send info about own id // send info about own transporter type s_output.println("%d %d", localNodeId, m_type); // get remote id int nodeId, remote_transporter_type= -1; char buf[256]; if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type); switch (r) { case 2: break; case 1: // we're running version prior to 4.1.9 // ok, but with no checks on transporter configuration compatability break; default: NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d", nodeId, remote_transporter_type)); if (remote_transporter_type != -1) { if (remote_transporter_type != m_type) { DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d", m_type, remote_transporter_type)); NDB_CLOSE_SOCKET(sockfd); g_eventLogger.error("Incompatible configuration: transporter type " "mismatch with node %d", nodeId); DBUG_RETURN(false); } } else if (m_type == tt_SHM_TRANSPORTER) { g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId); } { struct sockaddr addr; SOCKET_SIZE_TYPE addrlen= sizeof(addr); int r= getpeername(sockfd, &addr, &addrlen); m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr; } bool res = connect_client_impl(sockfd); if(res){ m_connected = true; m_errorCount = 0; } DBUG_RETURN(res);}voidTransporter::doDisconnect() { if(!m_connected) return; //assert(0); TODO will fail m_connected= false; disconnectImpl();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -