⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsops.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 4 页
字号:
{    kfsSeq_t seq;    PingOp *po;    parseCommon(prop, seq);    po = new PingOp(seq);    *c = po;    return 0;}intparseHandlerDumpChunkMap(Properties &prop, KfsOp **c){    kfsSeq_t seq;    DumpChunkMapOp *po;    parseCommon(prop, seq);    po = new DumpChunkMapOp(seq);    *c = po;    return 0;}intparseHandlerStats(Properties &prop, KfsOp **c){    kfsSeq_t seq;    StatsOp *so;    parseCommon(prop, seq);    so = new StatsOp(seq);    *c = so;    return 0;}////// Generic event handler for tracking completion of an event/// execution.  Push the op to the logger and the net thread will pick/// it up and dispatch it.///intKfsOp::HandleDone(int code, void *data){    gLogger.Submit(this);    return 0;}////// A read op finished.  Set the status and the # of bytes read/// alongwith the data and notify the client.///intReadOp::HandleDone(int code, void *data){    IOBuffer *b;    off_t chunkSize = 0;    // DecrementCounter(CMD_READ);#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    if (code == EVENT_DISK_ERROR) {        status = -1;        if (data != NULL) {            status = *(int *) data;            KFS_LOG_VA_INFO("Disk error: errno = %d, chunkid = %lld", status, chunkId);        }        gChunkManager.ChunkIOFailed(chunkId, status);    }    else if (code == EVENT_DISK_READ) {        if (dataBuf == NULL) {            dataBuf = new IOBuffer();        }        b = (IOBuffer *) data;        // Order matters...when we append b, we take the data from b        // and put it into our buffer.        dataBuf->Append(b);        // verify checksum        gChunkManager.ReadChunkDone(this);        numBytesIO = dataBuf->BytesConsumable();        if (status == 0)            // checksum verified            status = numBytesIO;    }    if (status >= 0) {        if (numBytesIO < (int) CHECKSUM_BLOCKSIZE) {            uint32_t cks;            cks = ComputeBlockChecksum(dataBuf, numBytesIO);            checksum.push_back(cks);        } else {            checksum = ComputeChecksums(dataBuf, numBytesIO);        }        // send the disk IO time back to client for telemetry reporting        struct timeval timeNow;        gettimeofday(&timeNow, NULL);        diskIOTime = ComputeTimeDiff(startTime, timeNow);    }    gChunkManager.ChunkSize(chunkId, &chunkSize);    if (wop != NULL) {        // if the read was triggered by a write, then resume execution of write        wop->Execute();        return 0;    }    if ((chunkSize > 0 && (offset + numBytesIO >= (off_t) chunkSize)) &&        (!gLeaseClerk.IsLeaseValid(chunkId))) {        // If we have read the full chunk, close out the fd.  The        // observation is that reads are sequential and when we        // finished a chunk, the client will move to the next one.        gChunkManager.CloseChunk(chunkId);    }    gLogger.Submit(this);    return 0;}intReadOp::HandleReplicatorDone(int code, void *data){    // notify the replicator object that the read it had submitted to    // the peer has finished.     return clnt->HandleEvent(code, data);}intWriteOp::HandleWriteDone(int code, void *data){    // DecrementCounter(CMD_WRITE);    if (isFromReReplication) {        if (code == EVENT_DISK_WROTE) {            status = *(int *) data;            numBytesIO = status;        }        else {            status = -1;        }        return clnt->HandleEvent(code, this);    }    assert(wpop != NULL);    if (code == EVENT_DISK_ERROR) {        // eat up everything that was sent        dataBuf->Consume(numBytes);        status = -1;        if (data != NULL) {            status = *(int *) data;            KFS_LOG_VA_INFO("Disk error: errno = %d, chunkid = %lld", status, chunkId);        }        gChunkManager.ChunkIOFailed(chunkId, status);        wpop->HandleEvent(EVENT_CMD_DONE, this);        return 0;    }    else if (code == EVENT_DISK_WROTE) {        status = *(int *) data;        numBytesIO = status;        SET_HANDLER(this, &WriteOp::HandleSyncDone);        // queue the sync op only if we are all done with writing to        // this chunk:        if ((size_t) numBytesIO != (size_t) numBytes) {            // write didn't do everything that was asked; we need to retry            KFS_LOG_VA_INFO("Write on chunk did less: asked = %ld, did = %ld; asking clnt to retry",                            numBytes, numBytesIO);            status = -EAGAIN;        }        waitForSyncDone = false;        if (offset + numBytesIO >= (off_t) KFS::CHUNKSIZE) {            // If we have written till the end of the chunk, close out the            // fd.  The observation is that writes are sequential and when            // we finished a chunk, the client will move to the next            // one.#if 0            if (gChunkManager.Sync(this) < 0) {                KFS_LOG_DEBUG("Sync failed...");                // eat up everything that was sent                dataBuf->Consume(numBytes);                // Sync failed                status = -1;                // clnt->HandleEvent(EVENT_CMD_DONE, this);                return wsop->HandleEvent(EVENT_CMD_DONE, this);            }#endif        }        if (!waitForSyncDone) {            // KFS_LOG_DEBUG("Queued sync; not waiting for sync to finish...");            // sync is queued; no need to wait for it to finish            return HandleSyncDone(EVENT_SYNC_DONE, 0);        }    }    return 0;}////// A write op finished.  Set the status and the # of bytes written/// and notify the owning write commit op.///intWriteOp::HandleSyncDone(int code, void *data){    // eat up everything that was sent    dataBuf->Consume(numBytes);    if (code != EVENT_SYNC_DONE) {        status = -1;    }    if (status >= 0) {        gChunkManager.ChunkSize(chunkId, &chunkSize);        SET_HANDLER(this, &WriteOp::HandleLoggingDone);        gLogger.Submit(this);    }    else {        wpop->HandleEvent(EVENT_CMD_DONE, this);    }        return 0;}intWriteOp::HandleLoggingDone(int code, void *data){    assert(wpop != NULL);    return wpop->HandleEvent(EVENT_CMD_DONE, this);}////// Handlers for ops that need logging.  This method is invoked by the/// logger thread.  So, don't access any globals here---otherwise, we/// need to add locking.///voidAllocChunkOp::Log(ofstream &ofs){    ofs << "ALLOCATE " << chunkId << ' ' << fileId << ' ';    ofs << chunkVersion << "\n";    assert(!ofs.fail());}/// Resetting a chunk's version # is equivalent to doing an allocation/// of an existing chunk.voidChangeChunkVersOp::Log(ofstream &ofs){    ofs << "CHANGE_CHUNK_VERS " << chunkId << ' ' << fileId << ' ';    ofs << chunkVersion << "\n";    assert(!ofs.fail());}voidDeleteChunkOp::Log(ofstream &ofs){    ofs << "DELETE " << chunkId << "\n";    assert(!ofs.fail());}voidWriteOp::Log(ofstream &ofs){    ofs << "WRITE " << chunkId << ' ' << chunkSize << ' ';    ofs << offset << ' ';    ofs << checksums.size();    for (vector<uint32_t>::size_type i = 0; i < checksums.size(); i++) {        ofs << ' ' << checksums[i];    }    ofs << "\n";    assert(!ofs.fail());}voidTruncateChunkOp::Log(ofstream &ofs){    ofs << "TRUNCATE " << chunkId << ' ' << chunkSize << "\n";    assert(!ofs.fail());}// For replicating a chunk, we log nothing.  We don't write out info// about the chunk in checkpoint files until replication is complete.// This way, if we ever crash during chunk-replication, we'll simply// nuke out the chunk on startup.voidReplicateChunkOp::Log(ofstream &ofs){}////// Handlers for executing the various ops.  If the op execution is/// "in-line", that is the op doesn't block, then when the execution/// is finished, the op is handed off to the logger; the net thread/// will drain the logger and then notify the client.  Otherwise, the op is queued/// for execution and the client gets notified whenever the op/// finishes execution.///voidOpenOp::Execute(){    status = gChunkManager.OpenChunk(chunkId, openFlags);    UpdateCounter(CMD_OPEN);    //clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);}voidCloseOp::Execute(){    UpdateCounter(CMD_CLOSE);    gChunkManager.CloseChunk(chunkId);    status = 0;    // clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);}voidAllocChunkOp::Execute(){    UpdateCounter(CMD_ALLOC_CHUNK);    int res;    // page in the chunk meta-data if needed    if (!gChunkManager.NeedToReadChunkMetadata(chunkId)) {        HandleChunkMetaReadDone(0, NULL);        return;    }        SET_HANDLER(this, &AllocChunkOp::HandleChunkMetaReadDone);    if ((res = gChunkManager.ReadChunkMetadata(chunkId, this)) < 0) {        KFS_LOG_VA_INFO("Unable read chunk metadata for chunk: %lld; error = %d", chunkId, res);        status = -EINVAL;        gLogger.Submit(this);        return;    }}intAllocChunkOp::HandleChunkMetaReadDone(int code, void *data){    status = gChunkManager.AllocChunk(fileId, chunkId, chunkVersion);    // if (status < 0) {    // failed; nothing to log    // clnt->HandleEvent(EVENT_CMD_DONE, this);    // return;    // }    if (leaseId >= 0) {        gCtrWriteMaster.Update(1);        gLeaseClerk.RegisterLease(chunkId, leaseId);    }    if (status < 0)        gLogger.Submit(this);    // Submit the op and wait to be notified    SET_HANDLER(this, &AllocChunkOp::HandleChunkMetaWriteDone);    status = gChunkManager.WriteChunkMetadata(chunkId, this);    if (status < 0)        gLogger.Submit(this);    return 0;}voidDeleteChunkOp::Execute(){    UpdateCounter(CMD_DELETE_CHUNK);    status = gChunkManager.DeleteChunk(chunkId);    //     if (status < 0)    //         // failed; nothing to log    //         clnt->HandleEvent(EVENT_CMD_DONE, this);    //     else    gLogger.Submit(this);}voidTruncateChunkOp::Execute(){    UpdateCounter(CMD_TRUNCATE_CHUNK);    SET_HANDLER(this, &TruncateChunkOp::HandleChunkMetaReadDone);    if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) {        status = -EINVAL;        gLogger.Submit(this);    }}intTruncateChunkOp::HandleChunkMetaReadDone(int code, void *data){    status = *(int *) data;    if (status < 0) {        gLogger.Submit(this);        return 0;    }    status = gChunkManager.TruncateChunk(chunkId, chunkSize);    if (status < 0) {        gLogger.Submit(this);        return 0;    }    SET_HANDLER(this, &TruncateChunkOp::HandleChunkMetaWriteDone);    status = gChunkManager.WriteChunkMetadata(chunkId, this);    if (status < 0)        gLogger.Submit(this);    return 0;}voidReplicateChunkOp::Execute(){    RemoteSyncSMPtr peer = gChunkServer.FindServer(location);    UpdateCounter(CMD_REPLICATE_CHUNK);#ifdef DEBUG    string s = location.ToString();    KFS_LOG_VA_DEBUG("Replicating chunk: %ld from %s",                     chunkId, s.c_str());#endif    if (!peer) {        string s = location.ToString();        KFS_LOG_VA_INFO("Unable to find peer: %s", s.c_str());        status = -1;        gLogger.Submit(this);        return;    }    replicator.reset(new Replicator(this));    // Get the animation going...    SET_HANDLER(this, &ReplicateChunkOp::HandleDone);    replicator->Start(peer);}voidChangeChunkVersOp::Execute(){    UpdateCounter(CMD_CHANGE_CHUNK_VERS);    SET_HANDLER(this, &ChangeChunkVersOp::HandleChunkMetaReadDone);    if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) {        status = -EINVAL;        gLogger.Submit(this);    }}intChangeChunkVersOp::HandleChunkMetaReadDone(int code, void *data){    status = *(int *) data;    if (status < 0) {        gLogger.Submit(this);        return 0;    }    status = gChunkManager.ChangeChunkVers(fileId, chunkId,                                            chunkVersion);    if (status < 0) {        gLogger.Submit(this);        return 0;    }            SET_HANDLER(this, &ChangeChunkVersOp::HandleChunkMetaWriteDone);    status = gChunkManager.WriteChunkMetadata(chunkId, this);    if (status < 0)        gLogger.Submit(this);    return 0;}// This is the heartbeat sent by the meta servervoidHeartbeatOp::Execute(){    UpdateCounter(CMD_HEARTBEAT);    totalSpace = gChunkManager.GetTotalSpace();    usedSpace = gChunkManager.GetUsedSpace();    numChunks = gChunkManager.GetNumChunks();    status = 0;    // clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);}voidRetireOp::Execute(){    // we are told to retire...so, bow out    KFS_LOG_INFO("We have been asked to retire...bye");    StopNetProcessor(0);}voidStaleChunksOp::Execute(){    vector<kfsChunkId_t>::size_type i;    for (i = 0; i < staleChunkIds.size(); ++i) {        gChunkManager.StaleChunk(staleChunkIds[i]);    }    status = 0;    // clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);}voidReadOp::Execute(){    UpdateCounter(CMD_READ);    SET_HANDLER(this, &ReadOp::HandleChunkMetaReadDone);    if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) {        status = -EINVAL;        gLogger.Submit(this);    }}intReadOp::HandleChunkMetaReadDone(int code, void *data){    status = *(int *) data;    if (status < 0) {        gLogger.Submit(this);        return 0;    }    SET_HANDLER(this, &ReadOp::HandleDone);        status = gChunkManager.ReadChunk(this);    if (status < 0) {        // clnt->HandleEvent(EVENT_CMD_DONE, this);        if (wop == NULL) {            // we are done with this op; this needs draining            gLogger.Submit(this);        }        else {            // resume execution of write            wop->Execute();        }    }    return 0;}//// Handling of writes is done in multiple steps:// 1. The client allocates a chunk from the metaserver; the metaserver// picks a set of hosting chunkservers and nominates one of the// server's as the "master" for the transaction.// 2. The client pushes data for a write via a WritePrepareOp to each// of the hosting chunkservers (in any order).// 3. The chunkserver in turn enqueues the write with the ChunkManager// object.  The ChunkManager assigns an id to the write.   NOTE:// nothing is written out to disk at this point.// 4. After the client has pushed out data to replica chunk-servers// and gotten write-id's, the client does a WriteSync to the master.// 5. The master retrieves the write corresponding to the write-id and// commits the write to disk.// 6. The master then sends out a WriteCommit to each of the replica// chunkservers asking them to commit the write; this commit message// is sent concurrently to all the replicas.// 7. After the replicas reply, the master replies to the client with// status from individual servers and how much got written on each.//

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -