📄 transporterfacade.cpp
字号:
}//-------------------------------------------------// Improving API performance//-------------------------------------------------voidTransporterFacade::checkForceSend(Uint32 block_number) { m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE; //------------------------------------------------- // This code is an adaptive algorithm to discover when // the API should actually send its buffers. The reason // is that the performance is highly dependent on the // size of the writes over the communication network. // Thus we try to ensure that the send size is as big // as possible. At the same time we don't want response // time to increase so therefore we have to keep track of // how the users are performing adaptively. //------------------------------------------------- if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) { sendPerformedLastInterval = 1; } checkCounter--; if (checkCounter < 0) { calculateSendLimit(); }}/****************************************************************************** * SEND SIGNAL METHODS *****************************************************************************/intTransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){ Uint32* tDataPtr = aSignal->getDataPtrSend(); Uint32 Tlen = aSignal->theLength; Uint32 TBno = aSignal->theReceiversBlockNumber; if(getIsNodeSendable(aNode) == true){#ifdef API_TRACE if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){ Uint32 tmp = aSignal->theSendersBlockRef; aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId); LinearSectionPtr ptr[3]; signalLogger.sendSignal(* aSignal, 1, tDataPtr, aNode, ptr, 0); signalLogger.flushSignalLog(); aSignal->theSendersBlockRef = tmp; }#endif if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) { SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 1, // JBB tDataPtr, aNode, 0); //if (ss != SEND_OK) ndbout << ss << endl; return (ss == SEND_OK ? 0 : -1); } else { ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno; ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl; assert(0); }//if } //const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(aNode); //const Uint32 startLevel = node.m_state.startLevel; return -1; // Node Dead}intTransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ Uint32* tDataPtr = aSignal->getDataPtrSend();#ifdef API_TRACE if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){ Uint32 tmp = aSignal->theSendersBlockRef; aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId); LinearSectionPtr ptr[3]; signalLogger.sendSignal(* aSignal, 0, tDataPtr, aNode, ptr, 0); signalLogger.flushSignalLog(); aSignal->theSendersBlockRef = tmp; }#endif assert((aSignal->theLength != 0) && (aSignal->theLength <= 25) && (aSignal->theReceiversBlockNumber != 0)); SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 0, tDataPtr, aNode, 0); return (ss == SEND_OK ? 0 : -1);}#define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 // related to MAX_MESSAGE_SIZEintTransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, LinearSectionPtr ptr[3], Uint32 secs){ if(getIsNodeSendable(aNode) != true) return -1;#ifdef API_TRACE if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){ Uint32 tmp = aSignal->theSendersBlockRef; aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId); signalLogger.sendSignal(* aSignal, 1, aSignal->getDataPtrSend(), aNode, ptr, secs); aSignal->theSendersBlockRef = tmp; }#endif NdbApiSignal tmp_signal(*(SignalHeader*)aSignal); LinearSectionPtr tmp_ptr[3]; Uint32 unique_id= m_fragmented_signal_id++; // next unique id unsigned i; for (i= 0; i < secs; i++) tmp_ptr[i]= ptr[i]; unsigned start_i= 0; unsigned chunk_sz= 0; unsigned fragment_info= 0; Uint32 *tmp_data= tmp_signal.getDataPtrSend(); for (i= 0; i < secs;) { unsigned save_sz= tmp_ptr[i].sz; tmp_data[i-start_i]= i; if (chunk_sz + save_sz > CHUNK_SZ) { // truncate unsigned send_sz= CHUNK_SZ - chunk_sz; if (i != start_i) // first piece of a new section has to be a multiple of NDB_SECTION_SEGMENT_SZ { send_sz= NDB_SECTION_SEGMENT_SZ *(send_sz+NDB_SECTION_SEGMENT_SZ-1) /NDB_SECTION_SEGMENT_SZ; if (send_sz > save_sz) send_sz= save_sz; } tmp_ptr[i].sz= send_sz; if (fragment_info < 2) // 1 = first fragment, 2 = middle fragments fragment_info++; // send tmp_signal tmp_data[i-start_i+1]= unique_id; tmp_signal.setLength(i-start_i+2); tmp_signal.m_fragmentInfo= fragment_info; tmp_signal.m_noOfSections= i-start_i+1; // do prepare send { SendStatus ss = theTransporterRegistry->prepareSend (&tmp_signal, 1, /*JBB*/ tmp_data, aNode, &tmp_ptr[start_i]); assert(ss != SEND_MESSAGE_TOO_BIG); if (ss != SEND_OK) return -1; } // setup variables for next signal start_i= i; chunk_sz= 0; tmp_ptr[i].sz= save_sz-send_sz; tmp_ptr[i].p+= send_sz; if (tmp_ptr[i].sz == 0) i++; } else { chunk_sz+=save_sz; i++; } } unsigned a_sz= aSignal->getLength(); if (fragment_info > 0) { // update the original signal to include section info Uint32 *a_data= aSignal->getDataPtrSend(); unsigned tmp_sz= i-start_i; memcpy(a_data+a_sz, tmp_data, tmp_sz*sizeof(Uint32)); a_data[a_sz+tmp_sz]= unique_id; aSignal->setLength(a_sz+tmp_sz+1); // send last fragment aSignal->m_fragmentInfo= 3; // 3 = last fragment aSignal->m_noOfSections= i-start_i; } else { aSignal->m_noOfSections= secs; } // send aSignal int ret; { SendStatus ss = theTransporterRegistry->prepareSend (aSignal, 1/*JBB*/, aSignal->getDataPtrSend(), aNode, &tmp_ptr[start_i]); assert(ss != SEND_MESSAGE_TOO_BIG); ret = (ss == SEND_OK ? 0 : -1); } aSignal->m_noOfSections = 0; aSignal->m_fragmentInfo = 0; aSignal->setLength(a_sz); return ret;}intTransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode, LinearSectionPtr ptr[3], Uint32 secs){ aSignal->m_noOfSections = secs; if(getIsNodeSendable(aNode) == true){#ifdef API_TRACE if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){ Uint32 tmp = aSignal->theSendersBlockRef; aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId); signalLogger.sendSignal(* aSignal, 1, aSignal->getDataPtrSend(), aNode, ptr, secs); signalLogger.flushSignalLog(); aSignal->theSendersBlockRef = tmp; }#endif SendStatus ss = theTransporterRegistry->prepareSend (aSignal, 1, // JBB aSignal->getDataPtrSend(), aNode, ptr); assert(ss != SEND_MESSAGE_TOO_BIG); aSignal->m_noOfSections = 0; return (ss == SEND_OK ? 0 : -1); } aSignal->m_noOfSections = 0; return -1;}/****************************************************************************** * CONNECTION METHODS Etc ******************************************************************************/voidTransporterFacade::doConnect(int aNodeId){ theTransporterRegistry->setIOState(aNodeId, NoHalt); theTransporterRegistry->do_connect(aNodeId);}voidTransporterFacade::doDisconnect(int aNodeId){ theTransporterRegistry->do_disconnect(aNodeId);}voidTransporterFacade::reportConnected(int aNodeId){ theClusterMgr->reportConnected(aNodeId); return;}voidTransporterFacade::reportDisconnected(int aNodeId){ theClusterMgr->reportDisconnected(aNodeId); return;}NodeIdTransporterFacade::ownId() const{ return theOwnId;}boolTransporterFacade::isConnected(NodeId aNodeId){ return theTransporterRegistry->is_connected(aNodeId);}NodeIdTransporterFacade::get_an_alive_node(){ DBUG_ENTER("TransporterFacade::get_an_alive_node"); DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));#ifdef VM_TRACE const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0); if (p != 0 && *p != 0) return atoi(p);#endif NodeId i; for (i = theStartNodeId; i < MAX_NDB_NODES; i++) { if (get_node_alive(i)){ DBUG_PRINT("info", ("Node %d is alive", i)); theStartNodeId = ((i + 1) % MAX_NDB_NODES); DBUG_RETURN(i); } } for (i = 1; i < theStartNodeId; i++) { if (get_node_alive(i)){ DBUG_PRINT("info", ("Node %d is alive", i)); theStartNodeId = ((i + 1) % MAX_NDB_NODES); DBUG_RETURN(i); } } DBUG_RETURN((NodeId)0);}TransporterFacade::ThreadData::ThreadData(Uint32 size){ m_firstFree = END_OF_LIST; expand(size);}voidTransporterFacade::ThreadData::expand(Uint32 size){ Object_Execute oe = { 0 ,0 }; NodeStatusFunction fun = 0; const Uint32 sz = m_statusNext.size(); m_objectExecute.fill(sz + size, oe); m_statusFunction.fill(sz + size, fun); for(Uint32 i = 0; i<size; i++){ m_statusNext.push_back(sz + i + 1); } m_statusNext.back() = m_firstFree; m_firstFree = m_statusNext.size() - size;}intTransporterFacade::ThreadData::open(void* objRef, ExecuteFunction fun, NodeStatusFunction fun2){ Uint32 nextFree = m_firstFree; if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){ return -1; } if(nextFree == END_OF_LIST){ expand(10); nextFree = m_firstFree; } m_firstFree = m_statusNext[nextFree]; Object_Execute oe = { objRef , fun }; m_statusNext[nextFree] = INACTIVE; m_objectExecute[nextFree] = oe; m_statusFunction[nextFree] = fun2; return indexToNumber(nextFree);}intTransporterFacade::ThreadData::close(int number){ number= numberToIndex(number); assert(getInUse(number)); m_statusNext[number] = m_firstFree; m_firstFree = number; Object_Execute oe = { 0, 0 }; m_objectExecute[number] = oe; m_statusFunction[number] = 0; return 0;}template class Vector<NodeStatusFunction>;template class Vector<TransporterFacade::ThreadData::Object_Execute>;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -