⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transporterfacade.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <my_pthread.h>#include <ndb_limits.h>#include "TransporterFacade.hpp"#include "ClusterMgr.hpp"#include <IPCConfig.hpp>#include <TransporterCallback.hpp>#include <TransporterRegistry.hpp>#include "NdbApiSignal.hpp"#include <NdbOut.hpp>#include <NdbEnv.h>#include <NdbSleep.h>#include "API.hpp"#include <ConfigRetriever.hpp>#include <mgmapi_config_parameters.h>#include <mgmapi_configuration.hpp>#include <NdbConfig.h>#include <ndb_version.h>#include <SignalLoggerManager.hpp>#include <kernel/ndb_limits.h>#include <signaldata/AlterTable.hpp>//#define REPORT_TRANSPORTER//#define API_TRACE;static int numberToIndex(int number){  return number - MIN_API_BLOCK_NO;}static int indexToNumber(int index){  return index + MIN_API_BLOCK_NO;}#if defined DEBUG_TRANSPORTER#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;#else#define TRP_DEBUG(t)#endifTransporterFacade* TransporterFacade::theFacadeInstance = NULL;/***************************************************************************** * Call back functions *****************************************************************************/voidreportError(void * callbackObj, NodeId nodeId,	    TransporterError errorCode, const char *info){#ifdef REPORT_TRANSPORTER  ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s", 	   (int)nodeId, (int)errorCode, info ? info : "");#endif  if(errorCode & TE_DO_DISCONNECT) {    ndbout_c("reportError (%d, %d) %s", (int)nodeId, (int)errorCode,	     info ? info : "");    ((TransporterFacade*)(callbackObj))->doDisconnect(nodeId);  }}/** * Report average send length in bytes (4096 last sends) */voidreportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){#ifdef REPORT_TRANSPORTER  ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)", 	   (int)nodeId, (Uint32)(bytes/count));#endif  (void)nodeId;  (void)count;  (void)bytes;}/**  * Report average receive length in bytes (4096 last receives) */voidreportReceiveLen(void * callbackObj, 		 NodeId nodeId, Uint32 count, Uint64 bytes){#ifdef REPORT_TRANSPORTER  ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)", 	   (int)nodeId, (Uint32)(bytes/count));#endif  (void)nodeId;  (void)count;  (void)bytes;}/** * Report connection established */voidreportConnect(void * callbackObj, NodeId nodeId){#ifdef REPORT_TRANSPORTER  ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);#endif  ((TransporterFacade*)(callbackObj))->reportConnected(nodeId);  //  TransporterFacade::instance()->reportConnected(nodeId);}/** * Report connection broken */voidreportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){#ifdef REPORT_TRANSPORTER  ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);#endif  ((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId);  //TransporterFacade::instance()->reportDisconnected(nodeId);}voidtransporter_recv_from(void * callbackObj, NodeId nodeId){  ((TransporterFacade*)(callbackObj))->hb_received(nodeId);}/**************************************************************************** *  *****************************************************************************//** * Report connection broken */int checkJobBuffer() {  return 0;}#ifdef API_TRACEstatic const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";static const char * apiSignalLog   = 0;static SignalLoggerManager signalLogger;staticinlineboolsetSignalLog(){  signalLogger.flushSignalLog();  const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);  if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){    return true;  } else if(tmp == 0 && apiSignalLog == 0){    return false;  } else if(tmp == 0 && apiSignalLog != 0){    signalLogger.setOutputStream(0);    apiSignalLog = tmp;    return false;  } else if(tmp !=0){    if (strcmp(tmp, "-") == 0)        signalLogger.setOutputStream(stdout);#ifndef DBUG_OFF    else if (strcmp(tmp, "+") == 0)        signalLogger.setOutputStream(DBUG_FILE);#endif    else        signalLogger.setOutputStream(fopen(tmp, "w"));    apiSignalLog = tmp;    return true;  }  return false;}#ifdef TRACE_APIREGREQ#define TRACE_GSN(gsn) true#else#define TRACE_GSN(gsn) (gsn != GSN_API_REGREQ && gsn != GSN_API_REGCONF)#endif#endif/** * The execute function : Handle received signal */voidexecute(void * callbackObj, SignalHeader * const header, 	Uint8 prio, Uint32 * const theData,	LinearSectionPtr ptr[3]){  TransporterFacade * theFacade = (TransporterFacade*)callbackObj;  TransporterFacade::ThreadData::Object_Execute oe;   Uint32 tRecBlockNo = header->theReceiversBlockNumber;  #ifdef API_TRACE  if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){    signalLogger.executeSignal(* header, 			       prio,                               theData,			       theFacade->ownId(),                                ptr, header->m_noOfSections);    signalLogger.flushSignalLog();  }#endif    if (tRecBlockNo >= MIN_API_BLOCK_NO) {    oe = theFacade->m_threads.get(tRecBlockNo);    if (oe.m_object != 0 && oe.m_executeFunction != 0) {      /**       * Handle received signal immediately to avoid any unnecessary       * copying of data, allocation of memory and other things. Copying       * of data could be interesting to support several priority levels       * and to support a special memory structure when executing the       * signals. Neither of those are interesting when receiving data       * in the NDBAPI. The NDBAPI will thus read signal data directly as       * it was written by the sender (SCI sender is other node, Shared       * memory sender is other process and TCP/IP sender is the OS that       * writes the TCP/IP message into a message buffer).       */      NdbApiSignal tmpSignal(*header);      NdbApiSignal * tSignal = &tmpSignal;      tSignal->setDataPtr(theData);      (* oe.m_executeFunction) (oe.m_object, tSignal, ptr);    }//if  } else if (tRecBlockNo == API_PACKED) {    /**     * Block number == 2047 is used to signal a signal that consists of     * multiple instances of the same signal. This is an effort to     * package the signals so as to avoid unnecessary communication     * overhead since TCP/IP has a great performance impact.     */    Uint32 Tlength = header->theLength;    Uint32 Tsent = 0;    /**     * Since it contains at least two data packets we will first     * copy the signal data to safe place.     */    while (Tsent < Tlength) {      Uint32 Theader = theData[Tsent];      Tsent++;      Uint32 TpacketLen = (Theader & 0x1F) + 3;      tRecBlockNo = Theader >> 16;      if (TpacketLen <= 25) {	if ((TpacketLen + Tsent) <= Tlength) {	  /**	   * Set the data length of the signal and the receivers block	   * reference and then call the API.	   */	  header->theLength = TpacketLen;	  header->theReceiversBlockNumber = tRecBlockNo;	  Uint32* tDataPtr = &theData[Tsent];	  Tsent += TpacketLen;	  if (tRecBlockNo >= MIN_API_BLOCK_NO) {	    oe = theFacade->m_threads.get(tRecBlockNo);	    if(oe.m_object != 0 && oe.m_executeFunction != 0){	      NdbApiSignal tmpSignal(*header);	      NdbApiSignal * tSignal = &tmpSignal;	      tSignal->setDataPtr(tDataPtr);	      (*oe.m_executeFunction)(oe.m_object, tSignal, 0);	    }	  }	}      }    }    return;  } else if (tRecBlockNo == API_CLUSTERMGR) {     /**      * The signal was aimed for the Cluster Manager.       * We handle it immediately here.      */          ClusterMgr * clusterMgr = theFacade->theClusterMgr;     const Uint32 gsn = header->theVerId_signalNumber;     switch (gsn){     case GSN_API_REGREQ:       clusterMgr->execAPI_REGREQ(theData);       break;     case GSN_API_REGCONF:       clusterMgr->execAPI_REGCONF(theData);       break;          case GSN_API_REGREF:       clusterMgr->execAPI_REGREF(theData);       break;     case GSN_NODE_FAILREP:       clusterMgr->execNODE_FAILREP(theData);       break;            case GSN_NF_COMPLETEREP:       clusterMgr->execNF_COMPLETEREP(theData);       break;     case GSN_ARBIT_STARTREQ:       if (theFacade->theArbitMgr != NULL)	 theFacade->theArbitMgr->doStart(theData);       break;            case GSN_ARBIT_CHOOSEREQ:       if (theFacade->theArbitMgr != NULL)	 theFacade->theArbitMgr->doChoose(theData);       break;            case GSN_ARBIT_STOPORD:       if(theFacade->theArbitMgr != NULL)	 theFacade->theArbitMgr->doStop(theData);       break;     case GSN_ALTER_TABLE_REP:     {       const AlterTableRep* rep = (const AlterTableRep*)theData;       theFacade->m_globalDictCache.lock();       theFacade->m_globalDictCache.	 alter_table_rep((const char*)ptr[0].p, 			 rep->tableId,			 rep->tableVersion,			 rep->changeType == AlterTableRep::CT_ALTERED);       theFacade->m_globalDictCache.unlock();     }     default:       break;            }     return;  } else {    ; // Ignore all other block numbers.    if(header->theVerId_signalNumber!=3) {      TRP_DEBUG( "TransporterFacade received signal to unknown block no." );      ndbout << "BLOCK NO: "  << tRecBlockNo << " sig " 	     << header->theVerId_signalNumber  << endl;      abort();    }  }}// These symbols are needed, but not used in the APIvoid SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,					   const SegmentedSectionPtr ptr[3],					   unsigned i){  abort();}void copy(Uint32 * & insertPtr,      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){  abort();}/** * Note that this function need no locking since its * only called from the constructor of Ndb (the NdbObject) *  * Which is protected by a mutex */intTransporterFacade::start_instance(int nodeId, 				  const ndb_mgm_configuration* props){  if (! theFacadeInstance->init(nodeId, props)) {    return -1;  }    /**   * Install signal handler for SIGPIPE   *   * This due to the fact that a socket connection might have   * been closed in between a select and a corresponding send   */#if !defined NDB_OSE && !defined NDB_SOFTOSE && !defined NDB_WIN32  signal(SIGPIPE, SIG_IGN);#endif  return 0;}/**

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -