📄 kfsops.h
字号:
kfsChunkId_t chunkId; int64_t chunkVersion; off_t offset; /* input */ size_t numBytes; /* input */ ssize_t numBytesIO; /* output: # of bytes actually written */ DiskConnectionPtr diskConnection; /* disk connection used for writing data */ IOBuffer *dataBuf; /* buffer with the data to be written */ off_t chunkSize; /* store the chunk size for logging purposes */ std::vector<uint32_t> checksums; /* store the checksum for logging purposes */ /* * for writes that are smaller than a checksum block, we need to * read the whole block in, compute the new checksum and then write * out data. This buffer holds the data read in from disk. */ ReadOp *rop; /* * The owning write prepare op */ WritePrepareOp *wpop; /* * Should we wait for aio_sync() to finish before replying to * upstream clients? By default, we don't */ bool waitForSyncDone; /* Set if the write was triggered due to re-replication */ bool isFromReReplication; int64_t writeId; // time at which the write was enqueued at the ChunkManager time_t enqueueTime; // for statistics purposes, have a "holder" op that tracks how long it took a write to finish. bool isWriteIdHolder; WriteOp(kfsChunkId_t c, int64_t v) : KfsOp(CMD_WRITE, 0), chunkId(c), chunkVersion(v), dataBuf(NULL), rop(NULL), wpop(NULL), waitForSyncDone(false), isFromReReplication(false), isWriteIdHolder(false) { SET_HANDLER(this, &WriteOp::HandleWriteDone); } WriteOp(kfsSeq_t s, kfsChunkId_t c, int64_t v, off_t o, size_t n, IOBuffer *b, int64_t id) : KfsOp(CMD_WRITE, s), chunkId(c), chunkVersion(v), offset(o), numBytes(n), numBytesIO(0), dataBuf(b), chunkSize(0), rop(NULL), wpop(NULL), waitForSyncDone(false), isFromReReplication(false), writeId(id), isWriteIdHolder(false) { SET_HANDLER(this, &WriteOp::HandleWriteDone); } ~WriteOp(); void Reset() { status = numBytesIO = 0; SET_HANDLER(this, &WriteOp::HandleWriteDone); } void Response(std::ostringstream &os) { }; void Execute(); void Log(std::ofstream &ofs); int HandleWriteDone(int code, void *data); int HandleSyncDone(int code, void *data); int HandleLoggingDone(int code, void *data); std::string Show() const { std::ostringstream os; os << "write: chunkId = " << chunkId << " chunkversion = " << chunkVersion; os << " offset: " << offset << " numBytes: " << numBytes; return os.str(); }};// sent by the client to force data to diskstruct WriteSyncOp : public KfsOp { kfsChunkId_t chunkId; int64_t chunkVersion; int64_t writeId; /* corresponds to the local write */ uint32_t numServers; std::string servers; WriteSyncOp *fwdedOp; WriteOp *writeOp; // the underlying write that needs to be pushed to disk uint32_t numDone; // if we did forwarding, we wait for // local/remote to be done; otherwise, we only // wait for local to be done bool writeMaster; // infer from the server list if we are the "master" for doing the writes WriteSyncOp(kfsSeq_t s, kfsChunkId_t c, int64_t v) : KfsOp(CMD_WRITE_SYNC, s), chunkId(c), chunkVersion(v), writeId(-1), numServers(0), fwdedOp(NULL), writeOp(NULL), numDone(0), writeMaster(false) { SET_HANDLER(this, &WriteSyncOp::HandleDone); } ~WriteSyncOp(); void Request(std::ostringstream &os); void Response(std::ostringstream &os); void Execute(); int ForwardToPeer(const ServerLocation &peer); int HandlePeerReply(int code, void *data); int HandleDone(int code, void *data); std::string Show() const { std::ostringstream os; os << "write-sync: seq = " << seq << " chunkId = " << chunkId << " chunkversion = " << chunkVersion; os << " write-id info: " << servers; return os.str(); }};// OP for reading/writing out the meta-data associated with each chunk. This// is an internally generated op (ops that generate this one are// allocate/write/truncate/change-chunk-vers). struct WriteChunkMetaOp : public KfsOp { kfsChunkId_t chunkId; DiskConnectionPtr diskConnection; /* disk connection used for writing data */ IOBuffer *dataBuf; /* buffer with the data to be written */ WriteChunkMetaOp(kfsChunkId_t c, KfsCallbackObj *o) : KfsOp(CMD_WRITE_CHUNKMETA, 0, o), chunkId(c), dataBuf(NULL) { SET_HANDLER(this, &WriteChunkMetaOp::HandleDone); } ~WriteChunkMetaOp() { delete dataBuf; } void Execute() { } std::string Show() const { std::ostringstream os; os << "write-chunk-meta: chunkid = " << chunkId; return os.str(); } // Notify the op that is waiting for the write to finish that all // is done int HandleDone(int code, void *data) { clnt->HandleEvent(EVENT_CMD_DONE, NULL); delete this; return 0; }};struct ReadChunkMetaOp : public KfsOp { kfsChunkId_t chunkId; DiskConnectionPtr diskConnection; /* disk connection used for reading data */ // others ops that are also waiting for this particular meta-data // read to finish; they'll get notified when the read is done std::list<KfsOp *> waiters; ReadChunkMetaOp(kfsChunkId_t c, KfsCallbackObj *o) : KfsOp(CMD_READ_CHUNKMETA, 0, o), chunkId(c) { SET_HANDLER(this, &ReadChunkMetaOp::HandleDone); } void Execute() { } std::string Show() const { std::ostringstream os; os << "read-chunk-meta: chunkid = " << chunkId; return os.str(); } void AddWaiter(KfsOp *op) { waiters.push_back(op); } // Update internal data structures and then notify the waiting op // that read of meta-data is done. int HandleDone(int code, void *data);};struct ReadOp : public KfsOp { kfsChunkId_t chunkId; int64_t chunkVersion; off_t offset; /* input */ size_t numBytes; /* input */ ssize_t numBytesIO; /* output: # of bytes actually read */ DiskConnectionPtr diskConnection; /* disk connection used for reading data */ IOBuffer *dataBuf; /* buffer with the data read */ std::vector<uint32_t> checksum; /* checksum over the data that is sent back to client */ float diskIOTime; /* how long did the AIOs take */ /* * for writes that require the associated checksum block to be * read in, store the pointer to the associated write op. */ WriteOp *wop; ReadOp(kfsSeq_t s) : KfsOp(CMD_READ, s), numBytesIO(0), dataBuf(NULL), wop(NULL) { SET_HANDLER(this, &ReadOp::HandleDone); } ReadOp(WriteOp *w, off_t o, size_t n) : KfsOp(CMD_READ, w->seq), chunkId(w->chunkId), chunkVersion(w->chunkVersion), offset(o), numBytes(n), numBytesIO(0), dataBuf(NULL), wop(w) { clnt = w; SET_HANDLER(this, &ReadOp::HandleDone); } ~ReadOp() { assert(wop == NULL); if (dataBuf != NULL) { delete dataBuf; } if (diskConnection) diskConnection->Close(); } void Request(std::ostringstream &os); void Response(std::ostringstream &os); void Execute(); int HandleDone(int code, void *data); // handler for reading in the chunk meta-data int HandleChunkMetaReadDone(int code, void *data); // handler for dealing with re-replication events int HandleReplicatorDone(int code, void *data); std::string Show() const { std::ostringstream os; os << "read: chunkId = " << chunkId << " chunkversion = " << chunkVersion; os << " offset: " << offset << " numBytes: " << numBytes; return os.str(); }};// used for retrieving a chunk's sizestruct SizeOp : public KfsOp { kfsChunkId_t chunkId; int64_t chunkVersion; off_t size; /* result */ SizeOp(kfsSeq_t s) : KfsOp(CMD_SIZE, s) { } SizeOp(kfsSeq_t s, kfsChunkId_t c, int64_t v) : KfsOp(CMD_SIZE, s), chunkId(c), chunkVersion(v) { } void Request(std::ostringstream &os); void Response(std::ostringstream &os); void Execute(); std::string Show() const { std::ostringstream os; os << "size: chunkId = " << chunkId << " chunkversion = " << chunkVersion; return os.str(); } int HandleDone(int code, void *data);};struct GetChunkMetadataOp : public KfsOp { kfsChunkId_t chunkId; // input int64_t chunkVersion; // output off_t chunkSize; // output IOBuffer *dataBuf; // buffer with the checksum info size_t numBytesIO; GetChunkMetadataOp(kfsSeq_t s) : KfsOp(CMD_GET_CHUNK_METADATA, s), chunkVersion(0), chunkSize(0), dataBuf(NULL), numBytesIO(0) { } ~GetChunkMetadataOp() { delete dataBuf; } void Execute(); // handler for reading in the chunk meta-data int HandleChunkMetaReadDone(int code, void *data); void Request(std::ostringstream &os); void Response(std::ostringstream &os); std::string Show() const { std::ostringstream os; os << "get-chunk-metadata: " << " chunkid = " << chunkId; return os.str(); } int HandleDone(int code, void *data);};// used for pinging the server and checking livenessstruct PingOp : public KfsOp { int64_t totalSpace; int64_t usedSpace; PingOp(kfsSeq_t s) : KfsOp(CMD_PING, s) { } void Response(std::ostringstream &os); void Execute(); std::string Show() const { return "monitoring ping"; }};// used to dump chunk mapstruct DumpChunkMapOp : public KfsOp { DumpChunkMapOp(kfsSeq_t s) : KfsOp(CMD_DUMP_CHUNKMAP, s) { } void Response(std::ostringstream &os); void Execute(); std::string Show() const { return "dumping chunk map"; }};// used to extract out all the counters we havestruct StatsOp : public KfsOp { std::string stats; // result StatsOp(kfsSeq_t s) : KfsOp(CMD_STATS, s) { } void Response(std::ostringstream &os); void Execute(); std::string Show() const { return "monitoring stats"; }};/// Checkpoint op is a means of communication between the main thread/// and the logger thread. The main thread sends this op to the logger/// thread and the logger threads gets rid of it after taking a/// checkpoint.// XXX: We may want to allow users to submit checkpoint requests. At// that point code will need to come in for ops and such.struct CheckpointOp : public KfsOp { std::ostringstream data; // the data that needs to be checkpointed CheckpointOp(kfsSeq_t s) : KfsOp(CMD_CHECKPOINT, s) { } void Response(std::ostringstream &os) { }; void Execute() { }; std::string Show() const { return "internal: checkpoint"; }};struct LeaseRenewOp : public KfsOp { kfsChunkId_t chunkId; int64_t leaseId; std::string leaseType; LeaseRenewOp(kfsSeq_t s, kfsChunkId_t c, int64_t l, std::string t) : KfsOp(CMD_LEASE_RENEW, s), chunkId(c), leaseId(l), leaseType(t) { SET_HANDLER(this, &LeaseRenewOp::HandleDone); } void Request(std::ostringstream &os); // To be called whenever we get a reply from the server int HandleDone(int code, void *data); void Execute() { }; std::string Show() const { std::ostringstream os; os << "lease-renew: " << " chunkid = " << chunkId; os << " leaseId: " << leaseId << " type: " << leaseType; return os.str(); }};// Whenever we want to give up a lease early, we notify the metaserver// using this op.struct LeaseRelinquishOp : public KfsOp { kfsChunkId_t chunkId; int64_t leaseId; std::string leaseType; LeaseRelinquishOp(kfsSeq_t s, kfsChunkId_t c, int64_t l, std::string t) : KfsOp(CMD_LEASE_RELINQUISH, s), chunkId(c), leaseId(l), leaseType(t) { SET_HANDLER(this, &LeaseRelinquishOp::HandleDone); } void Request(std::ostringstream &os); // To be called whenever we get a reply from the server int HandleDone(int code, void *data); void Execute() { }; std::string Show() const { std::ostringstream os; os << "lease-relinquish: " << " chunkid = " << chunkId; os << " leaseId: " << leaseId << " type: " << leaseType; return os.str(); }};// This is just a helper op for building a hello request to the metaserver.struct HelloMetaOp : public KfsOp { ServerLocation myLocation; std::string clusterKey; std::string md5sum; int rackId; int64_t totalSpace; int64_t usedSpace; std::vector<ChunkInfo_t> chunks; HelloMetaOp(kfsSeq_t s, ServerLocation &l, std::string &k, std::string &m, int r) : KfsOp(CMD_META_HELLO, s), myLocation(l), clusterKey(k), md5sum(m), rackId(r) { } void Execute(); void Request(std::ostringstream &os); std::string Show() const { std::ostringstream os; os << "meta-hello: " << " mylocation = " << myLocation.ToString(); os << "cluster key: " << clusterKey; return os.str(); }};struct CorruptChunkOp : public KfsOp { kfsFileId_t fid; // input: fid whose chunk is bad kfsChunkId_t chunkId; // input: chunkid of the corrupted chunk CorruptChunkOp(kfsSeq_t s, kfsFileId_t f, kfsChunkId_t c) : KfsOp(CMD_CORRUPT_CHUNK, s), fid(f), chunkId(c) { SET_HANDLER(this, &CorruptChunkOp::HandleDone); } void Request(std::ostringstream &os); // To be called whenever we get a reply from the server int HandleDone(int code, void *data); void Execute() { }; std::string Show() const { std::ostringstream os; os << "corrupt chunk: " << " fileid = " << fid << " chunkid = " << chunkId; return os.str(); }};struct TimeoutOp : public KfsOp { TimeoutOp(kfsSeq_t s) : KfsOp(CMD_TIMEOUT, s) { } void Request(std::ostringstream &os) { } void Execute(); std::string Show() const { return "timeout"; }};struct KillRemoteSyncOp : public KfsOp { // pass in the remote sync SM that needs to be nuked KillRemoteSyncOp(kfsSeq_t s, KfsCallbackObj *owner) : KfsOp(CMD_KILL_REMOTE_SYNC, s, owner) { } void Request(std::ostringstream &os) { } void Execute(); std::string Show() const { return "kill remote sync"; }};// Helper functor that matches ops based on seq #class OpMatcher { kfsSeq_t seqNum;public: OpMatcher(kfsSeq_t s) : seqNum(s) { }; bool operator() (KfsOp *op) { return op->seq == seqNum; }};extern void InitParseHandlers();extern void RegisterCounters();extern int ParseCommand(char *cmdBuf, int cmdLen, KfsOp **res);extern void SubmitOp(KfsOp *op);extern void SubmitOpResponse(KfsOp *op);}#endif // CHUNKSERVER_KFSOPS_H
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -