⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chunkmanager.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 4 页
字号:
voidChunkManager::ReadChunkDone(ReadOp *op){    ChunkInfoHandle_t *cih = NULL;        if ((GetChunkInfoHandle(op->chunkId, &cih) < 0) ||        (op->chunkVersion != cih->chunkInfo.chunkVersion)) {        AdjustDataRead(op);        if (cih) {            KFS_LOG_VA_INFO("Version # mismatch(have=%u vs asked=%ld...",                             cih->chunkInfo.chunkVersion, op->chunkVersion);        }        op->status = -KFS::EBADVERS;        return;    }    ZeroPad(op->dataBuf);    assert(op->dataBuf->BytesConsumable() >= (int) CHECKSUM_BLOCKSIZE);    // either nothing to verify or it better match    bool mismatch = false;    // figure out the block we are starting from and grab all the checksums    vector<uint32_t>::size_type i, checksumBlock = OffsetToChecksumBlockStart(op->offset);    vector<uint32_t> checksums = ComputeChecksums(op->dataBuf, op->dataBuf->BytesConsumable());    // the checksums should be loaded...    if (!cih->chunkInfo.AreChecksumsLoaded()) {        // the read took too long; the checksums got paged out.  ask the client to retry        KFS_LOG_VA_INFO("Checksums for chunk %lld got paged out; returning EAGAIN to client",                        cih->chunkInfo.chunkId);        op->status = -EAGAIN;        return;    }    cih->chunkInfo.VerifyChecksumsLoaded();    for (i = 0; i < checksums.size() &&             checksumBlock < MAX_CHUNK_CHECKSUM_BLOCKS;         checksumBlock++, i++) {        if ((cih->chunkInfo.chunkBlockChecksum[checksumBlock] == 0) ||            (checksums[i] == cih->chunkInfo.chunkBlockChecksum[checksumBlock])) {            continue;        }        mismatch = true;        break;    }    if (!mismatch) {        // for checksums to verify, we did reads in multiples of        // checksum block sizes.  so, get rid of the extra        AdjustDataRead(op);        return;    }    // die ("checksum mismatch");    KFS_LOG_VA_ERROR("Checksum mismatch for chunk=%ld, offset=%ld, bytes = %ld: expect: %u, computed: %u ",                  op->chunkId, op->offset, op->numBytesIO,                  cih->chunkInfo.chunkBlockChecksum[checksumBlock],                  checksums[i]);    op->status = -KFS::EBADCKSUM;    // Notify the metaserver that the chunk we have is "bad"; the    // metaserver will re-replicate this chunk.    NotifyMetaCorruptedChunk(op->chunkId);        // Take out the chunk from our side    StaleChunk(op->chunkId);}voidChunkManager::NotifyMetaCorruptedChunk(kfsChunkId_t chunkId){    ChunkInfoHandle_t *cih;    if (GetChunkInfoHandle(chunkId, &cih) < 0) {        KFS_LOG_VA_ERROR("Unable to notify metaserver of corrupt chunk: %lld",                      chunkId);        return;    }    KFS_LOG_VA_INFO("Notifying metaserver of corrupt chunk (%ld) in file %lld",                 cih->chunkInfo.fileId, chunkId);    // This op will get deleted when we get an ack from the metaserver    CorruptChunkOp *ccop = new CorruptChunkOp(0, cih->chunkInfo.fileId,                                               chunkId);    gMetaServerSM.EnqueueOp(ccop);}//// directory with dirname is unaccessable; maybe drive failed.  so,// notify metaserver of lost blocks.  the metaserver will then// re-replicate.//voidChunkManager::NotifyMetaChunksLost(const string &dirname){    ChunkInfoHandle_t *cih;    CMI iter = mChunkTable.begin();        while (iter != mChunkTable.end()) {        cih = iter->second;        if (cih->chunkInfo.GetDirname() != dirname) {            ++iter;            continue;        }        KFS_LOG_VA_INFO("Notifying metaserver of lost chunk (%ld) in file %lld in dir %s",                        cih->chunkInfo.fileId, cih->chunkInfo.chunkId, dirname.c_str());        // This op will get deleted when we get an ack from the metaserver        CorruptChunkOp *ccop = new CorruptChunkOp(0, cih->chunkInfo.fileId,                                                   cih->chunkInfo.chunkId);        gMetaServerSM.EnqueueOp(ccop);        // get rid of chunkid from our list        CMI prev = iter;        ++iter;        mChunkTable.erase(cih->chunkInfo.chunkId);        delete cih;    }    }voidChunkManager::ZeroPad(IOBuffer *buffer){    int bytesFilled = buffer->BytesConsumable();    if ((bytesFilled % CHECKSUM_BLOCKSIZE) == 0)        return;    int numToZero = CHECKSUM_BLOCKSIZE - (bytesFilled % CHECKSUM_BLOCKSIZE);    if (numToZero > 0) {        // pad with 0's        buffer->ZeroFill(numToZero);    }}voidChunkManager::AdjustDataRead(ReadOp *op){    size_t extraRead = op->offset - OffsetToChecksumBlockStart(op->offset);    if (extraRead > 0)        op->dataBuf->Consume(extraRead);    if (op->dataBuf->BytesConsumable() > op->numBytesIO)        op->dataBuf->Trim(op->numBytesIO);}uint32_t ChunkManager::GetChecksum(kfsChunkId_t chunkId, off_t offset){    uint32_t checksumBlock = OffsetToChecksumBlockNum(offset);    ChunkInfoHandle_t *cih;    if (GetChunkInfoHandle(chunkId, &cih) < 0)        return 0;    // the checksums should be loaded...    cih->chunkInfo.VerifyChecksumsLoaded();    assert(checksumBlock <= MAX_CHUNK_CHECKSUM_BLOCKS);    return cih->chunkInfo.chunkBlockChecksum[checksumBlock];}DiskConnection *ChunkManager::SetupDiskConnection(kfsChunkId_t chunkId, KfsOp *op){    ChunkInfoHandle_t *cih;    DiskConnection *diskConnection;    CMI tableEntry = mChunkTable.find(chunkId);    if (tableEntry == mChunkTable.end()) {        return NULL;    }    cih = tableEntry->second;    if ((!cih->dataFH) || (cih->dataFH->mFd < 0)) {        CleanupInactiveFds();        if (OpenChunk(chunkId, O_RDWR) < 0)             return NULL;    }     cih->lastIOTime = time(0);    diskConnection = new DiskConnection(cih->dataFH, op);    return diskConnection;}    voidChunkManager::CancelChunkOp(KfsCallbackObj *cont, kfsChunkId_t chunkId){    // Cancel the chunk operations scheduled by KfsCallbackObj on chunkId.    // XXX: Fill it...}//// dump out the contents of the chunkTable to disk//voidChunkManager::Checkpoint(){    CheckpointOp *cop;    // on the macs, i can't declare CMI iter;    CMI iter = mChunkTable.begin();    mLastCheckpointTime = time(NULL);    if (!mIsChunkTableDirty)        return;    // KFS_LOG_VA_DEBUG("Checkpointing state");    cop = new CheckpointOp(1);    #if 0    // there are no more checkpoints on the chunkserver...this will all go    // we are using this to rotate logs...    ChunkInfoHandle_t *cih;    for (iter = mChunkTable.begin(); iter != mChunkTable.end(); ++iter) {        cih = iter->second;        // If a chunk is being replicated, then it is not yet a part        // of the namespace.  When replication is done, it becomes a        // part of the namespace.  This model keeps recovery simple:        // if we die in the midst of replicating a chunk, on restart,        // we will the chunk as an orphan and throw it away.        if (cih->isBeingReplicated)            continue;        cop->data << cih->chunkInfo.fileId << ' ';        cop->data << cih->chunkInfo.chunkId << ' ';        cop->data << cih->chunkInfo.chunkSize << ' ';        cop->data << cih->chunkInfo.chunkVersion << ' ';        cop->data << MAX_CHUNK_CHECKSUM_BLOCKS << ' ';        for (uint32_t i = 0; i < MAX_CHUNK_CHECKSUM_BLOCKS; ++i) {            cop->data << cih->chunkInfo.chunkBlockChecksum[i] << ' ';        }        cop->data << endl;    }#endif        gLogger.Submit(cop);    // Now, everything is clean...    mIsChunkTableDirty = false;}//// Get all the chunk directory entries from all the places we can// store the chunks into a single array.//intChunkManager::GetChunkDirsEntries(struct dirent ***namelist){    struct dirent **entries;    vector<struct dirent **> dirEntries;    vector<int> dirEntriesCount;    int res, numChunkFiles = 0;    uint32_t i;    *namelist = NULL;    for (i = 0; i < mChunkDirs.size(); i++) {        res = scandir(mChunkDirs[i].dirname.c_str(), &entries, 0, alphasort);        if (res < 0) {            KFS_LOG_VA_INFO("Unable to open %s", mChunkDirs[i].dirname.c_str());            for (i = 0; i < dirEntries.size(); i++) {                entries = dirEntries[i];                for (int j = 0; j < dirEntriesCount[i]; j++)                    free(entries[j]);                free(entries);            }            dirEntries.clear();            return -1;        }        dirEntries.push_back(entries);        dirEntriesCount.push_back(res);        numChunkFiles += res;    }        // Get all the directory entries into one giganto array    *namelist = (struct dirent **) malloc(sizeof(struct dirent **) * numChunkFiles);    numChunkFiles = 0;    for (i = 0; i < dirEntries.size(); i++) {        int count = dirEntriesCount[i];        entries = dirEntries[i];        memcpy((*namelist) + numChunkFiles, entries, count * sizeof(struct dirent **));        numChunkFiles += count;    }    return numChunkFiles;}voidChunkManager::GetChunkPathEntries(vector<string> &pathnames){    uint32_t i;    struct dirent **entries = NULL;    int res;    for (i = 0; i < mChunkDirs.size(); i++) {        res = scandir(mChunkDirs[i].dirname.c_str(), &entries, 0, alphasort);        if (res < 0) {            KFS_LOG_VA_INFO("Unable to open %s", mChunkDirs[i].dirname.c_str());            continue;        }        for (int j = 0; j < res; j++) {            string s = mChunkDirs[i].dirname + "/" + entries[j]->d_name;            pathnames.push_back(s);            free(entries[j]);        }        free(entries);    }}voidChunkManager::Restart(){    int version;    version = gLogger.GetVersionFromCkpt();    if (version == gLogger.GetLoggerVersionNum()) {        Restore();    } else {        std::cout << "Unsupported version...copy out the data and copy it back in...." << std::endl;        exit(-1);    }    // Write out a new checkpoint file with just version and set it at 2    gLogger.Checkpoint(NULL);}voidChunkManager::Restore(){    // sort all the chunk names alphabetically in each of the    // directories    vector<string> chunkPathnames;    struct stat buf;    int res;    uint32_t i, numChunkFiles;    GetChunkPathEntries(chunkPathnames);    numChunkFiles = chunkPathnames.size();    // each chunk file is of the form: <fileid>.<chunkid>.<chunkversion>      // parse the filename to extract out the chunk info    for (i = 0; i < numChunkFiles; ++i) {        string s = chunkPathnames[i];        ChunkInfoHandle_t *cih;        res = stat(s.c_str(), &buf);        if ((res < 0) || (!S_ISREG(buf.st_mode)))            continue;        MakeChunkInfoFromPathname(s, buf.st_size, &cih);        if (cih != NULL)            AddMapping(cih);        else {            KFS_LOG_VA_INFO("Deleting possibly duplicate file %s", s.c_str());            unlink(s.c_str());        }    }}voidChunkManager::AddMapping(ChunkInfoHandle_t *cih){    mNumChunks++;    mChunkTable[cih->chunkInfo.chunkId] = cih;    mUsedSpace += cih->chunkInfo.chunkSize;    UpdateDirSpace(cih, cih->chunkInfo.chunkSize);}voidChunkManager::ReplayAllocChunk(kfsFileId_t fileId, kfsChunkId_t chunkId,                               int64_t chunkVersion){    ChunkInfoHandle_t *cih;    mIsChunkTableDirty = true;    if (GetChunkInfoHandle(chunkId, &cih) == 0) {        // If the entry exists, just update the version        cih->chunkInfo.chunkVersion = chunkVersion;        mChunkTable[chunkId] = cih;        return;    }    mNumChunks++;    // after replay is done, when we verify entries in the table, we    // stat the file and fix up the sizes then.  so, no need to do    // anything here.    cih = new ChunkInfoHandle_t();    cih->chunkInfo.fileId = fileId;    cih->chunkInfo.chunkId = chunkId;    cih->chunkInfo.chunkVersion = chunkVersion;    mChunkTable[chunkId] = cih;}voidChunkManager::ReplayChangeChunkVers(kfsFileId_t fileId, kfsChunkId_t chunkId,                                    int64_t chunkVersion){    ChunkInfoHandle_t *cih;    if (GetChunkInfoHandle(chunkId, &cih) != 0)         return;    KFS_LOG_VA_DEBUG("Chunk %ld already exists; changing version # to %ld",                     chunkId, chunkVersion);        // Update the version #    cih->chunkInfo.chunkVersion = chunkVersion;    mChunkTable[chunkId] = cih;    mIsChunkTableDirty = true;}voidChunkManager::ReplayDeleteChunk(kfsChunkId_t chunkId){    ChunkInfoHandle_t *cih;    CMI tableEntry = mChunkTable.find(chunkId);    mIsChunkTableDirty = true;    if (tableEntry != mChunkTable.end()) {        cih = tableEntry->second;        mUsedSpace -= cih->chunkInfo.chunkSize;        mChunkTable.erase(chunkId);        delete cih;                mNumChunks--;        assert(mNumChunks >= 0);        if (mNumChunks < 0)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -