📄 chunkmanager.cc
字号:
voidChunkManager::ReadChunkDone(ReadOp *op){ ChunkInfoHandle_t *cih = NULL; if ((GetChunkInfoHandle(op->chunkId, &cih) < 0) || (op->chunkVersion != cih->chunkInfo.chunkVersion)) { AdjustDataRead(op); if (cih) { KFS_LOG_VA_INFO("Version # mismatch(have=%u vs asked=%ld...", cih->chunkInfo.chunkVersion, op->chunkVersion); } op->status = -KFS::EBADVERS; return; } ZeroPad(op->dataBuf); assert(op->dataBuf->BytesConsumable() >= (int) CHECKSUM_BLOCKSIZE); // either nothing to verify or it better match bool mismatch = false; // figure out the block we are starting from and grab all the checksums vector<uint32_t>::size_type i, checksumBlock = OffsetToChecksumBlockStart(op->offset); vector<uint32_t> checksums = ComputeChecksums(op->dataBuf, op->dataBuf->BytesConsumable()); // the checksums should be loaded... if (!cih->chunkInfo.AreChecksumsLoaded()) { // the read took too long; the checksums got paged out. ask the client to retry KFS_LOG_VA_INFO("Checksums for chunk %lld got paged out; returning EAGAIN to client", cih->chunkInfo.chunkId); op->status = -EAGAIN; return; } cih->chunkInfo.VerifyChecksumsLoaded(); for (i = 0; i < checksums.size() && checksumBlock < MAX_CHUNK_CHECKSUM_BLOCKS; checksumBlock++, i++) { if ((cih->chunkInfo.chunkBlockChecksum[checksumBlock] == 0) || (checksums[i] == cih->chunkInfo.chunkBlockChecksum[checksumBlock])) { continue; } mismatch = true; break; } if (!mismatch) { // for checksums to verify, we did reads in multiples of // checksum block sizes. so, get rid of the extra AdjustDataRead(op); return; } // die ("checksum mismatch"); KFS_LOG_VA_ERROR("Checksum mismatch for chunk=%ld, offset=%ld, bytes = %ld: expect: %u, computed: %u ", op->chunkId, op->offset, op->numBytesIO, cih->chunkInfo.chunkBlockChecksum[checksumBlock], checksums[i]); op->status = -KFS::EBADCKSUM; // Notify the metaserver that the chunk we have is "bad"; the // metaserver will re-replicate this chunk. NotifyMetaCorruptedChunk(op->chunkId); // Take out the chunk from our side StaleChunk(op->chunkId);}voidChunkManager::NotifyMetaCorruptedChunk(kfsChunkId_t chunkId){ ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(chunkId, &cih) < 0) { KFS_LOG_VA_ERROR("Unable to notify metaserver of corrupt chunk: %lld", chunkId); return; } KFS_LOG_VA_INFO("Notifying metaserver of corrupt chunk (%ld) in file %lld", cih->chunkInfo.fileId, chunkId); // This op will get deleted when we get an ack from the metaserver CorruptChunkOp *ccop = new CorruptChunkOp(0, cih->chunkInfo.fileId, chunkId); gMetaServerSM.EnqueueOp(ccop);}//// directory with dirname is unaccessable; maybe drive failed. so,// notify metaserver of lost blocks. the metaserver will then// re-replicate.//voidChunkManager::NotifyMetaChunksLost(const string &dirname){ ChunkInfoHandle_t *cih; CMI iter = mChunkTable.begin(); while (iter != mChunkTable.end()) { cih = iter->second; if (cih->chunkInfo.GetDirname() != dirname) { ++iter; continue; } KFS_LOG_VA_INFO("Notifying metaserver of lost chunk (%ld) in file %lld in dir %s", cih->chunkInfo.fileId, cih->chunkInfo.chunkId, dirname.c_str()); // This op will get deleted when we get an ack from the metaserver CorruptChunkOp *ccop = new CorruptChunkOp(0, cih->chunkInfo.fileId, cih->chunkInfo.chunkId); gMetaServerSM.EnqueueOp(ccop); // get rid of chunkid from our list CMI prev = iter; ++iter; mChunkTable.erase(cih->chunkInfo.chunkId); delete cih; } }voidChunkManager::ZeroPad(IOBuffer *buffer){ int bytesFilled = buffer->BytesConsumable(); if ((bytesFilled % CHECKSUM_BLOCKSIZE) == 0) return; int numToZero = CHECKSUM_BLOCKSIZE - (bytesFilled % CHECKSUM_BLOCKSIZE); if (numToZero > 0) { // pad with 0's buffer->ZeroFill(numToZero); }}voidChunkManager::AdjustDataRead(ReadOp *op){ size_t extraRead = op->offset - OffsetToChecksumBlockStart(op->offset); if (extraRead > 0) op->dataBuf->Consume(extraRead); if (op->dataBuf->BytesConsumable() > op->numBytesIO) op->dataBuf->Trim(op->numBytesIO);}uint32_t ChunkManager::GetChecksum(kfsChunkId_t chunkId, off_t offset){ uint32_t checksumBlock = OffsetToChecksumBlockNum(offset); ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(chunkId, &cih) < 0) return 0; // the checksums should be loaded... cih->chunkInfo.VerifyChecksumsLoaded(); assert(checksumBlock <= MAX_CHUNK_CHECKSUM_BLOCKS); return cih->chunkInfo.chunkBlockChecksum[checksumBlock];}DiskConnection *ChunkManager::SetupDiskConnection(kfsChunkId_t chunkId, KfsOp *op){ ChunkInfoHandle_t *cih; DiskConnection *diskConnection; CMI tableEntry = mChunkTable.find(chunkId); if (tableEntry == mChunkTable.end()) { return NULL; } cih = tableEntry->second; if ((!cih->dataFH) || (cih->dataFH->mFd < 0)) { CleanupInactiveFds(); if (OpenChunk(chunkId, O_RDWR) < 0) return NULL; } cih->lastIOTime = time(0); diskConnection = new DiskConnection(cih->dataFH, op); return diskConnection;} voidChunkManager::CancelChunkOp(KfsCallbackObj *cont, kfsChunkId_t chunkId){ // Cancel the chunk operations scheduled by KfsCallbackObj on chunkId. // XXX: Fill it...}//// dump out the contents of the chunkTable to disk//voidChunkManager::Checkpoint(){ CheckpointOp *cop; // on the macs, i can't declare CMI iter; CMI iter = mChunkTable.begin(); mLastCheckpointTime = time(NULL); if (!mIsChunkTableDirty) return; // KFS_LOG_VA_DEBUG("Checkpointing state"); cop = new CheckpointOp(1); #if 0 // there are no more checkpoints on the chunkserver...this will all go // we are using this to rotate logs... ChunkInfoHandle_t *cih; for (iter = mChunkTable.begin(); iter != mChunkTable.end(); ++iter) { cih = iter->second; // If a chunk is being replicated, then it is not yet a part // of the namespace. When replication is done, it becomes a // part of the namespace. This model keeps recovery simple: // if we die in the midst of replicating a chunk, on restart, // we will the chunk as an orphan and throw it away. if (cih->isBeingReplicated) continue; cop->data << cih->chunkInfo.fileId << ' '; cop->data << cih->chunkInfo.chunkId << ' '; cop->data << cih->chunkInfo.chunkSize << ' '; cop->data << cih->chunkInfo.chunkVersion << ' '; cop->data << MAX_CHUNK_CHECKSUM_BLOCKS << ' '; for (uint32_t i = 0; i < MAX_CHUNK_CHECKSUM_BLOCKS; ++i) { cop->data << cih->chunkInfo.chunkBlockChecksum[i] << ' '; } cop->data << endl; }#endif gLogger.Submit(cop); // Now, everything is clean... mIsChunkTableDirty = false;}//// Get all the chunk directory entries from all the places we can// store the chunks into a single array.//intChunkManager::GetChunkDirsEntries(struct dirent ***namelist){ struct dirent **entries; vector<struct dirent **> dirEntries; vector<int> dirEntriesCount; int res, numChunkFiles = 0; uint32_t i; *namelist = NULL; for (i = 0; i < mChunkDirs.size(); i++) { res = scandir(mChunkDirs[i].dirname.c_str(), &entries, 0, alphasort); if (res < 0) { KFS_LOG_VA_INFO("Unable to open %s", mChunkDirs[i].dirname.c_str()); for (i = 0; i < dirEntries.size(); i++) { entries = dirEntries[i]; for (int j = 0; j < dirEntriesCount[i]; j++) free(entries[j]); free(entries); } dirEntries.clear(); return -1; } dirEntries.push_back(entries); dirEntriesCount.push_back(res); numChunkFiles += res; } // Get all the directory entries into one giganto array *namelist = (struct dirent **) malloc(sizeof(struct dirent **) * numChunkFiles); numChunkFiles = 0; for (i = 0; i < dirEntries.size(); i++) { int count = dirEntriesCount[i]; entries = dirEntries[i]; memcpy((*namelist) + numChunkFiles, entries, count * sizeof(struct dirent **)); numChunkFiles += count; } return numChunkFiles;}voidChunkManager::GetChunkPathEntries(vector<string> &pathnames){ uint32_t i; struct dirent **entries = NULL; int res; for (i = 0; i < mChunkDirs.size(); i++) { res = scandir(mChunkDirs[i].dirname.c_str(), &entries, 0, alphasort); if (res < 0) { KFS_LOG_VA_INFO("Unable to open %s", mChunkDirs[i].dirname.c_str()); continue; } for (int j = 0; j < res; j++) { string s = mChunkDirs[i].dirname + "/" + entries[j]->d_name; pathnames.push_back(s); free(entries[j]); } free(entries); }}voidChunkManager::Restart(){ int version; version = gLogger.GetVersionFromCkpt(); if (version == gLogger.GetLoggerVersionNum()) { Restore(); } else { std::cout << "Unsupported version...copy out the data and copy it back in...." << std::endl; exit(-1); } // Write out a new checkpoint file with just version and set it at 2 gLogger.Checkpoint(NULL);}voidChunkManager::Restore(){ // sort all the chunk names alphabetically in each of the // directories vector<string> chunkPathnames; struct stat buf; int res; uint32_t i, numChunkFiles; GetChunkPathEntries(chunkPathnames); numChunkFiles = chunkPathnames.size(); // each chunk file is of the form: <fileid>.<chunkid>.<chunkversion> // parse the filename to extract out the chunk info for (i = 0; i < numChunkFiles; ++i) { string s = chunkPathnames[i]; ChunkInfoHandle_t *cih; res = stat(s.c_str(), &buf); if ((res < 0) || (!S_ISREG(buf.st_mode))) continue; MakeChunkInfoFromPathname(s, buf.st_size, &cih); if (cih != NULL) AddMapping(cih); else { KFS_LOG_VA_INFO("Deleting possibly duplicate file %s", s.c_str()); unlink(s.c_str()); } }}voidChunkManager::AddMapping(ChunkInfoHandle_t *cih){ mNumChunks++; mChunkTable[cih->chunkInfo.chunkId] = cih; mUsedSpace += cih->chunkInfo.chunkSize; UpdateDirSpace(cih, cih->chunkInfo.chunkSize);}voidChunkManager::ReplayAllocChunk(kfsFileId_t fileId, kfsChunkId_t chunkId, int64_t chunkVersion){ ChunkInfoHandle_t *cih; mIsChunkTableDirty = true; if (GetChunkInfoHandle(chunkId, &cih) == 0) { // If the entry exists, just update the version cih->chunkInfo.chunkVersion = chunkVersion; mChunkTable[chunkId] = cih; return; } mNumChunks++; // after replay is done, when we verify entries in the table, we // stat the file and fix up the sizes then. so, no need to do // anything here. cih = new ChunkInfoHandle_t(); cih->chunkInfo.fileId = fileId; cih->chunkInfo.chunkId = chunkId; cih->chunkInfo.chunkVersion = chunkVersion; mChunkTable[chunkId] = cih;}voidChunkManager::ReplayChangeChunkVers(kfsFileId_t fileId, kfsChunkId_t chunkId, int64_t chunkVersion){ ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(chunkId, &cih) != 0) return; KFS_LOG_VA_DEBUG("Chunk %ld already exists; changing version # to %ld", chunkId, chunkVersion); // Update the version # cih->chunkInfo.chunkVersion = chunkVersion; mChunkTable[chunkId] = cih; mIsChunkTableDirty = true;}voidChunkManager::ReplayDeleteChunk(kfsChunkId_t chunkId){ ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); mIsChunkTableDirty = true; if (tableEntry != mChunkTable.end()) { cih = tableEntry->second; mUsedSpace -= cih->chunkInfo.chunkSize; mChunkTable.erase(chunkId); delete cih; mNumChunks--; assert(mNumChunks >= 0); if (mNumChunks < 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -