📄 ck_checkpoint.cxx
字号:
voidCheckpoint::StartMigration(bool needStabilize){ mg_nextCorePage = 0; mg_nextCoreNode = 0; mg_nextDirent = coreGeneration[last_ckpt].FirstDirEntry(); mg_nextThreadDirPg = 0; mg_nextReserveDirPg = 0; mg_objectPotOID = 0; mg_tagPotOID = 0;#ifndef NDEBUG mg_SanityChecked = false;#endif migrationStatus = needStabilize ? mg_StartCheckpoint : mg_StartMigration; DEBUG(mig) MsgLog::printf("Starting migration in state %s\n", MigStateName());}voidCheckpoint::StabilizeNodes(){ assert (Thread::Current()); while (mg_nextCoreNode < ObjectCache::TotalNodes()) { if (Thread::Current()->pageWriteCount >= Thread::FairMigrateWrites) { Thread::Current()->pageWriteCount = 0; return; } Node *pNode = ObjectCache::GetCoreNodeFrame(mg_nextCoreNode); assert (pNode->GetFlags(OFLG_IO) == 0); if (pNode->GetFlags(OFLG_CKPT) && pNode->IsDirty()) { assert (pNode->IsFree() == false); WriteNodeToLog(pNode); } mg_nextCoreNode++; } if (Thread::Current()->pageWriteCount >= Thread::FairMigrateWrites) { Thread::Current()->pageWriteCount = 0; return; } /* At this point, we know that everything is marked dirty that is * going to be: */ coreGeneration[last_ckpt].FinishActiveNodeFrame(); mg_nextCoreNode = 0; migrationStatus = mg_StabilizePages;}/* Note that StabilizePages also writes the log frames that were * modified during node stabilization! */voidCheckpoint::StabilizePages(){ while (mg_nextCorePage < ObjectCache::TotalPages()) { if (Thread::Current()->pageWriteCount >= Thread::FairMigrateWrites) { Thread::Current()->pageWriteCount = 0; return; } ObjectHeader *pObj = ObjectCache::GetCorePageFrame(mg_nextCorePage); if (pObj->IsDirty()) { if (pObj->obType == ObType::PtLogPage || (pObj->GetFlags(OFLG_CKPT) && pObj->obType == ObType::PtDataPage)) { assert (pObj->IsFree() == false); if (DBCOND(stabilize)) { MsgLog::printf("stabilizing pObj=0x%08x oid=0x%08x%08x cur: %c chk: %c drt: %c%c\n", pObj, (uint32_t) (pObj->ob.oid >> 32), (uint32_t) pObj->ob.oid, pObj->GetFlags(OFLG_CURRENT) ? 'y' : 'n', pObj->GetFlags(OFLG_CKPT) ? 'y' : 'n', pObj->GetFlags(OFLG_DIRTY) ? 'y' : 'n', pObj->GetFlags(OFLG_REDIRTY) ? 'y' : 'n' ); MsgLog::printf(" pObj=0x%08x calccheck=0x%08x\n", pObj,#ifdef OPTION_OB_MOD_CHECK pObj->CalcCheck()#else 0#endif ); } assert ( PTE::ObIsNotWritable(pObj) ); Persist::WritePage(pObj); } } mg_nextCorePage++; } mg_nextCorePage = 0; migrationStatus = mg_WriteDir;}voidCheckpoint::DrainDirentPage(){ if ( !nextDirPageHdr->IsDirty() ) { nextDirPage->nDirent = 0; nextDirPageHdr->ob.oid = UNDEF_LID; return; } /* This needs to block, because we only posess a single in-core * directory entry page. A regrettable consequence is that writing * the directory is a sequential operation across all threads * desiring to dirty pages. I should look into doing this in lazy * fashion -- pre-reserving the directory page may simply be * unnecessary; I did it to ensure that I wouldn't deadlock stuck * while writing out directory pages because I didn't have time to * think through the relevant possible bugs. */ if (nextDirPageHdr->GetFlags(OFLG_IO) == 0) { /* Add an entry in the master header for this directory page, * and write it to the disk. */ lid_t lid = nextDirPageHdr->ob.oid; if (lid == UNDEF_LID) { lid = coreGeneration[last_ckpt].AllocateLid(); coreGeneration[last_ckpt].alloc.nDirFrame++; nextDirPageHdr->ob.oid = lid; uint32_t firstDirEntry = nextCkptHdr->nRsrvPage + nextCkptHdr->nThreadPage; uint32_t dirEntry = firstDirEntry + nextCkptHdr->nDirPage++; nextDirPageHdr->ob.oid = lid; nextCkptHdr->dirPage[dirEntry] = lid; nextCkptObHdr->SetDirtyFlag(); } Persist::WritePageToLog(nextDirPageHdr, lid, true); } else { Thread::Current()->SleepOn(nextDirPageHdr->ObjectSleepQueue()); Thread::Current()->Yield(); }}voidCheckpoint::AppendDiskDirent(CoreDirent* cde){ if (nextDirPage->nDirent == CkptDirPage::maxDirEnt) { DEBUG(stabilize) MsgLog::dprintf(true, "Draining full disk dirent page\n"); DrainDirentPage(); }#ifdef PARANOID_CKPT if (nextDirPage->nDirent >= CkptDirPage::maxDirEnt) Debugger();#endif CkptDirent &cd = nextDirPage->entry[nextDirPage->nDirent]; cd.oid = cde->oid; cd.count = cde->count; cd.lid = cde->lid; cd.type = cde->type; nextDirPageHdr->SetDirtyFlag(); nextDirPage->nDirent++;}voidCheckpoint::WriteCkptDirectory(){#ifndef NDEBUG /* Sanity check: If all pages and nodes have gone out, there should * exist no entries in the respective core directories with an * UNDEF_LOGLOC entry. */ if (mg_SanityChecked == false) { while (mg_nextDirent != CkNIL) { assert ( mg_nextDirent->lid != UNDEF_LID ); assert ( mg_nextDirent->lid != DEAD_LID ); mg_nextDirent = mg_nextDirent->Successor(); } mg_nextDirent = coreGeneration[last_ckpt].FirstDirEntry(); mg_SanityChecked = true; }#endif while (mg_nextDirent != CkNIL) { if (Thread::Current()->pageWriteCount >= Thread::FairMigrateWrites) { Thread::Current()->pageWriteCount = 0; return; } AppendDiskDirent(mg_nextDirent); mg_nextDirent = mg_nextDirent->Successor(); } if (Thread::Current()->pageWriteCount >= Thread::FairMigrateWrites) { Thread::Current()->pageWriteCount = 0; return; } /* This must not be conditionalized on dirtyness, or the nDirent * field will not be reset properly! */ DrainDirentPage(); DEBUG(wrckdir) MsgLog::dprintf(true, "Last dirent page had %d entries\n", nextDirPage->nDirent); #define NUM_CONTAINERS(x, y) (((x) + (y) - 1) / (y)) uint32_t nThreadPage = NUM_CONTAINERS(coreGeneration[last_ckpt].nThread, ThreadDirPage::maxDirEnt); uint32_t nRsrvPage = NUM_CONTAINERS(coreGeneration[last_ckpt].nReserve, ReserveDirPage::maxDirEnt); assert (nRsrvPage == nextCkptHdr->nRsrvPage); assert (nThreadPage == nextCkptHdr->nThreadPage); while (mg_nextReserveDirPg < nRsrvPage) { if (Thread::Current()->pageWriteCount >= Thread::FairMigrateWrites) { Thread::Current()->pageWriteCount = 0; return; } DEBUG(wrckdir) { MsgLog::dprintf(true, "Writing rsrv dir pg %d to 0x%08x\n", mg_nextReserveDirPg, nextReserveDirLid[mg_nextReserveDirPg]); if (reserveDirHdr[mg_nextReserveDirPg]->IsDirty() == false) { MsgLog::dprintf(true, "Reserve dir pg %d not dirty!\n", mg_nextReserveDirPg); reserveDirHdr[mg_nextReserveDirPg]->SetDirtyFlag(); } } Persist::WritePageToLog(reserveDirHdr[mg_nextReserveDirPg], nextReserveDirLid[mg_nextReserveDirPg]); mg_nextReserveDirPg++; } while (mg_nextThreadDirPg < nThreadPage) { if (Thread::Current()->pageWriteCount >= Thread::FairMigrateWrites) { Thread::Current()->pageWriteCount = 0; return; } DEBUG(wrckdir) { MsgLog::dprintf(true, "Writing thread dir pg %d to 0x%08x\n", mg_nextThreadDirPg, nextThreadDirLid[mg_nextThreadDirPg]); if (threadDirHdr[mg_nextThreadDirPg]->IsDirty() == false) { MsgLog::dprintf(true, "Thread dir pg %d not dirty!\n", mg_nextThreadDirPg); threadDirHdr[mg_nextThreadDirPg]->SetDirtyFlag(); } } Persist::WritePageToLog(threadDirHdr[mg_nextThreadDirPg], nextThreadDirLid[mg_nextThreadDirPg]); mg_nextThreadDirPg++; } mg_nextDirent = coreGeneration[last_ckpt].FirstDirEntry(); mg_nextThreadDirPg = 0; mg_nextReserveDirPg = 0;#ifndef NDEBUG mg_SanityChecked = false;#endif nextCkptObHdr->SetDirtyFlag(); migrationStatus = mg_DrainCheckpoint;}voidCheckpoint::WriteCkptHeader(){ if (!nextCkptObHdr->IsDirty()) { /* Up to this point, we have been writing down the 'next' * checkpoint. Once the checkpoint header has been written, the * 'next' checkpoint becomes the 'last' checkpoint. * * At this point in the logic, we also logically discard all of the * remaining directory pages of the OLD last checkpoint. In * practice, the on-disk object directories were discarded when * migration completed, and the thread and reserve directory pages * will be recycled when the time comes to write the NEW next * checkpoint. */ DEBUG2(ckage,mig) MsgLog::dprintf(true, "About to release Aged Storage\n"); CoreGeneration &fm = coreGeneration[first_migrated]; /* Release the storage associated with the checkpoint directory in * what used to be the last checkpoint generation: */ uint32_t entry = lastCkptHdr->nRsrvPage + lastCkptHdr->nThreadPage; for (uint32_t i = 0; i < lastCkptHdr->nDirPage; i++, entry++) { lid_t dirLoc = lastCkptHdr->dirPage[entry]; Checkpoint::DeallocateLid(dirLoc); fm.release.nDirFrame++; fm.release.nFrames++; } assert (fm.alloc.nDirFrame == fm.release.nDirFrame); lastCkptHdr->nDirPage = 0; /* Now that a new checkpoint has been stabilized, we can permit * storage from the previous checkpoint to be reclaimed, and we * can free the directory pages associated with that checkpoint. */ fm.canReclaim = true; SwapCheckpointHeaders(); SwapThreadDirs(); InitNextCkptHdr(); /* probably unnecessary - done in */ /* TakeCheckpoint() */ nCheckpointsCompleted++; migrationStatus = mg_MigrateObjects;#ifdef TESTING_CKPT Debugger();#endif return; } /* This needs to block, because we only posess a single in-core * directory entry page. A regrettable consequence is that writing * the directory is a sequential operation across all threads * desiring to dirty pages. I should look into doing this in lazy * fashion -- pre-reserving the directory page may simply be * unnecessary; I did it to ensure that I wouldn't deadlock stuck * while writing out directory pages because I didn't have time to * think through the relevant possible bugs. */ if (nextCkptObHdr->GetFlags(OFLG_IO) == 0) { nextCkptObHdr->ob.oid = nextCkptHdrLid; DEBUG(ckdir) MsgLog::dprintf(true, "Writing CkHdr seq=0x%08x to 0x%08x, dirpg %d\n", (uint32_t) nextCkptHdr->sequenceNumber, nextCkptHdrLid, nextCkptHdr->nDirPage); Persist::WritePageToLog(nextCkptObHdr, nextCkptHdrLid, true); } else { Thread::Current()->SleepOn(nextCkptObHdr->ObjectSleepQueue()); Thread::Current()->Yield(); }}voidCheckpoint::DrainLastObjectPot(){ ObjectHeader *pPot = ObjectHeader::Lookup(ObType::PtObjectPot, mg_objectPotOID); if (pPot && (pPot->IsDirty())) Persist::WritePage(pPot);}ObjectHeader *Checkpoint::GetObjectPot(OID oid){ FrameInfo fi(oid); if (fi.cd == 0) MsgLog::fatal("Migrating to unknown range!\n"); /* Bring in the object pot from the home location: */ ObjectHeader *pObjectPot = Persist::GetObjectPot(fi); pObjectPot->TransLock(); /* If this object is going to a new pot, and the old pot is still in * core and is dirty, start the old pot to the disk: */ if (pObjectPot->ob.oid != mg_objectPotOID) { DrainLastObjectPot(); } mg_objectPotOID = pObjectPot->ob.oid; return pObjectPot;}/* Returns true iff tag changed. */boolCheckpoint::UpdateTagPot(CoreDirent *cd, ObCount count){ bool result = false; FrameInfo fi(cd->oid); if (fi.cd == 0) MsgLog::fatal("Bad oid 0x%08x%08x to UpdateTagPot()\n", (cd->oid >> 32), cd->oid); ObjectHeader *pTagHdr = Persist::GetTagPot(fi); pTagHdr->TransLock(); /* If this object is going to a new tag pot, and the old tag pot is * still in core and is dirty, start it to the disk: */ if (pTagHdr->ob.oid != mg_tagPotOID) DrainLastTagPot(); mg_tagPotOID = pTagHdr->ob.oid;#if 0 MsgLog::printf("pTagHdr 0x%08x entry %d\n", pTagHdr, fi.tagEntry);#endif PagePot* pPagePot = (PagePot *) ObjectCache::ObHdrToPage(pTagHdr); /* If the tag is incorrect, we may need to set all the allocation * counts on the objects themselves. This is a bloody great mess. * We don't need to do it for objects that occupy the full frame, as * in such cases we have the correct answer in hand already. * * Note that we are careful to ensure always that the tag pot * allocation count is ALWAYS the max of the member counts. This * eliminates the need to make an additional type-dependent pass * over the frame here. */ uint8_t tagType = cd->type; if (cd->type == FRM_TYPE_ZNODE) tagType = FRM_TYPE_NODE; if (pPagePot->type[fi.tagEntry] != tagType) {#ifdef DBG_WILD_PTR if (dbg_wild_ptr) Check::Consistency("In retag()");#endif pPagePot->type[fi.tagEntry] = tagType; pTagHdr->SetDirtyFlag();#ifdef DBG_WILD_PTR if (dbg_wild_ptr) Check::Consistency("Post-In retag()");#endif result = true; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -