📄 dbtuplcp.cpp
字号:
regOpPtr.p->fragId, regFragPtr.p->checkpointVersion); }//if }//if loopCount++;; regOpPtr.i = regOpPtr.p->nextOprecInList; }//while if (regOpPtr.i == RNIL) { ljam(); signal->theData[0] = ciPtr.p->lcpUserptr; sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_LCPSTARTED, signal, 1, JBA); signal->theData[0] = ZCONT_SAVE_DP; signal->theData[1] = ciPtr.i; sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); } else { ljam(); ciPtr.p->lcpTmpOperPtr = regOpPtr.i; signal->theData[0] = ZCONT_START_SAVE_CL; signal->theData[1] = ciPtr.i; sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); }//if}//Dbtup::lcpSaveCopyListLab()/* ---------------------------------------------------------------- *//* ------- PERFORM A COPY OF ONE DATAPAGE DURING CHECKPOINT ------- *//* ---------------------------------------------------------------- *//* THE RANGE OF DATA PAGES IS INCLUDED IN THE CHECKPOINT_INFO_PTR *//* LAST_PAGE_TO_BUFFER ELEMENT IS INCREASED UNTIL ALL PAGES ARE *//* COPIED TO THE DISK BUFFER. WHEN A DISK BUFFER SEGMENT IS FULL *//* IT WILL BE WRITTEN TO DISK (TYPICALLY EACH 8:TH PAGE) *//* ---------------------------------------------------------------- */void Dbtup::lcpSaveDataPageLab(Signal* signal, Uint32 ciIndex) { CheckpointInfoPtr ciPtr; DiskBufferSegmentInfoPtr dbsiPtr; FragrecordPtr regFragPtr; LocalLogInfoPtr lliPtr; UndoPagePtr undoCopyPagePtr; PagePtr pagePtr; ciPtr.i = ciIndex; ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo); if (ERROR_INSERTED(4000)){ if (ciPtr.p->lcpTabPtr == c_errorInsert4000TableId) { // Delay writing of data pages during LCP ndbout << "Delay writing of data pages during LCP" << endl; signal->theData[0] = ZCONT_SAVE_DP; signal->theData[1] = ciIndex; sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 1000, 2); return; }//if }//if if (clblPageCounter == 0) { ljam(); signal->theData[0] = ZCONT_SAVE_DP; signal->theData[1] = ciPtr.i; sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 100, 2); return; } else { ljam(); clblPageCounter--; }//if regFragPtr.i = ciPtr.p->lcpFragmentP; ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord); dbsiPtr.i = ciPtr.p->lcpDataBufferSegmentP; ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); pagePtr.i = getRealpid(regFragPtr.p, regFragPtr.p->minPageNotWrittenInCheckpoint); ptrCheckGuard(pagePtr, cnoOfPage, page); ndbrequire(dbsiPtr.p->pdxNumDataPages < 16); undoCopyPagePtr.i = dbsiPtr.p->pdxDataPage[dbsiPtr.p->pdxNumDataPages]; ptrCheckGuard(undoCopyPagePtr, cnoOfUndoPage, undoPage); MEMCOPY_NO_WORDS(&undoCopyPagePtr.p->undoPageWord[0], &pagePtr.p->pageWord[0], ZWORDS_ON_PAGE); regFragPtr.p->minPageNotWrittenInCheckpoint++; dbsiPtr.p->pdxNumDataPages++; if (regFragPtr.p->minPageNotWrittenInCheckpoint == regFragPtr.p->maxPageWrittenInCheckpoint) { /* ---------------------------------------------------------- */ /* ALL PAGES ARE COPIED, TIME TO FINISH THE CHECKPOINT */ /* SAVE THE END POSITIONS OF THE LOG RECORDS SINCE ALL DATA */ /* PAGES ARE NOW SAFE ON DISK AND NO MORE LOGGING WILL APPEAR */ /* ---------------------------------------------------------- */ ljam(); lliPtr.i = ciPtr.p->lcpLocalLogInfoP; ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); regFragPtr.p->checkpointVersion = RNIL; /* UNDO LOGGING IS SHUT OFF */ lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, false); dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE_LAST; } else if (dbsiPtr.p->pdxNumDataPages == ZDB_SEGMENT_SIZE) { ljam(); lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, false); dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE; } else { ljam(); signal->theData[0] = ZCONT_SAVE_DP; signal->theData[1] = ciPtr.i; sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); }//if}//Dbtup::lcpSaveDataPageLab()void Dbtup::lcpWriteListDataPageSegment(Signal* signal, DiskBufferSegmentInfoPtr dbsiPtr, CheckpointInfoPtr ciPtr, bool flushFlag) { Uint32 flags = 1; cnoOfDataPagesToDiskWithoutSynch += dbsiPtr.p->pdxNumDataPages; if ((cnoOfDataPagesToDiskWithoutSynch > MAX_PAGES_WITHOUT_SYNCH) || (flushFlag)) { ljam();/* ---------------------------------------------------------------- */// To avoid synching too big chunks at a time we synch after writing// a certain number of data pages. (e.g. 2 MBytes)./* ---------------------------------------------------------------- */ cnoOfDataPagesToDiskWithoutSynch = 0; flags |= 0x10; //Set synch flag unconditionally }//if signal->theData[0] = ciPtr.p->lcpDataFileHandle; signal->theData[1] = cownref; signal->theData[2] = dbsiPtr.i; signal->theData[3] = flags; signal->theData[4] = ZBASE_ADDR_UNDO_WORD; signal->theData[5] = dbsiPtr.p->pdxNumDataPages; signal->theData[6] = dbsiPtr.p->pdxDataPage[0]; signal->theData[7] = dbsiPtr.p->pdxFilePage; sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA); dbsiPtr.p->pdxFilePage += dbsiPtr.p->pdxNumDataPages; dbsiPtr.p->pdxNumDataPages = 0;}//Dbtup::lcpWriteListDataPageSegment()void Dbtup::lcpFlushLogLab(Signal* signal, CheckpointInfoPtr ciPtr) { DiskBufferSegmentInfoPtr oldDbsiPtr; LocalLogInfoPtr lliPtr; UndoPagePtr oldUndoPagePtr; UndoPagePtr newUndoPagePtr; lliPtr.i = ciPtr.p->lcpLocalLogInfoP; ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); oldDbsiPtr.i = lliPtr.p->lliUndoBufferSegmentP; ptrCheckGuard(oldDbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); oldDbsiPtr.p->pdxNumDataPages++; if (clblPageCounter > 0) { ljam(); clblPageCounter--; }//if oldUndoPagePtr.i = lliPtr.p->lliUndoPage; ptrCheckGuard(oldUndoPagePtr, cnoOfUndoPage, undoPage); lcpWriteUndoSegment(signal, lliPtr.p, true); oldDbsiPtr.p->pdxOperation = CHECKPOINT_UNDO_WRITE_FLUSH; oldDbsiPtr.p->pdxCheckpointInfoP = ciPtr.i;/* ---------------------------------------------------------------- *//* SINCE LAST PAGE SENT TO DISK WAS NOT FULL YET WE COPY IT *//* TO THE NEW LAST PAGE. *//* ---------------------------------------------------------------- */ newUndoPagePtr.i = lliPtr.p->lliUndoPage; ptrCheckGuard(newUndoPagePtr, cnoOfUndoPage, undoPage); ndbrequire(lliPtr.p->lliUndoWord < ZWORDS_ON_PAGE); MEMCOPY_NO_WORDS(&newUndoPagePtr.p->undoPageWord[0], &oldUndoPagePtr.p->undoPageWord[0], lliPtr.p->lliUndoWord);}//Dbtup::lcpFlushLogLab()void Dbtup::lcpFlushRestartInfoLab(Signal* signal, Uint32 ciIndex) { CheckpointInfoPtr ciPtr; DiskBufferSegmentInfoPtr dbsiPtr; LocalLogInfoPtr lliPtr; UndoPagePtr undoCopyPagePtr; ciPtr.i = ciIndex; ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo); lliPtr.i = ciPtr.p->lcpLocalLogInfoP; ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); dbsiPtr.i = ciPtr.p->lcpDataBufferSegmentP; ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); undoCopyPagePtr.i = dbsiPtr.p->pdxDataPage[0]; /* UNDO INFO STORED AT PAGE 0 */ ptrCheckGuard(undoCopyPagePtr, cnoOfUndoPage, undoPage); ndbrequire(ciPtr.p->lcpNoOfPages > 0); undoCopyPagePtr.p->undoPageWord[ZSRI_NO_OF_FRAG_PAGES_POS] = ciPtr.p->lcpNoOfPages; undoCopyPagePtr.p->undoPageWord[ZSRI_NO_COPY_PAGES_ALLOC] = ciPtr.p->lcpNoCopyPagesAlloc; undoCopyPagePtr.p->undoPageWord[ZSRI_EMPTY_PRIM_PAGE] = ciPtr.p->lcpEmptyPrimPage; undoCopyPagePtr.p->undoPageWord[ZSRI_TH_FREE_FIRST] = ciPtr.p->lcpThFreeFirst; undoCopyPagePtr.p->undoPageWord[ZSRI_TH_FREE_COPY_FIRST] = ciPtr.p->lcpThFreeCopyFirst; undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_REC_ID] = lliPtr.p->lliPrevRecordId; undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_FILE_VER] = cundoFileVersion; if (lliPtr.p->lliUndoWord == ZUNDO_PAGE_HEADER_SIZE) { ljam(); undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_PAGE_ID] = lliPtr.p->lliLogFilePage - 1; } else { ljam(); undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_PAGE_ID] = lliPtr.p->lliLogFilePage; }//if dbsiPtr.p->pdxNumDataPages = 1; dbsiPtr.p->pdxFilePage = 0; if (clblPageCounter > 0) { ljam(); clblPageCounter--; }//if lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, true); dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE_FLUSH; return;}//Dbtup::lcpFlushRestartInfoLab()void Dbtup::lcpCompletedLab(Signal* signal, Uint32 ciIndex) { CheckpointInfoPtr ciPtr; PendingFileOpenInfoPtr pfoiPtr;/* ---------------------------------------------------------------------- *//* INSERT CODE TO CLOSE DATA FILE HERE. DO THIS BEFORE SEND CONF *//* ---------------------------------------------------------------------- */ ciPtr.i = ciIndex; ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo); seizePendingFileOpenInfoRecord(pfoiPtr); pfoiPtr.p->pfoOpenType = LCP_DATA_FILE_CLOSE; pfoiPtr.p->pfoCheckpointInfoP = ciPtr.i; signal->theData[0] = ciPtr.p->lcpDataFileHandle; signal->theData[1] = cownref; signal->theData[2] = pfoiPtr.i; signal->theData[3] = 0; sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA); return;}//Dbtup::lcpCompletedLab()void Dbtup::lcpClosedDataFileLab(Signal* signal, CheckpointInfoPtr ciPtr) { signal->theData[0] = ciPtr.p->lcpUserptr; sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_LCPCONF, signal, 1, JBB); releaseCheckpointInfoRecord(ciPtr); return;}//Dbtup::lcpClosedDataFileLab()/* ---------------------------------------------------------------------- *//* LCP END IS THE LAST STEP IN THE LCP PROCESS IT WILL CLOSE THE LOGFILES *//* AND RELEASE THE ALLOCATED CHECKPOINT_INFO_RECORDS *//* ---------------------------------------------------------------------- */void Dbtup::execEND_LCPREQ(Signal* signal) { DiskBufferSegmentInfoPtr dbsiPtr; LocalLogInfoPtr lliPtr; PendingFileOpenInfoPtr pfoiPtr; ljamEntry(); clqhUserpointer = signal->theData[0]; clqhBlockref = signal->theData[1]; for (lliPtr.i = 0; lliPtr.i < 16; lliPtr.i++) { ljam(); ptrAss(lliPtr, localLogInfo); if (lliPtr.p->lliActiveLcp > 0) { ljam(); dbsiPtr.i = lliPtr.p->lliUndoBufferSegmentP; ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); freeDiskBufferSegmentRecord(signal, dbsiPtr); seizePendingFileOpenInfoRecord(pfoiPtr); /* SEIZE A NEW FILE OPEN INFO */ pfoiPtr.p->pfoOpenType = LCP_UNDO_FILE_CLOSE; pfoiPtr.p->pfoCheckpointInfoP = lliPtr.i; signal->theData[0] = lliPtr.p->lliUndoFileHandle; signal->theData[1] = cownref; signal->theData[2] = pfoiPtr.i; signal->theData[3] = 0; sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA); lliPtr.p->lliActiveLcp = 0; }//if }//for return;}//Dbtup::execEND_LCPREQ()void Dbtup::lcpEndconfLab(Signal* signal) { LocalLogInfoPtr lliPtr; for (lliPtr.i = 0; lliPtr.i < 16; lliPtr.i++) { ljam(); ptrAss(lliPtr, localLogInfo); if (lliPtr.p->lliUndoFileHandle != RNIL) { ljam();/* ---------------------------------------------------------------------- *//* WAIT UNTIL ALL LOG FILES HAVE BEEN CLOSED. *//* ---------------------------------------------------------------------- */ return; }//if }//for signal->theData[0] = clqhUserpointer; sendSignal(clqhBlockref, GSN_END_LCPCONF, signal, 1, JBB); return;}//Dbtup::lcpEndconfLab()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -