⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sci_transporter.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h> #include "SCI_Transporter.hpp" #include <NdbOut.hpp> #include <NdbSleep.h> #include <NdbTick.h> #include <NdbTick.h> #include "TransporterInternalDefinitions.hpp" #include <TransporterCallback.hpp> #include <InputStream.hpp>#include <OutputStream.hpp> #define FLAGS 0  #define DEBUG_TRANSPORTER SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,                                 const char *lHostName,                                 const char *rHostName,                                 int r_port,				 bool isMgmConnection,                                 Uint32 packetSize,         				 Uint32 bufferSize,       				 Uint32 nAdapters, 				 Uint16 remoteSciNodeId0,        				 Uint16 remoteSciNodeId1, 				 NodeId _localNodeId,      				 NodeId _remoteNodeId,				 NodeId serverNodeId,				 bool chksm,  				 bool signalId, 				 Uint32 reportFreq) :    Transporter(t_reg, tt_SCI_TRANSPORTER,	      lHostName, rHostName, r_port, isMgmConnection, _localNodeId,              _remoteNodeId, serverNodeId, 0, false, chksm, signalId) {  DBUG_ENTER("SCI_Transporter::SCI_Transporter");  m_PacketSize = (packetSize + 3)/4 ;   m_BufferSize = bufferSize;   m_sendBuffer.m_buffer = NULL;    m_RemoteSciNodeId = remoteSciNodeId0;      if(remoteSciNodeId0 == 0 || remoteSciNodeId1 == 0)     m_numberOfRemoteNodes=1;   else     m_numberOfRemoteNodes=2;    m_RemoteSciNodeId1 = remoteSciNodeId1;       m_initLocal=false;   m_swapCounter=0;   m_failCounter=0;   m_remoteNodes[0]=remoteSciNodeId0;   m_remoteNodes[1]=remoteSciNodeId1;   m_adapters = nAdapters;     // The maximum number of times to try and create,    // start and destroy a sequence   m_ActiveAdapterId=0;   m_StandbyAdapterId=1;     m_mapped = false;   m_sciinit=false;     sciAdapters= new SciAdapter[nAdapters* (sizeof (SciAdapter))];   if(sciAdapters==NULL) {   }   m_SourceSegm= new sourceSegm[nAdapters* (sizeof (sourceSegm))];   if(m_SourceSegm==NULL) {   }   m_TargetSegm= new targetSegm[nAdapters* (sizeof (targetSegm))];   if(m_TargetSegm==NULL) {   }   m_reportFreq= reportFreq;     //reset all statistic counters. #ifdef DEBUG_TRANSPORTER  i1024=0;  i2048=0;  i2049=0;  i10242048=0;  i20484096=0;  i4096=0;  i4097=0; #endif  DBUG_VOID_RETURN;}    void SCI_Transporter::disconnectImpl() {   DBUG_ENTER("SCI_Transporter::disconnectImpl");  sci_error_t err;   if(m_mapped){     setDisconnect();     DBUG_PRINT("info", ("connect status = %d, remote node = %d",    (int)getConnectionStatus(), remoteNodeId));     disconnectRemote();     disconnectLocal();   }     // Empty send buffer   m_sendBuffer.m_dataSize = 0;  m_initLocal=false;   m_mapped = false;     if(m_sciinit) {     for(Uint32 i=0; i<m_adapters ; i++) {             SCIClose(sciAdapters[i].scidesc, FLAGS, &err);              if(err != SCI_ERR_OK)  { 	report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL);         DBUG_PRINT("error", ("Cannot close channel to the driver. Error code 0x%x",  		    err));       }     }   }   m_sciinit=false;    #ifdef DEBUG_TRANSPORTER       ndbout << "total: " <<  i1024+ i10242048 + i2048+i2049 << endl;       ndbout << "<1024: " << i1024 << endl;       ndbout << "1024-2047: " << i10242048 << endl;       ndbout << "==2048: " << i2048 << endl;       ndbout << "2049-4096: " << i20484096 << endl;       ndbout << "==4096: " << i4096 << endl;       ndbout << ">4096: " << i4097 << endl; #endif   DBUG_VOID_RETURN;  }    bool SCI_Transporter::initTransporter() {   DBUG_ENTER("SCI_Transporter::initTransporter");  if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){     m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096;   }   // Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding  // the need to send twice when a large message comes around. Send buffer size is  // measured in words.   Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;;    m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4);   m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4];  m_sendBuffer.m_dataSize = 0;   DBUG_PRINT("info", ("Created SCI Send Buffer with buffer size %d and packet size %d",              m_sendBuffer.m_sendBufferSize, m_PacketSize * 4));  if(!getLinkStatus(m_ActiveAdapterId) ||       (m_adapters > 1 &&     !getLinkStatus(m_StandbyAdapterId))) {     DBUG_PRINT("error", ("The link is not fully operational. Check the cables and the switches"));     //reportDisconnect(remoteNodeId, 0);     //doDisconnect();     //NDB should terminate     report_error(TE_SCI_LINK_ERROR);     DBUG_RETURN(false);   }     DBUG_RETURN(true); } // initTransporter()    Uint32 SCI_Transporter::getLocalNodeId(Uint32 adapterNo) {   sci_query_adapter_t queryAdapter;   sci_error_t  error;   Uint32 _localNodeId;      queryAdapter.subcommand = SCI_Q_ADAPTER_NODEID;   queryAdapter.localAdapterNo = adapterNo;   queryAdapter.data = &_localNodeId;      SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);      if(error != SCI_ERR_OK)     return 0;   return _localNodeId;  }   bool SCI_Transporter::getLinkStatus(Uint32 adapterNo) {   sci_query_adapter_t queryAdapter;   sci_error_t  error;   int linkstatus;   queryAdapter.subcommand = SCI_Q_ADAPTER_LINK_OPERATIONAL;      queryAdapter.localAdapterNo = adapterNo;   queryAdapter.data = &linkstatus;      SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);      if(error != SCI_ERR_OK) {     DBUG_PRINT("error", ("error %d querying adapter", error));     return false;   }   if(linkstatus<=0)     return false;   return true; }    sci_error_t SCI_Transporter::initLocalSegment() {   DBUG_ENTER("SCI_Transporter::initLocalSegment");  Uint32 segmentSize = m_BufferSize;   Uint32 offset  = 0;   sci_error_t err;   if(!m_sciinit) {     for(Uint32 i=0; i<m_adapters ; i++) {       SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err);       sciAdapters[i].localSciNodeId=getLocalNodeId(i);       DBUG_PRINT("info", ("SCInode iD %d  adapter %d\n",  	         sciAdapters[i].localSciNodeId, i));       if(err != SCI_ERR_OK) {         DBUG_PRINT("error", ("Cannot open an SCI virtual device. Error code 0x%x", 		   err)); 	DBUG_RETURN(err);       }     }   }      m_sciinit=true;    SCICreateSegment(sciAdapters[0].scidesc,            		   &(m_SourceSegm[0].localHandle),  		   hostSegmentId(localNodeId, remoteNodeId),    		   segmentSize,                		   0, 		   0, 		   0,         		   &err);                  if(err != SCI_ERR_OK) {     DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err));    DBUG_RETURN(err);   } else {     DBUG_PRINT("info", ("created segment id : %d",	       hostSegmentId(localNodeId, remoteNodeId)));   }      /** Prepare the segment*/   for(Uint32 i=0; i < m_adapters; i++) {     SCIPrepareSegment((m_SourceSegm[0].localHandle),  		      i, 		      FLAGS, 		      &err);          if(err != SCI_ERR_OK) {       DBUG_PRINT("error", ("Local Segment is not accessible by an SCI adapter. Error code 0x%x\n",                  err));       DBUG_RETURN(err);     }   }      m_SourceSegm[0].mappedMemory =      SCIMapLocalSegment((m_SourceSegm[0].localHandle), 		       &(m_SourceSegm[0].lhm[0].map), 		       offset, 		       segmentSize, 		       NULL, 		       FLAGS, 		       &err);      if(err != SCI_ERR_OK) {     DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x", 	        segmentSize,err));     doDisconnect();     DBUG_RETURN(err);   }       /** Make the local segment available*/   for(Uint32 i=0; i < m_adapters; i++) {     SCISetSegmentAvailable((m_SourceSegm[0].localHandle),  			     i, 			   FLAGS, 			   &err);          if(err != SCI_ERR_OK) {       DBUG_PRINT("error", ("Local Segment is not available for remote connections. Error code 0x%x\n",                 err));       DBUG_RETURN(err);     }   }       setupLocalSegment();     DBUG_RETURN(err);    } // initLocalSegment()   bool SCI_Transporter::doSend() { #ifdef DEBUG_TRANSPORTER    NDB_TICKS startSec=0, stopSec=0;   Uint32 startMicro=0, stopMicro=0, totalMicro=0; #endif  sci_error_t             err;   Uint32 retry=0;    const char * const sendPtr = (char*)m_sendBuffer.m_buffer;  const Uint32 sizeToSend    = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes    if (sizeToSend > 0){#ifdef DEBUG_TRANSPORTER     if(sizeToSend < 1024 )       i1024++;     if(sizeToSend > 1024 && sizeToSend < 2048 )       i10242048++;     if(sizeToSend==2048)       i2048++;     if(sizeToSend>2048 && sizeToSend < 4096)       i20484096++;     if(sizeToSend==4096)       i4096++;     if(sizeToSend==4097)       i4097++; #endif    if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {       DBUG_PRINT("error", ("Start sequence failed"));       report_error(TE_SCI_UNABLE_TO_START_SEQUENCE);       return false;     }             tryagain:    retry++;    if (retry > 3) {       DBUG_PRINT("error", ("SCI Transfer failed"));      report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);      return false;     }     Uint32 * insertPtr = (Uint32 *)       (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend);         if(insertPtr != 0) {	               const Uint32 remoteOffset=(Uint32) 	((char*)insertPtr -  	 (char*)(m_TargetSegm[m_ActiveAdapterId].mappedMemory));             SCIMemCpy(m_TargetSegm[m_ActiveAdapterId].sequence, 		(void*)sendPtr, 		m_TargetSegm[m_ActiveAdapterId].rhm[m_ActiveAdapterId].map, 		remoteOffset, 		sizeToSend, 		SCI_FLAG_ERROR_CHECK, 		&err);                     if (err != SCI_ERR_OK) {       if(err == SCI_ERR_OUT_OF_RANGE) {         DBUG_PRINT("error", ("Data transfer : out of range error")); 	goto tryagain;       }       if(err == SCI_ERR_SIZE_ALIGNMENT) {         DBUG_PRINT("error", ("Data transfer : alignment error"));         DBUG_PRINT("info", ("sendPtr 0x%x, sizeToSend = %d", sendPtr, sizeToSend));	goto tryagain;       }       if(err == SCI_ERR_OFFSET_ALIGNMENT) {         DBUG_PRINT("error", ("Data transfer : offset alignment")); 	goto tryagain;       }         if(err == SCI_ERR_TRANSFER_FAILED) { 	//(m_TargetSegm[m_StandbyAdapterId].writer)->heavyLock(); 	if(getLinkStatus(m_ActiveAdapterId)) { 	  goto tryagain; 	}        if (m_adapters == 1) {          DBUG_PRINT("error", ("SCI Transfer failed"));          report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);	  return false;         }	m_failCounter++; 	Uint32 temp=m_ActiveAdapterId;	    	     	switch(m_swapCounter) { 	case 0:  	  /**swap from active (0) to standby (1)*/ 	  if(getLinkStatus(m_StandbyAdapterId)) {             DBUG_PRINT("error", ("Swapping from adapter 0 to 1")); 	    failoverShmWriter();		 	    SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0); 	    m_ActiveAdapterId=m_StandbyAdapterId; 	    m_StandbyAdapterId=temp; 	    SCIRemoveSequence((m_TargetSegm[m_StandbyAdapterId].sequence),			      FLAGS,  			      &err); 	    if(err!=SCI_ERR_OK) { 	      report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE);               DBUG_PRINT("error", ("Unable to remove sequence"));	      return false; 	    } 	    if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {               DBUG_PRINT("error", ("Start sequence failed")); 	      report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); 	      return false; 	    } 	    m_swapCounter++;             DBUG_PRINT("info", ("failover complete")); 	    goto tryagain; 	  }  else {	    report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);            DBUG_PRINT("error", ("SCI Transfer failed")); 	    return false;	  }	  return false; 	  break; 	case 1: 	  /** swap back from 1 to 0 	      must check that the link is up */ 	  	  if(getLinkStatus(m_StandbyAdapterId)) { 	    failoverShmWriter(); 	    m_ActiveAdapterId=m_StandbyAdapterId; 	    m_StandbyAdapterId=temp;             DBUG_PRINT("info", ("Swapping from 1 to 0"));	 	    if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {               DBUG_PRINT("error", ("Unable to create sequence"));	      report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); 	      return false; 	    } 	    if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {               DBUG_PRINT("error", ("startSequence failed... disconnecting")); 	      report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); 	      return false; 	    } 	    	    SCIRemoveSequence((m_TargetSegm[m_StandbyAdapterId].sequence) 			      , FLAGS,  			      &err); 	    if(err!=SCI_ERR_OK) {               DBUG_PRINT("error", ("Unable to remove sequence"));	      report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); 	      return false;	    } 	    	    if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {               DBUG_PRINT("error", ("Unable to create sequence on standby"));	      report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); 	      return false; 	    } 	    	    m_swapCounter=0; 	                DBUG_PRINT("info", ("failover complete..")); 	    goto tryagain; 	    	  } else {            DBUG_PRINT("error", ("Unrecoverable data transfer error")); 	    report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);	    return false;	  }	  	  break; 	default:           DBUG_PRINT("error", ("Unrecoverable data transfer error")); 	  report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); 	  return false; 	  break; 	}        }      } else { 	SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer);	writer->updateWritePtr(sizeToSend); 		Uint32 sendLimit = writer->getBufferSize();	sendLimit -= writer->getWriteIndex();		m_sendBuffer.m_dataSize = 0;	m_sendBuffer.m_forceSendLimit = sendLimit;      }           } else {       /**        * If we end up here, the SCI segment is full.         */       DBUG_PRINT("error", ("the segment is full for some reason"));       return false;     } //if    }   return true; } // doSend()   void SCI_Transporter::failoverShmWriter() { #if 0  (m_TargetSegm[m_StandbyAdapterId].writer)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -