📄 chunkserver.h
字号:
return mLocation == loc; } /// Setter method to set the host name/port void SetServerLocation(const ServerLocation &loc) { mLocation = loc; } /// Setter method to set space void SetSpace(uint64_t total, uint64_t used, uint64_t alloc) { mTotalSpace = total; mUsedSpace = used; mAllocSpace = alloc; } const char *GetServerName() { return mLocation.hostname.c_str(); } void SetRack(int rackId) { mRackId = rackId; } /// Return the unique identifier for the rack on which the /// server is located. int GetRack() const { return mRackId; } /// Available space is defined as the difference /// between the total storage space available /// on the server and the amount of space that /// has been parceled out for outstanding writes /// by the meta server. THat is, alloc space is tied /// to the chunks that have been write-leased. This /// has the effect of keeping alloc space tied closely /// to used space. uint64_t GetAvailSpace() { mAllocSpace = mUsedSpace + (mNumChunkWriteReplications + mNumChunkWrites) * CHUNKSIZE; if (mAllocSpace >= mTotalSpace) return 0; else return mTotalSpace - mAllocSpace; } /// An estimate of the # of writes that are currently /// happening at this server. inline void UpdateNumChunkWrites(int amount) { mNumChunkWrites += amount; if (mNumChunkWrites < 0) mNumChunkWrites = 0; } /// Accessor to that returns an estimate of the # of /// concurrent writes that are being handled by this server inline int GetNumChunkWrites() const { return mNumChunkWrites; } uint64_t GetTotalSpace() const { return mTotalSpace; } uint64_t GetUsedSpace() { return mUsedSpace; } int GetNumChunks () const { return mNumChunks; } /// Return an estimate of disk space utilization on this server. /// The estimate is between [0..1] float GetSpaceUtilization() { if (mTotalSpace == 0) return 0.0; return (float) mUsedSpace / (float) mTotalSpace; } bool IsDown() { return mDown; } /// /// The chunk server went down. So, fail all the /// outstanding ops. /// virtual void FailPendingOps(); /// For monitoring purposes, dump out state as a string. /// @param [out] result The state of this server /// void Ping(string &result); seq_t NextSeq() { return mSeqNo++; } protected: /// A sequence # associated with each RPC we send to /// chunk server. This variable tracks the seq # that /// we should use in the next RPC. seq_t mSeqNo; /// A handle to the network connection NetConnectionPtr mNetConnection; /// Periodically heartbeat the chunk server ChunkServerTimeoutImpl *mTimer; /// Are we thru with processing HELLO message bool mHelloDone; /// Boolean that tracks whether this server is down bool mDown; /// Is there a heartbeat message for which we haven't /// recieved a reply yet? If yes, dont' send one more bool mHeartbeatSent; /// did we skip the sending of a heartbeat message? bool mHeartbeatSkipped; /// is the server being retired bool mIsRetiring; /// when we did we get the retire request time_t mRetireStartTime; /// when did we get the last heartbeat reply time_t mLastHeard; /// Set of chunks on this server that need to be evacuated /// whenever this node is to be retired; when evacuation set is /// empty, the server can be retired. std::set<chunkId_t> mEvacuatingChunks; /// Set of chunks that need to be moved to this server. /// This set was previously computed by the rebalance planner. std::set<chunkId_t> mChunksToMove; /// Location of the server at which clients can /// connect to ServerLocation mLocation; /// A unique id to denote the rack on which the server is located. /// -1 signifies that we don't what rack the server is on and by /// implication, all servers are on same rack int mRackId; /// Keep a count of how many corrupt chunks we are seeing on /// this node; an indicator of the node in trouble? int mNumCorruptChunks; /// total space available on this server uint64_t mTotalSpace; /// space that has been used by chunks on this server uint64_t mUsedSpace; /// space that has been allocated for chunks: this /// corresponds to the allocations that have been /// made, but not all of the allocated space is used. /// For instance, when we have partially filled /// chunks, there is space is allocated for a chunk /// but that space hasn't been fully used up. uint64_t mAllocSpace; /// # of chunks hosted on this server; useful for /// reporting purposes long mNumChunks; /// An estimate of the # of writes that are being handled /// by this server. We use this value to update mAllocSpace /// The problem we have is that, we can end up with lots of /// partial chunks and over time such drift can significantly /// reduce the available space on the server (space is held /// down for by the partial chunks that may never be written to). /// Since writes can occur only when someone gets a valid write lease, /// we track the # of write leases that are issued and where the /// writes are occurring. So, whenever we get a heartbeat, we /// can update alloc space as a sum of the used space and the # of /// writes that are currently being handled by this server. int mNumChunkWrites; /// Track the # of chunk replications (write/read) that are going on this server int mNumChunkWriteReplications; int mNumChunkReadReplications; /// list of RPCs that need to be sent to this chunk /// server. This list is shared between the main /// event processing loop and the network thread. MetaQueue <MetaRequest> mPendingReqs; /// list of RPCs that we have sent to this chunk /// server. This list is operated by the network /// thread. std::list <MetaRequest *> mDispatchedReqs; /// /// We have received a message from the chunk /// server. Do something with it. /// @param[in] iobuf An IO buffer stream with message /// received from the chunk server. /// @param[in] msgLen Length in bytes of the message. /// @retval 0 if message was processed successfully; /// -1 if there was an error /// int HandleMsg(IOBuffer *iobuf, int msgLen); /// Handlers for the 3 types of messages we could get: /// 1. Hello message from a chunkserver /// 2. An RPC from a chunkserver /// 3. A reply to an RPC that we have sent previously. int HandleHelloMsg(IOBuffer *iobuf, int msgLen); int HandleCmd(IOBuffer *iobuf, int msgLen); int HandleReply(IOBuffer *iobuf, int msgLen); /// Send a response message to the MetaRequest we got. void SendResponse(MetaRequest *op); /// /// Given a response from a chunkserver, find the /// associated request that we previously sent. /// Request/responses are matched based on sequence /// numbers in the messages. /// /// @param[in] cseq The sequence # of the op we are /// looking for. /// @retval The matching request if one exists; NULL /// otherwise /// MetaRequest *FindMatchingRequest(seq_t cseq); /// /// The response sent by a chunkserver is of the form: /// OK \r\n /// Cseq: <seq #>\r\n /// Status: <status> \r\n\r\n /// Extract out Cseq, Status /// /// @param[in] buf Buffer containing the response /// @param[in] bufLen length of buf /// @param[out] prop Properties object with the response header/values /// void ParseResponse(char *buf, int bufLen, Properties &prop); /// /// The chunk server went down. So, stop the network timer event; /// also, fail all the dispatched ops. /// void StopTimer(); void FailDispatchedOps(); }; class ChunkServerTimeoutImpl: public ITimeout { public: ChunkServerTimeoutImpl(ChunkServer *c) { mChunkServer = c; // send heartbeat once every min SetTimeoutInterval(60 * 1000); }; ~ChunkServerTimeoutImpl() { mChunkServer = NULL; }; // On a timeout send a heartbeat RPC void Timeout() { mChunkServer->Heartbeat(); }; private: ChunkServer *mChunkServer; //!< pointer to the owner (chunk server) }; class ChunkServerMatcher { const ChunkServer *target; public: ChunkServerMatcher(const ChunkServer *t): target(t) { }; bool operator() (ChunkServerPtr &c) { return c.get() == target; } };}#endif // META_CHUNKSERVER_H
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -