📄 chunkmanager.cc
字号:
mChunkTable[chunkId] = cih; return 0;}voidChunkManager::ReplicationDone(kfsChunkId_t chunkId){ ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); if (tableEntry == mChunkTable.end()) { return; } cih = tableEntry->second;#ifdef DEBUG string chunkPathname = MakeChunkPathname(cih); KFS_LOG_VA_DEBUG("Replication for chunk %s is complete...", chunkPathname.c_str());#endif mIsChunkTableDirty = true; cih->isBeingReplicated = false; mChunkTable[chunkId] = cih;}voidChunkManager::Start(){ globals().netManager.RegisterTimeoutHandler(mChunkManagerTimeoutImpl);}voidChunkManager::UpdateDirSpace(ChunkInfoHandle_t *cih, off_t nbytes){ for (uint32_t i = 0; i < mChunkDirs.size(); i++) { if (mChunkDirs[i].dirname == cih->chunkInfo.GetDirname()) { mChunkDirs[i].usedSpace += nbytes; if (mChunkDirs[i].usedSpace < 0) mChunkDirs[i].usedSpace = 0; } }}stringChunkManager::GetDirForChunk(){ if (mChunkDirs.size() == 1) return mChunkDirs[0].dirname; // round robin over the drives, picking one that has space; this // has the effect of spreading the load over all the drives. int32_t dirToUse; bool found = false; for (uint32_t i = 0; i < mChunkDirs.size(); i++) { dirToUse = (mLastDriveChosen + i + 1) % mChunkDirs.size(); if ((mChunkDirs[dirToUse].availableSpace == 0) || ((mChunkDirs[dirToUse].availableSpace - mChunkDirs[dirToUse].usedSpace) < (off_t) CHUNKSIZE)) { continue; } else { found = true; break; } } if (!found) return ""; mLastDriveChosen = dirToUse; return mChunkDirs[dirToUse].dirname;}stringChunkManager::MakeChunkPathname(ChunkInfoHandle_t *cih){ ostringstream os; os << cih->chunkInfo.GetDirname() << '/' << cih->chunkInfo.fileId << '.' << cih->chunkInfo.chunkId << '.' << cih->chunkInfo.chunkVersion; return os.str();}stringChunkManager::MakeChunkPathname(const string &chunkdir, kfsFileId_t fid, kfsChunkId_t chunkId, kfsSeq_t chunkVersion){ ostringstream os; os << chunkdir << '/' << fid << '.' << chunkId << '.' << chunkVersion; return os.str();}stringChunkManager::MakeStaleChunkPathname(ChunkInfoHandle_t *cih){ ostringstream os; string staleChunkDir = GetStaleChunkPath(cih->chunkInfo.GetDirname()); os << staleChunkDir << '/' << cih->chunkInfo.fileId << '.' << cih->chunkInfo.chunkId << '.' << cih->chunkInfo.chunkVersion; return os.str();}voidChunkManager::MakeChunkInfoFromPathname(const string &pathname, off_t filesz, ChunkInfoHandle_t **result){ string::size_type slash = pathname.rfind('/'); ChunkInfoHandle_t *cih; if (slash == string::npos) { *result = NULL; return; } string chunkFn, dirname; vector<string> component; dirname.assign(pathname, 0, slash); chunkFn.assign(pathname, slash + 1, string::npos); split(component, chunkFn, '.'); assert(component.size() == 3); chunkId_t chunkId = atoll(component[1].c_str()); if (GetChunkInfoHandle(chunkId, &cih) == 0) { KFS_LOG_VA_INFO("Duplicate chunk (%lld) with path: %s", chunkId, pathname.c_str()); *result = NULL; return; } cih = new ChunkInfoHandle_t(); cih->chunkInfo.fileId = atoll(component[0].c_str()); cih->chunkInfo.chunkId = atoll(component[1].c_str()); cih->chunkInfo.chunkVersion = atoll(component[2].c_str()); if (filesz >= (off_t) KFS_CHUNK_HEADER_SIZE) cih->chunkInfo.chunkSize = filesz - KFS_CHUNK_HEADER_SIZE; cih->chunkInfo.SetDirname(dirname); *result = cih; /* KFS_LOG_VA_DEBUG("From %s restored: %d, %d, %d", chunkFn.c_str(), cih->chunkInfo.fileId, cih->chunkInfo.chunkId, cih->chunkInfo.chunkVersion); */}intChunkManager::OpenChunk(kfsChunkId_t chunkId, int openFlags){ string fn; int fd; ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); if (tableEntry == mChunkTable.end()) { KFS_LOG_VA_DEBUG("No such chunk: %s", fn.c_str()); return -EBADF; } cih = tableEntry->second; fn = MakeChunkPathname(cih); if ((!cih->dataFH) || (cih->dataFH->mFd < 0)) { fd = open(fn.c_str(), openFlags, S_IRUSR|S_IWUSR); if (fd < 0) { perror("open: "); return -EBADF; } globals().ctrOpenDiskFds.Update(1); // the checksums will be loaded async cih->Init(fd); } else { fd = cih->dataFH->mFd; } return 0;}voidChunkManager::CloseChunk(kfsChunkId_t chunkId){ ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); if (tableEntry == mChunkTable.end()) { return; } cih = tableEntry->second; // If there are at most 2 references to the handle---a reference // from mChunkTable and a reference from cih->chunkHandle, then // we can safely close the fileid. if (cih->dataFH.use_count() <= 2) { if ((!cih->dataFH) || (cih->dataFH->mFd < 0)) return; cih->Release(); }}intChunkManager::ChunkSize(kfsChunkId_t chunkId, off_t *chunkSize){ ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(chunkId, &cih) < 0) return -EBADF; *chunkSize = cih->chunkInfo.chunkSize; return 0;}intChunkManager::ReadChunk(ReadOp *op){ ssize_t res; DiskConnection *d; ChunkInfoHandle_t *cih; off_t offset; size_t numBytesIO; if (GetChunkInfoHandle(op->chunkId, &cih) < 0) return -EBADF; d = SetupDiskConnection(op->chunkId, op); if (d == NULL) return -KFS::ESERVERBUSY; // the checksums should be loaded... cih->chunkInfo.VerifyChecksumsLoaded(); if (op->chunkVersion != cih->chunkInfo.chunkVersion) { KFS_LOG_VA_INFO("Version # mismatch(have=%u vs asked=%ld...failing a read", cih->chunkInfo.chunkVersion, op->chunkVersion); return -KFS::EBADVERS; } op->diskConnection.reset(d); // schedule a read based on the chunk size if (op->offset >= cih->chunkInfo.chunkSize) { op->numBytesIO = 0; } else if ((off_t) (op->offset + op->numBytes) > cih->chunkInfo.chunkSize) { op->numBytesIO = cih->chunkInfo.chunkSize - op->offset; } else { op->numBytesIO = op->numBytes; } if (op->numBytesIO == 0) return -EIO; // for checksumming to work right, reads should be in terms of // checksum-blocks. offset = OffsetToChecksumBlockStart(op->offset); numBytesIO = (op->numBytesIO / CHECKSUM_BLOCKSIZE) * CHECKSUM_BLOCKSIZE; if (op->numBytesIO % CHECKSUM_BLOCKSIZE) numBytesIO += CHECKSUM_BLOCKSIZE; // Make sure we don't try to read past EOF; the checksumming will // do the necessary zero-padding. if ((off_t) (offset + numBytesIO) > cih->chunkInfo.chunkSize) numBytesIO = cih->chunkInfo.chunkSize - offset; if ((res = op->diskConnection->Read(offset + KFS_CHUNK_HEADER_SIZE, numBytesIO)) < 0) return -EIO; // read was successfully scheduled return 0;}intChunkManager::WriteChunk(WriteOp *op){ ChunkInfoHandle_t *cih; int res; if (GetChunkInfoHandle(op->chunkId, &cih) < 0) return -EBADF; // the checksums should be loaded... cih->chunkInfo.VerifyChecksumsLoaded(); // schedule a write based on the chunk size. Make sure that a // write doesn't overflow the size of a chunk. op->numBytesIO = min((size_t) (KFS::CHUNKSIZE - op->offset), op->numBytes); if (op->numBytesIO == 0) return -EINVAL;#if defined(__APPLE__) size_t addedBytes = max((long long) 0, op->offset + op->numBytesIO - cih->chunkInfo.chunkSize);#else size_t addedBytes = max((size_t) 0, (size_t) (op->offset + op->numBytesIO - cih->chunkInfo.chunkSize));#endif if ((off_t) (mUsedSpace + addedBytes) >= mTotalSpace) return -ENOSPC; if ((OffsetToChecksumBlockStart(op->offset) == op->offset) && ((size_t) op->numBytesIO >= (size_t) CHECKSUM_BLOCKSIZE)) { assert(op->numBytesIO % CHECKSUM_BLOCKSIZE == 0); if (op->numBytesIO % CHECKSUM_BLOCKSIZE != 0) { return -EINVAL; }#if 0 // checksum was computed when we got data from client..so, skip // Hopefully, common case: write covers an entire block and // so, we just compute checksum and get on with the write. op->checksums = ComputeChecksums(op->dataBuf, op->numBytesIO);#endif if (!op->isFromReReplication) { assert(op->checksums[0] == op->wpop->checksum); } else { op->checksums = ComputeChecksums(op->dataBuf, op->numBytesIO); } } else { assert((size_t) op->numBytesIO < (size_t) CHECKSUM_BLOCKSIZE); op->checksums.clear(); // The checksum block we are after is beyond the current // end-of-chunk. So, treat that as a 0-block and splice in. if (OffsetToChecksumBlockStart(op->offset) >= cih->chunkInfo.chunkSize) { IOBuffer *data = new IOBuffer(); data->ZeroFill(CHECKSUM_BLOCKSIZE); data->Splice(op->dataBuf, op->offset % CHECKSUM_BLOCKSIZE, op->numBytesIO); delete op->dataBuf; op->dataBuf = data; goto do_checksum; } // Need to read the data block over which the checksum is // computed. if (op->rop == NULL) { // issue a read ReadOp *rop = new ReadOp(op, OffsetToChecksumBlockStart(op->offset), CHECKSUM_BLOCKSIZE); KFS_LOG_VA_DEBUG("Write triggered a read for offset = %ld", op->offset); op->rop = rop; rop->Execute(); if (rop->status < 0) { int res = rop->status; op->rop = NULL; rop->wop = NULL; delete rop; return res; } return 0; } // If the read failed, cleanup and bail if (op->rop->status < 0) { op->status = op->rop->status; op->rop->wop = NULL; delete op->rop; op->rop = NULL; return op->HandleDone(EVENT_DISK_ERROR, NULL); } // All is good. So, get on with checksumming op->rop->dataBuf->Splice(op->dataBuf, op->offset % CHECKSUM_BLOCKSIZE, op->numBytesIO); delete op->dataBuf; op->dataBuf = op->rop->dataBuf; op->rop->dataBuf = NULL; // If the buffer doesn't have a full CHECKSUM_BLOCKSIZE worth // of data, zero-pad the end. We don't need to zero-pad the // front because the underlying filesystem will zero-fill when // we read a hole. ZeroPad(op->dataBuf); do_checksum: assert(op->dataBuf->BytesConsumable() == (int) CHECKSUM_BLOCKSIZE); uint32_t cksum = ComputeBlockChecksum(op->dataBuf, CHECKSUM_BLOCKSIZE); op->checksums.push_back(cksum); // eat away the stuff at the beginning, so that we write out // exactly where we were asked from. off_t extra = op->offset - OffsetToChecksumBlockStart(op->offset); if (extra > 0) op->dataBuf->Consume(extra); } DiskConnection *d = SetupDiskConnection(op->chunkId, op); if (d == NULL) return -KFS::ESERVERBUSY; op->diskConnection.reset(d); /* KFS_LOG_VA_DEBUG("Checksum for chunk: %ld, offset = %ld, bytes = %ld, # of cksums = %u", op->chunkId, op->offset, op->numBytesIO, op->checksums.size()); */ res = op->diskConnection->Write(op->offset + KFS_CHUNK_HEADER_SIZE, op->numBytesIO, op->dataBuf); if (res >= 0) UpdateChecksums(cih, op); return res;}voidChunkManager::UpdateChecksums(ChunkInfoHandle_t *cih, WriteOp *op){ mIsChunkTableDirty = true; off_t endOffset = op->offset + op->numBytesIO; // the checksums should be loaded... cih->chunkInfo.VerifyChecksumsLoaded(); for (vector<uint32_t>::size_type i = 0; i < op->checksums.size(); i++) { off_t offset = op->offset + i * CHECKSUM_BLOCKSIZE; uint32_t checksumBlock = OffsetToChecksumBlockNum(offset); cih->chunkInfo.chunkBlockChecksum[checksumBlock] = op->checksums[i]; } if (cih->chunkInfo.chunkSize < endOffset) { UpdateDirSpace(cih, endOffset - cih->chunkInfo.chunkSize); mUsedSpace += endOffset - cih->chunkInfo.chunkSize; cih->chunkInfo.chunkSize = endOffset; } assert(0 <= mUsedSpace && mUsedSpace <= mTotalSpace);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -