⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chunkmanager.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 4 页
字号:
            mNumChunks = 0;    }}voidChunkManager::ReplayWriteDone(kfsChunkId_t chunkId, off_t chunkSize,                              off_t offset, vector<uint32_t> checksums){    ChunkInfoHandle_t *cih;    int res;    res = GetChunkInfoHandle(chunkId, &cih);    if (res < 0)        return;    mIsChunkTableDirty = true;    mUsedSpace -= cih->chunkInfo.chunkSize;        cih->chunkInfo.chunkSize = chunkSize;    mUsedSpace += cih->chunkInfo.chunkSize;        for (vector<uint32_t>::size_type i = 0; i < checksums.size(); i++) {        off_t currOffset = offset + i * CHECKSUM_BLOCKSIZE;        size_t checksumBlock = OffsetToChecksumBlockNum(currOffset);                cih->chunkInfo.chunkBlockChecksum[checksumBlock] = checksums[i];    }}voidChunkManager::ReplayTruncateDone(kfsChunkId_t chunkId, off_t chunkSize){    ChunkInfoHandle_t *cih;    int res;    off_t lastChecksumBlock;    res = GetChunkInfoHandle(chunkId, &cih);    if (res < 0)        return;    mIsChunkTableDirty = true;    mUsedSpace -= cih->chunkInfo.chunkSize;        cih->chunkInfo.chunkSize = chunkSize;    mUsedSpace += cih->chunkInfo.chunkSize;    lastChecksumBlock = OffsetToChecksumBlockNum(chunkSize);    cih->chunkInfo.chunkBlockChecksum[lastChecksumBlock] = 0;}voidChunkManager::GetHostedChunks(vector<ChunkInfo_t> &result){    ChunkInfoHandle_t *cih;    // walk thru the table and pick up the chunk-ids    for (CMI iter = mChunkTable.begin(); iter != mChunkTable.end(); ++iter) {        cih = iter->second;        result.push_back(cih->chunkInfo);    }}intChunkManager::GetChunkInfoHandle(kfsChunkId_t chunkId, ChunkInfoHandle_t **cih){    CMI iter = mChunkTable.find(chunkId);    if (iter == mChunkTable.end()) {        *cih = NULL;        return -EBADF;    }    *cih = iter->second;    return 0;}intChunkManager::AllocateWriteId(WriteIdAllocOp *wi){    WriteOp *op;    ChunkInfoHandle_t *cih;    if (GetChunkInfoHandle(wi->chunkId, &cih) < 0)        return -EBADF;    if (wi->chunkVersion != cih->chunkInfo.chunkVersion) {        KFS_LOG_VA_INFO("Version # mismatch(have=%d vs asked=%lu...failing a write",                        cih->chunkInfo.chunkVersion, wi->chunkVersion);        return -EINVAL;    }    mWriteId++;    op = new WriteOp(wi->seq, wi->chunkId, wi->chunkVersion,                     wi->offset, wi->numBytes, NULL, mWriteId);    op->enqueueTime = time(NULL);    wi->writeId = mWriteId;    op->isWriteIdHolder = true;    mPendingWrites.push_back(op);    return 0;}intChunkManager::EnqueueWrite(WritePrepareOp *wp){    WriteOp *op;    ChunkInfoHandle_t *cih;    if (GetChunkInfoHandle(wp->chunkId, &cih) < 0)        return -EBADF;    if (wp->chunkVersion != cih->chunkInfo.chunkVersion) {        KFS_LOG_VA_INFO("Version # mismatch(have=%d vs asked=%lu...failing a write",                         cih->chunkInfo.chunkVersion, wp->chunkVersion);        return -EINVAL;    }    op = GetWriteOp(wp->writeId);    if (op->dataBuf == NULL)        op->dataBuf = wp->dataBuf;    else        op->dataBuf->Append(wp->dataBuf);    wp->dataBuf = NULL;    mPendingWrites.push_back(op);    return 0;}// Helper functor that matches pending writes by chunkid'sclass ChunkIdMatcher {    kfsChunkId_t myid;public:    ChunkIdMatcher(kfsChunkId_t s) : myid(s) { }    bool operator() (const WriteOp *r) {        return (r->chunkId == myid);    }};boolChunkManager::IsWritePending(kfsChunkId_t chunkId){    list<WriteOp *>::iterator i;    i = find_if(mPendingWrites.begin(), mPendingWrites.end(),                 ChunkIdMatcher(chunkId));    if (i == mPendingWrites.end())        return false;    return true;}int64_tChunkManager::GetChunkVersion(kfsChunkId_t c){    ChunkInfoHandle_t *cih;    if (GetChunkInfoHandle(c, &cih) < 0)        return -1;    return cih->chunkInfo.chunkVersion;}// Helper functor that matches write id's by sequence #'sclass WriteIdMatcher {    int64_t myid;public:    WriteIdMatcher(int64_t s) : myid(s) { }    bool operator() (const WriteOp *r) {        return (r->writeId == myid);    }};WriteOp *ChunkManager::GetWriteOp(int64_t writeId){    list<WriteOp *>::iterator i;    WriteOp *op;    i = find_if(mPendingWrites.begin(), mPendingWrites.end(),                 WriteIdMatcher(writeId));    if (i == mPendingWrites.end())        return NULL;    op = *i;    mPendingWrites.erase(i);    return op;}WriteOp *ChunkManager::CloneWriteOp(int64_t writeId){    list<WriteOp *>::iterator i;    WriteOp *op, *other;    i = find_if(mPendingWrites.begin(), mPendingWrites.end(),                 WriteIdMatcher(writeId));    if (i == mPendingWrites.end())        return NULL;    other = *i;    if (other->status < 0)        // if the write is "bad" already, don't add more data to it        return NULL;    // Since we are cloning, "touch" the time    other->enqueueTime = time(NULL);    // offset/size/buffer are to be filled in    op = new WriteOp(other->seq, other->chunkId, other->chunkVersion,                      0, 0, NULL, other->writeId);    return op;}voidChunkManager::SetWriteStatus(int64_t writeId, int status){    list<WriteOp *>::iterator i;    WriteOp *op;    i = find_if(mPendingWrites.begin(), mPendingWrites.end(),                 WriteIdMatcher(writeId));    if (i == mPendingWrites.end())        return;    op = *i;    op->status = status;    KFS_LOG_VA_INFO("Setting the status of writeid: %d to %d", writeId, status);}boolChunkManager::IsValidWriteId(int64_t writeId){    list<WriteOp *>::iterator i;    i = find_if(mPendingWrites.begin(), mPendingWrites.end(),                 WriteIdMatcher(writeId));    // valid if we don't hit the end of the list    return (i != mPendingWrites.end());}voidChunkManager::Timeout(){    time_t now = time(NULL);#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    if (now - mLastCheckpointTime > CKPT_TIME_INTERVAL) {        Checkpoint();        // if any writes have been around for "too" long, remove them        // and reclaim memory        ScavengePendingWrites();        // cleanup inactive fd's and thereby free up fd's        CleanupInactiveFds(true);    }}voidChunkManager::ScavengePendingWrites(){    list<WriteOp *>::iterator i;    WriteOp *op;    time_t now = time(NULL);    ChunkInfoHandle_t *cih;    i = mPendingWrites.begin();    while (i != mPendingWrites.end()) {        op = *i;        // The list is sorted by enqueue time        if (now - op->enqueueTime < MAX_PENDING_WRITE_LRU_SECS) {            break;        }        // if it exceeds 5 mins, retire the op        KFS_LOG_VA_DEBUG("Retiring write with id=%ld as it has been too long",                         op->writeId);        mPendingWrites.pop_front();        if ((GetChunkInfoHandle(op->chunkId, &cih) == 0) &&            (now - cih->lastIOTime >= INACTIVE_FDS_CLEANUP_INTERVAL_SECS)) {            // close the chunk only if it is inactive            CloseChunk(op->chunkId);        }        delete op;        i = mPendingWrites.begin();    }}intChunkManager::Sync(WriteOp *op){    if (!op->diskConnection) {        return -1;    }    return op->diskConnection->Sync(op->waitForSyncDone);}class InactiveFdCleaner {    time_t now;public:    InactiveFdCleaner(time_t n) : now(n) { }    void operator() (const std::tr1::unordered_map<kfsChunkId_t, ChunkInfoHandle_t *>::value_type v) {        ChunkInfoHandle_t *cih  = v.second;                if ((!cih->dataFH) || (cih->dataFH->mFd < 0) ||            (gLeaseClerk.IsLeaseValid(cih->chunkInfo.chunkId)) ||            (now - cih->lastIOTime < INACTIVE_FDS_CLEANUP_INTERVAL_SECS) ||            (cih->isBeingReplicated))            return;        // we have a valid file-id and it has been over 5 mins since we last did I/O on it.        KFS_LOG_VA_DEBUG("cleanup: closing fileid = %d, for chunk = %ld",                         cih->dataFH->mFd,                         cih->chunkInfo.chunkId);        cih->Release();    }};voidChunkManager::CleanupInactiveFds(bool periodic){    static time_t lastCleanupTime = time(0);    time_t now = time(0);    if (OPEN_FDS_LOW_WATERMARK == 0) {        struct rlimit rlim;        int res;        res = getrlimit(RLIMIT_NOFILE, &rlim);        if (res == 0) {            OPEN_FDS_LOW_WATERMARK = rlim.rlim_cur / 2;            // bump the soft limit to the hard limit            rlim.rlim_cur = rlim.rlim_max;            if (setrlimit(RLIMIT_NOFILE, &rlim) == 0) {                KFS_LOG_VA_DEBUG("Setting # of open files to: %ld",                                 rlim.rlim_cur);                OPEN_FDS_LOW_WATERMARK = rlim.rlim_cur / 2;            }        }    }    // not enough time has elapsed    if (periodic && (now - lastCleanupTime < INACTIVE_FDS_CLEANUP_INTERVAL_SECS))        return;    int totalOpenFds = globals().ctrOpenDiskFds.GetValue() +        globals().ctrOpenNetFds.GetValue();    // if we haven't cleaned up in 5 mins or if we too many fd's that    // are open, clean up.    if ((!periodic) && (totalOpenFds < OPEN_FDS_LOW_WATERMARK)) {        return;    }    // either we are periodic cleaning or we have too many FDs open    lastCleanupTime = time(0);    for_each(mChunkTable.begin(), mChunkTable.end(), InactiveFdCleaner(now));}stringKFS::GetStaleChunkPath(const string &partition){    return partition + "/lost+found/";}int64_tChunkManager::GetTotalSpace() {#if defined(__APPLE__)    for (uint32_t i = 0; i < mChunkDirs.size(); i++) {        mChunkDirs[i].availableSpace = mTotalSpace;    }    return mTotalSpace;#endif    int64_t availableSpace = 0;    set<unsigned long> seenDrives;    for (uint32_t i = 0; i < mChunkDirs.size(); i++) {        // report the space based on availability#if defined(__APPLE__) || defined(__sun__) || (!defined(__i386__))        struct statvfs result;        if (statvfs(mChunkDirs[i].dirname.c_str(), &result) < 0) {            int err = errno;            KFS_LOG_VA_INFO("statvfs failed on %s with error: %d", mChunkDirs[i].dirname.c_str(), err);            mChunkDirs[i].availableSpace = 0;            if (err == EIO) {                // We can't stat the directory.                // Notify metaserver that all blocks on this                // drive are lost                NotifyMetaChunksLost(mChunkDirs[i].dirname);            }            continue;        }#else        // we are on i386 on linux        struct statvfs64 result;        if (statvfs64(mChunkDirs[i].dirname.c_str(), &result) < 0) {            int err = errno;            KFS_LOG_VA_INFO("statvfs failed on %s with error: %d", mChunkDirs[i].dirname.c_str(), err);            mChunkDirs[i].availableSpace = 0;            if (err == EIO) {                // We can't stat the directory.                // Notify metaserver that all blocks on this                // drive are lost                NotifyMetaChunksLost(mChunkDirs[i].dirname);            }            continue;        }#endif        if (seenDrives.find(result.f_fsid) != seenDrives.end()) {            // if we have seen the drive where this directory is, then            // we have already accounted for how much is free on the drive            availableSpace += mChunkDirs[i].usedSpace;            mChunkDirs[i].availableSpace = result.f_bavail * result.f_frsize + mChunkDirs[i].usedSpace;        } else {            // result.* is how much is available on disk; mUsedSpace is how            // much we used up with chunks; so, the total storage available on            // the drive is the sum of the two.  if we don't add mUsedSpace,            // then all the chunks we write will get to use the space on disk and            // won't get acounted for in terms of drive space.            mChunkDirs[i].availableSpace = result.f_bavail * result.f_frsize + mChunkDirs[i].usedSpace;            availableSpace += result.f_bavail * result.f_frsize + mChunkDirs[i].usedSpace;            seenDrives.insert(result.f_fsid);        }        KFS_LOG_VA_DEBUG("Dir: %s has space %ld", mChunkDirs[i].dirname.c_str(), mChunkDirs[i].availableSpace);    }    // we got all the info...so report true value    return min(availableSpace, mTotalSpace);}voidChunkManagerTimeoutImpl::Timeout(){    SubmitOp(&mTimeoutOp);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -