📄 transporterregistry.cpp
字号:
break; ind++; for(; ind<nOSETransporters; ind++) theOSETransporters[ind-1] = theOSETransporters[ind]; nOSETransporters --;#endif break; } nTransporters--; // Delete the transporter and remove it from theTransporters array delete theTransporters[nodeId]; theTransporters[nodeId] = NULL; }Uint32TransporterRegistry::get_free_buffer(Uint32 node) const{ Transporter *t; if(likely((t = theTransporters[node]) != 0)) { return t->get_free_buffer(); } return 0;}SendStatusTransporterRegistry::prepareSend(const SignalHeader * const signalHeader, Uint8 prio, const Uint32 * const signalData, NodeId nodeId, const LinearSectionPtr ptr[3]){ Transporter *t = theTransporters[nodeId]; if(t != NULL && (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || ((signalHeader->theReceiversBlockNumber == 252) || (signalHeader->theReceiversBlockNumber == 4002)))) { if(t->isConnected()){ Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr); if(lenBytes <= MAX_MESSAGE_SIZE){ Uint32 * insertPtr = t->getWritePtr(lenBytes, prio); if(insertPtr != 0){ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr); t->updateWritePtr(lenBytes, prio); return SEND_OK; } int sleepTime = 2; /** * @note: on linux/i386 the granularity is 10ms * so sleepTime = 2 generates a 10 ms sleep. */ for(int i = 0; i<50; i++){ if((nSHMTransporters+nSCITransporters) == 0) NdbSleep_MilliSleep(sleepTime); insertPtr = t->getWritePtr(lenBytes, prio); if(insertPtr != 0){ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr); t->updateWritePtr(lenBytes, prio); break; } } if(insertPtr != 0){ /** * Send buffer full, but resend works */ reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL); return SEND_OK; } WARNING("Signal to " << nodeId << " lost(buffer)"); reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL); return SEND_BUFFER_FULL; } else { return SEND_MESSAGE_TOO_BIG; } } else { DEBUG("Signal to " << nodeId << " lost(disconnect) "); return SEND_DISCONNECTED; } } else { DEBUG("Discarding message to block: " << signalHeader->theReceiversBlockNumber << " node: " << nodeId); if(t == NULL) return SEND_UNKNOWN_NODE; return SEND_BLOCKED; }}SendStatusTransporterRegistry::prepareSend(const SignalHeader * const signalHeader, Uint8 prio, const Uint32 * const signalData, NodeId nodeId, class SectionSegmentPool & thePool, const SegmentedSectionPtr ptr[3]){ Transporter *t = theTransporters[nodeId]; if(t != NULL && (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || ((signalHeader->theReceiversBlockNumber == 252)|| (signalHeader->theReceiversBlockNumber == 4002)))) { if(t->isConnected()){ Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr); if(lenBytes <= MAX_MESSAGE_SIZE){ Uint32 * insertPtr = t->getWritePtr(lenBytes, prio); if(insertPtr != 0){ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr); t->updateWritePtr(lenBytes, prio); return SEND_OK; } /** * @note: on linux/i386 the granularity is 10ms * so sleepTime = 2 generates a 10 ms sleep. */ int sleepTime = 2; for(int i = 0; i<50; i++){ if((nSHMTransporters+nSCITransporters) == 0) NdbSleep_MilliSleep(sleepTime); insertPtr = t->getWritePtr(lenBytes, prio); if(insertPtr != 0){ t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr); t->updateWritePtr(lenBytes, prio); break; } } if(insertPtr != 0){ /** * Send buffer full, but resend works */ reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL); return SEND_OK; } WARNING("Signal to " << nodeId << " lost(buffer)"); reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL); return SEND_BUFFER_FULL; } else { return SEND_MESSAGE_TOO_BIG; } } else { DEBUG("Signal to " << nodeId << " lost(disconnect) "); return SEND_DISCONNECTED; } } else { DEBUG("Discarding message to block: " << signalHeader->theReceiversBlockNumber << " node: " << nodeId); if(t == NULL) return SEND_UNKNOWN_NODE; return SEND_BLOCKED; }}voidTransporterRegistry::external_IO(Uint32 timeOutMillis) { //----------------------------------------------------------- // Most of the time we will send the buffers here and then wait // for new signals. Thus we start by sending without timeout // followed by the receive part where we expect to sleep for // a while. //----------------------------------------------------------- if(pollReceive(timeOutMillis)){ performReceive(); } performSend();}Uint32TransporterRegistry::pollReceive(Uint32 timeOutMillis){ Uint32 retVal = 0;#ifdef NDB_OSE_TRANSPORTER retVal |= poll_OSE(timeOutMillis); retVal |= poll_TCP(0); return retVal;#endif if((nSCITransporters) > 0) { timeOutMillis=0; }#ifdef NDB_SHM_TRANSPORTER if(nSHMTransporters > 0) { Uint32 res = poll_SHM(0); if(res) { retVal |= res; timeOutMillis = 0; } }#endif#ifdef NDB_TCP_TRANSPORTER if(nTCPTransporters > 0 || retVal == 0) { retVal |= poll_TCP(timeOutMillis); } else tcpReadSelectReply = 0;#endif#ifdef NDB_SCI_TRANSPORTER if(nSCITransporters > 0) retVal |= poll_SCI(timeOutMillis);#endif#ifdef NDB_SHM_TRANSPORTER if(nSHMTransporters > 0 && retVal == 0) { int res = poll_SHM(0); retVal |= res; }#endif return retVal;}#ifdef NDB_SCI_TRANSPORTERUint32TransporterRegistry::poll_SCI(Uint32 timeOutMillis){ for (int i=0; i<nSCITransporters; i++) { SCI_Transporter * t = theSCITransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) return 1; } } return 0;}#endif#ifdef NDB_SHM_TRANSPORTERstatic int g_shm_counter = 0;Uint32TransporterRegistry::poll_SHM(Uint32 timeOutMillis){ for(int j=0; j < 100; j++) { for (int i=0; i<nSHMTransporters; i++) { SHM_Transporter * t = theSHMTransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) { return 1; } } } } return 0;}#endif#ifdef NDB_OSE_TRANSPORTERUint32TransporterRegistry::poll_OSE(Uint32 timeOutMillis){ if(theOSEReceiver != NULL){ return theOSEReceiver->doReceive(timeOutMillis); } NdbSleep_MilliSleep(timeOutMillis); return 0;}#endif#ifdef NDB_TCP_TRANSPORTERUint32 TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ if (false && nTCPTransporters == 0) { tcpReadSelectReply = 0; return 0; } struct timeval timeout;#ifdef NDB_OSE // Return directly if there are no TCP transporters configured if(timeOutMillis <= 1){ timeout.tv_sec = 0; timeout.tv_usec = 1025; } else { timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; }#else timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000;#endif NDB_SOCKET_TYPE maxSocketValue = -1; // Needed for TCP/IP connections // The read- and writeset are used by select FD_ZERO(&tcpReadset); // Prepare for sending and receiving for (int i = 0; i < nTCPTransporters; i++) { TCP_Transporter * t = theTCPTransporters[i]; // If the transporter is connected if (t->isConnected()) { const NDB_SOCKET_TYPE socket = t->getSocket(); // Find the highest socket value. It will be used by select if (socket > maxSocketValue) maxSocketValue = socket; // Put the connected transporters in the socket read-set FD_SET(socket, &tcpReadset); } } // The highest socket value plus one maxSocketValue++; tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout); if(false && tcpReadSelectReply == -1 && errno == EINTR) ndbout_c("woke-up by signal");#ifdef NDB_WIN32 if(tcpReadSelectReply == SOCKET_ERROR) { NdbSleep_MilliSleep(timeOutMillis); }#endif return tcpReadSelectReply;}#endifvoidTransporterRegistry::performReceive(){#ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != 0) { while(theOSEReceiver->hasData()) { NodeId remoteNodeId; Uint32 * readPtr; Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr); transporter_recv_from(callbackObj, remoteNodeId); Uint32 szUsed = unpack(readPtr, sz, remoteNodeId, ioStates[remoteNodeId]);#ifdef DEBUG_TRANSPORTER /** * OSE transporter can handle executions of * half signals */ assert(sz == szUsed);#endif theOSEReceiver->updateReceiveDataPtr(szUsed); theOSEReceiver->doReceive(0); // checkJobBuffer(); } }#endif#ifdef NDB_TCP_TRANSPORTER if(tcpReadSelectReply > 0) { for (int i=0; i<nTCPTransporters; i++) { checkJobBuffer(); TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const NDB_SOCKET_TYPE socket = t->getSocket(); if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { const int receiveSize = t->doReceive(); if(receiveSize > 0) { Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); transporter_recv_from(callbackObj, nodeId); Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]); t->updateReceiveDataPtr(szUsed); } } } } }#endif #ifdef NDB_SCI_TRANSPORTER //performReceive //do prepareReceive on the SCI transporters (prepareReceive(t,,,,)) for (int i=0; i<nSCITransporters; i++) { checkJobBuffer(); SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)) { if(t->isConnected() && t->checkConnected()) { Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); transporter_recv_from(callbackObj, nodeId); Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); t->updateReceivePtr(newPtr); } } }#endif#ifdef NDB_SHM_TRANSPORTER for (int i=0; i<nSHMTransporters; i++) { checkJobBuffer(); SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()) { Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); transporter_recv_from(callbackObj, nodeId); Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); t->updateReceivePtr(newPtr); } } }#endif}static int x = 0;voidTransporterRegistry::performSend(){ int i; sendCounter = 1; #ifdef NDB_OSE_TRANSPORTER for (int i = 0; i < nOSETransporters; i++) { OSE_Transporter *t = theOSETransporters[i]; if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected())) { t->doSend(); }//if }//for#endif #ifdef NDB_TCP_TRANSPORTER#ifdef NDB_OSE { int maxSocketValue = 0; // Needed for TCP/IP connections // The writeset are used by select fd_set writeset; FD_ZERO(&writeset); // Prepare for sending and receiving for (i = 0; i < nTCPTransporters; i++) { TCP_Transporter * t = theTCPTransporters[i]; // If the transporter is connected if ((t->hasDataToSend()) && (t->isConnected())) { const int socket = t->getSocket(); // Find the highest socket value. It will be used by select if (socket > maxSocketValue) { maxSocketValue = socket; }//if FD_SET(socket, &writeset); }//if }//for // The highest socket value plus one if(maxSocketValue == 0) return; maxSocketValue++; struct timeval timeout = { 0, 1025 }; Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout); if (tmp == 0) { return; }//if for (i = 0; i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const int socket = t->getSocket(); if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &writeset)) { t->doSend(); }//if }//if }//for }#endif#ifdef NDB_TCP_TRANSPORTER for (i = x; i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && t->hasDataToSend() && t->isConnected() && is_connected(t->getRemoteNodeId())) { t->doSend(); } } for (i = 0; i < x && i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && t->hasDataToSend() && t->isConnected() && is_connected(t->getRemoteNodeId())) { t->doSend(); } } x++; if (x == nTCPTransporters) x = 0;#endif#endif#ifdef NDB_SCI_TRANSPORTER //scroll through the SCI transporters, // get each transporter, check if connected, send data for (i=0; i<nSCITransporters; i++) { SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)) { if(t->isConnected() && t->hasDataToSend()) { t->doSend(); } //if } //if }#endif #ifdef NDB_SHM_TRANSPORTER for (i=0; i<nSHMTransporters; i++) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -