📄 layoutmanager.cc
字号:
i = remove_if(c.chunkLeases.begin(), c.chunkLeases.end(), LeaseExpired(now)); for_each(i, c.chunkLeases.end(), DecChunkWriteCount(c.fid, chunkId)); // trim the list c.chunkLeases.erase(i, c.chunkLeases.end()); cmap[p.first] = c; }};voidLayoutManager::LeaseCleanup(){ time_t now = time(0); for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), LeaseExpirer(mChunkToServerMap, now));}// Cleanup the leases for a particular chunkvoidLayoutManager::LeaseCleanup(chunkId_t chunkId, ChunkPlacementInfo &v){ for_each(v.chunkLeases.begin(), v.chunkLeases.end(), DecChunkWriteCount(v.fid, chunkId)); v.chunkLeases.clear();}class RetiringServerPred {public: RetiringServerPred() { } bool operator()(const ChunkServerPtr &c) { return c->IsRetiring(); }};class ReplicationDoneNotifier { chunkId_t cid;public: ReplicationDoneNotifier(chunkId_t c) : cid(c) { } void operator()(ChunkServerPtr &s) { s->EvacuateChunkDone(cid); }};class RackSetter { set<int> &racks; bool excludeRetiringServers;public: RackSetter(set<int> &r, bool excludeRetiring = false) : racks(r), excludeRetiringServers(excludeRetiring) { } void operator()(const ChunkServerPtr &s) { if (excludeRetiringServers && s->IsRetiring()) return; racks.insert(s->GetRack()); }};intLayoutManager::ReplicateChunk(chunkId_t chunkId, const ChunkPlacementInfo &clli, uint32_t extraReplicas){ vector<int> racks; set<int> excludeRacks; // find a place vector<ChunkServerPtr> candidates; // two steps here: first, exclude the racks on which chunks are already // placed; if we can't find a unique rack, then put it wherever // for accounting purposes, ignore the rack(s) which contain a retiring // chunkserver; we'd like to move the block within that rack if // possible. for_each(clli.chunkServers.begin(), clli.chunkServers.end(), RackSetter(excludeRacks, true)); FindCandidateRacks(racks, excludeRacks); if (racks.size() == 0) { // no new rack is available to put the chunk // take what we got FindCandidateRacks(racks); if (racks.size() == 0) // no rack is available return 0; } uint32_t numServersPerRack = extraReplicas / racks.size(); if (extraReplicas % racks.size()) numServersPerRack++; for (uint32_t idx = 0; idx < racks.size(); idx++) { if (candidates.size() >= extraReplicas) break; vector<ChunkServerPtr> servers; // find candidates other than those that are already hosting the // chunk FindCandidateServers(servers, clli.chunkServers, racks[idx]); // take as many as we can from this rack for (uint32_t i = 0; i < servers.size() && i < numServersPerRack; i++) { if (candidates.size() >= extraReplicas) break; candidates.push_back(servers[i]); } } if (candidates.size() == 0) return 0; return ReplicateChunk(chunkId, clli, extraReplicas, candidates);}intLayoutManager::ReplicateChunk(chunkId_t chunkId, const ChunkPlacementInfo &clli, uint32_t extraReplicas, const vector<ChunkServerPtr> &candidates){ ChunkServerPtr c, dataServer; vector<MetaChunkInfo *> v; vector<MetaChunkInfo *>::iterator chunk; fid_t fid = clli.fid; int numDone = 0; /* metatree.getalloc(fid, v); chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(chunkId)); if (chunk == v.end()) { panic("missing chunk", true); } MetaChunkInfo *mci = *chunk; */ for (uint32_t i = 0; i < candidates.size() && i < extraReplicas; i++) { vector<ChunkServerPtr>::const_iterator iter; c = candidates[i]; // Don't send too many replications to a server if (c->GetNumChunkReplications() > MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE) continue;#ifdef DEBUG // verify that we got good candidates iter = find(clli.chunkServers.begin(), clli.chunkServers.end(), c); if (iter != clli.chunkServers.end()) { assert(!"Not possible..."); }#endif // prefer a server that is being retired to the other nodes as // the source of the chunk replication iter = find_if(clli.chunkServers.begin(), clli.chunkServers.end(), RetiringServerPred()); const char *reason; if (iter != clli.chunkServers.end()) { reason = " evacuating chunk "; if (((*iter)->GetReplicationReadLoad() < MAX_CONCURRENT_READ_REPLICATIONS_PER_NODE) && (*iter)->IsResponsiveServer()) dataServer = *iter; } else { reason = " re-replication "; } // if we can't find a retiring server, pick a server that has read b/w available for (uint32_t j = 0; (!dataServer) && (j < clli.chunkServers.size()); j++) { if ((clli.chunkServers[j]->GetReplicationReadLoad() >= MAX_CONCURRENT_READ_REPLICATIONS_PER_NODE) || (!(clli.chunkServers[j]->IsResponsiveServer()))) continue; dataServer = clli.chunkServers[j]; } if (dataServer) { ServerLocation srcLocation = dataServer->GetServerLocation(); ServerLocation dstLocation = c->GetServerLocation(); KFS_LOG_VA_INFO("Starting re-replication for chunk %lld (from: %s to %s) reason = %s", chunkId, srcLocation.ToString().c_str(), dstLocation.ToString().c_str(), reason); dataServer->UpdateReplicationReadLoad(1); /* c->ReplicateChunk(fid, chunkId, mci->chunkVersion, dataServer->GetServerLocation()); */ // have the chunkserver get the version c->ReplicateChunk(fid, chunkId, -1, dataServer->GetServerLocation()); numDone++; } dataServer.reset(); } if (numDone > 0) { mTotalReplicationStats->Update(1); mOngoingReplicationStats->Update(numDone); } return numDone;}boolLayoutManager::CanReplicateChunkNow(chunkId_t chunkId, ChunkPlacementInfo &c, int &extraReplicas){ vector<LeaseInfo>::iterator l; // Don't replicate chunks for which a write lease // has been issued. l = find_if(c.chunkLeases.begin(), c.chunkLeases.end(), ptr_fun(LeaseInfo::IsValidWriteLease)); if (l != c.chunkLeases.end()) return false; extraReplicas = 0; // Can't re-replicate a chunk if we don't have a copy! so, // take out this chunk from the candidate set. if (c.chunkServers.size() == 0) return true; MetaFattr *fa = metatree.getFattr(c.fid); if (fa == NULL) // No file attr. So, take out this chunk // from the candidate set. return true; // check if the chunk still exists vector<MetaChunkInfo *> v; vector<MetaChunkInfo *>::iterator chunk; metatree.getalloc(c.fid, v); chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(chunkId)); if (chunk == v.end()) { // This chunk doesn't exist in this file anymore. // So, take out this chunk from the candidate set. return true; } // if any of the chunkservers are retiring, we need to make copies // so, first determine how many copies we need because one of the // servers hosting the chunk is going down int numRetiringServers = count_if(c.chunkServers.begin(), c.chunkServers.end(), RetiringServerPred()); // now, determine if we have sufficient copies if (numRetiringServers > 0) { if (c.chunkServers.size() - numRetiringServers < (uint32_t) fa->numReplicas) { // we need to make this many copies: # of servers that are // retiring plus the # this chunk is under-replicated extraReplicas = numRetiringServers + (fa->numReplicas - c.chunkServers.size()); } else { // we got sufficient copies even after accounting for // the retiring server. so, take out this chunk from // replication candidates set. extraReplicas = 0; } return true; } // May need to re-replicate this chunk: // - extraReplicas > 0 means make extra copies; // - extraReplicas == 0, take out this chunkid from the candidate set // - extraReplicas < 0, means we got too many copies; delete some extraReplicas = fa->numReplicas - c.chunkServers.size(); if (extraReplicas < 0) { // // We need to delete additional copies; however, if // there is a valid (read) lease issued on the chunk, // then leave the chunk alone for now; we'll look at // deleting it when the lease has expired. This is for // safety: if a client was reading from the copy of the // chunk that we are trying to delete, the client will // see the deletion and will have to failover; avoid // unnecessary failovers // l = find_if(c.chunkLeases.begin(), c.chunkLeases.end(), ptr_fun(LeaseInfo::IsValidLease)); if (l != c.chunkLeases.end()) return false; } return true;}class EvacuateChunkChecker { CRCandidateSet &candidates; CSMap &chunkToServerMap;public: EvacuateChunkChecker(CRCandidateSet &c, CSMap &m) : candidates(c), chunkToServerMap(m) {} void operator()(ChunkServerPtr c) { if (!c->IsRetiring()) return; CRCandidateSet leftover = c->GetEvacuatingChunks(); for (CRCandidateSetIter citer = leftover.begin(); citer != leftover.end(); ++citer) { chunkId_t chunkId = *citer; CSMapIter iter = chunkToServerMap.find(chunkId); if (iter == chunkToServerMap.end()) { c->EvacuateChunkDone(chunkId); KFS_LOG_VA_INFO("%s has bogus block %ld", c->GetServerLocation().ToString().c_str(), chunkId); } else { // XXX // if we don't think this chunk is on this // server, then we should update the view... candidates.insert(chunkId); KFS_LOG_VA_INFO("%s has block %ld that wasn't in replication candidates", c->GetServerLocation().ToString().c_str(), chunkId); } } }};voidLayoutManager::CheckHibernatingServersStatus(){ time_t now = time(0); vector <HibernatingServerInfo_t>::iterator iter = mHibernatingServers.begin(); vector<ChunkServerPtr>::iterator i; while (iter != mHibernatingServers.end()) { i = find_if(mChunkServers.begin(), mChunkServers.end(), MatchingServer(iter->location)); if ((i == mChunkServers.end()) && (now < iter->sleepEndTime)) { // within the time window where the server is sleeping // so, move on iter++; continue; } if (i != mChunkServers.end()) { KFS_LOG_VA_INFO("Hibernated server (%s) is back as promised...", iter->location.ToString().c_str()); } else { // server hasn't come back as promised...so, check // re-replication for the blocks that were on that node KFS_LOG_VA_INFO("Hibernated server (%s) is not back as promised...", iter->location.ToString().c_str()); mChunkReplicationCandidates.insert(iter->blocks.begin(), iter->blocks.end()); } mHibernatingServers.erase(iter); iter = mHibernatingServers.begin(); }}voidLayoutManager::ChunkReplicationChecker(){ if (InRecovery()) { return; } CheckHibernatingServersStatus(); // There is a set of chunks that are affected: their server went down // or there is a change in their degree of replication. in either // case, walk this set of chunkid's and work on their replication amount. chunkId_t chunkId; CRCandidateSet delset; int extraReplicas, numOngoing; uint32_t numIterations = 0; struct timeval start; gettimeofday(&start, NULL); for (CRCandidateSetIter citer = mChunkReplicationCandidates.begin(); citer != mChunkReplicationCandidates.end(); ++citer) { chunkId = *citer; struct timeval now; gettimeofday(&now, NULL); if (ComputeTimeDiff(start, now) > 5.0) // if we have spent more than 5 seconds here, stop // serve other requests break; CSMapIter iter = mChunkToServerMap.find(chunkId); if (iter == mChunkToServerMap.end()) { delset.insert(chunkId); continue; } if (iter->second.ongoingReplications > 0) // this chunk is being re-replicated; we'll check later continue; if (!CanReplicateChunkNow(iter->first, iter->second, extraReplicas)) continue; if (extraReplicas > 0) { numOngoing = ReplicateChunk(iter->first, iter->second, extraReplicas); iter->second.ongoingReplications += numOngoing; if (numOngoing > 0) { mNumOngoingReplications++; mLastChunkReplicated = chunkId; numIterations++; } } else if (extraReplicas == 0) { delset.insert(chunkId); } else { DeleteAddlChunkReplicas(iter->first, iter->second, -extraReplicas); delset.insert(chunkId); } if (numIterations > mChunkServers.size() * MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE) // throttle...we are handing out break; } if (delset.size() > 0) { for (CRCandidateSetIter citer = delset.begin(); citer != delset.end(); ++citer) { // Notify the retiring servers of any of their chunks have // been evacuated---such as, if there were too many copies of those // chunks, we are done evacuating them chunkId = *citer; CSMapIter iter = mChunkToServerMap.find(chunkId); if (iter != mChunkToServerMap.end()) for_each(iter->second.chunkServers.begin(), iter->second.chunkServers.end(), ReplicationDoneNotifier(chunkId)); mChunkReplicationCandidates.erase(*citer); } } if (mChunkReplicationCandidates.size() == 0) { // if there are any retiring servers, we need to make sure that // the servers don't think there is a block to be replicated // if there is any such, let us get them into the set of // candidates...need to know why this happens for_each(mChunkServers.begin(), mChunkServers.end(), EvacuateChunkChecker(mChunkReplicationCandidates, mChunkToServerMap)); } RebalanceServers(); mReplicationTodoStats->Set(mChunkReplicationCandidates.size());}voidLayoutManager::FindReplicationWorkForServer(ChunkServerPtr &server, chunkId_t chunkReplicated){ chunkId_t chunkId;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -