📄 layoutmanager.h
字号:
/// Update the mapping from chunkId -> server. /// @param[in] chunkId chunkId that has been stored /// on server c /// @param[in] c server that stores chunk chunkId. /// @retval 0 if update is successful; -1 otherwise /// Update will fail if chunkId is not present in the /// chunkId -> server mapping table. int UpdateChunkToServerMapping(chunkId_t chunkId, ChunkServer *c); /// Get the mapping from chunkId -> server. /// @param[in] chunkId chunkId that has been stored /// on some server(s) /// @param[out] c server(s) that stores chunk chunkId /// @retval 0 if a mapping was found; -1 otherwise /// int GetChunkToServerMapping(chunkId_t chunkId, std::vector<ChunkServerPtr> &c); CSMap GetChunkToServerMap() { return mChunkToServerMap; } /// Dump out the chunk location map to a file. void DumpChunkToServerMap(); /// Dump out the chunk location map to a string stream. void DumpChunkToServerMap(ostringstream &os); /// Ask each of the chunkserver's to dispatch pending RPCs void Dispatch(); /// For monitoring purposes, dump out state of all the /// connected chunk servers. /// @param[out] systemInfo A string that describes system status /// such as, the amount of space in cluster /// @param[out] upServers The string containing the /// state of the up chunk servers. /// @param[out] downServers The string containing the /// state of the down chunk servers. /// @param[out] retiringServers The string containing the /// state of the chunk servers that are being retired for /// maintenance. void Ping(string &systemInfo, string &upServers, string &downServers, string &retiringServers); /// Return a list of alive chunk servers void UpServers(ostringstream &os); /// Periodically, walk the table of chunk -> [location, lease] /// and remove out dead leases. void LeaseCleanup(); /// Cleanup the lease for a particular chunk /// @param[in] chunkId the chunk for which leases need to be cleaned up /// @param[in] v the placement/lease info for the chunk void LeaseCleanup(chunkId_t chunkId, ChunkPlacementInfo &v); /// Handler that loops thru the chunk->location map and determines /// if there are sufficient copies of each chunk. Those chunks with /// fewer copies are (re) replicated. void ChunkReplicationChecker(); /// A set of nodes have been put in hibernation by an admin. /// This is done for scheduled downtime. During this period, we /// don't want to pro-actively replicate data on the down nodes; /// if the node doesn't come back as promised, we then start /// re-replication. Periodically, check the status of /// hibernating nodes. void CheckHibernatingServersStatus(); /// A chunk replication operation finished. If the op was successful, /// then, we update the chunk->location map to record the presence /// of a new replica. /// @param[in] req The op that we sent to a chunk server asking /// it to do the replication. void ChunkReplicationDone(MetaChunkReplicate *req); /// Degree of replication for chunk has changed. When the replication /// checker runs, have it check the status for this chunk. /// @param[in] chunkId chunk whose replication level needs checking /// void ChangeChunkReplication(chunkId_t chunkId); /// Get all the fid's for which there is an open lease (read/write). /// This is useful for reporting purposes. /// @param[out] openForRead, openForWrite: the pathnames of files /// that are open for reading/writing respectively void GetOpenFiles(std::string &openForRead, std::string &openForWrite); void InitRecoveryStartTime() { mRecoveryStartTime = time(0); } void SetMinChunkserversToExitRecovery(uint32_t n) { mMinChunkserversToExitRecovery = n; } void ToggleRebalancing(bool v) { mIsRebalancingEnabled = v; } /// Methods for doing "planned" rebalancing of data. /// Read in the file that lays out the plan /// Return 0 if we can open the file; -1 otherwise int LoadRebalancePlan(const std::string &planFn); /// Execute the plan for all servers void ExecuteRebalancePlan(); /// Execute planned rebalance for server c void ExecuteRebalancePlan(ChunkServerPtr &c); protected: /// A rolling counter for tracking leases that are issued to /// to clients/chunkservers for reading/writing chunks int64_t mLeaseId; /// A counter to track the # of ongoing chunk replications int mNumOngoingReplications; /// A switch to toggle rebalancing: if the system is under load, /// we'd like to turn off rebalancing. We can enable it a /// suitable time. bool mIsRebalancingEnabled; /// Set when a rebalancing plan is being excuted. bool mIsExecutingRebalancePlan; /// On each iteration, we try to rebalance some # of blocks; /// this counter tracks the last chunk we checked kfsChunkId_t mLastChunkRebalanced; /// When a server goes down or needs retiring, we start /// replicating blocks. Whenever a replication finishes, we /// find the next candidate. We need to track "where" we left off /// on a previous iteration, so that we can start from there and /// run with it. kfsChunkId_t mLastChunkReplicated; /// After a crash, track the recovery start time. For a timer /// period that equals the length of lease interval, we only grant /// lease renews and new leases to new chunks. We however, /// disallow granting new leases to existing chunks. This is /// because during the time period that corresponds to a lease interval, /// we may learn about leases that we had handed out before crashing. time_t mRecoveryStartTime; /// Periodically clean out dead leases LeaseCleaner mLeaseCleaner; /// Similar to the lease cleaner: periodically check if there are /// sufficient copies of each chunk. ChunkReplicator mChunkReplicator; uint32_t mMinChunkserversToExitRecovery; /// List of connected chunk servers. std::vector <ChunkServerPtr> mChunkServers; /// Whenever the list of chunkservers has to be modified, this /// lock is used to serialize access pthread_mutex_t mChunkServersMutex; /// List of servers that are hibernating; if they don't wake up /// the time the hibernation period ends, the blocks on those /// nodes needs to be re-replicated. This provides us the ability /// to take a node down for maintenance and bring it back up /// without incurring re-replication overheads. std::vector <HibernatingServerInfo_t> mHibernatingServers; /// Track when servers went down so we can report it std::ostringstream mDownServers; /// State about how each rack (such as, servers/space etc) std::vector<RackInfo> mRacks; /// Mapping from a chunk to its location(s). CSMap mChunkToServerMap; /// Candidate set of chunks whose replication needs checking CRCandidateSet mChunkReplicationCandidates; /// Counters to track chunk replications Counter *mOngoingReplicationStats; Counter *mTotalReplicationStats; /// how much todo before we are all done (estimate of the size /// of the chunk-replication candidates set). Counter *mReplicationTodoStats; /// Track the # of replication ops that failed Counter *mFailedReplicationStats; /// Track the # of stale chunks we have seen so far Counter *mStaleChunkCount; /// Find a set of racks to place a chunk on; the racks are /// ordered by space. void FindCandidateRacks(std::vector<int> &result); /// Find a set of racks to place a chunk on; the racks are /// ordered by space. The set excludes defines the set of racks /// that should be excluded from consideration. void FindCandidateRacks(std::vector<int> &result, const std::set<int> &excludes); /// Helper function to generate candidate servers /// for hosting a chunk. The list of servers returned is /// ordered in decreasing space availability. /// @param[out] result The set of available servers /// @param[in] excludes The set of servers to exclude from /// candidate generation. /// @param[in] rackId The rack to restrict the candidate /// selection to; if rackId = -1, then all servers are fair game void FindCandidateServers(std::vector<ChunkServerPtr> &result, const std::vector<ChunkServerPtr> &excludes, int rackId = -1); /// Helper function to generate candidate servers from /// the specified set of sources for hosting a chunk. /// The list of servers returned is /// ordered in decreasing space availability. /// @param[out] result The set of available servers /// @param[in] sources The set of possible source servers /// @param[in] excludes The set of servers to exclude from /// @param[in] rackId The rack to restrict the candidate /// selection to; if rackId = -1, then all servers are fair game void FindCandidateServers(std::vector<ChunkServerPtr> &result, const std::vector<ChunkServerPtr> &sources, const std::vector<ChunkServerPtr> &excludes, int rackId = -1); /// Helper function that takes a set of servers and sorts /// them by space utilization. The list of servers returned is /// ordered on increasing space utilization (i.e., decreasing /// space availability). /// @param[in/out] servers The set of servers we want sorted void SortServersByUtilization(vector<ChunkServerPtr> &servers); /// Check the # of copies for the chunk and return true if the /// # of copies is less than targeted amount. We also don't replicate a chunk /// if it is currently being written to (i.e., if a write lease /// has been issued). /// @param[in] chunkId The id of the chunk which we are checking /// @param[in] clli The lease/location information about the chunk. /// @param[out] extraReplicas The target # of additional replicas for the chunk /// @retval true if the chunk is to be replicated; false otherwise bool CanReplicateChunkNow(chunkId_t chunkId, ChunkPlacementInfo &clli, int &extraReplicas); /// Replicate a chunk. This involves finding a new location for /// the chunk that is different from the existing set of replicas /// and asking the chunkserver to get a copy. /// @param[in] chunkId The id of the chunk which we are checking /// @param[in] clli The lease/location information about the chunk. /// @param[in] extraReplicas The target # of additional replicas for the chunk /// @param[in] candidates The set of servers on which the additional replicas /// should be stored /// @retval The # of actual replications triggered int ReplicateChunk(chunkId_t chunkId, const ChunkPlacementInfo &clli, uint32_t extraReplicas); int ReplicateChunk(chunkId_t chunkId, const ChunkPlacementInfo &clli, uint32_t extraReplicas, const std::vector<ChunkServerPtr> &candidates); /// The server has finished re-replicating a chunk. If there is more /// re-replication to be done, send it the server's way. /// @param[in] server The server to which re-replication work should be sent /// @param[in] chunkReplicated The chunkid that the server says /// it finished replication. void FindReplicationWorkForServer(ChunkServerPtr &server, chunkId_t chunkReplicated); /// There are more replicas of a chunk than the requested amount. So, /// delete the extra replicas and reclaim space. When deleting the addtional /// copies, find the servers that are low on space and delete from there. /// As part of deletion, we update our mapping of where the chunk is stored. /// @param[in] chunkId The id of the chunk which we are checking /// @param[in] clli The lease/location information about the chunk. /// @param[in] extraReplicas The # of replicas that need to be deleted void DeleteAddlChunkReplicas(chunkId_t chunkId, ChunkPlacementInfo &clli, uint32_t extraReplicas); /// Helper function to check set membership. /// @param[in] hosters Set of servers hosting a chunk /// @param[in] server The server we want to check for membership in hosters. /// @retval true if server is a member of the set of hosters; /// false otherwise bool IsChunkHostedOnServer(const vector<ChunkServerPtr> &hosters, const ChunkServerPtr &server); /// Periodically, update our estimate of how much space is /// used/available in each rack. void UpdateRackSpaceUsageCounts(); /// Periodically, rebalance servers by moving chunks around from /// "over utilized" servers to "under utilized" servers. /// @retval # of blocks that were moved around int RebalanceServers(); void FindIntraRackRebalanceCandidates(vector<ChunkServerPtr> &candidates, const vector<ChunkServerPtr> &nonloadedServers, const ChunkPlacementInfo &clli); void FindInterRackRebalanceCandidate(ChunkServerPtr &candidate, const vector<ChunkServerPtr> &nonloadedServers, const ChunkPlacementInfo &clli); /// Helper method to replicate a chunk to given set of /// candidates. /// Returns the # of copies that were triggered. int ReplicateChunkToServers(chunkId_t chunkId, ChunkPlacementInfo &clli, uint32_t numCopies, std::vector<ChunkServerPtr> &candidates); /// Return true if c is a server in mChunkServers[]. bool ValidServer(ChunkServer *c); /// For a time period that corresponds to the length of a lease interval, /// we are in recovery after a restart. /// Also, if the # of chunkservers that are connected to us is /// less than some threshold, we are in recovery mode. bool InRecovery() { if (mChunkServers.size() < mMinChunkserversToExitRecovery) return true; time_t now = time(0); return now - mRecoveryStartTime <= KFS::LEASE_INTERVAL_SECS; } }; // When the rebalance planner it works out a plan that specifies // which chunk has to be moved from src->dst struct RebalancePlanInfo_t { static const int hostnamelen = 256; chunkId_t chunkId; char dst[hostnamelen]; char src[hostnamelen]; }; extern LayoutManager gLayoutManager;}#endif // META_LAYOUTMANAGER_H
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -