📄 simulatedblock.cpp
字号:
sh.theLength = length; sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.m_noOfSections = noOfSections; sh.m_fragmentInfo = tFragInfo; /** * Check own node */ if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){#ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], ourProcessor, ptr, noOfSections); }#endif /** * We have to copy the data */ Ptr<SectionSegment> segptr[3]; for(Uint32 i = 0; i<noOfSections; i++){ ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz)); signal->m_sectionPtr[i].i = segptr[i].i; } globalScheduler.execute(signal, jobBuffer, recBlock, gsn); rg.m_nodes.clear((Uint32)0); rg.m_nodes.clear(ourProcessor); } /** * Do the big loop */ Uint32 recNode = 0; while(!rg.m_nodes.isclear()){ recNode = rg.m_nodes.find(recNode + 1); rg.m_nodes.clear(recNode); #ifdef VM_TRACE if(globalData.testOn){ globalSignalLoggers.sendSignal(signal->header, jobBuffer, &signal->theData[0], recNode, ptr, noOfSections); }#endif #ifdef TRACE_DISTRIBUTED ndbout_c("send: %s(%d) to (%s, %d)", getSignalName(gsn), gsn, getBlockName(recBlock), recNode);#endif SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, &signal->theData[0], recNode, ptr); ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); } signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; return;}voidSimulatedBlock::sendSignalWithDelay(BlockReference ref, GlobalSignalNumber gsn, Signal* signal, Uint32 delayInMilliSeconds, Uint32 length) const { BlockNumber bnr = refToBlock(ref); //BlockNumber sendBnr = number(); BlockReference sendBRef = reference(); if (bnr == 0) { bnr_error(); }//if signal->header.theLength = length; signal->header.theSendersSignalId = signal->header.theSignalId; signal->header.theSendersBlockRef = sendBRef; signal->header.theVerId_signalNumber = gsn; signal->header.theReceiversBlockNumber = bnr;#ifdef VM_TRACE { if(globalData.testOn){ globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds, signal->header, 0, &signal->theData[0], globalData.ownId, signal->m_sectionPtr, signal->header.m_noOfSections); } }#endif globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds); signal->header.m_noOfSections = 0; signal->header.m_fragmentInfo = 0; // befor 2nd parameter to globalTimeQueue.insert // (Priority)theSendSig[sigIndex].jobBuffer}voidSimulatedBlock::releaseSections(Signal* signal){ ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); signal->header.m_noOfSections = 0;}class SectionSegmentPool& SimulatedBlock::getSectionSegmentPool(){ return g_sectionSegmentPool;}NewVARIABLE *SimulatedBlock::allocateBat(int batSize){ NewVARIABLE* bat = NewVarRef; bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE)); NewVarRef = bat; theBATSize = batSize; return bat;}voidSimulatedBlock::freeBat(){ if(NewVarRef != 0){ free(NewVarRef); NewVarRef = 0; }}const NewVARIABLE *SimulatedBlock::getBat(Uint16 blockNo){ SimulatedBlock * sb = globalData.getBlock(blockNo); if(sb == 0) return 0; return sb->NewVarRef;}Uint16SimulatedBlock::getBatSize(Uint16 blockNo){ SimulatedBlock * sb = globalData.getBlock(blockNo); if(sb == 0) return 0; return sb->theBATSize;}void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear) { void * p = NULL; size_t size = n*s; refresh_watch_dog(); if (size > 0){#ifdef VM_TRACE_MEM ndbout_c("%s::allocRecord(%s, %u, %u) = %u bytes", getBlockName(number()), type, s, n, size);#endif p = ndbd_malloc(size); if (p == NULL){ char buf1[255]; char buf2[255]; BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s", getBlockName(number()), type); BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %u bytes", (Uint32)s, (Uint32)n, (Uint32)size); ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2); } if(clear){ char * ptr = (char*)p; const Uint32 chunk = 128 * 1024; while(size > chunk){ refresh_watch_dog(); memset(ptr, 0, chunk); ptr += chunk; size -= chunk; } refresh_watch_dog(); memset(ptr, 0, size); } } return p;}void SimulatedBlock::deallocRecord(void ** ptr, const char * type, size_t s, size_t n){ (void)type; if(* ptr != 0){ ndbd_free(* ptr, n*s); * ptr = 0; }}voidSimulatedBlock::refresh_watch_dog(){ globalData.incrementWatchDogCounter(1);}voidSimulatedBlock::progError(int line, int err_code, const char* extra) const { jamLine(line); const char *aBlockName = getBlockName(number(), "VM Kernel"); // Pack status of interesting config variables // so that we can print them in error.log int magicStatus = (theConfiguration.stopOnError()<<1) + (theConfiguration.getInitialStart()<<2) + (theConfiguration.getDaemonMode()<<3); /* Add line number to block name */ char buf[100]; BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x", aBlockName, line, magicStatus); ErrorReporter::handleError(err_code, extra, buf);}void SimulatedBlock::infoEvent(const char * msg, ...) const { if(msg == 0) return; Uint32 theData[25]; theData[0] = NDB_LE_InfoEvent; char * buf = (char *)&(theData[1]); va_list ap; va_start(ap, msg); BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4 va_end(ap); int len = strlen(buf) + 1; if(len > 96){ len = 96; buf[95] = 0; } /** * Init and put it into the job buffer */ SignalHeader sh; memset(&sh, 0, sizeof(SignalHeader)); const Signal * signal = globalScheduler.getVMSignals(); Uint32 tTrace = signal->header.theTrace; Uint32 tSignalId = signal->header.theSignalId; sh.theVerId_signalNumber = GSN_EVENT_REP; sh.theReceiversBlockNumber = CMVMI; sh.theSendersBlockRef = reference(); sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.theLength = ((len+3)/4)+1; Uint32 secPtrI[3]; // Dummy globalScheduler.execute(&sh, JBB, theData, secPtrI);}void SimulatedBlock::warningEvent(const char * msg, ...) const { if(msg == 0) return; Uint32 theData[25]; theData[0] = NDB_LE_WarningEvent; char * buf = (char *)&(theData[1]); va_list ap; va_start(ap, msg); BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4 va_end(ap); int len = strlen(buf) + 1; if(len > 96){ len = 96; buf[95] = 0; } /** * Init and put it into the job buffer */ SignalHeader sh; memset(&sh, 0, sizeof(SignalHeader)); const Signal * signal = globalScheduler.getVMSignals(); Uint32 tTrace = signal->header.theTrace; Uint32 tSignalId = signal->header.theSignalId; sh.theVerId_signalNumber = GSN_EVENT_REP; sh.theReceiversBlockNumber = CMVMI; sh.theSendersBlockRef = reference(); sh.theTrace = tTrace; sh.theSignalId = tSignalId; sh.theLength = ((len+3)/4)+1; Uint32 secPtrI[3]; // Dummy globalScheduler.execute(&sh, JBB, theData, secPtrI);}voidSimulatedBlock::execNODE_STATE_REP(Signal* signal){ const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0]; this->theNodeState = rep->nodeState;}voidSimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){ const ChangeNodeStateReq * const req = (ChangeNodeStateReq *)&signal->theData[0]; this->theNodeState = req->nodeState; const Uint32 senderData = req->senderData; const BlockReference senderRef = req->senderRef; /** * Pack return signal */ ChangeNodeStateConf * const conf = (ChangeNodeStateConf *)&signal->theData[0]; conf->senderData = senderData; sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal, ChangeNodeStateConf::SignalLength, JBB);}voidSimulatedBlock::execNDB_TAMPER(Signal * signal){ SET_ERROR_INSERT_VALUE(signal->theData[0]);}voidSimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){ char msg[64]; const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0]; snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()), rep->originalGsn, rep->originalLength,rep->originalSectionCount); ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY, msg, __FILE__, NST_ErrorHandler);}voidSimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){ ljamEntry(); Ptr<FragmentSendInfo> fragPtr; c_segmentedFragmentSendList.first(fragPtr); for(; !fragPtr.isNull();){ ljam(); Ptr<FragmentSendInfo> copyPtr = fragPtr; c_segmentedFragmentSendList.next(fragPtr); sendNextSegmentedFragment(signal, * copyPtr.p); if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){ ljam(); if(copyPtr.p->m_callback.m_callbackFunction != 0) { ljam(); execute(signal, copyPtr.p->m_callback, 0); }//if c_segmentedFragmentSendList.release(copyPtr); } } c_linearFragmentSendList.first(fragPtr); for(; !fragPtr.isNull();){ ljam(); Ptr<FragmentSendInfo> copyPtr = fragPtr; c_linearFragmentSendList.next(fragPtr); sendNextLinearFragment(signal, * copyPtr.p); if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){ ljam(); if(copyPtr.p->m_callback.m_callbackFunction != 0) { ljam(); execute(signal, copyPtr.p->m_callback, 0); }//if c_linearFragmentSendList.release(copyPtr); } } if(c_segmentedFragmentSendList.isEmpty() && c_linearFragmentSendList.isEmpty()){ ljam(); c_fragSenderRunning = false; return; } ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); sig->line = __LINE__; sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);}#ifdef VM_TRACE_TIMEvoidSimulatedBlock::clearTimes() { for(Uint32 i = 0; i <= MAX_GSN; i++){ m_timeTrace[i].cnt = 0; m_timeTrace[i].sum = 0; m_timeTrace[i].sub = 0; }}voidSimulatedBlock::printTimes(FILE * output){ fprintf(output, "-- %s --\n", getBlockName(number())); Uint64 sum = 0; for(Uint32 i = 0; i <= MAX_GSN; i++){ Uint32 n = m_timeTrace[i].cnt; if(n != 0){ double dn = n; double avg = m_timeTrace[i].sum; double avg2 = avg - m_timeTrace[i].sub; avg /= dn; avg2 /= dn; fprintf(output, //name ; cnt ; loc ; acc "%s ; #%d ; %dus ; %dus ; %dms\n", getSignalName(i), n, (Uint32)avg, (Uint32)avg2, (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000)); sum += (m_timeTrace[i].sum - m_timeTrace[i].sub); } } sum = (sum + 500)/ 1000; fprintf(output, "-- %s : %d --\n", getBlockName(number()), sum); fprintf(output, "\n"); fflush(output);}#endifvoid release(SegmentedSectionPtr & ptr);SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){ m_fragmentId = fragId; m_senderRef = sender; m_sectionPtrI[0] = RNIL; m_sectionPtrI[1] = RNIL; m_sectionPtrI[2] = RNIL;}SimulatedBlock::FragmentSendInfo::FragmentSendInfo(){}boolSimulatedBlock::assembleFragments(Signal * signal){ Uint32 sigLen = signal->length() - 1; Uint32 fragId = signal->theData[sigLen]; Uint32 fragInfo = signal->header.m_fragmentInfo; Uint32 senderRef = signal->getSendersBlockRef(); if(fragInfo == 0){ return true; } const Uint32 secs = signal->header.m_noOfSections; const Uint32 * const secNos = &signal->theData[sigLen - secs]; if(fragInfo == 1){ /** * First in train */ Ptr<FragmentInfo> fragPtr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -