📄 chunkmanager.cc
字号:
mNumChunks = 0; }}voidChunkManager::ReplayWriteDone(kfsChunkId_t chunkId, off_t chunkSize, off_t offset, vector<uint32_t> checksums){ ChunkInfoHandle_t *cih; int res; res = GetChunkInfoHandle(chunkId, &cih); if (res < 0) return; mIsChunkTableDirty = true; mUsedSpace -= cih->chunkInfo.chunkSize; cih->chunkInfo.chunkSize = chunkSize; mUsedSpace += cih->chunkInfo.chunkSize; for (vector<uint32_t>::size_type i = 0; i < checksums.size(); i++) { off_t currOffset = offset + i * CHECKSUM_BLOCKSIZE; size_t checksumBlock = OffsetToChecksumBlockNum(currOffset); cih->chunkInfo.chunkBlockChecksum[checksumBlock] = checksums[i]; }}voidChunkManager::ReplayTruncateDone(kfsChunkId_t chunkId, off_t chunkSize){ ChunkInfoHandle_t *cih; int res; off_t lastChecksumBlock; res = GetChunkInfoHandle(chunkId, &cih); if (res < 0) return; mIsChunkTableDirty = true; mUsedSpace -= cih->chunkInfo.chunkSize; cih->chunkInfo.chunkSize = chunkSize; mUsedSpace += cih->chunkInfo.chunkSize; lastChecksumBlock = OffsetToChecksumBlockNum(chunkSize); cih->chunkInfo.chunkBlockChecksum[lastChecksumBlock] = 0;}voidChunkManager::GetHostedChunks(vector<ChunkInfo_t> &result){ ChunkInfoHandle_t *cih; // walk thru the table and pick up the chunk-ids for (CMI iter = mChunkTable.begin(); iter != mChunkTable.end(); ++iter) { cih = iter->second; result.push_back(cih->chunkInfo); }}intChunkManager::GetChunkInfoHandle(kfsChunkId_t chunkId, ChunkInfoHandle_t **cih){ CMI iter = mChunkTable.find(chunkId); if (iter == mChunkTable.end()) { *cih = NULL; return -EBADF; } *cih = iter->second; return 0;}intChunkManager::AllocateWriteId(WriteIdAllocOp *wi){ WriteOp *op; ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(wi->chunkId, &cih) < 0) return -EBADF; if (wi->chunkVersion != cih->chunkInfo.chunkVersion) { KFS_LOG_VA_INFO("Version # mismatch(have=%d vs asked=%lu...failing a write", cih->chunkInfo.chunkVersion, wi->chunkVersion); return -EINVAL; } mWriteId++; op = new WriteOp(wi->seq, wi->chunkId, wi->chunkVersion, wi->offset, wi->numBytes, NULL, mWriteId); op->enqueueTime = time(NULL); wi->writeId = mWriteId; op->isWriteIdHolder = true; mPendingWrites.push_back(op); return 0;}intChunkManager::EnqueueWrite(WritePrepareOp *wp){ WriteOp *op; ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(wp->chunkId, &cih) < 0) return -EBADF; if (wp->chunkVersion != cih->chunkInfo.chunkVersion) { KFS_LOG_VA_INFO("Version # mismatch(have=%d vs asked=%lu...failing a write", cih->chunkInfo.chunkVersion, wp->chunkVersion); return -EINVAL; } op = GetWriteOp(wp->writeId); if (op->dataBuf == NULL) op->dataBuf = wp->dataBuf; else op->dataBuf->Append(wp->dataBuf); wp->dataBuf = NULL; mPendingWrites.push_back(op); return 0;}// Helper functor that matches pending writes by chunkid'sclass ChunkIdMatcher { kfsChunkId_t myid;public: ChunkIdMatcher(kfsChunkId_t s) : myid(s) { } bool operator() (const WriteOp *r) { return (r->chunkId == myid); }};boolChunkManager::IsWritePending(kfsChunkId_t chunkId){ list<WriteOp *>::iterator i; i = find_if(mPendingWrites.begin(), mPendingWrites.end(), ChunkIdMatcher(chunkId)); if (i == mPendingWrites.end()) return false; return true;}int64_tChunkManager::GetChunkVersion(kfsChunkId_t c){ ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(c, &cih) < 0) return -1; return cih->chunkInfo.chunkVersion;}// Helper functor that matches write id's by sequence #'sclass WriteIdMatcher { int64_t myid;public: WriteIdMatcher(int64_t s) : myid(s) { } bool operator() (const WriteOp *r) { return (r->writeId == myid); }};WriteOp *ChunkManager::GetWriteOp(int64_t writeId){ list<WriteOp *>::iterator i; WriteOp *op; i = find_if(mPendingWrites.begin(), mPendingWrites.end(), WriteIdMatcher(writeId)); if (i == mPendingWrites.end()) return NULL; op = *i; mPendingWrites.erase(i); return op;}WriteOp *ChunkManager::CloneWriteOp(int64_t writeId){ list<WriteOp *>::iterator i; WriteOp *op, *other; i = find_if(mPendingWrites.begin(), mPendingWrites.end(), WriteIdMatcher(writeId)); if (i == mPendingWrites.end()) return NULL; other = *i; if (other->status < 0) // if the write is "bad" already, don't add more data to it return NULL; // Since we are cloning, "touch" the time other->enqueueTime = time(NULL); // offset/size/buffer are to be filled in op = new WriteOp(other->seq, other->chunkId, other->chunkVersion, 0, 0, NULL, other->writeId); return op;}voidChunkManager::SetWriteStatus(int64_t writeId, int status){ list<WriteOp *>::iterator i; WriteOp *op; i = find_if(mPendingWrites.begin(), mPendingWrites.end(), WriteIdMatcher(writeId)); if (i == mPendingWrites.end()) return; op = *i; op->status = status; KFS_LOG_VA_INFO("Setting the status of writeid: %d to %d", writeId, status);}boolChunkManager::IsValidWriteId(int64_t writeId){ list<WriteOp *>::iterator i; i = find_if(mPendingWrites.begin(), mPendingWrites.end(), WriteIdMatcher(writeId)); // valid if we don't hit the end of the list return (i != mPendingWrites.end());}voidChunkManager::Timeout(){ time_t now = time(NULL);#ifdef DEBUG verifyExecutingOnEventProcessor();#endif if (now - mLastCheckpointTime > CKPT_TIME_INTERVAL) { Checkpoint(); // if any writes have been around for "too" long, remove them // and reclaim memory ScavengePendingWrites(); // cleanup inactive fd's and thereby free up fd's CleanupInactiveFds(true); }}voidChunkManager::ScavengePendingWrites(){ list<WriteOp *>::iterator i; WriteOp *op; time_t now = time(NULL); ChunkInfoHandle_t *cih; i = mPendingWrites.begin(); while (i != mPendingWrites.end()) { op = *i; // The list is sorted by enqueue time if (now - op->enqueueTime < MAX_PENDING_WRITE_LRU_SECS) { break; } // if it exceeds 5 mins, retire the op KFS_LOG_VA_DEBUG("Retiring write with id=%ld as it has been too long", op->writeId); mPendingWrites.pop_front(); if ((GetChunkInfoHandle(op->chunkId, &cih) == 0) && (now - cih->lastIOTime >= INACTIVE_FDS_CLEANUP_INTERVAL_SECS)) { // close the chunk only if it is inactive CloseChunk(op->chunkId); } delete op; i = mPendingWrites.begin(); }}intChunkManager::Sync(WriteOp *op){ if (!op->diskConnection) { return -1; } return op->diskConnection->Sync(op->waitForSyncDone);}class InactiveFdCleaner { time_t now;public: InactiveFdCleaner(time_t n) : now(n) { } void operator() (const std::tr1::unordered_map<kfsChunkId_t, ChunkInfoHandle_t *>::value_type v) { ChunkInfoHandle_t *cih = v.second; if ((!cih->dataFH) || (cih->dataFH->mFd < 0) || (gLeaseClerk.IsLeaseValid(cih->chunkInfo.chunkId)) || (now - cih->lastIOTime < INACTIVE_FDS_CLEANUP_INTERVAL_SECS) || (cih->isBeingReplicated)) return; // we have a valid file-id and it has been over 5 mins since we last did I/O on it. KFS_LOG_VA_DEBUG("cleanup: closing fileid = %d, for chunk = %ld", cih->dataFH->mFd, cih->chunkInfo.chunkId); cih->Release(); }};voidChunkManager::CleanupInactiveFds(bool periodic){ static time_t lastCleanupTime = time(0); time_t now = time(0); if (OPEN_FDS_LOW_WATERMARK == 0) { struct rlimit rlim; int res; res = getrlimit(RLIMIT_NOFILE, &rlim); if (res == 0) { OPEN_FDS_LOW_WATERMARK = rlim.rlim_cur / 2; // bump the soft limit to the hard limit rlim.rlim_cur = rlim.rlim_max; if (setrlimit(RLIMIT_NOFILE, &rlim) == 0) { KFS_LOG_VA_DEBUG("Setting # of open files to: %ld", rlim.rlim_cur); OPEN_FDS_LOW_WATERMARK = rlim.rlim_cur / 2; } } } // not enough time has elapsed if (periodic && (now - lastCleanupTime < INACTIVE_FDS_CLEANUP_INTERVAL_SECS)) return; int totalOpenFds = globals().ctrOpenDiskFds.GetValue() + globals().ctrOpenNetFds.GetValue(); // if we haven't cleaned up in 5 mins or if we too many fd's that // are open, clean up. if ((!periodic) && (totalOpenFds < OPEN_FDS_LOW_WATERMARK)) { return; } // either we are periodic cleaning or we have too many FDs open lastCleanupTime = time(0); for_each(mChunkTable.begin(), mChunkTable.end(), InactiveFdCleaner(now));}stringKFS::GetStaleChunkPath(const string &partition){ return partition + "/lost+found/";}int64_tChunkManager::GetTotalSpace() {#if defined(__APPLE__) for (uint32_t i = 0; i < mChunkDirs.size(); i++) { mChunkDirs[i].availableSpace = mTotalSpace; } return mTotalSpace;#endif int64_t availableSpace = 0; set<unsigned long> seenDrives; for (uint32_t i = 0; i < mChunkDirs.size(); i++) { // report the space based on availability#if defined(__APPLE__) || defined(__sun__) || (!defined(__i386__)) struct statvfs result; if (statvfs(mChunkDirs[i].dirname.c_str(), &result) < 0) { int err = errno; KFS_LOG_VA_INFO("statvfs failed on %s with error: %d", mChunkDirs[i].dirname.c_str(), err); mChunkDirs[i].availableSpace = 0; if (err == EIO) { // We can't stat the directory. // Notify metaserver that all blocks on this // drive are lost NotifyMetaChunksLost(mChunkDirs[i].dirname); } continue; }#else // we are on i386 on linux struct statvfs64 result; if (statvfs64(mChunkDirs[i].dirname.c_str(), &result) < 0) { int err = errno; KFS_LOG_VA_INFO("statvfs failed on %s with error: %d", mChunkDirs[i].dirname.c_str(), err); mChunkDirs[i].availableSpace = 0; if (err == EIO) { // We can't stat the directory. // Notify metaserver that all blocks on this // drive are lost NotifyMetaChunksLost(mChunkDirs[i].dirname); } continue; }#endif if (seenDrives.find(result.f_fsid) != seenDrives.end()) { // if we have seen the drive where this directory is, then // we have already accounted for how much is free on the drive availableSpace += mChunkDirs[i].usedSpace; mChunkDirs[i].availableSpace = result.f_bavail * result.f_frsize + mChunkDirs[i].usedSpace; } else { // result.* is how much is available on disk; mUsedSpace is how // much we used up with chunks; so, the total storage available on // the drive is the sum of the two. if we don't add mUsedSpace, // then all the chunks we write will get to use the space on disk and // won't get acounted for in terms of drive space. mChunkDirs[i].availableSpace = result.f_bavail * result.f_frsize + mChunkDirs[i].usedSpace; availableSpace += result.f_bavail * result.f_frsize + mChunkDirs[i].usedSpace; seenDrives.insert(result.f_fsid); } KFS_LOG_VA_DEBUG("Dir: %s has space %ld", mChunkDirs[i].dirname.c_str(), mChunkDirs[i].availableSpace); } // we got all the info...so report true value return min(availableSpace, mTotalSpace);}voidChunkManagerTimeoutImpl::Timeout(){ SubmitOp(&mTimeoutOp);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -