⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sci_transporter.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
    ->copyIndexes((m_TargetSegm[m_StandbyAdapterId].writer));#endif} //failoverShm   void SCI_Transporter::setupLocalSegment()   {    DBUG_ENTER("SCI_Transporter::setupLocalSegment");    Uint32 sharedSize = 0;    sharedSize =4096;   //start of the buffer is page aligend        Uint32 sizeOfBuffer = m_BufferSize;     sizeOfBuffer -= sharedSize;     Uint32 * localReadIndex =       (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;     Uint32 * localWriteIndex =  (Uint32*)(localReadIndex+ 1);    m_localStatusFlag = (Uint32*)(localReadIndex + 3);     char * localStartOfBuf = (char*)       ((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize);     * localReadIndex = 0;    * localWriteIndex = 0;    const Uint32 slack = MAX_MESSAGE_SIZE;   reader = new SHM_Reader(localStartOfBuf,  			   sizeOfBuffer, 			   slack,			   localReadIndex, 			   localWriteIndex);       reader->clear();    DBUG_VOID_RETURN;} //setupLocalSegment    void SCI_Transporter::setupRemoteSegment()   {    DBUG_ENTER("SCI_Transporter::setupRemoteSegment");   Uint32 sharedSize = 0;    sharedSize =4096;   //start of the buffer is page aligned      Uint32 sizeOfBuffer = m_BufferSize;    const Uint32 slack = MAX_MESSAGE_SIZE;   sizeOfBuffer -= sharedSize;    Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ;          Uint32 * remoteReadIndex = (Uint32*)segPtr;     Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1);    m_remoteStatusFlag = (Uint32*)(segPtr + 3);       char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize));        writer = new SHM_Writer(remoteStartOfBuf,  			   sizeOfBuffer, 			   slack,			   remoteReadIndex, 			   remoteWriteIndex);      writer->clear();        m_TargetSegm[0].writer=writer;     m_sendBuffer.m_forceSendLimit = writer->getBufferSize();       if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {      report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);      DBUG_PRINT("error", ("Unable to create sequence on active"));     doDisconnect();    }    if (m_adapters > 1) {     segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ;          Uint32 * remoteReadIndex2 = (Uint32*)segPtr;       Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);      m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);         char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize);          /**       * setup a writer. writer2 is used to mirror the changes of       * writer on the standby       * segment, so that in the case of a failover, we can switch       * to the stdby seg. quickly.*       */      writer2 = new SHM_Writer(remoteStartOfBuf2,                                sizeOfBuffer,                               slack,                              remoteReadIndex2,                               remoteWriteIndex2);     * remoteReadIndex = 0;      * remoteWriteIndex = 0;      writer2->clear();      m_TargetSegm[1].writer=writer2;      if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {        report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);        DBUG_PRINT("error", ("Unable to create sequence on standby"));       doDisconnect();      }    }   DBUG_VOID_RETURN; } //setupRemoteSegment boolSCI_Transporter::init_local(){  DBUG_ENTER("SCI_Transporter::init_local");  if(!m_initLocal) {     if(initLocalSegment()!=SCI_ERR_OK){       NdbSleep_MilliSleep(10);      //NDB SHOULD TERMINATE AND COMPUTER REBOOTED!       report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT);      DBUG_RETURN(false);    }     m_initLocal=true;  }   DBUG_RETURN(true);}boolSCI_Transporter::init_remote(){  DBUG_ENTER("SCI_Transporter::init_remote");  sci_error_t err;   Uint32 offset = 0;  if(!m_mapped ) {    DBUG_PRINT("info", ("Map remote segments"));    for(Uint32 i=0; i < m_adapters ; i++) {      m_TargetSegm[i].rhm[i].remoteHandle=0;      SCIConnectSegment(sciAdapters[i].scidesc,                        &(m_TargetSegm[i].rhm[i].remoteHandle),                        m_remoteNodes[i],                        remoteSegmentId(localNodeId, remoteNodeId),                        i,                        0,                        0,                        0,                        0,                        &err);      if(err != SCI_ERR_OK) {        NdbSleep_MilliSleep(10);        DBUG_PRINT("error", ("Error connecting segment, err 0x%x", err));        DBUG_RETURN(false);      }    }    // Map the remote memory segment into program space      for(Uint32 i=0; i < m_adapters ; i++) {      m_TargetSegm[i].mappedMemory =        SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle),                            &(m_TargetSegm[i].rhm[i].map),                            offset,                            m_BufferSize,                            NULL,                            FLAGS,                            &err);        if(err!= SCI_ERR_OK) {          DBUG_PRINT("error", ("Cannot map a segment to the remote node %d. Error code 0x%x",m_RemoteSciNodeId, err));          //NDB SHOULD TERMINATE AND COMPUTER REBOOTED!           report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT);          DBUG_RETURN(false);        }    }    m_mapped=true;    setupRemoteSegment();    setConnected();    DBUG_PRINT("info", ("connected and mapped to segment, remoteNode: %d",               remoteNodeId));    DBUG_PRINT("info", ("remoteSegId: %d",               remoteSegmentId(localNodeId, remoteNodeId)));    DBUG_RETURN(true);  } else {    DBUG_RETURN(getConnectionStatus());  }}boolSCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd){  SocketInputStream s_input(sockfd);  SocketOutputStream s_output(sockfd);  char buf[256];  DBUG_ENTER("SCI_Transporter::connect_client_impl");  // Wait for server to create and attach  if (s_input.gets(buf, 256) == 0) {    DBUG_PRINT("error", ("No initial response from server in SCI"));    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  if (!init_local()) {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  // Send ok to server  s_output.println("sci client 1 ok");  if (!init_remote()) {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  // Wait for ok from server  if (s_input.gets(buf, 256) == 0) {    DBUG_PRINT("error", ("No second response from server in SCI"));    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  // Send ok to server  s_output.println("sci client 2 ok");  NDB_CLOSE_SOCKET(sockfd);  DBUG_PRINT("info", ("Successfully connected client to node %d",              remoteNodeId));  DBUG_RETURN(true);}boolSCI_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd){  SocketOutputStream s_output(sockfd);  SocketInputStream s_input(sockfd);  char buf[256];  DBUG_ENTER("SCI_Transporter::connect_server_impl");  if (!init_local()) {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  // Send ok to client  s_output.println("sci server 1 ok");  // Wait for ok from client  if (s_input.gets(buf, 256) == 0) {    DBUG_PRINT("error", ("No response from client in SCI"));    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  if (!init_remote()) {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  // Send ok to client  s_output.println("sci server 2 ok");  // Wait for ok from client  if (s_input.gets(buf, 256) == 0) {    DBUG_PRINT("error", ("No second response from client in SCI"));    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  NDB_CLOSE_SOCKET(sockfd);  DBUG_PRINT("info", ("Successfully connected server to node %d",              remoteNodeId));  DBUG_RETURN(true);} sci_error_t SCI_Transporter::createSequence(Uint32 adapterid) {   sci_error_t err;   SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map),  		       &(m_TargetSegm[adapterid].sequence),  		       SCI_FLAG_FAST_BARRIER,  		       &err);        return err; } // createSequence()    sci_error_t SCI_Transporter::startSequence(Uint32 adapterid) {     sci_error_t err;   /** Perform preliminary error check on an SCI adapter before starting a    * sequence of read and write operations on the mapped segment.    */   m_SequenceStatus = SCIStartSequence( 				       (m_TargetSegm[adapterid].sequence),  				       FLAGS, &err);         // If there still is an error then data cannot be safely send   return err; } // startSequence()      bool SCI_Transporter::disconnectLocal()  {  DBUG_ENTER("SCI_Transporter::disconnectLocal");   sci_error_t err;   m_ActiveAdapterId=0;    /** Free resources used by a local segment    */    SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err);   if(err!=SCI_ERR_OK) {     report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);     DBUG_PRINT("error", ("Unable to unmap segment"));    DBUG_RETURN(false);   }    SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle), 		   FLAGS, 		   &err);     if(err!=SCI_ERR_OK) {     report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT);     DBUG_PRINT("error", ("Unable to remove segment"));    DBUG_RETURN(false);   }   DBUG_PRINT("info", ("Local memory segment is unmapped and removed"));   DBUG_RETURN(true); } // disconnectLocal()   bool SCI_Transporter::disconnectRemote()  {   DBUG_ENTER("SCI_Transporter::disconnectRemote");  sci_error_t err;   for(Uint32 i=0; i<m_adapters; i++) {     /**      * Segment unmapped, disconnect from the remotely connected segment      */       SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err);     if(err!=SCI_ERR_OK) {       report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);       DBUG_PRINT("error", ("Unable to unmap segment"));      DBUG_RETURN(false);     } 	     SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle, 			 FLAGS, 			 &err);     if(err!=SCI_ERR_OK) {       report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT);       DBUG_PRINT("error", ("Unable to disconnect segment"));      DBUG_RETURN(false);     }     DBUG_PRINT("info", ("Remote memory segment is unmapped and disconnected"));   }   DBUG_RETURN(true); } // disconnectRemote() SCI_Transporter::~SCI_Transporter() {   DBUG_ENTER("SCI_Transporter::~SCI_Transporter");  // Close channel to the driver   doDisconnect();   if(m_sendBuffer.m_buffer != NULL)    delete[] m_sendBuffer.m_buffer;  DBUG_VOID_RETURN;} // ~SCI_Transporter()     void SCI_Transporter::closeSCI() {   // Termination of SCI   sci_error_t err;   DBUG_ENTER("SCI_Transporter::closeSCI");     // Disconnect and remove remote segment   disconnectRemote();    // Unmap and remove local segment      disconnectLocal();      // Closes an SCI virtual device   SCIClose(activeSCIDescriptor, FLAGS, &err);       if(err != SCI_ERR_OK) {    DBUG_PRINT("error", ("Cannot close SCI channel to the driver. Error code 0x%x",  	        err));   }  SCITerminate();   DBUG_VOID_RETURN;} // closeSCI()  Uint32 *SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){  Uint32 sci_buffer_remaining = m_sendBuffer.m_forceSendLimit;  Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;  Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;  Uint32 new_curr_data_size = curr_data_size + lenBytes;  if ((curr_data_size >= send_buf_size) ||      (curr_data_size >= sci_buffer_remaining)) {    /**     * The new message will not fit in the send buffer. We need to     * send the send buffer before filling it up with the new     * signal data. If current data size will spill over buffer edge     * we will also send to ensure correct operation.     */      if (!doSend()) {       /**       * We were not successfull sending, report 0 as meaning buffer full and       * upper levels handle retries and other recovery matters.       */      return 0;    }  }  /**   * New signal fits, simply fill it up with more data.   */  Uint32 sz = m_sendBuffer.m_dataSize;  return &m_sendBuffer.m_buffer[sz];}voidSCI_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){    Uint32 sz = m_sendBuffer.m_dataSize;  Uint32 packet_size = m_PacketSize;  sz += ((lenBytes + 3) >> 2);  m_sendBuffer.m_dataSize = sz;    if(sz > packet_size) {     /**-------------------------------------------------      * Buffer is full and we are ready to send. We will      * not wait since the signal is already in the buffer.      * Force flag set has the same indication that we      * should always send. If it is not possible to send      * we will not worry since we will soon be back for      * a renewed trial.      *-------------------------------------------------      */     doSend();  }}enum SciStatus {  SCIDISCONNECT = 1,  SCICONNECTED  = 2}; bool SCI_Transporter::getConnectionStatus() {   if(*m_localStatusFlag == SCICONNECTED &&       (*m_remoteStatusFlag == SCICONNECTED ||      ((m_adapters > 1) &&      *m_remoteStatusFlag2 == SCICONNECTED)))     return true;   else     return false; }   void  SCI_Transporter::setConnected() {   *m_remoteStatusFlag = SCICONNECTED;   if (m_adapters > 1) {    *m_remoteStatusFlag2 = SCICONNECTED;   }  *m_localStatusFlag = SCICONNECTED; }   void  SCI_Transporter::setDisconnect() {   if(getLinkStatus(m_ActiveAdapterId))     *m_remoteStatusFlag = SCIDISCONNECT;   if (m_adapters > 1) {    if(getLinkStatus(m_StandbyAdapterId))       *m_remoteStatusFlag2 = SCIDISCONNECT;   }}   bool SCI_Transporter::checkConnected() {   if (*m_localStatusFlag == SCIDISCONNECT) {     return false;   }   else     return true; }  static bool init = false;  bool  SCI_Transporter::initSCI() {   DBUG_ENTER("SCI_Transporter::initSCI");  if(!init){     sci_error_t error;     // Initialize SISCI library     SCIInitialize(0, &error);     if(error != SCI_ERR_OK)  {       DBUG_PRINT("error", ("Cannot initialize SISCI library."));      DBUG_PRINT("error", ("Inconsistency between SISCI library and SISCI driver. Error code 0x%x",                 error));       DBUG_RETURN(false);    }     init = true;   }   DBUG_RETURN(true);}  Uint32SCI_Transporter::get_free_buffer() const{  return (m_TargetSegm[m_ActiveAdapterId].writer)->get_free_buffer();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -