📄 sci_transporter.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */#include <ndb_global.h> #include "SCI_Transporter.hpp" #include <NdbOut.hpp> #include <NdbSleep.h> #include <NdbTick.h> #include <NdbTick.h> #include "TransporterInternalDefinitions.hpp" #include <TransporterCallback.hpp> #include <InputStream.hpp>#include <OutputStream.hpp> #define FLAGS 0 #define DEBUG_TRANSPORTER SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg, const char *lHostName, const char *rHostName, int r_port, bool isMgmConnection, Uint32 packetSize, Uint32 bufferSize, Uint32 nAdapters, Uint16 remoteSciNodeId0, Uint16 remoteSciNodeId1, NodeId _localNodeId, NodeId _remoteNodeId, NodeId serverNodeId, bool chksm, bool signalId, Uint32 reportFreq) : Transporter(t_reg, tt_SCI_TRANSPORTER, lHostName, rHostName, r_port, isMgmConnection, _localNodeId, _remoteNodeId, serverNodeId, 0, false, chksm, signalId) { DBUG_ENTER("SCI_Transporter::SCI_Transporter"); m_PacketSize = (packetSize + 3)/4 ; m_BufferSize = bufferSize; m_sendBuffer.m_buffer = NULL; m_RemoteSciNodeId = remoteSciNodeId0; if(remoteSciNodeId0 == 0 || remoteSciNodeId1 == 0) m_numberOfRemoteNodes=1; else m_numberOfRemoteNodes=2; m_RemoteSciNodeId1 = remoteSciNodeId1; m_initLocal=false; m_swapCounter=0; m_failCounter=0; m_remoteNodes[0]=remoteSciNodeId0; m_remoteNodes[1]=remoteSciNodeId1; m_adapters = nAdapters; // The maximum number of times to try and create, // start and destroy a sequence m_ActiveAdapterId=0; m_StandbyAdapterId=1; m_mapped = false; m_sciinit=false; sciAdapters= new SciAdapter[nAdapters* (sizeof (SciAdapter))]; if(sciAdapters==NULL) { } m_SourceSegm= new sourceSegm[nAdapters* (sizeof (sourceSegm))]; if(m_SourceSegm==NULL) { } m_TargetSegm= new targetSegm[nAdapters* (sizeof (targetSegm))]; if(m_TargetSegm==NULL) { } m_reportFreq= reportFreq; //reset all statistic counters. #ifdef DEBUG_TRANSPORTER i1024=0; i2048=0; i2049=0; i10242048=0; i20484096=0; i4096=0; i4097=0; #endif DBUG_VOID_RETURN;} void SCI_Transporter::disconnectImpl() { DBUG_ENTER("SCI_Transporter::disconnectImpl"); sci_error_t err; if(m_mapped){ setDisconnect(); DBUG_PRINT("info", ("connect status = %d, remote node = %d", (int)getConnectionStatus(), remoteNodeId)); disconnectRemote(); disconnectLocal(); } // Empty send buffer m_sendBuffer.m_dataSize = 0; m_initLocal=false; m_mapped = false; if(m_sciinit) { for(Uint32 i=0; i<m_adapters ; i++) { SCIClose(sciAdapters[i].scidesc, FLAGS, &err); if(err != SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL); DBUG_PRINT("error", ("Cannot close channel to the driver. Error code 0x%x", err)); } } } m_sciinit=false; #ifdef DEBUG_TRANSPORTER ndbout << "total: " << i1024+ i10242048 + i2048+i2049 << endl; ndbout << "<1024: " << i1024 << endl; ndbout << "1024-2047: " << i10242048 << endl; ndbout << "==2048: " << i2048 << endl; ndbout << "2049-4096: " << i20484096 << endl; ndbout << "==4096: " << i4096 << endl; ndbout << ">4096: " << i4097 << endl; #endif DBUG_VOID_RETURN; } bool SCI_Transporter::initTransporter() { DBUG_ENTER("SCI_Transporter::initTransporter"); if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){ m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096; } // Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding // the need to send twice when a large message comes around. Send buffer size is // measured in words. Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;; m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4); m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4]; m_sendBuffer.m_dataSize = 0; DBUG_PRINT("info", ("Created SCI Send Buffer with buffer size %d and packet size %d", m_sendBuffer.m_sendBufferSize, m_PacketSize * 4)); if(!getLinkStatus(m_ActiveAdapterId) || (m_adapters > 1 && !getLinkStatus(m_StandbyAdapterId))) { DBUG_PRINT("error", ("The link is not fully operational. Check the cables and the switches")); //reportDisconnect(remoteNodeId, 0); //doDisconnect(); //NDB should terminate report_error(TE_SCI_LINK_ERROR); DBUG_RETURN(false); } DBUG_RETURN(true); } // initTransporter() Uint32 SCI_Transporter::getLocalNodeId(Uint32 adapterNo) { sci_query_adapter_t queryAdapter; sci_error_t error; Uint32 _localNodeId; queryAdapter.subcommand = SCI_Q_ADAPTER_NODEID; queryAdapter.localAdapterNo = adapterNo; queryAdapter.data = &_localNodeId; SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error); if(error != SCI_ERR_OK) return 0; return _localNodeId; } bool SCI_Transporter::getLinkStatus(Uint32 adapterNo) { sci_query_adapter_t queryAdapter; sci_error_t error; int linkstatus; queryAdapter.subcommand = SCI_Q_ADAPTER_LINK_OPERATIONAL; queryAdapter.localAdapterNo = adapterNo; queryAdapter.data = &linkstatus; SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error); if(error != SCI_ERR_OK) { DBUG_PRINT("error", ("error %d querying adapter", error)); return false; } if(linkstatus<=0) return false; return true; } sci_error_t SCI_Transporter::initLocalSegment() { DBUG_ENTER("SCI_Transporter::initLocalSegment"); Uint32 segmentSize = m_BufferSize; Uint32 offset = 0; sci_error_t err; if(!m_sciinit) { for(Uint32 i=0; i<m_adapters ; i++) { SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err); sciAdapters[i].localSciNodeId=getLocalNodeId(i); DBUG_PRINT("info", ("SCInode iD %d adapter %d\n", sciAdapters[i].localSciNodeId, i)); if(err != SCI_ERR_OK) { DBUG_PRINT("error", ("Cannot open an SCI virtual device. Error code 0x%x", err)); DBUG_RETURN(err); } } } m_sciinit=true; SCICreateSegment(sciAdapters[0].scidesc, &(m_SourceSegm[0].localHandle), hostSegmentId(localNodeId, remoteNodeId), segmentSize, 0, 0, 0, &err); if(err != SCI_ERR_OK) { DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err)); DBUG_RETURN(err); } else { DBUG_PRINT("info", ("created segment id : %d", hostSegmentId(localNodeId, remoteNodeId))); } /** Prepare the segment*/ for(Uint32 i=0; i < m_adapters; i++) { SCIPrepareSegment((m_SourceSegm[0].localHandle), i, FLAGS, &err); if(err != SCI_ERR_OK) { DBUG_PRINT("error", ("Local Segment is not accessible by an SCI adapter. Error code 0x%x\n", err)); DBUG_RETURN(err); } } m_SourceSegm[0].mappedMemory = SCIMapLocalSegment((m_SourceSegm[0].localHandle), &(m_SourceSegm[0].lhm[0].map), offset, segmentSize, NULL, FLAGS, &err); if(err != SCI_ERR_OK) { DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x", segmentSize,err)); doDisconnect(); DBUG_RETURN(err); } /** Make the local segment available*/ for(Uint32 i=0; i < m_adapters; i++) { SCISetSegmentAvailable((m_SourceSegm[0].localHandle), i, FLAGS, &err); if(err != SCI_ERR_OK) { DBUG_PRINT("error", ("Local Segment is not available for remote connections. Error code 0x%x\n", err)); DBUG_RETURN(err); } } setupLocalSegment(); DBUG_RETURN(err); } // initLocalSegment() bool SCI_Transporter::doSend() { #ifdef DEBUG_TRANSPORTER NDB_TICKS startSec=0, stopSec=0; Uint32 startMicro=0, stopMicro=0, totalMicro=0; #endif sci_error_t err; Uint32 retry=0; const char * const sendPtr = (char*)m_sendBuffer.m_buffer; const Uint32 sizeToSend = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes if (sizeToSend > 0){#ifdef DEBUG_TRANSPORTER if(sizeToSend < 1024 ) i1024++; if(sizeToSend > 1024 && sizeToSend < 2048 ) i10242048++; if(sizeToSend==2048) i2048++; if(sizeToSend>2048 && sizeToSend < 4096) i20484096++; if(sizeToSend==4096) i4096++; if(sizeToSend==4097) i4097++; #endif if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { DBUG_PRINT("error", ("Start sequence failed")); report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); return false; } tryagain: retry++; if (retry > 3) { DBUG_PRINT("error", ("SCI Transfer failed")); report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); return false; } Uint32 * insertPtr = (Uint32 *) (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend); if(insertPtr != 0) { const Uint32 remoteOffset=(Uint32) ((char*)insertPtr - (char*)(m_TargetSegm[m_ActiveAdapterId].mappedMemory)); SCIMemCpy(m_TargetSegm[m_ActiveAdapterId].sequence, (void*)sendPtr, m_TargetSegm[m_ActiveAdapterId].rhm[m_ActiveAdapterId].map, remoteOffset, sizeToSend, SCI_FLAG_ERROR_CHECK, &err); if (err != SCI_ERR_OK) { if(err == SCI_ERR_OUT_OF_RANGE) { DBUG_PRINT("error", ("Data transfer : out of range error")); goto tryagain; } if(err == SCI_ERR_SIZE_ALIGNMENT) { DBUG_PRINT("error", ("Data transfer : alignment error")); DBUG_PRINT("info", ("sendPtr 0x%x, sizeToSend = %d", sendPtr, sizeToSend)); goto tryagain; } if(err == SCI_ERR_OFFSET_ALIGNMENT) { DBUG_PRINT("error", ("Data transfer : offset alignment")); goto tryagain; } if(err == SCI_ERR_TRANSFER_FAILED) { //(m_TargetSegm[m_StandbyAdapterId].writer)->heavyLock(); if(getLinkStatus(m_ActiveAdapterId)) { goto tryagain; } if (m_adapters == 1) { DBUG_PRINT("error", ("SCI Transfer failed")); report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); return false; } m_failCounter++; Uint32 temp=m_ActiveAdapterId; switch(m_swapCounter) { case 0: /**swap from active (0) to standby (1)*/ if(getLinkStatus(m_StandbyAdapterId)) { DBUG_PRINT("error", ("Swapping from adapter 0 to 1")); failoverShmWriter(); SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0); m_ActiveAdapterId=m_StandbyAdapterId; m_StandbyAdapterId=temp; SCIRemoveSequence((m_TargetSegm[m_StandbyAdapterId].sequence), FLAGS, &err); if(err!=SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); DBUG_PRINT("error", ("Unable to remove sequence")); return false; } if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { DBUG_PRINT("error", ("Start sequence failed")); report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); return false; } m_swapCounter++; DBUG_PRINT("info", ("failover complete")); goto tryagain; } else { report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); DBUG_PRINT("error", ("SCI Transfer failed")); return false; } return false; break; case 1: /** swap back from 1 to 0 must check that the link is up */ if(getLinkStatus(m_StandbyAdapterId)) { failoverShmWriter(); m_ActiveAdapterId=m_StandbyAdapterId; m_StandbyAdapterId=temp; DBUG_PRINT("info", ("Swapping from 1 to 0")); if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { DBUG_PRINT("error", ("Unable to create sequence")); report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); return false; } if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { DBUG_PRINT("error", ("startSequence failed... disconnecting")); report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); return false; } SCIRemoveSequence((m_TargetSegm[m_StandbyAdapterId].sequence) , FLAGS, &err); if(err!=SCI_ERR_OK) { DBUG_PRINT("error", ("Unable to remove sequence")); report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); return false; } if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { DBUG_PRINT("error", ("Unable to create sequence on standby")); report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); return false; } m_swapCounter=0; DBUG_PRINT("info", ("failover complete..")); goto tryagain; } else { DBUG_PRINT("error", ("Unrecoverable data transfer error")); report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); return false; } break; default: DBUG_PRINT("error", ("Unrecoverable data transfer error")); report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); return false; break; } } } else { SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer); writer->updateWritePtr(sizeToSend); Uint32 sendLimit = writer->getBufferSize(); sendLimit -= writer->getWriteIndex(); m_sendBuffer.m_dataSize = 0; m_sendBuffer.m_forceSendLimit = sendLimit; } } else { /** * If we end up here, the SCI segment is full. */ DBUG_PRINT("error", ("the segment is full for some reason")); return false; } //if } return true; } // doSend() void SCI_Transporter::failoverShmWriter() { #if 0 (m_TargetSegm[m_StandbyAdapterId].writer)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -