⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transporterregistry.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
📖 第 1 页 / 共 3 页
字号:
	break;    ind++;    for(; ind<nOSETransporters; ind++)      theOSETransporters[ind-1] = theOSETransporters[ind];    nOSETransporters --;#endif    break;  }    nTransporters--;  // Delete the transporter and remove it from theTransporters array  delete theTransporters[nodeId];  theTransporters[nodeId] = NULL;        }Uint32TransporterRegistry::get_free_buffer(Uint32 node) const{  Transporter *t;  if(likely((t = theTransporters[node]) != 0))  {    return t->get_free_buffer();  }  return 0;}SendStatusTransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 				 Uint8 prio,				 const Uint32 * const signalData,				 NodeId nodeId, 				 const LinearSectionPtr ptr[3]){  Transporter *t = theTransporters[nodeId];  if(t != NULL &&      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||       ((signalHeader->theReceiversBlockNumber == 252) ||       (signalHeader->theReceiversBlockNumber == 4002)))) {	     if(t->isConnected()){      Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);      if(lenBytes <= MAX_MESSAGE_SIZE){	Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);	if(insertPtr != 0){	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);	  t->updateWritePtr(lenBytes, prio);	  return SEND_OK;	}	int sleepTime = 2;		/**	 * @note: on linux/i386 the granularity is 10ms	 *        so sleepTime = 2 generates a 10 ms sleep.	 */	for(int i = 0; i<50; i++){	  if((nSHMTransporters+nSCITransporters) == 0)	    NdbSleep_MilliSleep(sleepTime); 	  insertPtr = t->getWritePtr(lenBytes, prio);	  if(insertPtr != 0){	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);	    t->updateWritePtr(lenBytes, prio);	    break;	  }	}		if(insertPtr != 0){	  /**	   * Send buffer full, but resend works	   */	  reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);	  return SEND_OK;	}		WARNING("Signal to " << nodeId << " lost(buffer)");	reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);	return SEND_BUFFER_FULL;      } else {	return SEND_MESSAGE_TOO_BIG;      }    } else {      DEBUG("Signal to " << nodeId << " lost(disconnect) ");      return SEND_DISCONNECTED;    }  } else {    DEBUG("Discarding message to block: " 	  << signalHeader->theReceiversBlockNumber 	  << " node: " << nodeId);        if(t == NULL)      return SEND_UNKNOWN_NODE;        return SEND_BLOCKED;  }}SendStatusTransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 				 Uint8 prio,				 const Uint32 * const signalData,				 NodeId nodeId, 				 class SectionSegmentPool & thePool,				 const SegmentedSectionPtr ptr[3]){    Transporter *t = theTransporters[nodeId];  if(t != NULL &&      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||       ((signalHeader->theReceiversBlockNumber == 252)||        (signalHeader->theReceiversBlockNumber == 4002)))) {        if(t->isConnected()){      Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);      if(lenBytes <= MAX_MESSAGE_SIZE){	Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);	if(insertPtr != 0){	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);	  t->updateWritePtr(lenBytes, prio);	  return SEND_OK;	}			/**	 * @note: on linux/i386 the granularity is 10ms	 *        so sleepTime = 2 generates a 10 ms sleep.	 */	int sleepTime = 2;	for(int i = 0; i<50; i++){	  if((nSHMTransporters+nSCITransporters) == 0)	    NdbSleep_MilliSleep(sleepTime); 	  insertPtr = t->getWritePtr(lenBytes, prio);	  if(insertPtr != 0){	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);	    t->updateWritePtr(lenBytes, prio);	    break;	  }	}		if(insertPtr != 0){	  /**	   * Send buffer full, but resend works	   */	  reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);	  return SEND_OK;	}		WARNING("Signal to " << nodeId << " lost(buffer)");	reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);	return SEND_BUFFER_FULL;      } else {	return SEND_MESSAGE_TOO_BIG;      }    } else {      DEBUG("Signal to " << nodeId << " lost(disconnect) ");      return SEND_DISCONNECTED;    }  } else {    DEBUG("Discarding message to block: " 	  << signalHeader->theReceiversBlockNumber 	  << " node: " << nodeId);        if(t == NULL)      return SEND_UNKNOWN_NODE;        return SEND_BLOCKED;  }}voidTransporterRegistry::external_IO(Uint32 timeOutMillis) {  //-----------------------------------------------------------  // Most of the time we will send the buffers here and then wait  // for new signals. Thus we start by sending without timeout  // followed by the receive part where we expect to sleep for  // a while.  //-----------------------------------------------------------  if(pollReceive(timeOutMillis)){    performReceive();  }  performSend();}Uint32TransporterRegistry::pollReceive(Uint32 timeOutMillis){  Uint32 retVal = 0;#ifdef NDB_OSE_TRANSPORTER  retVal |= poll_OSE(timeOutMillis);  retVal |= poll_TCP(0);  return retVal;#endif    if((nSCITransporters) > 0)  {    timeOutMillis=0;  }#ifdef NDB_SHM_TRANSPORTER  if(nSHMTransporters > 0)  {    Uint32 res = poll_SHM(0);    if(res)    {      retVal |= res;      timeOutMillis = 0;    }  }#endif#ifdef NDB_TCP_TRANSPORTER  if(nTCPTransporters > 0 || retVal == 0)  {    retVal |= poll_TCP(timeOutMillis);  }  else    tcpReadSelectReply = 0;#endif#ifdef NDB_SCI_TRANSPORTER  if(nSCITransporters > 0)    retVal |= poll_SCI(timeOutMillis);#endif#ifdef NDB_SHM_TRANSPORTER  if(nSHMTransporters > 0 && retVal == 0)  {    int res = poll_SHM(0);    retVal |= res;  }#endif  return retVal;}#ifdef NDB_SCI_TRANSPORTERUint32TransporterRegistry::poll_SCI(Uint32 timeOutMillis){  for (int i=0; i<nSCITransporters; i++) {    SCI_Transporter * t = theSCITransporters[i];    if (t->isConnected()) {      if(t->hasDataToRead())	return 1;    }  }  return 0;}#endif#ifdef NDB_SHM_TRANSPORTERstatic int g_shm_counter = 0;Uint32TransporterRegistry::poll_SHM(Uint32 timeOutMillis){    for(int j=0; j < 100; j++)  {    for (int i=0; i<nSHMTransporters; i++) {      SHM_Transporter * t = theSHMTransporters[i];      if (t->isConnected()) {	if(t->hasDataToRead()) {	  return 1;	}      }    }  }  return 0;}#endif#ifdef NDB_OSE_TRANSPORTERUint32TransporterRegistry::poll_OSE(Uint32 timeOutMillis){  if(theOSEReceiver != NULL){    return theOSEReceiver->doReceive(timeOutMillis);  }  NdbSleep_MilliSleep(timeOutMillis);  return 0;}#endif#ifdef NDB_TCP_TRANSPORTERUint32 TransporterRegistry::poll_TCP(Uint32 timeOutMillis){  if (false && nTCPTransporters == 0)  {    tcpReadSelectReply = 0;    return 0;  }    struct timeval timeout;#ifdef NDB_OSE  // Return directly if there are no TCP transporters configured    if(timeOutMillis <= 1){    timeout.tv_sec  = 0;    timeout.tv_usec = 1025;  } else {    timeout.tv_sec  = timeOutMillis / 1000;    timeout.tv_usec = (timeOutMillis % 1000) * 1000;  }#else    timeout.tv_sec  = timeOutMillis / 1000;  timeout.tv_usec = (timeOutMillis % 1000) * 1000;#endif  NDB_SOCKET_TYPE maxSocketValue = -1;    // Needed for TCP/IP connections  // The read- and writeset are used by select    FD_ZERO(&tcpReadset);  // Prepare for sending and receiving  for (int i = 0; i < nTCPTransporters; i++) {    TCP_Transporter * t = theTCPTransporters[i];        // If the transporter is connected    if (t->isConnected()) {            const NDB_SOCKET_TYPE socket = t->getSocket();      // Find the highest socket value. It will be used by select      if (socket > maxSocketValue)	maxSocketValue = socket;            // Put the connected transporters in the socket read-set       FD_SET(socket, &tcpReadset);    }  }    // The highest socket value plus one  maxSocketValue++;     tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);    if(false && tcpReadSelectReply == -1 && errno == EINTR)    ndbout_c("woke-up by signal");#ifdef NDB_WIN32  if(tcpReadSelectReply == SOCKET_ERROR)  {    NdbSleep_MilliSleep(timeOutMillis);  }#endif    return tcpReadSelectReply;}#endifvoidTransporterRegistry::performReceive(){#ifdef NDB_OSE_TRANSPORTER  if(theOSEReceiver != 0)  {    while(theOSEReceiver->hasData())    {      NodeId remoteNodeId;      Uint32 * readPtr;      Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);      transporter_recv_from(callbackObj, remoteNodeId);      Uint32 szUsed = unpack(readPtr,			     sz,			     remoteNodeId,			     ioStates[remoteNodeId]);#ifdef DEBUG_TRANSPORTER      /**       * OSE transporter can handle executions of       *   half signals       */      assert(sz == szUsed);#endif      theOSEReceiver->updateReceiveDataPtr(szUsed);      theOSEReceiver->doReceive(0);      //      checkJobBuffer();    }  }#endif#ifdef NDB_TCP_TRANSPORTER  if(tcpReadSelectReply > 0)  {    for (int i=0; i<nTCPTransporters; i++)     {      checkJobBuffer();      TCP_Transporter *t = theTCPTransporters[i];      const NodeId nodeId = t->getRemoteNodeId();      const NDB_SOCKET_TYPE socket    = t->getSocket();      if(is_connected(nodeId)){	if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) 	{	  const int receiveSize = t->doReceive();	  if(receiveSize > 0)	  {	    Uint32 * ptr;	    Uint32 sz = t->getReceiveData(&ptr);	    transporter_recv_from(callbackObj, nodeId);	    Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);	    t->updateReceiveDataPtr(szUsed);          }	}      }     }  }#endif  #ifdef NDB_SCI_TRANSPORTER  //performReceive  //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))  for (int i=0; i<nSCITransporters; i++)   {    checkJobBuffer();    SCI_Transporter  *t = theSCITransporters[i];    const NodeId nodeId = t->getRemoteNodeId();    if(is_connected(nodeId))    {      if(t->isConnected() && t->checkConnected())      {	Uint32 * readPtr, * eodPtr;	t->getReceivePtr(&readPtr, &eodPtr);	transporter_recv_from(callbackObj, nodeId);	Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);	t->updateReceivePtr(newPtr);      }    }   }#endif#ifdef NDB_SHM_TRANSPORTER  for (int i=0; i<nSHMTransporters; i++)   {    checkJobBuffer();    SHM_Transporter *t = theSHMTransporters[i];    const NodeId nodeId = t->getRemoteNodeId();    if(is_connected(nodeId)){      if(t->isConnected() && t->checkConnected())      {	Uint32 * readPtr, * eodPtr;	t->getReceivePtr(&readPtr, &eodPtr);	transporter_recv_from(callbackObj, nodeId);	Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);	t->updateReceivePtr(newPtr);      }    }   }#endif}static int x = 0;voidTransporterRegistry::performSend(){  int i;   sendCounter = 1;  #ifdef NDB_OSE_TRANSPORTER  for (int i = 0; i < nOSETransporters; i++)  {    OSE_Transporter *t = theOSETransporters[i];     if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected()))    {      t->doSend();    }//if  }//for#endif  #ifdef NDB_TCP_TRANSPORTER#ifdef NDB_OSE  {    int maxSocketValue = 0;        // Needed for TCP/IP connections    // The writeset are used by select    fd_set writeset;    FD_ZERO(&writeset);        // Prepare for sending and receiving    for (i = 0; i < nTCPTransporters; i++) {      TCP_Transporter * t = theTCPTransporters[i];            // If the transporter is connected      if ((t->hasDataToSend()) && (t->isConnected())) {	const int socket = t->getSocket();	// Find the highest socket value. It will be used by select	if (socket > maxSocketValue) {	  maxSocketValue = socket;	}//if	FD_SET(socket, &writeset);      }//if    }//for        // The highest socket value plus one    if(maxSocketValue == 0)      return;        maxSocketValue++;     struct timeval timeout = { 0, 1025 };    Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);        if (tmp == 0)     {      return;    }//if    for (i = 0; i < nTCPTransporters; i++) {      TCP_Transporter *t = theTCPTransporters[i];      const NodeId nodeId = t->getRemoteNodeId();      const int socket    = t->getSocket();      if(is_connected(nodeId)){	  if(t->isConnected() && FD_ISSET(socket, &writeset)) {	    t->doSend();	  }//if	}//if      }//for    }#endif#ifdef NDB_TCP_TRANSPORTER  for (i = x; i < nTCPTransporters; i++)   {    TCP_Transporter *t = theTCPTransporters[i];    if (t && t->hasDataToSend() && t->isConnected() &&	is_connected(t->getRemoteNodeId()))     {      t->doSend();    }  }  for (i = 0; i < x && i < nTCPTransporters; i++)   {    TCP_Transporter *t = theTCPTransporters[i];    if (t && t->hasDataToSend() && t->isConnected() &&	is_connected(t->getRemoteNodeId()))     {      t->doSend();    }  }  x++;  if (x == nTCPTransporters) x = 0;#endif#endif#ifdef NDB_SCI_TRANSPORTER  //scroll through the SCI transporters,   // get each transporter, check if connected, send data  for (i=0; i<nSCITransporters; i++) {    SCI_Transporter  *t = theSCITransporters[i];    const NodeId nodeId = t->getRemoteNodeId();        if(is_connected(nodeId))    {      if(t->isConnected() && t->hasDataToSend()) {	t->doSend();      } //if    } //if  }#endif  #ifdef NDB_SHM_TRANSPORTER  for (i=0; i<nSHMTransporters; i++)   {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -