sci_transporter.cpp

来自「这个文件是windows mysql源码」· C++ 代码 · 共 910 行 · 第 1/2 页

CPP
910
字号
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; version 2 of the License.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h> #include "SCI_Transporter.hpp" #include <NdbOut.hpp> #include <NdbSleep.h> #include <NdbTick.h> #include <NdbTick.h> #include "TransporterInternalDefinitions.hpp" #include <TransporterCallback.hpp> #include <InputStream.hpp>#include <OutputStream.hpp> #define FLAGS 0  #define DEBUG_TRANSPORTER SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,                                 const char *lHostName,                                 const char *rHostName,                                 int r_port,				 bool isMgmConnection,                                 Uint32 packetSize,         				 Uint32 bufferSize,       				 Uint32 nAdapters, 				 Uint16 remoteSciNodeId0,        				 Uint16 remoteSciNodeId1, 				 NodeId _localNodeId,      				 NodeId _remoteNodeId,				 NodeId serverNodeId,				 bool chksm,  				 bool signalId, 				 Uint32 reportFreq) :    Transporter(t_reg, tt_SCI_TRANSPORTER,	      lHostName, rHostName, r_port, isMgmConnection, _localNodeId,              _remoteNodeId, serverNodeId, 0, false, chksm, signalId) {  DBUG_ENTER("SCI_Transporter::SCI_Transporter");  m_PacketSize = (packetSize + 3)/4 ;   m_BufferSize = bufferSize;   m_sendBuffer.m_buffer = NULL;    m_RemoteSciNodeId = remoteSciNodeId0;      if(remoteSciNodeId0 == 0 || remoteSciNodeId1 == 0)     m_numberOfRemoteNodes=1;   else     m_numberOfRemoteNodes=2;    m_RemoteSciNodeId1 = remoteSciNodeId1;       m_initLocal=false;   m_failCounter=0;   m_remoteNodes[0]=remoteSciNodeId0;   m_remoteNodes[1]=remoteSciNodeId1;   m_adapters = nAdapters;     m_ActiveAdapterId=0;   m_StandbyAdapterId=1;     m_mapped = false;   m_sciinit=false;     sciAdapters= new SciAdapter[nAdapters* (sizeof (SciAdapter))];   if(sciAdapters==NULL) {   }   m_SourceSegm= new sourceSegm[nAdapters* (sizeof (sourceSegm))];   if(m_SourceSegm==NULL) {   }   m_TargetSegm= new targetSegm[nAdapters* (sizeof (targetSegm))];   if(m_TargetSegm==NULL) {   }   m_reportFreq= reportFreq;     //reset all statistic counters. #ifdef DEBUG_TRANSPORTER  i1024=0;  i2048=0;  i2049=0;  i10242048=0;  i20484096=0;  i4096=0;  i4097=0; #endif  DBUG_VOID_RETURN;}  void SCI_Transporter::disconnectImpl() {   DBUG_ENTER("SCI_Transporter::disconnectImpl");  sci_error_t err;   if(m_mapped){     setDisconnect();     DBUG_PRINT("info", ("connect status = %d, remote node = %d",    (int)getConnectionStatus(), remoteNodeId));     disconnectRemote();     disconnectLocal();   }     // Empty send buffer   m_sendBuffer.m_dataSize = 0;  m_initLocal=false;   m_mapped = false;     if(m_sciinit) {     for(Uint32 i=0; i<m_adapters ; i++) {             SCIClose(sciAdapters[i].scidesc, FLAGS, &err);              if(err != SCI_ERR_OK)  { 	report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL);         DBUG_PRINT("error",        ("Cannot close channel to the driver. Error code 0x%x",  		    err));       }     }   }   m_sciinit=false;    #ifdef DEBUG_TRANSPORTER       ndbout << "total: " <<  i1024+ i10242048 + i2048+i2049 << endl;       ndbout << "<1024: " << i1024 << endl;       ndbout << "1024-2047: " << i10242048 << endl;       ndbout << "==2048: " << i2048 << endl;       ndbout << "2049-4096: " << i20484096 << endl;       ndbout << "==4096: " << i4096 << endl;       ndbout << ">4096: " << i4097 << endl; #endif   DBUG_VOID_RETURN;  }    bool SCI_Transporter::initTransporter() {   DBUG_ENTER("SCI_Transporter::initTransporter");  if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){     m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096;   }   // Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding  // the need to send twice when a large message comes around. Send buffer size is  // measured in words.   Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;;    m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4);   m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4];  m_sendBuffer.m_dataSize = 0;   DBUG_PRINT("info",  ("Created SCI Send Buffer with buffer size %d and packet size %d",              m_sendBuffer.m_sendBufferSize, m_PacketSize * 4));  if(!getLinkStatus(m_ActiveAdapterId) ||       (m_adapters > 1 &&     !getLinkStatus(m_StandbyAdapterId))) {     DBUG_PRINT("error",    ("The link is not fully operational. Check the cables and the switches"));     //NDB should terminate     report_error(TE_SCI_LINK_ERROR);     DBUG_RETURN(false);   }   DBUG_RETURN(true); } // initTransporter()    Uint32 SCI_Transporter::getLocalNodeId(Uint32 adapterNo) {   sci_query_adapter_t queryAdapter;   sci_error_t  error;   Uint32 _localNodeId;      queryAdapter.subcommand = SCI_Q_ADAPTER_NODEID;   queryAdapter.localAdapterNo = adapterNo;   queryAdapter.data = &_localNodeId;      SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);      if(error != SCI_ERR_OK)     return 0;   return _localNodeId;  }   bool SCI_Transporter::getLinkStatus(Uint32 adapterNo) {   sci_query_adapter_t queryAdapter;   sci_error_t  error;   int linkstatus;   queryAdapter.subcommand = SCI_Q_ADAPTER_LINK_OPERATIONAL;      queryAdapter.localAdapterNo = adapterNo;   queryAdapter.data = &linkstatus;      SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);      if(error != SCI_ERR_OK) {     DBUG_PRINT("error", ("error %d querying adapter", error));     return false;   }   if(linkstatus<=0)     return false;   return true; }    sci_error_t SCI_Transporter::initLocalSegment() {   DBUG_ENTER("SCI_Transporter::initLocalSegment");  Uint32 segmentSize = m_BufferSize;   Uint32 offset  = 0;   sci_error_t err;   if(!m_sciinit) {     for(Uint32 i=0; i<m_adapters ; i++) {       SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err);       sciAdapters[i].localSciNodeId=getLocalNodeId(i);       DBUG_PRINT("info", ("SCInode iD %d  adapter %d\n",  	         sciAdapters[i].localSciNodeId, i));       if(err != SCI_ERR_OK) {         DBUG_PRINT("error",        ("Cannot open an SCI virtual device. Error code 0x%x", 		   err)); 	DBUG_RETURN(err);       }     }   }      m_sciinit=true;    SCICreateSegment(sciAdapters[0].scidesc,            		   &(m_SourceSegm[0].localHandle),  		   hostSegmentId(localNodeId, remoteNodeId),    		   segmentSize,                		   0, 		   0, 		   0,         		   &err);                  if(err != SCI_ERR_OK) {     DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err));    DBUG_RETURN(err);   } else {     DBUG_PRINT("info", ("created segment id : %d",	       hostSegmentId(localNodeId, remoteNodeId)));   }      /** Prepare the segment*/   for(Uint32 i=0; i < m_adapters; i++) {     SCIPrepareSegment((m_SourceSegm[0].localHandle),  		      i, 		      FLAGS, 		      &err);          if(err != SCI_ERR_OK) {       DBUG_PRINT("error",    ("Local Segment is not accessible by an SCI adapter. Error code 0x%x\n",                  err));       DBUG_RETURN(err);     }   }      m_SourceSegm[0].mappedMemory =      SCIMapLocalSegment((m_SourceSegm[0].localHandle), 		       &(m_SourceSegm[0].lhm[0].map), 		       offset, 		       segmentSize, 		       NULL, 		       FLAGS, 		       &err);      if(err != SCI_ERR_OK) {     DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x", 	        segmentSize,err));     doDisconnect();     DBUG_RETURN(err);   }       /** Make the local segment available*/   for(Uint32 i=0; i < m_adapters; i++) {     SCISetSegmentAvailable((m_SourceSegm[0].localHandle),  			     i, 			   FLAGS, 			   &err);          if(err != SCI_ERR_OK) {       DBUG_PRINT("error",   ("Local Segment is not available for remote connections. Error code 0x%x\n",                 err));       DBUG_RETURN(err);     }   }   setupLocalSegment();   DBUG_RETURN(err);    } // initLocalSegment()   bool SCI_Transporter::doSend() { #ifdef DEBUG_TRANSPORTER    NDB_TICKS startSec=0, stopSec=0;   Uint32 startMicro=0, stopMicro=0, totalMicro=0; #endif  sci_error_t             err;   Uint32 retry=0;    const char * const sendPtr = (char*)m_sendBuffer.m_buffer;  const Uint32 sizeToSend    = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes    if (sizeToSend > 0){#ifdef DEBUG_TRANSPORTER     if(sizeToSend < 1024 )       i1024++;     if(sizeToSend > 1024 && sizeToSend < 2048 )       i10242048++;     if(sizeToSend==2048)       i2048++;     if(sizeToSend>2048 && sizeToSend < 4096)       i20484096++;     if(sizeToSend==4096)       i4096++;     if(sizeToSend==4097)       i4097++; #endif        tryagain:    retry++;    if (retry > 3) {       DBUG_PRINT("error", ("SCI Transfer failed"));      report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);      return false;     }     Uint32 * insertPtr = (Uint32 *)       (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend);         if(insertPtr != 0) {	               const Uint32 remoteOffset=(Uint32) 	((char*)insertPtr -  	 (char*)(m_TargetSegm[m_ActiveAdapterId].mappedMemory));             SCIMemCpy(m_TargetSegm[m_ActiveAdapterId].sequence, 		(void*)sendPtr, 		m_TargetSegm[m_ActiveAdapterId].rhm[m_ActiveAdapterId].map, 		remoteOffset, 		sizeToSend, 		SCI_FLAG_ERROR_CHECK, 		&err);               if (err != SCI_ERR_OK) {         if (err == SCI_ERR_OUT_OF_RANGE ||            err == SCI_ERR_SIZE_ALIGNMENT ||            err == SCI_ERR_OFFSET_ALIGNMENT) {           DBUG_PRINT("error", ("Data transfer error = %d", err));          report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);	  return false;         }         if(err == SCI_ERR_TRANSFER_FAILED) { 	  if(getLinkStatus(m_ActiveAdapterId))	    goto tryagain;           if (m_adapters == 1) {            DBUG_PRINT("error", ("SCI Transfer failed"));            report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);	    return false;           }	  m_failCounter++; 	  Uint32 temp=m_ActiveAdapterId;	    	     	  if (getLinkStatus(m_StandbyAdapterId)) { 	    failoverShmWriter();		 	    SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0); 	    m_ActiveAdapterId=m_StandbyAdapterId; 	    m_StandbyAdapterId=temp;             DBUG_PRINT("error", ("Swapping from adapter %u to %u",                       m_StandbyAdapterId, m_ActiveAdapterId));	  } else {	    report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);            DBUG_PRINT("error", ("SCI Transfer failed")); 	  }        }      } else { 	SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer);	writer->updateWritePtr(sizeToSend); 		Uint32 sendLimit = writer->getBufferSize();	sendLimit -= writer->getWriteIndex();		m_sendBuffer.m_dataSize = 0;	m_sendBuffer.m_forceSendLimit = sendLimit;      }     } else {       /**        * If we end up here, the SCI segment is full.         */       DBUG_PRINT("error", ("the segment is full for some reason"));       return false;     } //if    }   return true; } // doSend()   void SCI_Transporter::failoverShmWriter() { #if 0  (m_TargetSegm[m_StandbyAdapterId].writer)    ->copyIndexes((m_TargetSegm[m_StandbyAdapterId].writer));#endif} //failoverShm   void SCI_Transporter::setupLocalSegment()   {    DBUG_ENTER("SCI_Transporter::setupLocalSegment");    Uint32 sharedSize = 0;    sharedSize =4096;   //start of the buffer is page aligend        Uint32 sizeOfBuffer = m_BufferSize;     sizeOfBuffer -= sharedSize;     Uint32 * localReadIndex =       (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;     Uint32 * localWriteIndex =  (Uint32*)(localReadIndex+ 1);    m_localStatusFlag = (Uint32*)(localReadIndex + 3);     char * localStartOfBuf = (char*)       ((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize);     * localReadIndex = 0;    * localWriteIndex = 0;    const Uint32 slack = MAX_MESSAGE_SIZE;   reader = new SHM_Reader(localStartOfBuf,  			   sizeOfBuffer, 			   slack,			   localReadIndex, 			   localWriteIndex);    

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?