📄 transporterfacade.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h>#include <my_pthread.h>#include <ndb_limits.h>#include "TransporterFacade.hpp"#include "ClusterMgr.hpp"#include <IPCConfig.hpp>#include <TransporterCallback.hpp>#include <TransporterRegistry.hpp>#include "NdbApiSignal.hpp"#include <NdbOut.hpp>#include <NdbEnv.h>#include <NdbSleep.h>#include "API.hpp"#include <ConfigRetriever.hpp>#include <mgmapi_config_parameters.h>#include <mgmapi_configuration.hpp>#include <NdbConfig.h>#include <ndb_version.h>#include <SignalLoggerManager.hpp>#include <kernel/ndb_limits.h>#include <signaldata/AlterTable.hpp>//#define REPORT_TRANSPORTER//#define API_TRACE;static int numberToIndex(int number){ return number - MIN_API_BLOCK_NO;}static int indexToNumber(int index){ return index + MIN_API_BLOCK_NO;}#if defined DEBUG_TRANSPORTER#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;#else#define TRP_DEBUG(t)#endifTransporterFacade* TransporterFacade::theFacadeInstance = NULL;/***************************************************************************** * Call back functions *****************************************************************************/voidreportError(void * callbackObj, NodeId nodeId, TransporterError errorCode, const char *info){#ifdef REPORT_TRANSPORTER ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s", (int)nodeId, (int)errorCode, info ? info : "");#endif if(errorCode & TE_DO_DISCONNECT) { ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode, info ? info : ""); ((TransporterFacade*)(callbackObj))->doDisconnect(nodeId); }}/** * Report average send length in bytes (4096 last sends) */voidreportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){#ifdef REPORT_TRANSPORTER ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)", (int)nodeId, (Uint32)(bytes/count));#endif (void)nodeId; (void)count; (void)bytes;}/** * Report average receive length in bytes (4096 last receives) */voidreportReceiveLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){#ifdef REPORT_TRANSPORTER ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)", (int)nodeId, (Uint32)(bytes/count));#endif (void)nodeId; (void)count; (void)bytes;}/** * Report connection established */voidreportConnect(void * callbackObj, NodeId nodeId){#ifdef REPORT_TRANSPORTER ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);#endif ((TransporterFacade*)(callbackObj))->reportConnected(nodeId); // TransporterFacade::instance()->reportConnected(nodeId);}/** * Report connection broken */voidreportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){#ifdef REPORT_TRANSPORTER ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);#endif ((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId); //TransporterFacade::instance()->reportDisconnected(nodeId);}voidtransporter_recv_from(void * callbackObj, NodeId nodeId){ ((TransporterFacade*)(callbackObj))->hb_received(nodeId);}/**************************************************************************** * *****************************************************************************//** * Report connection broken */int checkJobBuffer() { return 0;}#ifdef API_TRACEstatic const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";static const char * apiSignalLog = 0;static SignalLoggerManager signalLogger;staticinlineboolsetSignalLog(){ signalLogger.flushSignalLog(); const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0); if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){ return true; } else if(tmp == 0 && apiSignalLog == 0){ return false; } else if(tmp == 0 && apiSignalLog != 0){ signalLogger.setOutputStream(0); apiSignalLog = tmp; return false; } else if(tmp !=0){ if (strcmp(tmp, "-") == 0) signalLogger.setOutputStream(stdout);#ifndef DBUG_OFF else if (strcmp(tmp, "+") == 0) signalLogger.setOutputStream(DBUG_FILE);#endif else signalLogger.setOutputStream(fopen(tmp, "w")); apiSignalLog = tmp; return true; } return false;}#ifdef TRACE_APIREGREQ#define TRACE_GSN(gsn) true#else#define TRACE_GSN(gsn) (gsn != GSN_API_REGREQ && gsn != GSN_API_REGCONF)#endif#endif/** * The execute function : Handle received signal */voidexecute(void * callbackObj, SignalHeader * const header, Uint8 prio, Uint32 * const theData, LinearSectionPtr ptr[3]){ TransporterFacade * theFacade = (TransporterFacade*)callbackObj; TransporterFacade::ThreadData::Object_Execute oe; Uint32 tRecBlockNo = header->theReceiversBlockNumber; #ifdef API_TRACE if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){ signalLogger.executeSignal(* header, prio, theData, theFacade->ownId(), ptr, header->m_noOfSections); signalLogger.flushSignalLog(); }#endif if (tRecBlockNo >= MIN_API_BLOCK_NO) { oe = theFacade->m_threads.get(tRecBlockNo); if (oe.m_object != 0 && oe.m_executeFunction != 0) { /** * Handle received signal immediately to avoid any unnecessary * copying of data, allocation of memory and other things. Copying * of data could be interesting to support several priority levels * and to support a special memory structure when executing the * signals. Neither of those are interesting when receiving data * in the NDBAPI. The NDBAPI will thus read signal data directly as * it was written by the sender (SCI sender is other node, Shared * memory sender is other process and TCP/IP sender is the OS that * writes the TCP/IP message into a message buffer). */ NdbApiSignal tmpSignal(*header); NdbApiSignal * tSignal = &tmpSignal; tSignal->setDataPtr(theData); (* oe.m_executeFunction) (oe.m_object, tSignal, ptr); }//if } else if (tRecBlockNo == API_PACKED) { /** * Block number == 2047 is used to signal a signal that consists of * multiple instances of the same signal. This is an effort to * package the signals so as to avoid unnecessary communication * overhead since TCP/IP has a great performance impact. */ Uint32 Tlength = header->theLength; Uint32 Tsent = 0; /** * Since it contains at least two data packets we will first * copy the signal data to safe place. */ while (Tsent < Tlength) { Uint32 Theader = theData[Tsent]; Tsent++; Uint32 TpacketLen = (Theader & 0x1F) + 3; tRecBlockNo = Theader >> 16; if (TpacketLen <= 25) { if ((TpacketLen + Tsent) <= Tlength) { /** * Set the data length of the signal and the receivers block * reference and then call the API. */ header->theLength = TpacketLen; header->theReceiversBlockNumber = tRecBlockNo; Uint32* tDataPtr = &theData[Tsent]; Tsent += TpacketLen; if (tRecBlockNo >= MIN_API_BLOCK_NO) { oe = theFacade->m_threads.get(tRecBlockNo); if(oe.m_object != 0 && oe.m_executeFunction != 0){ NdbApiSignal tmpSignal(*header); NdbApiSignal * tSignal = &tmpSignal; tSignal->setDataPtr(tDataPtr); (*oe.m_executeFunction)(oe.m_object, tSignal, 0); } } } } } return; } else if (tRecBlockNo == API_CLUSTERMGR) { /** * The signal was aimed for the Cluster Manager. * We handle it immediately here. */ ClusterMgr * clusterMgr = theFacade->theClusterMgr; const Uint32 gsn = header->theVerId_signalNumber; switch (gsn){ case GSN_API_REGREQ: clusterMgr->execAPI_REGREQ(theData); break; case GSN_API_REGCONF: clusterMgr->execAPI_REGCONF(theData); break; case GSN_API_REGREF: clusterMgr->execAPI_REGREF(theData); break; case GSN_NODE_FAILREP: clusterMgr->execNODE_FAILREP(theData); break; case GSN_NF_COMPLETEREP: clusterMgr->execNF_COMPLETEREP(theData); break; case GSN_ARBIT_STARTREQ: if (theFacade->theArbitMgr != NULL) theFacade->theArbitMgr->doStart(theData); break; case GSN_ARBIT_CHOOSEREQ: if (theFacade->theArbitMgr != NULL) theFacade->theArbitMgr->doChoose(theData); break; case GSN_ARBIT_STOPORD: if(theFacade->theArbitMgr != NULL) theFacade->theArbitMgr->doStop(theData); break; case GSN_ALTER_TABLE_REP: { const AlterTableRep* rep = (const AlterTableRep*)theData; theFacade->m_globalDictCache.lock(); theFacade->m_globalDictCache. alter_table_rep((const char*)ptr[0].p, rep->tableId, rep->tableVersion, rep->changeType == AlterTableRep::CT_ALTERED); theFacade->m_globalDictCache.unlock(); } default: break; } return; } else { ; // Ignore all other block numbers. if(header->theVerId_signalNumber!=3) { TRP_DEBUG( "TransporterFacade received signal to unknown block no." ); ndbout << "BLOCK NO: " << tRecBlockNo << " sig " << header->theVerId_signalNumber << endl; abort(); } }}// These symbols are needed, but not used in the APIvoid SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &, const SegmentedSectionPtr ptr[3], unsigned i){ abort();}void copy(Uint32 * & insertPtr, class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){ abort();}/** * Note that this function need no locking since its * only called from the constructor of Ndb (the NdbObject) * * Which is protected by a mutex */intTransporterFacade::start_instance(int nodeId, const ndb_mgm_configuration* props){ if (! theFacadeInstance->init(nodeId, props)) { return -1; } /** * Install signal handler for SIGPIPE * * This due to the fact that a socket connection might have * been closed in between a select and a corresponding send */#if !defined NDB_OSE && !defined NDB_SOFTOSE && !defined NDB_WIN32 signal(SIGPIPE, SIG_IGN);#endif return 0;}/**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -