⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsops.h

📁 nandflash文件系统源代码
💻 H
📖 第 1 页 / 共 2 页
字号:
    kfsChunkId_t chunkId;    int64_t	 chunkVersion;    off_t 	 offset;   /* input */    size_t 	 numBytes; /* input */    ssize_t	 numBytesIO; /* output: # of bytes actually written */    DiskConnectionPtr diskConnection; /* disk connection used for writing data */    IOBuffer *dataBuf; /* buffer with the data to be written */    off_t	 chunkSize; /* store the chunk size for logging purposes */    std::vector<uint32_t> checksums; /* store the checksum for logging purposes */    /*      * for writes that are smaller than a checksum block, we need to     * read the whole block in, compute the new checksum and then write     * out data.  This buffer holds the data read in from disk.    */    ReadOp *rop;    /*     * The owning write prepare op     */    WritePrepareOp *wpop;    /*     * Should we wait for aio_sync() to finish before replying to     * upstream clients? By default, we don't     */    bool waitForSyncDone;    /* Set if the write was triggered due to re-replication */    bool isFromReReplication;    int64_t      writeId;    // time at which the write was enqueued at the ChunkManager    time_t	 enqueueTime;    // for statistics purposes, have a "holder" op that tracks how long it took a write to finish.     bool isWriteIdHolder;    WriteOp(kfsChunkId_t c, int64_t v) :        KfsOp(CMD_WRITE, 0), chunkId(c), chunkVersion(v),        dataBuf(NULL), rop(NULL), wpop(NULL), waitForSyncDone(false),        isFromReReplication(false), isWriteIdHolder(false)    {        SET_HANDLER(this, &WriteOp::HandleWriteDone);    }    WriteOp(kfsSeq_t s, kfsChunkId_t c, int64_t v, off_t o, size_t n,             IOBuffer *b, int64_t id) :        KfsOp(CMD_WRITE, s), chunkId(c), chunkVersion(v),        offset(o), numBytes(n), numBytesIO(0),        dataBuf(b), chunkSize(0), rop(NULL), wpop(NULL),         waitForSyncDone(false), isFromReReplication(false),        writeId(id), isWriteIdHolder(false)    {        SET_HANDLER(this, &WriteOp::HandleWriteDone);    }    ~WriteOp();    void Reset() {        status = numBytesIO = 0;        SET_HANDLER(this, &WriteOp::HandleWriteDone);    }    void Response(std::ostringstream &os) { };    void Execute();    void Log(std::ofstream &ofs);    int HandleWriteDone(int code, void *data);        int HandleSyncDone(int code, void *data);    int HandleLoggingDone(int code, void *data);        std::string Show() const {        std::ostringstream os;                os << "write: chunkId = " << chunkId << " chunkversion = " << chunkVersion;        os << " offset: " << offset << " numBytes: " << numBytes;        return os.str();    }};// sent by the client to force data to diskstruct WriteSyncOp : public KfsOp {    kfsChunkId_t chunkId;        int64_t chunkVersion;    int64_t writeId; /* corresponds to the local write */    uint32_t numServers;    std::string servers;    WriteSyncOp *fwdedOp;    WriteOp *writeOp; // the underlying write that needs to be pushed to disk    uint32_t numDone; // if we did forwarding, we wait for                      // local/remote to be done; otherwise, we only                      // wait for local to be done    bool writeMaster; // infer from the server list if we are the "master" for doing the writes    WriteSyncOp(kfsSeq_t s, kfsChunkId_t c, int64_t v) :        KfsOp(CMD_WRITE_SYNC, s), chunkId(c), chunkVersion(v), writeId(-1),        numServers(0), fwdedOp(NULL), writeOp(NULL), numDone(0), writeMaster(false)    {         SET_HANDLER(this, &WriteSyncOp::HandleDone);            }    ~WriteSyncOp();    void Request(std::ostringstream &os);    void Response(std::ostringstream &os);    void Execute();    int ForwardToPeer(const ServerLocation &peer);    int HandlePeerReply(int code, void *data);    int HandleDone(int code, void *data);        std::string Show() const {        std::ostringstream os;                os << "write-sync: seq = " << seq << " chunkId = " << chunkId << " chunkversion = " << chunkVersion;        os << " write-id info: " << servers;        return os.str();    }};// OP for reading/writing out the meta-data associated with each chunk.  This// is an internally generated op (ops that generate this one are// allocate/write/truncate/change-chunk-vers). struct WriteChunkMetaOp : public KfsOp {    kfsChunkId_t chunkId;    DiskConnectionPtr diskConnection; /* disk connection used for writing data */    IOBuffer *dataBuf; /* buffer with the data to be written */    WriteChunkMetaOp(kfsChunkId_t c, KfsCallbackObj *o) :         KfsOp(CMD_WRITE_CHUNKMETA, 0, o), chunkId(c), dataBuf(NULL)      {        SET_HANDLER(this, &WriteChunkMetaOp::HandleDone);    }    ~WriteChunkMetaOp() {        delete dataBuf;    }    void Execute() { }    std::string Show() const {        std::ostringstream os;        os << "write-chunk-meta: chunkid = " << chunkId;        return os.str();    }    // Notify the op that is waiting for the write to finish that all    // is done    int HandleDone(int code, void *data) {        clnt->HandleEvent(EVENT_CMD_DONE, NULL);        delete this;        return 0;    }};struct ReadChunkMetaOp : public KfsOp {    kfsChunkId_t chunkId;    DiskConnectionPtr diskConnection; /* disk connection used for reading data */    // others ops that are also waiting for this particular meta-data    // read to finish; they'll get notified when the read is done    std::list<KfsOp *> waiters;    ReadChunkMetaOp(kfsChunkId_t c, KfsCallbackObj *o) :         KfsOp(CMD_READ_CHUNKMETA, 0, o), chunkId(c)    {        SET_HANDLER(this, &ReadChunkMetaOp::HandleDone);    }    void Execute() { }    std::string Show() const {        std::ostringstream os;        os << "read-chunk-meta: chunkid = " << chunkId;        return os.str();    }    void AddWaiter(KfsOp *op) {        waiters.push_back(op);    }    // Update internal data structures and then notify the waiting op    // that read of meta-data is done.    int HandleDone(int code, void *data);};struct ReadOp : public KfsOp {    kfsChunkId_t chunkId;    int64_t	 chunkVersion;    off_t 	 offset;   /* input */    size_t 	 numBytes; /* input */    ssize_t	 numBytesIO; /* output: # of bytes actually read */    DiskConnectionPtr diskConnection; /* disk connection used for reading data */    IOBuffer *dataBuf; /* buffer with the data read */    std::vector<uint32_t> checksum; /* checksum over the data that is sent back to client */    float diskIOTime; /* how long did the AIOs take */    /*     * for writes that require the associated checksum block to be     * read in, store the pointer to the associated write op.    */    WriteOp *wop;    ReadOp(kfsSeq_t s) :        KfsOp(CMD_READ, s), numBytesIO(0), dataBuf(NULL),        wop(NULL)    {        SET_HANDLER(this, &ReadOp::HandleDone);    }    ReadOp(WriteOp *w, off_t o, size_t n) :        KfsOp(CMD_READ, w->seq), chunkId(w->chunkId),        chunkVersion(w->chunkVersion), offset(o), numBytes(n),        numBytesIO(0), dataBuf(NULL), wop(w)    {        clnt = w;        SET_HANDLER(this, &ReadOp::HandleDone);    }    ~ReadOp() {        assert(wop == NULL);        if (dataBuf != NULL) {            delete dataBuf;        }        if (diskConnection)            diskConnection->Close();    }    void Request(std::ostringstream &os);    void Response(std::ostringstream &os);    void Execute();    int HandleDone(int code, void *data);    // handler for reading in the chunk meta-data    int HandleChunkMetaReadDone(int code, void *data);    // handler for dealing with re-replication events    int HandleReplicatorDone(int code, void *data);    std::string Show() const {        std::ostringstream os;                os << "read: chunkId = " << chunkId << " chunkversion = " << chunkVersion;        os << " offset: " << offset << " numBytes: " << numBytes;        return os.str();    }};// used for retrieving a chunk's sizestruct SizeOp : public KfsOp {    kfsChunkId_t chunkId;    int64_t	 chunkVersion;    off_t     size; /* result */    SizeOp(kfsSeq_t s) :        KfsOp(CMD_SIZE, s) { }    SizeOp(kfsSeq_t s, kfsChunkId_t c, int64_t v) :        KfsOp(CMD_SIZE, s), chunkId(c), chunkVersion(v) { }    void Request(std::ostringstream &os);    void Response(std::ostringstream &os);    void Execute();    std::string Show() const {        std::ostringstream os;                os << "size: chunkId = " << chunkId << " chunkversion = " << chunkVersion;        return os.str();    }    int HandleDone(int code, void *data);};struct GetChunkMetadataOp : public KfsOp {    kfsChunkId_t chunkId;  // input    int64_t chunkVersion; // output    off_t chunkSize; // output    IOBuffer *dataBuf; // buffer with the checksum info    size_t numBytesIO;    GetChunkMetadataOp(kfsSeq_t s) :        KfsOp(CMD_GET_CHUNK_METADATA, s), chunkVersion(0), chunkSize(0), dataBuf(NULL), numBytesIO(0)    {    }    ~GetChunkMetadataOp()     {        delete dataBuf;    }    void Execute();    // handler for reading in the chunk meta-data    int HandleChunkMetaReadDone(int code, void *data);    void Request(std::ostringstream &os);    void Response(std::ostringstream &os);    std::string Show() const {        std::ostringstream os;        os << "get-chunk-metadata: " << " chunkid = " << chunkId;        return os.str();    }    int HandleDone(int code, void *data);};// used for pinging the server and checking livenessstruct PingOp : public KfsOp {    int64_t totalSpace;    int64_t usedSpace;    PingOp(kfsSeq_t s) :        KfsOp(CMD_PING, s) { }    void Response(std::ostringstream &os);    void Execute();    std::string Show() const {        return "monitoring ping";    }};// used to dump chunk mapstruct DumpChunkMapOp : public KfsOp {    DumpChunkMapOp(kfsSeq_t s) :       KfsOp(CMD_DUMP_CHUNKMAP, s) { }    void Response(std::ostringstream &os);    void Execute();    std::string Show() const {       return "dumping chunk map";    }};// used to extract out all the counters we havestruct StatsOp : public KfsOp {    std::string stats; // result    StatsOp(kfsSeq_t s) :        KfsOp(CMD_STATS, s) { }    void Response(std::ostringstream &os);    void Execute();    std::string Show() const {        return "monitoring stats";    }};/// Checkpoint op is a means of communication between the main thread/// and the logger thread.  The main thread sends this op to the logger/// thread and the logger threads gets rid of it after taking a/// checkpoint.// XXX: We may want to allow users to submit checkpoint requests.  At// that point code will need to come in for ops and such.struct CheckpointOp : public KfsOp {    std::ostringstream data; // the data that needs to be checkpointed    CheckpointOp(kfsSeq_t s) :        KfsOp(CMD_CHECKPOINT, s) { }    void Response(std::ostringstream &os) { };    void Execute() { };    std::string Show() const {        return "internal: checkpoint";    }};struct LeaseRenewOp : public KfsOp {    kfsChunkId_t chunkId;    int64_t leaseId;    std::string leaseType;    LeaseRenewOp(kfsSeq_t s, kfsChunkId_t c, int64_t l, std::string t) :        KfsOp(CMD_LEASE_RENEW, s), chunkId(c), leaseId(l), leaseType(t)    {        SET_HANDLER(this, &LeaseRenewOp::HandleDone);    }    void Request(std::ostringstream &os);    // To be called whenever we get a reply from the server    int HandleDone(int code, void *data);    void Execute() { };    std::string Show() const {        std::ostringstream os;        os << "lease-renew: " << " chunkid = " << chunkId;        os << " leaseId: " << leaseId << " type: " << leaseType;        return os.str();    }};// Whenever we want to give up a lease early, we notify the metaserver// using this op.struct LeaseRelinquishOp : public KfsOp {    kfsChunkId_t chunkId;    int64_t leaseId;    std::string leaseType;    LeaseRelinquishOp(kfsSeq_t s, kfsChunkId_t c, int64_t l, std::string t) :        KfsOp(CMD_LEASE_RELINQUISH, s), chunkId(c), leaseId(l), leaseType(t)    {        SET_HANDLER(this, &LeaseRelinquishOp::HandleDone);    }    void Request(std::ostringstream &os);    // To be called whenever we get a reply from the server    int HandleDone(int code, void *data);    void Execute() { };    std::string Show() const {        std::ostringstream os;        os << "lease-relinquish: " << " chunkid = " << chunkId;        os << " leaseId: " << leaseId << " type: " << leaseType;        return os.str();    }};// This is just a helper op for building a hello request to the metaserver.struct HelloMetaOp : public KfsOp {    ServerLocation myLocation;    std::string clusterKey;    std::string md5sum;    int rackId;    int64_t totalSpace;    int64_t usedSpace;    std::vector<ChunkInfo_t> chunks;    HelloMetaOp(kfsSeq_t s, ServerLocation &l, std::string &k, std::string &m, int r) :        KfsOp(CMD_META_HELLO, s), myLocation(l),  clusterKey(k), md5sum(m), rackId(r) {  }    void Execute();    void Request(std::ostringstream &os);    std::string Show() const {        std::ostringstream os;        os << "meta-hello: " << " mylocation = " << myLocation.ToString();        os << "cluster key: " << clusterKey;        return os.str();    }};struct CorruptChunkOp : public KfsOp {    kfsFileId_t fid; // input: fid whose chunk is bad    kfsChunkId_t chunkId; // input: chunkid of the corrupted chunk    CorruptChunkOp(kfsSeq_t s, kfsFileId_t f, kfsChunkId_t c) :        KfsOp(CMD_CORRUPT_CHUNK, s), fid(f), chunkId(c)    {        SET_HANDLER(this, &CorruptChunkOp::HandleDone);    }    void Request(std::ostringstream &os);    // To be called whenever we get a reply from the server    int HandleDone(int code, void *data);    void Execute() { };    std::string Show() const {        std::ostringstream os;        os << "corrupt chunk: " << " fileid = " << fid            << " chunkid = " << chunkId;        return os.str();    }};struct TimeoutOp : public KfsOp {    TimeoutOp(kfsSeq_t s) :        KfsOp(CMD_TIMEOUT, s)    {    }    void Request(std::ostringstream &os) { }    void Execute();    std::string Show() const { return "timeout"; }};struct KillRemoteSyncOp : public KfsOp {    // pass in the remote sync SM that needs to be nuked    KillRemoteSyncOp(kfsSeq_t s, KfsCallbackObj *owner) :        KfsOp(CMD_KILL_REMOTE_SYNC, s, owner)    {    }    void Request(std::ostringstream &os) { }    void Execute();    std::string Show() const { return "kill remote sync"; }};// Helper functor that matches ops based on seq #class OpMatcher {    kfsSeq_t seqNum;public:    OpMatcher(kfsSeq_t s) : seqNum(s) { };    bool operator() (KfsOp *op) {        return op->seq == seqNum;    }};extern void InitParseHandlers();extern void RegisterCounters();extern int ParseCommand(char *cmdBuf, int cmdLen, KfsOp **res);extern void SubmitOp(KfsOp *op);extern void SubmitOpResponse(KfsOp *op);}#endif // CHUNKSERVER_KFSOPS_H

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -