📄 layoutmanager.cc
字号:
int extraReplicas = 0, numOngoing; vector<ChunkServerPtr> c; if (server->IsRetiring()) return; c.push_back(server); // try to start where we were done with this server CRCandidateSetIter citer = mChunkReplicationCandidates.find(chunkReplicated + 1); if (citer == mChunkReplicationCandidates.end()) { // try to start where we left off last time; if that chunk has // disappeared, find something "closeby" citer = mChunkReplicationCandidates.upper_bound(mLastChunkReplicated); } if (citer == mChunkReplicationCandidates.end()) { mLastChunkReplicated = 1; citer = mChunkReplicationCandidates.begin(); } struct timeval start, now; gettimeofday(&start, NULL); for (; citer != mChunkReplicationCandidates.end(); ++citer) { gettimeofday(&now, NULL); if (ComputeTimeDiff(start, now) > 0.2) // if we have spent more than 200 m-seconds here, stop // serve other requests break; chunkId = *citer; CSMapIter iter = mChunkToServerMap.find(chunkId); if (iter == mChunkToServerMap.end()) { continue; } if (iter->second.ongoingReplications > 0) continue; // if the chunk is already hosted on this server, the chunk isn't a candidate for // work to be sent to this server. if (IsChunkHostedOnServer(iter->second.chunkServers, server) || (!CanReplicateChunkNow(iter->first, iter->second, extraReplicas))) continue; if (extraReplicas > 0) { if (mRacks.size() > 1) { // when there is more than one rack, since we // are re-replicating a chunk, we don't want to put two // copies of a chunk on the same rack. if one // of the nodes is retiring, we don't want to // count the node in this rack set---we want to // put a block on to the same rack. set<int> excludeRacks; for_each(iter->second.chunkServers.begin(), iter->second.chunkServers.end(), RackSetter(excludeRacks, true)); if (excludeRacks.find(server->GetRack()) != excludeRacks.end()) continue; } numOngoing = ReplicateChunk(iter->first, iter->second, 1, c); iter->second.ongoingReplications += numOngoing; if (numOngoing > 0) { mNumOngoingReplications++; mLastChunkReplicated = chunkId; } } if (server->GetNumChunkReplications() > MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE) break; } // if there is any room left to do more work... ExecuteRebalancePlan(server);}voidLayoutManager::ChunkReplicationDone(MetaChunkReplicate *req){ vector<ChunkServerPtr>::iterator source; mOngoingReplicationStats->Update(-1); // Book-keeping.... CSMapIter iter = mChunkToServerMap.find(req->chunkId); if (iter != mChunkToServerMap.end()) { iter->second.ongoingReplications--; if (iter->second.ongoingReplications == 0) // if all the replications for this chunk are done, // then update the global counter. mNumOngoingReplications--; if (iter->second.ongoingReplications < 0) // sanity... iter->second.ongoingReplications = 0; } req->server->ReplicateChunkDone(req->chunkId); source = find_if(mChunkServers.begin(), mChunkServers.end(), MatchingServer(req->srcLocation)); if (source != mChunkServers.end()) { (*source)->UpdateReplicationReadLoad(-1); } if (req->status != 0) { // Replication failed...we will try again later KFS_LOG_VA_INFO("%s: re-replication for chunk %lld failed, code = %d", req->server->GetServerLocation().ToString().c_str(), req->chunkId, req->status); mFailedReplicationStats->Update(1); return; } // replication succeeded: book-keeping // if any of the hosting servers were being "retired", notify them that // re-replication of any chunks hosted on them is finished if (iter != mChunkToServerMap.end()) { for_each(iter->second.chunkServers.begin(), iter->second.chunkServers.end(), ReplicationDoneNotifier(req->chunkId)); } // validate that the server got the latest copy of the chunk vector<MetaChunkInfo *> v; vector<MetaChunkInfo *>::iterator chunk; metatree.getalloc(req->fid, v); chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(req->chunkId)); if (chunk == v.end()) { // Chunk disappeared -> stale; this chunk will get nuked KFS_LOG_VA_INFO("Re-replicate: chunk (%lld) disappeared => so, stale", req->chunkId); mFailedReplicationStats->Update(1); req->server->NotifyStaleChunk(req->chunkId); return; } MetaChunkInfo *mci = *chunk; if (mci->chunkVersion != req->chunkVersion) { // Version that we replicated has changed...so, stale KFS_LOG_VA_INFO("Re-replicate: chunk (%lld) version changed (was=%lld, now=%lld) => so, stale", req->chunkId, req->chunkVersion, mci->chunkVersion); mFailedReplicationStats->Update(1); req->server->NotifyStaleChunk(req->chunkId); return; } // Yaeee...all good... KFS_LOG_VA_DEBUG("%s reports that re-replication for chunk %lld is all done", req->server->GetServerLocation().ToString().c_str(), req->chunkId); UpdateChunkToServerMapping(req->chunkId, req->server.get()); // since this server is now free, send more work its way... if (req->server->GetNumChunkReplications() <= 1) { FindReplicationWorkForServer(req->server, req->chunkId); }}//// To delete additional copies of a chunk, find the servers that have the least// amount of space and delete the chunk from there. In addition, also pay// attention to rack-awareness: if two copies are on the same rack, then we pick// the server that is the most loaded and delete it there//voidLayoutManager::DeleteAddlChunkReplicas(chunkId_t chunkId, ChunkPlacementInfo &clli, uint32_t extraReplicas){ vector<ChunkServerPtr> servers = clli.chunkServers, copiesToDiscard; uint32_t numReplicas = servers.size() - extraReplicas; set<int> chosenRacks; // if any of the copies are on nodes that are retiring, leave the copies // alone; we will reclaim space later if needed int numRetiringServers = count_if(servers.begin(), servers.end(), RetiringServerPred()); if (numRetiringServers > 0) return; // We get servers sorted by increasing amount of space utilization; so the candidates // we want to delete are at the end SortServersByUtilization(servers); for_each(servers.begin(), servers.end(), RackSetter(chosenRacks)); if (chosenRacks.size() == numReplicas) { // we need to keep as many copies as racks. so, find the extra // copies on a given rack and delete them clli.chunkServers.clear(); chosenRacks.clear(); for (uint32_t i = 0; i < servers.size(); i++) { if (chosenRacks.find(servers[i]->GetRack()) == chosenRacks.end()) { chosenRacks.insert(servers[i]->GetRack()); clli.chunkServers.push_back(servers[i]); } else { // second copy on the same rack copiesToDiscard.push_back(servers[i]); } } } else { clli.chunkServers = servers; // Get rid of the extra stuff from the end clli.chunkServers.resize(numReplicas); // The first N are what we want to keep; the rest should go. copiesToDiscard.insert(copiesToDiscard.end(), servers.begin() + numReplicas, servers.end()); } mChunkToServerMap[chunkId] = clli; ostringstream msg; msg << "Chunk " << chunkId << " lives on: \n"; for (uint32_t i = 0; i < clli.chunkServers.size(); i++) { msg << clli.chunkServers[i]->GetServerLocation().ToString() << ' ' << clli.chunkServers[i]->GetRack() << "; "; } msg << "\n"; msg << "Discarding chunk on: "; for (uint32_t i = 0; i < copiesToDiscard.size(); i++) { msg << copiesToDiscard[i]->GetServerLocation().ToString() << ' ' << copiesToDiscard[i]->GetRack() << " "; } msg << "\n"; KFS_LOG_VA_INFO("%s", msg.str().c_str()); for_each(copiesToDiscard.begin(), copiesToDiscard.end(), ChunkDeletor(chunkId));}voidLayoutManager::ChangeChunkReplication(chunkId_t chunkId){ mChunkReplicationCandidates.insert(chunkId);}//// Check if the server is part of the set of the servers hosting the chunk//boolLayoutManager::IsChunkHostedOnServer(const vector<ChunkServerPtr> &hosters, const ChunkServerPtr &server){ vector<ChunkServerPtr>::const_iterator iter; iter = find(hosters.begin(), hosters.end(), server); return iter != hosters.end();}class LoadedServerPred {public: LoadedServerPred() { } bool operator()(const ChunkServerPtr &s) const { return s->GetSpaceUtilization() > MAX_SERVER_SPACE_UTIL_THRESHOLD; }};//// We are trying to move a chunk between two servers on the same rack. For a// given chunk, we try to find as many "migration pairs" (source/destination// nodes) within the respective racks.//voidLayoutManager::FindIntraRackRebalanceCandidates(vector<ChunkServerPtr> &candidates, const vector<ChunkServerPtr> &nonloadedServers, const ChunkPlacementInfo &clli){ vector<ChunkServerPtr>::const_iterator iter; for (uint32_t i = 0; i < clli.chunkServers.size(); i++) { vector<ChunkServerPtr> servers; if (clli.chunkServers[i]->GetSpaceUtilization() < MAX_SERVER_SPACE_UTIL_THRESHOLD) { continue; } //we have a loaded server; find another non-loaded //server within the same rack (case 1 from above) FindCandidateServers(servers, nonloadedServers, clli.chunkServers, clli.chunkServers[i]->GetRack()); if (servers.size() == 0) { // nothing available within the rack to do the move continue; } // make sure that we are not putting 2 copies of a chunk on the // same server for (uint32_t j = 0; j < servers.size(); j++) { iter = find(candidates.begin(), candidates.end(), servers[j]); if (iter == candidates.end()) { candidates.push_back(servers[j]); break; } } }}//// For rebalancing, for a chunk, we could not find a candidate server on the same rack as a// loaded server. Hence, we are trying to move the chunk between two servers on two// different racks. So, find a migration pair: source/destination on two different racks.//voidLayoutManager::FindInterRackRebalanceCandidate(ChunkServerPtr &candidate, const vector<ChunkServerPtr> &nonloadedServers, const ChunkPlacementInfo &clli){ vector<ChunkServerPtr> servers; FindCandidateServers(servers, nonloadedServers, clli.chunkServers); if (servers.size() == 0) { return; } // XXX: in emulation mode, we have 0 racks due to compile issues if (mRacks.size() <= 1) return; // if we had only one rack then the intra-rack move should have found a // candidate. assert(mRacks.size() > 1); if (mRacks.size() <= 1) return; // For the candidate we pick, we want to enforce the property that all // the copies of the chunks are on different racks. set<int> excludeRacks; for_each(clli.chunkServers.begin(), clli.chunkServers.end(), RackSetter(excludeRacks)); for (uint32_t i = 0; i < servers.size(); i++) { set<int>::iterator iter = excludeRacks.find(servers[i]->GetRack()); if (iter == excludeRacks.end()) { candidate = servers[i]; return; } }}//// Periodically, if we find that some chunkservers have LOT (> 80% free) of space// and if others are loaded (i.e., < 30% free space), move chunks around. This// helps with keeping better disk space utilization (and maybe load).//intLayoutManager::RebalanceServers(){ if ((InRecovery()) || (mChunkServers.size() == 0)) { return 0; } // if we are doing rebalancing based on a plan, execute as // much of the plan as there is room. ExecuteRebalancePlan(); for_each(mRacks.begin(), mRacks.end(), mem_fun_ref(&RackInfo::computeSpace)); if (!mIsRebalancingEnabled) return 0; vector<ChunkServerPtr> servers = mChunkServers; vector<ChunkServerPtr> loadedServers, nonloadedServers; int extraReplicas, numBlocksMoved = 0; for (uint32_t i = 0; i < servers.size(); i++) { if (servers[i]->IsRetiring()) continue; if (servers[i]->GetSpaceUtilization() < MIN_SERVER_SPACE_UTIL_THRESHOLD) nonloadedServers.push_back(servers[i]); else if (servers[i]->GetSpaceUtilization() > MAX_SERVER_SPACE_UTIL_THRESHOLD) loadedServers.push_back(servers[i]); } if ((nonloadedServers.size() == 0) || (loadedServers.size() == 0)) return 0; bool allbusy = false; // try to start where we left off last time; if that chunk has // disappeared, find something "closeby" CSMapIter iter = mChunkToServerMap.find(mLastChunkRebalanced); if (iter == mChunkToServerMap.end()) iter = mChunkToServerMap.upper_bound(mLastChunkRebalanced); for (; iter != mChunkToServerMap.end(); iter++) { allbusy = true; for (uint32_t i = 0; i < nonloadedServers.size(); i++) { if (nonloadedServers[i]->GetNumChunkReplications() < MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE) { allbusy = false; break; } } if (allbusy) break; if (numBlocksMoved > 200) break; chunkId_t chunkId = iter->first; ChunkPlacementInfo &clli = iter->second; vector<ChunkServerPtr> candidates; // chunk could be moved around if it is hosted on a loaded server vector<ChunkServerPtr>::const_iterator csp; csp = find_if(clli.chunkServers.begin(), clli.chunkServers.end(), LoadedServerPred()); if (csp == clli.chunkServers.end()) continue; // we have seen this chunkId; next time, we'll start time from // around here mLastChunkRebalanced = chunkId; // If this chunk is already being replicated or it is busy, skip if ((clli.ongoingReplications > 0) || (!CanReplicateChunkNow(chunkId, clli, extraReplicas))) continue; // if we got too many copies of this chunk, don't bother if (extraReplicas < 0) continue; // there are two ways nodes can be added: // 1. new nodes to an existing rack: in this case, we want to // migrate chunk within a rack // 2. new rack of nodes gets added: in this case, we want to // migrate chunks to the new rack as long as we don't get more // than one copy onto the same rack FindIntraRackRebalanceCandidates(candidates, nonloadedServers, clli); if (candidates.size() == 0) { ChunkServerPtr cand; FindInterRackRebalanceCandidate(cand, nonloadedServers, clli); if (cand) candidates.push_back(cand); } if (candidates.size() == 0) // no candidates :-( continue; // get the chunk version vector<MetaChunkInfo *> v; vector<MetaChunkInfo *>::iterator chunk; metatree.getalloc(clli.fid, v); chunk = fin
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -