📄 simulatedblock.cpp
字号:
if(!c_fragmentInfoHash.seize(fragPtr)){ ndbrequire(false); return false; } new (fragPtr.p)FragmentInfo(fragId, senderRef); c_fragmentInfoHash.add(fragPtr); for(Uint32 i = 0; i<secs; i++){ Uint32 sectionNo = secNos[i]; ndbassert(sectionNo < 3); fragPtr.p->m_sectionPtrI[sectionNo] = signal->m_sectionPtr[i].i; } /** * Don't release allocated segments */ signal->header.m_noOfSections = 0; return false; } FragmentInfo key(fragId, senderRef); Ptr<FragmentInfo> fragPtr; if(c_fragmentInfoHash.find(fragPtr, key)){ /** * FragInfo == 2 or 3 */ Uint32 i; for(i = 0; i<secs; i++){ Uint32 sectionNo = secNos[i]; ndbassert(sectionNo < 3); Uint32 sectionPtrI = signal->m_sectionPtr[i].i; if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){ linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI); } else { fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI; } } /** * fragInfo = 2 */ if(fragInfo == 2){ signal->header.m_noOfSections = 0; return false; } /** * fragInfo = 3 */ for(i = 0; i<3; i++){ Uint32 ptrI = fragPtr.p->m_sectionPtrI[i]; if(ptrI != RNIL){ signal->m_sectionPtr[i].i = ptrI; } else { break; } } signal->setLength(sigLen - i); signal->header.m_noOfSections = i; signal->header.m_fragmentInfo = 0; getSections(i, signal->m_sectionPtr); c_fragmentInfoHash.release(fragPtr); return true; } /** * Unable to find fragment */ ndbrequire(false); return false;}boolSimulatedBlock::sendFirstFragment(FragmentSendInfo & info, NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, Uint32 messageSize){ info.m_sectionPtr[0].m_segmented.i = RNIL; info.m_sectionPtr[1].m_segmented.i = RNIL; info.m_sectionPtr[2].m_segmented.i = RNIL; Uint32 totalSize = 0; SectionSegment * p; switch(signal->header.m_noOfSections){ case 3: p = signal->m_sectionPtr[2].p; info.m_sectionPtr[2].m_segmented.p = p; info.m_sectionPtr[2].m_segmented.i = signal->m_sectionPtr[2].i; totalSize += p->m_sz; case 2: p = signal->m_sectionPtr[1].p; info.m_sectionPtr[1].m_segmented.p = p; info.m_sectionPtr[1].m_segmented.i = signal->m_sectionPtr[1].i; totalSize += p->m_sz; case 1: p = signal->m_sectionPtr[0].p; info.m_sectionPtr[0].m_segmented.p = p; info.m_sectionPtr[0].m_segmented.i = signal->m_sectionPtr[0].i; totalSize += p->m_sz; } if(totalSize <= messageSize + SectionSegment::DataLength){ /** * Send signal directly */ sendSignal(rg, gsn, signal, length, jbuf); info.m_status = FragmentSendInfo::SendComplete; return true; } /** * Consume sections */ signal->header.m_noOfSections = 0; /** * Setup info object */ info.m_status = FragmentSendInfo::SendNotComplete; info.m_prio = (Uint8)jbuf; info.m_gsn = gsn; info.m_fragInfo = 1; info.m_messageSize = messageSize; info.m_fragmentId = c_fragmentIdCounter++; info.m_nodeReceiverGroup = rg; info.m_callback.m_callbackFunction = 0; Ptr<SectionSegment> tmp; if(!import(tmp, &signal->theData[0], length)){ ndbrequire(false); return false; } info.m_theDataSection.p = &tmp.p->theData[0]; info.m_theDataSection.sz = length; tmp.p->theData[length] = tmp.i; sendNextSegmentedFragment(signal, info); if(c_fragmentIdCounter == 0){ /** * Fragment id 0 is invalid */ c_fragmentIdCounter = 1; } return true;}#if 0#define lsout(x) x#else#define lsout(x)#endifvoidSimulatedBlock::sendNextSegmentedFragment(Signal* signal, FragmentSendInfo & info){ /** * Store "theData" */ const Uint32 sigLen = info.m_theDataSection.sz; memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen); Uint32 sz = 0; Uint32 maxSz = info.m_messageSize; Int32 secNo = 2; Uint32 secCount = 0; Uint32 * secNos = &signal->theData[sigLen]; enum { Unknown = 0, Full = 1 } loop = Unknown; for(; secNo >= 0 && secCount < 3; secNo--){ Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i; if(ptrI == RNIL) continue; info.m_sectionPtr[secNo].m_segmented.i = RNIL; SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p; const Uint32 size = ptrP->m_sz; signal->m_sectionPtr[secCount].i = ptrI; signal->m_sectionPtr[secCount].p = ptrP; signal->m_sectionPtr[secCount].sz = size; secNos[secCount] = secNo; secCount++; const Uint32 sizeLeft = maxSz - sz; if(size <= sizeLeft){ /** * The section fits */ sz += size; lsout(ndbout_c("section %d saved as %d", secNo, secCount-1)); continue; } const Uint32 overflow = size - sizeLeft; // > 0 if(overflow <= SectionSegment::DataLength){ /** * Only one segment left to send * send even if sizeLeft <= size */ lsout(ndbout_c("section %d saved as %d but full over: %d", secNo, secCount-1, overflow)); secNo--; break; } // size >= 61 if(sizeLeft < SectionSegment::DataLength){ /** * Less than one segment left (space) * dont bother sending */ secCount--; info.m_sectionPtr[secNo].m_segmented.i = ptrI; loop = Full; lsout(ndbout_c("section %d not saved", secNo)); break; } /** * Split list * 1) Find place to split * 2) Rewrite header (the part that will be sent) * 3) Write new header (for remaining part) * 4) Store new header on FragmentSendInfo - record */ // size >= 61 && sizeLeft >= 60 Uint32 sum = SectionSegment::DataLength; Uint32 prevPtrI = ptrI; ptrI = ptrP->m_nextSegment; const Uint32 fill = sizeLeft - SectionSegment::DataLength; while(sum < fill){ prevPtrI = ptrI; ptrP = g_sectionSegmentPool.getPtr(ptrI); ptrI = ptrP->m_nextSegment; sum += SectionSegment::DataLength; } /** * Rewrite header w.r.t size and last */ Uint32 prev = secCount - 1; const Uint32 last = signal->m_sectionPtr[prev].p->m_lastSegment; signal->m_sectionPtr[prev].p->m_lastSegment = prevPtrI; signal->m_sectionPtr[prev].p->m_sz = sum; signal->m_sectionPtr[prev].sz = sum; /** * Write "new" list header */ ptrP = g_sectionSegmentPool.getPtr(ptrI); ptrP->m_lastSegment = last; ptrP->m_sz = size - sum; /** * And store it on info-record */ info.m_sectionPtr[secNo].m_segmented.i = ptrI; info.m_sectionPtr[secNo].m_segmented.p = ptrP; loop = Full; lsout(ndbout_c("section %d split into %d", secNo, prev)); break; } lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d", loop, secNo, secCount, sz)); /** * Store fragment id */ secNos[secCount] = info.m_fragmentId; Uint32 fragInfo = info.m_fragInfo; info.m_fragInfo = 2; switch(loop){ case Unknown: if(secNo >= 0){ lsout(ndbout_c("Unknown - Full")); /** * Not finished */ break; } // Fall through lsout(ndbout_c("Unknown - Done")); info.m_status = FragmentSendInfo::SendComplete; ndbassert(fragInfo == 2); fragInfo = 3; case Full: break; } signal->header.m_fragmentInfo = fragInfo; signal->header.m_noOfSections = secCount; sendSignal(info.m_nodeReceiverGroup, info.m_gsn, signal, sigLen + secCount + 1, (JobBufferLevel)info.m_prio); if(fragInfo == 3){ /** * This is the last signal */ g_sectionSegmentPool.release(info.m_theDataSection.p[sigLen]); }}boolSimulatedBlock::sendFirstFragment(FragmentSendInfo & info, NodeReceiverGroup rg, GlobalSignalNumber gsn, Signal* signal, Uint32 length, JobBufferLevel jbuf, LinearSectionPtr ptr[3], Uint32 noOfSections, Uint32 messageSize){ ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); signal->header.m_noOfSections = 0; info.m_sectionPtr[0].m_linear.p = NULL; info.m_sectionPtr[1].m_linear.p = NULL; info.m_sectionPtr[2].m_linear.p = NULL; Uint32 totalSize = 0; switch(noOfSections){ case 3: info.m_sectionPtr[2].m_linear = ptr[2]; totalSize += ptr[2].sz; case 2: info.m_sectionPtr[1].m_linear = ptr[1]; totalSize += ptr[1].sz; case 1: info.m_sectionPtr[0].m_linear = ptr[0]; totalSize += ptr[0].sz; } if(totalSize <= messageSize + SectionSegment::DataLength){ /** * Send signal directly */ sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections); info.m_status = FragmentSendInfo::SendComplete; /** * Indicate to sendLinearSignalFragment * that we'r already done */ return true; } /** * Setup info object */ info.m_status = FragmentSendInfo::SendNotComplete; info.m_prio = (Uint8)jbuf; info.m_gsn = gsn; info.m_messageSize = messageSize; info.m_fragInfo = 1; info.m_fragmentId = c_fragmentIdCounter++; info.m_nodeReceiverGroup = rg; info.m_callback.m_callbackFunction = 0; Ptr<SectionSegment> tmp; if(!import(tmp, &signal->theData[0], length)){ ndbrequire(false); return false; } info.m_theDataSection.p = &tmp.p->theData[0]; info.m_theDataSection.sz = length; tmp.p->theData[length] = tmp.i; sendNextLinearFragment(signal, info); if(c_fragmentIdCounter == 0){ /** * Fragment id 0 is invalid */ c_fragmentIdCounter = 1; } return true;}voidSimulatedBlock::sendNextLinearFragment(Signal* signal, FragmentSendInfo & info){ /** * Store "theData" */ const Uint32 sigLen = info.m_theDataSection.sz; memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen); Uint32 sz = 0; Uint32 maxSz = info.m_messageSize; Int32 secNo = 2; Uint32 secCount = 0; Uint32 * secNos = &signal->theData[sigLen]; LinearSectionPtr signalPtr[3]; enum { Unknown = 0, Full = 2 } loop = Unknown; for(; secNo >= 0 && secCount < 3; secNo--){ Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p; if(ptrP == NULL) continue; info.m_sectionPtr[secNo].m_linear.p = NULL; const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz; signalPtr[secCount].p = ptrP; signalPtr[secCount].sz = size; secNos[secCount] = secNo; secCount++; const Uint32 sizeLeft = maxSz - sz; if(size <= sizeLeft){ /** * The section fits */ sz += size; lsout(ndbout_c("section %d saved as %d", secNo, secCount-1)); continue; } const Uint32 overflow = size - sizeLeft; // > 0 if(overflow <= SectionSegment::DataLength){ /** * Only one segment left to send * send even if sizeLeft <= size */ lsout(ndbout_c("section %d saved as %d but full over: %d", secNo, secCount-1, overflow)); secNo--; break; } // size >= 61 if(sizeLeft < SectionSegment::DataLength){ /** * Less than one segment left (space) * dont bother sending */ secCount--; info.m_sectionPtr[secNo].m_linear.p = ptrP; loop = Full; lsout(ndbout_c("section %d not saved", secNo)); break; } /** * Split list * 1) Find place to split * 2) Rewrite header (the part that will be sent) * 3) Write new header (for remaining part) * 4) Store new header on FragmentSendInfo - record */ Uint32 sum = sizeLeft; sum /= SectionSegment::DataLength; sum *= SectionSegment::DataLength; /** * Rewrite header w.r.t size */ Uint32 prev = secCount - 1; signalPtr[prev].sz = sum; /** * Write/store "new" header */ info.m_sectionPtr[secNo].m_linear.p = ptrP + sum; info.m_sectionPtr[secNo].m_linear.sz = size - sum; loop = Full; lsout(ndbout_c("section %d split into %d", secNo, prev)); break; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -