📄 sci_transporter.cpp
字号:
->copyIndexes((m_TargetSegm[m_StandbyAdapterId].writer));#endif} //failoverShm void SCI_Transporter::setupLocalSegment() { DBUG_ENTER("SCI_Transporter::setupLocalSegment"); Uint32 sharedSize = 0; sharedSize =4096; //start of the buffer is page aligend Uint32 sizeOfBuffer = m_BufferSize; sizeOfBuffer -= sharedSize; Uint32 * localReadIndex = (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory; Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1); m_localStatusFlag = (Uint32*)(localReadIndex + 3); char * localStartOfBuf = (char*) ((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize); * localReadIndex = 0; * localWriteIndex = 0; const Uint32 slack = MAX_MESSAGE_SIZE; reader = new SHM_Reader(localStartOfBuf, sizeOfBuffer, slack, localReadIndex, localWriteIndex); reader->clear(); DBUG_VOID_RETURN;} //setupLocalSegment void SCI_Transporter::setupRemoteSegment() { DBUG_ENTER("SCI_Transporter::setupRemoteSegment"); Uint32 sharedSize = 0; sharedSize =4096; //start of the buffer is page aligned Uint32 sizeOfBuffer = m_BufferSize; const Uint32 slack = MAX_MESSAGE_SIZE; sizeOfBuffer -= sharedSize; Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ; Uint32 * remoteReadIndex = (Uint32*)segPtr; Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1); m_remoteStatusFlag = (Uint32*)(segPtr + 3); char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize)); writer = new SHM_Writer(remoteStartOfBuf, sizeOfBuffer, slack, remoteReadIndex, remoteWriteIndex); writer->clear(); m_TargetSegm[0].writer=writer; m_sendBuffer.m_forceSendLimit = writer->getBufferSize(); if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); DBUG_PRINT("error", ("Unable to create sequence on active")); doDisconnect(); } if (m_adapters > 1) { segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ; Uint32 * remoteReadIndex2 = (Uint32*)segPtr; Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1); m_remoteStatusFlag2 = (Uint32*)(segPtr + 3); char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize); /** * setup a writer. writer2 is used to mirror the changes of * writer on the standby * segment, so that in the case of a failover, we can switch * to the stdby seg. quickly.* */ writer2 = new SHM_Writer(remoteStartOfBuf2, sizeOfBuffer, slack, remoteReadIndex2, remoteWriteIndex2); * remoteReadIndex = 0; * remoteWriteIndex = 0; writer2->clear(); m_TargetSegm[1].writer=writer2; if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); DBUG_PRINT("error", ("Unable to create sequence on standby")); doDisconnect(); } } DBUG_VOID_RETURN; } //setupRemoteSegment boolSCI_Transporter::init_local(){ DBUG_ENTER("SCI_Transporter::init_local"); if(!m_initLocal) { if(initLocalSegment()!=SCI_ERR_OK){ NdbSleep_MilliSleep(10); //NDB SHOULD TERMINATE AND COMPUTER REBOOTED! report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT); DBUG_RETURN(false); } m_initLocal=true; } DBUG_RETURN(true);}boolSCI_Transporter::init_remote(){ DBUG_ENTER("SCI_Transporter::init_remote"); sci_error_t err; Uint32 offset = 0; if(!m_mapped ) { DBUG_PRINT("info", ("Map remote segments")); for(Uint32 i=0; i < m_adapters ; i++) { m_TargetSegm[i].rhm[i].remoteHandle=0; SCIConnectSegment(sciAdapters[i].scidesc, &(m_TargetSegm[i].rhm[i].remoteHandle), m_remoteNodes[i], remoteSegmentId(localNodeId, remoteNodeId), i, 0, 0, 0, 0, &err); if(err != SCI_ERR_OK) { NdbSleep_MilliSleep(10); DBUG_PRINT("error", ("Error connecting segment, err 0x%x", err)); DBUG_RETURN(false); } } // Map the remote memory segment into program space for(Uint32 i=0; i < m_adapters ; i++) { m_TargetSegm[i].mappedMemory = SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle), &(m_TargetSegm[i].rhm[i].map), offset, m_BufferSize, NULL, FLAGS, &err); if(err!= SCI_ERR_OK) { DBUG_PRINT("error", ("Cannot map a segment to the remote node %d. Error code 0x%x",m_RemoteSciNodeId, err)); //NDB SHOULD TERMINATE AND COMPUTER REBOOTED! report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT); DBUG_RETURN(false); } } m_mapped=true; setupRemoteSegment(); setConnected(); DBUG_PRINT("info", ("connected and mapped to segment, remoteNode: %d", remoteNodeId)); DBUG_PRINT("info", ("remoteSegId: %d", remoteSegmentId(localNodeId, remoteNodeId))); DBUG_RETURN(true); } else { DBUG_RETURN(getConnectionStatus()); }}boolSCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd){ SocketInputStream s_input(sockfd); SocketOutputStream s_output(sockfd); char buf[256]; DBUG_ENTER("SCI_Transporter::connect_client_impl"); // Wait for server to create and attach if (s_input.gets(buf, 256) == 0) { DBUG_PRINT("error", ("No initial response from server in SCI")); NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } if (!init_local()) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } // Send ok to server s_output.println("sci client 1 ok"); if (!init_remote()) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } // Wait for ok from server if (s_input.gets(buf, 256) == 0) { DBUG_PRINT("error", ("No second response from server in SCI")); NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } // Send ok to server s_output.println("sci client 2 ok"); NDB_CLOSE_SOCKET(sockfd); DBUG_PRINT("info", ("Successfully connected client to node %d", remoteNodeId)); DBUG_RETURN(true);}boolSCI_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd){ SocketOutputStream s_output(sockfd); SocketInputStream s_input(sockfd); char buf[256]; DBUG_ENTER("SCI_Transporter::connect_server_impl"); if (!init_local()) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } // Send ok to client s_output.println("sci server 1 ok"); // Wait for ok from client if (s_input.gets(buf, 256) == 0) { DBUG_PRINT("error", ("No response from client in SCI")); NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } if (!init_remote()) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } // Send ok to client s_output.println("sci server 2 ok"); // Wait for ok from client if (s_input.gets(buf, 256) == 0) { DBUG_PRINT("error", ("No second response from client in SCI")); NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } NDB_CLOSE_SOCKET(sockfd); DBUG_PRINT("info", ("Successfully connected server to node %d", remoteNodeId)); DBUG_RETURN(true);} sci_error_t SCI_Transporter::createSequence(Uint32 adapterid) { sci_error_t err; SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map), &(m_TargetSegm[adapterid].sequence), SCI_FLAG_FAST_BARRIER, &err); return err; } // createSequence() sci_error_t SCI_Transporter::startSequence(Uint32 adapterid) { sci_error_t err; /** Perform preliminary error check on an SCI adapter before starting a * sequence of read and write operations on the mapped segment. */ m_SequenceStatus = SCIStartSequence( (m_TargetSegm[adapterid].sequence), FLAGS, &err); // If there still is an error then data cannot be safely send return err; } // startSequence() bool SCI_Transporter::disconnectLocal() { DBUG_ENTER("SCI_Transporter::disconnectLocal"); sci_error_t err; m_ActiveAdapterId=0; /** Free resources used by a local segment */ SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err); if(err!=SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT); DBUG_PRINT("error", ("Unable to unmap segment")); DBUG_RETURN(false); } SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle), FLAGS, &err); if(err!=SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT); DBUG_PRINT("error", ("Unable to remove segment")); DBUG_RETURN(false); } DBUG_PRINT("info", ("Local memory segment is unmapped and removed")); DBUG_RETURN(true); } // disconnectLocal() bool SCI_Transporter::disconnectRemote() { DBUG_ENTER("SCI_Transporter::disconnectRemote"); sci_error_t err; for(Uint32 i=0; i<m_adapters; i++) { /** * Segment unmapped, disconnect from the remotely connected segment */ SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err); if(err!=SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT); DBUG_PRINT("error", ("Unable to unmap segment")); DBUG_RETURN(false); } SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle, FLAGS, &err); if(err!=SCI_ERR_OK) { report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT); DBUG_PRINT("error", ("Unable to disconnect segment")); DBUG_RETURN(false); } DBUG_PRINT("info", ("Remote memory segment is unmapped and disconnected")); } DBUG_RETURN(true); } // disconnectRemote() SCI_Transporter::~SCI_Transporter() { DBUG_ENTER("SCI_Transporter::~SCI_Transporter"); // Close channel to the driver doDisconnect(); if(m_sendBuffer.m_buffer != NULL) delete[] m_sendBuffer.m_buffer; DBUG_VOID_RETURN;} // ~SCI_Transporter() void SCI_Transporter::closeSCI() { // Termination of SCI sci_error_t err; DBUG_ENTER("SCI_Transporter::closeSCI"); // Disconnect and remove remote segment disconnectRemote(); // Unmap and remove local segment disconnectLocal(); // Closes an SCI virtual device SCIClose(activeSCIDescriptor, FLAGS, &err); if(err != SCI_ERR_OK) { DBUG_PRINT("error", ("Cannot close SCI channel to the driver. Error code 0x%x", err)); } SCITerminate(); DBUG_VOID_RETURN;} // closeSCI() Uint32 *SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){ Uint32 sci_buffer_remaining = m_sendBuffer.m_forceSendLimit; Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize; Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2; Uint32 new_curr_data_size = curr_data_size + lenBytes; if ((curr_data_size >= send_buf_size) || (curr_data_size >= sci_buffer_remaining)) { /** * The new message will not fit in the send buffer. We need to * send the send buffer before filling it up with the new * signal data. If current data size will spill over buffer edge * we will also send to ensure correct operation. */ if (!doSend()) { /** * We were not successfull sending, report 0 as meaning buffer full and * upper levels handle retries and other recovery matters. */ return 0; } } /** * New signal fits, simply fill it up with more data. */ Uint32 sz = m_sendBuffer.m_dataSize; return &m_sendBuffer.m_buffer[sz];}voidSCI_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){ Uint32 sz = m_sendBuffer.m_dataSize; Uint32 packet_size = m_PacketSize; sz += ((lenBytes + 3) >> 2); m_sendBuffer.m_dataSize = sz; if(sz > packet_size) { /**------------------------------------------------- * Buffer is full and we are ready to send. We will * not wait since the signal is already in the buffer. * Force flag set has the same indication that we * should always send. If it is not possible to send * we will not worry since we will soon be back for * a renewed trial. *------------------------------------------------- */ doSend(); }}enum SciStatus { SCIDISCONNECT = 1, SCICONNECTED = 2}; bool SCI_Transporter::getConnectionStatus() { if(*m_localStatusFlag == SCICONNECTED && (*m_remoteStatusFlag == SCICONNECTED || ((m_adapters > 1) && *m_remoteStatusFlag2 == SCICONNECTED))) return true; else return false; } void SCI_Transporter::setConnected() { *m_remoteStatusFlag = SCICONNECTED; if (m_adapters > 1) { *m_remoteStatusFlag2 = SCICONNECTED; } *m_localStatusFlag = SCICONNECTED; } void SCI_Transporter::setDisconnect() { if(getLinkStatus(m_ActiveAdapterId)) *m_remoteStatusFlag = SCIDISCONNECT; if (m_adapters > 1) { if(getLinkStatus(m_StandbyAdapterId)) *m_remoteStatusFlag2 = SCIDISCONNECT; }} bool SCI_Transporter::checkConnected() { if (*m_localStatusFlag == SCIDISCONNECT) { return false; } else return true; } static bool init = false; bool SCI_Transporter::initSCI() { DBUG_ENTER("SCI_Transporter::initSCI"); if(!init){ sci_error_t error; // Initialize SISCI library SCIInitialize(0, &error); if(error != SCI_ERR_OK) { DBUG_PRINT("error", ("Cannot initialize SISCI library.")); DBUG_PRINT("error", ("Inconsistency between SISCI library and SISCI driver. Error code 0x%x", error)); DBUG_RETURN(false); } init = true; } DBUG_RETURN(true);} Uint32SCI_Transporter::get_free_buffer() const{ return (m_TargetSegm[m_ActiveAdapterId].writer)->get_free_buffer();}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -