⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsops.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 4 页
字号:
voidDumpChunkMapOp::Execute(){   // Dump chunk map   gChunkManager.DumpChunkMap();   status = 0;   gLogger.Submit(this);}voidStatsOp::Execute(){    ostringstream os;    os << "Num aios: " << globals().diskManager.NumDiskIOOutstanding() << "\r\n";    os << "Num ops: " << gChunkServer.GetNumOps() << "\r\n";    globals().counterManager.Show(os);    stats = os.str();    status = 0;    // clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);}intAllocChunkOp::HandleChunkMetaWriteDone(int code, void *data){    gLogger.Submit(this);    return 0;}intTruncateChunkOp::HandleChunkMetaWriteDone(int code, void *data){    gLogger.Submit(this);    return 0;}intChangeChunkVersOp::HandleChunkMetaWriteDone(int code, void *data){    gLogger.Submit(this);    return 0;}////// Generate response for an op based on the KFS protocol.///voidKfsOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n\r\n";}voidSizeOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    if (status < 0) {        os << "Status: " << status << "\r\n\r\n";        return;    }    os << "Status: " << status << "\r\n";        os << "Size: " << size << "\r\n\r\n";}voidGetChunkMetadataOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    if (status < 0) {        os << "Status: " << status << "\r\n\r\n";        return;    }    os << "Status: " << status << "\r\n";        os << "Chunk-handle: " << chunkId << "\r\n";    os << "Chunk-version: " << chunkVersion << "\r\n";    os << "Size: " << chunkSize << "\r\n";    os << "Content-length: " << numBytesIO << "\r\n\r\n";}voidReadOp::Response(ostringstream &os){    // DecrementCounter(CMD_READ);    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    if (status < 0) {        os << "Status: " << status << "\r\n\r\n";        return;    }    os << "Status: " << status << "\r\n";    os << "DiskIOtime: " << diskIOTime << "\r\n";    os << "Checksum-entries: " << checksum.size() << "\r\n";    if (checksum.size() == 0) {        os << "Checksums: " << 0 << "\r\n";    } else {        os << "Checksums: ";        for (uint32_t i = 0; i < checksum.size(); i++)            os << checksum[i] << ' ';        os << "\r\n";    }    os << "Content-length: " << numBytesIO << "\r\n\r\n";}voidWriteIdAllocOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    if (status < 0) {        os << "Status: " << status << "\r\n\r\n";            return;    }    os << "Status: " << status << "\r\n";    // os << "Write-id: " << writeId << "\r\n\r\n";    os << "Write-id: " << writeIdStr << "\r\n\r\n";}voidWritePrepareOp::Response(ostringstream &os){    // no reply for a prepare...the reply is covered by sync    if (1)        return;    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    if (status < 0) {        os << "Status: " << status << "\r\n\r\n";            return;    }    os << "Status: " << status << "\r\n\r\n";}voidSizeOp::Request(ostringstream &os){    os << "SIZE \r\n";    os << "Cseq: " << seq << "\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Chunk-handle: " << chunkId << "\r\n";    os << "Chunk-version: " << chunkVersion << "\r\n\r\n";}voidGetChunkMetadataOp::Request(ostringstream &os){    os << "GET_CHUNK_METADATA \r\n";    os << "Cseq: " << seq << "\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Chunk-handle: " << chunkId << "\r\n\r\n";}voidReadOp::Request(ostringstream &os){    os << "READ \r\n";    os << "Cseq: " << seq << "\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Chunk-handle: " << chunkId << "\r\n";    os << "Chunk-version: " << chunkVersion << "\r\n";    os << "Offset: " << offset << "\r\n";    os << "Num-bytes: " << numBytes << "\r\n\r\n";}voidWriteIdAllocOp::Request(ostringstream &os){    os << "WRITE_ID_ALLOC\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Chunk-handle:" << chunkId << "\r\n";    os << "Chunk-version:" << chunkVersion << "\r\n";    os << "Offset: " << offset << "\r\n";    os << "Num-bytes: " << numBytes << "\r\n";    os << "Num-servers: " << numServers << "\r\n";    os << "Servers: " << servers << "\r\n\r\n";}voidWritePrepareFwdOp::Request(ostringstream &os){    os << "WRITE_PREPARE\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Chunk-handle:" << owner->chunkId << "\r\n";    os << "Chunk-version:" << owner->chunkVersion << "\r\n";    os << "Offset: " << owner->offset << "\r\n";    os << "Num-bytes: " << owner->numBytes << "\r\n";    os << "Checksum: " << owner->checksum << "\r\n";    os << "Num-servers: " << owner->numServers << "\r\n";    os << "Servers: " << owner->servers << "\r\n\r\n";}voidWriteSyncOp::Request(ostringstream &os){    os << "WRITE_SYNC\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Chunk-handle:" << chunkId << "\r\n";    os << "Chunk-version:" << chunkVersion << "\r\n";    os << "Num-servers: " << numServers << "\r\n";    os << "Servers: " << servers << "\r\n\r\n";}voidWriteSyncOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n\r\n";    }voidAllocChunkOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n\r\n";}voidHeartbeatOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n";    os << "Total-space: " << totalSpace << "\r\n";    os << "Used-space: " << usedSpace << "\r\n";    os << "Num-chunks: " << numChunks << "\r\n\r\n";}voidStaleChunksOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n\r\n";}voidReplicateChunkOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n";    if (status == 0) {        os << "File-handle: " << fid << "\r\n";        os << "Chunk-version: " << chunkVersion << "\r\n";    }    os << "\r\n";}voidPingOp::Response(ostringstream &os){    ServerLocation loc = gMetaServerSM.GetLocation();    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n";    os << "Meta-server-host: " << loc.hostname << "\r\n";    os << "Meta-server-port: " << loc.port << "\r\n";    os << "Total-space: " << totalSpace << "\r\n";    os << "Used-space: " << usedSpace << "\r\n\r\n";}voidDumpChunkMapOp::Response(ostringstream &os){    ostringstream v;    gChunkManager.DumpChunkMap(v);    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n";    os << "Content-length: " << v.str().length() << "\r\n\r\n";    if (v.str().length() > 0)       os << v.str();}voidStatsOp::Response(ostringstream &os){    os << "OK\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Status: " << status << "\r\n";    os << stats << "\r\n";}////////////////////////////////////////////////// Now the handle done's....////////////////////////////////////////////////intSizeOp::HandleDone(int code, void *data){    // notify the owning object that the op finished    clnt->HandleEvent(EVENT_CMD_DONE, this);    return 0;}intGetChunkMetadataOp::HandleDone(int code, void *data){    // notify the owning object that the op finished    clnt->HandleEvent(EVENT_CMD_DONE, this);    return 0;}intReplicateChunkOp::HandleDone(int code, void *data){    if (data != NULL)        chunkVersion = * (kfsSeq_t *) data;    else        chunkVersion = -1;    gLogger.Submit(this);    return 0;}intWritePrepareFwdOp::HandleDone(int code, void *data){    // data fwding is finished; notify owner    return owner->HandleEvent(EVENT_CMD_DONE, this);}class ReadChunkMetaNotifier {    int res;public:    ReadChunkMetaNotifier(int r) : res(r) { }    void operator()(KfsOp *op) {        op->HandleEvent(EVENT_CMD_DONE, &res);    }};intReadChunkMetaOp::HandleDone(int code, void *data){    int res = -EINVAL;    if (code == EVENT_DISK_ERROR) {        status = -1;        if (data != NULL) {            status = *(int *) data;            KFS_LOG_VA_INFO("Disk error: errno = %d, chunkid = %lld", status, chunkId);            gChunkManager.ChunkIOFailed(chunkId, status);            res = status;        }    }    else if (code == EVENT_DISK_READ) {        IOBuffer *dataBuf = (IOBuffer *) data;            if (dataBuf->BytesConsumable() >= (int) sizeof(DiskChunkInfo_t)) {            DiskChunkInfo_t dci;                        dataBuf->CopyOut((char *) &dci, sizeof(DiskChunkInfo_t));            res = gChunkManager.SetChunkMetadata(dci);        }    }        gChunkManager.ReadChunkMetadataDone(chunkId);    clnt->HandleEvent(EVENT_CMD_DONE, (void *) &res);    for_each(waiters.begin(), waiters.end(), ReadChunkMetaNotifier(res));    delete this;    return 0;}WriteOp::~WriteOp(){    if (isWriteIdHolder) {        // track how long it took for the write to finish up:        // enqueueTime tracks when the last write was done to this        // writeid        struct timeval lastWriteTime;        lastWriteTime.tv_sec = enqueueTime;        lastWriteTime.tv_usec = 0;        float timeSpent = ComputeTimeDiff(startTime, lastWriteTime);        if (timeSpent < 1e-6)            timeSpent = 0.0;        if (timeSpent > 5.0) {            gChunkServer.SendTelemetryReport(CMD_WRITE, timeSpent);        }                // we don't want write id's to pollute stats        gettimeofday(&startTime, NULL);        gCtrWriteDuration.Update(1);        gCtrWriteDuration.Update(timeSpent);    }    if (dataBuf != NULL)        delete dataBuf;    if (rop != NULL) {        rop->wop = NULL;        assert(rop->dataBuf == NULL);        delete rop;    }    if (diskConnection)        diskConnection->Close();}WriteIdAllocOp::~WriteIdAllocOp(){    if (fwdedOp != NULL)        delete fwdedOp;}WritePrepareOp::~WritePrepareOp(){    // on a successful prepare, dataBuf should be moved to a write op.    assert((status != 0) || (dataBuf == NULL));    if (dataBuf != NULL)        delete dataBuf;    if (writeFwdOp != NULL)        delete writeFwdOp;    if (writeOp != NULL)        delete writeOp;}WriteSyncOp::~WriteSyncOp(){    off_t chunkSize = 0;    if (fwdedOp != NULL)        delete fwdedOp;    if (writeOp != NULL)        delete writeOp;    gChunkManager.ChunkSize(chunkId, &chunkSize);    if ((chunkSize > 0) &&         (chunkSize >= (off_t) KFS::CHUNKSIZE)) {        // we are done with all the writing to this chunk; so, close it        gChunkManager.CloseChunk(chunkId);    }}voidLeaseRenewOp::Request(ostringstream &os){    os << "LEASE_RENEW\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Chunk-handle:" << chunkId << "\r\n";    os << "Lease-id: " << leaseId << "\r\n";    os << "Lease-type: " << leaseType << "\r\n\r\n";}intLeaseRenewOp::HandleDone(int code, void *data){    KfsOp *op = (KfsOp *) data;    assert(op == this);    return op->clnt->HandleEvent(EVENT_CMD_DONE, data);}voidLeaseRelinquishOp::Request(ostringstream &os){    os << "LEASE_RELINQUISH\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Chunk-handle:" << chunkId << "\r\n";    os << "Lease-id: " << leaseId << "\r\n";    os << "Lease-type: " << leaseType << "\r\n\r\n";}intLeaseRelinquishOp::HandleDone(int code, void *data){    KfsOp *op = (KfsOp *) data;    assert(op == this);    delete this;    return 0;}voidCorruptChunkOp::Request(ostringstream &os){    os << "CORRUPT_CHUNK\r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Cseq: " << seq << "\r\n";    os << "File-handle:" << fid << "\r\n";    os << "Chunk-handle:" << chunkId << "\r\n\r\n";}intCorruptChunkOp::HandleDone(int code, void *data){    // Thank you metaserver for replying :-)    delete this;    return 0;}class PrintChunkInfo {    ostringstream &os;public:    PrintChunkInfo(ostringstream &o) : os(o) { }    void operator() (ChunkInfo_t &c) {        os << c.fileId << ' ';        os << c.chunkId << ' ';        os << c.chunkVersion << ' ';    }};voidHelloMetaOp::Request(ostringstream &os){    ostringstream chunkInfo;    os << "HELLO \r\n";    os << "Version: " << KFS_VERSION_STR << "\r\n";    os << "Cseq: " << seq << "\r\n";    os << "Chunk-server-name: " << myLocation.hostname << "\r\n";    os << "Chunk-server-port: " << myLocation.port << "\r\n";    os << "Cluster-key: " << clusterKey << "\r\n";    os << "MD5Sum: " << md5sum << "\r\n";    os << "Rack-id: " << rackId << "\r\n";    os << "Total-space: " << totalSpace << "\r\n";    os << "Used-space: " << usedSpace << "\r\n";    // now put in the chunk information    os << "Num-chunks: " << chunks.size() << "\r\n";        // figure out the content-length first...    for_each(chunks.begin(), chunks.end(), PrintChunkInfo(chunkInfo));    os << "Content-length: " << chunkInfo.str().length() << "\r\n\r\n";    os << chunkInfo.str().c_str();}// timeout op to the event processor goingvoidTimeoutOp::Execute(){    gChunkManager.Timeout();    gLeaseClerk.Timeout();    // do not delete "this" since it is either a member variable of    // the ChunkManagerTimeoutImpl or a static object.      // bump the seq # so we know how many times it got executed    seq++;}voidKillRemoteSyncOp::Execute(){    RemoteSyncSM *remoteSyncSM = static_cast<RemoteSyncSM *>(clnt);    assert(remoteSyncSM != NULL);    remoteSyncSM->Finish();}voidHelloMetaOp::Execute(){    totalSpace = gChunkManager.GetTotalSpace();    usedSpace = gChunkManager.GetUsedSpace();    gChunkManager.GetHostedChunks(chunks);    status = 0;    gLogger.Submit(this);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -