📄 layoutmanager.cc
字号:
retiringServer->Retire(); return 0; } MapRetirer retirer(mChunkToServerMap, mChunkReplicationCandidates, retiringServer.get()); for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), retirer); return 0;}/* * Chunk-placement algorithm is rack-aware. At a high-level, the algorithm tries * to keep at most 1 copy of a chunk per rack: * - Sort the racks based on space * - From each rack, find one or more candidate servers * This approach will work when we are placing a chunk for the first time. * Whenever we need to copy/migrate a chunk, the intent is to keep the chunk * migration traffic within the same rack. Specifically: * 1. Need to re-replicate a chunk because a node is dead * -- here, we exclude the 2 racks on which the chunk is already placed and * try to find a new spot. By the time we get to finding a spot, we have * removed the info about where the chunk was. So, put it on a unique rack * 2. Need to re-replicate a chunk because a node is retiring * -- here, since we know which node is retiring and the rack it is on, we * can keep the new spot to be on the same rack * 3. Need to re-replicate a chunk because we are re-balancing amongst nodes * -- we move data between nodes in the same rack * -- if we a new rack got added, we move data between racks; here we need * to be a bit careful: in one iteration, we move data from one rack to * a newly added rack; in the next iteration, we could move another copy * of the chunk to the same rack...fill this in. * If we can't place the 3 copies on 3 different racks, we put the moved data * whereever we can find a spot. (At a later time, we'll need the fix code: if * a new rack becomes available, we move the 3rd copy to the new rack and get * the copies on different racks). * *//* * Return an ordered list of candidate racks */voidLayoutManager::FindCandidateRacks(vector<int> &result){ set<int> dummy; FindCandidateRacks(result, dummy);}voidLayoutManager::FindCandidateRacks(vector<int> &result, const set<int> &excludes){ set<int>::const_iterator iter; int32_t numRacksToChoose = mRacks.size() - excludes.size(); int32_t count = 0; int rackId, nodeId; set<int> chosenRacks; result.clear(); if (numRacksToChoose == 0) return; for (uint32_t i = 0; i < mRacks.size(); i++) { // paranoia: each candidate rack better have at least one node if (!excludes.empty()) { iter = excludes.find(mRacks[i].id()); if (iter != excludes.end()) continue; } if (mRacks[i].getServers().size() == 0) { // there are no nodes in this rack numRacksToChoose--; } } if (numRacksToChoose == 0) return; count = 0; // choose a rack proportional to the # of nodes that rack while (count < numRacksToChoose) { nodeId = rand() % mChunkServers.size(); rackId = mChunkServers[nodeId]->GetRack(); if (!excludes.empty()) { iter = excludes.find(rackId); if (iter != excludes.end()) // rack is in the exclude list continue; } iter = chosenRacks.find(rackId); if (iter != chosenRacks.end()) { // we have chosen this rack already continue; } chosenRacks.insert(rackId); result.push_back(rackId); count++; }}struct ServerSpace { uint32_t serverIdx; uint32_t loadEstimate; uint64_t availSpace; uint64_t usedSpace; // sort in decreasing order: Prefer the server with more free // space, or in the case of a tie, the one with less used space. // also, prefer servers that are lightly loaded bool operator < (const ServerSpace &other) const { if ((loadEstimate > CONCURRENT_WRITES_PER_NODE_WATERMARK) && (loadEstimate != other.loadEstimate)) { // prefer server that is "lightly" loaded return loadEstimate < other.loadEstimate; } if (availSpace != other.availSpace) return availSpace > other.availSpace; else return usedSpace < other.usedSpace; }};struct ServerSpaceUtil { uint32_t serverIdx; float utilization; // sort in increasing order of space utilization bool operator < (const ServerSpaceUtil &other) const { return utilization < other.utilization; }};voidLayoutManager::FindCandidateServers(vector<ChunkServerPtr> &result, const vector<ChunkServerPtr> &excludes, int rackId){ if (mChunkServers.size() < 1) return; if (rackId > 0) { vector<RackInfo>::iterator rackIter; rackIter = find_if(mRacks.begin(), mRacks.end(), RackMatcher(rackId)); if (rackIter != mRacks.end()) { FindCandidateServers(result, rackIter->getServers(), excludes, rackId); return; } } FindCandidateServers(result, mChunkServers, excludes, rackId);}static boolIsCandidateServer(ChunkServerPtr &c){ if ((c->GetAvailSpace() < ((uint64_t) CHUNKSIZE)) || (!c->IsResponsiveServer()) || (c->IsRetiring())) { // one of: no space, non-responsive, retiring...we leave // the server alone return false; } return true;}voidLayoutManager::FindCandidateServers(vector<ChunkServerPtr> &result, const vector<ChunkServerPtr> &sources, const vector<ChunkServerPtr> &excludes, int rackId){ if (sources.size() < 1) return; vector<ChunkServerPtr> candidates; vector<ChunkServerPtr>::size_type i; vector<ChunkServerPtr>::const_iterator iter; for (i = 0; i < sources.size(); i++) { ChunkServerPtr c = sources[i]; if ((rackId >= 0) && (c->GetRack() != rackId)) continue; if (!IsCandidateServer(c)) continue; if (excludes.size() > 0) { iter = find(excludes.begin(), excludes.end(), c); if (iter != excludes.end()) { continue; } } // XXX: temporary measure: take only under-utilized servers // we need to move a model where we give preference to // under-utilized servers if (c->GetSpaceUtilization() > MAX_SERVER_SPACE_UTIL_THRESHOLD) continue; candidates.push_back(c); } if (candidates.size() == 0) return; random_shuffle(candidates.begin(), candidates.end()); for (i = 0; i < candidates.size(); i++) { result.push_back(candidates[i]); }}#if 0voidLayoutManager::FindCandidateServers(vector<ChunkServerPtr> &result, const vector<ChunkServerPtr> &sources, const vector<ChunkServerPtr> &excludes, int rackId){ if (sources.size() < 1) return; vector<ServerSpace> ss; vector<ChunkServerPtr>::size_type i, j; vector<ChunkServerPtr>::const_iterator iter; ss.resize(sources.size()); for (i = 0, j = 0; i < sources.size(); i++) { ChunkServerPtr c = sources[i]; if ((rackId >= 0) && (c->GetRack() != rackId)) continue; if ((c->GetAvailSpace() < ((uint64_t) CHUNKSIZE)) || (!c->IsResponsiveServer()) || (c->IsRetiring())) { // one of: no space, non-responsive, retiring...we leave // the server alone continue; } if (excludes.size() > 0) { iter = find(excludes.begin(), excludes.end(), c); if (iter != excludes.end()) { continue; } } ss[j].serverIdx = i; ss[j].availSpace = c->GetAvailSpace(); ss[j].usedSpace = c->GetUsedSpace(); ss[j].loadEstimate = c->GetNumChunkWrites(); j++; } if (j == 0) return; ss.resize(j); sort(ss.begin(), ss.end()); result.reserve(ss.size()); for (i = 0; i < ss.size(); ++i) { result.push_back(sources[ss[i].serverIdx]); }}#endifvoidLayoutManager::SortServersByUtilization(vector<ChunkServerPtr> &servers){ vector<ServerSpaceUtil> ss; vector<ChunkServerPtr> temp; ss.resize(servers.size()); temp.resize(servers.size()); for (vector<ChunkServerPtr>::size_type i = 0; i < servers.size(); i++) { ss[i].serverIdx = i; ss[i].utilization = servers[i]->GetSpaceUtilization(); temp[i] = servers[i]; } sort(ss.begin(), ss.end()); for (vector<ChunkServerPtr>::size_type i = 0; i < servers.size(); i++) { servers[i] = temp[ss[i].serverIdx]; }}////// The algorithm for picking a set of servers to hold a chunk is: (1) pick/// the server with the most amount of free space, and (2) to break/// ties, pick the one with the least amount of used space. This/// policy has the effect of doing round-robin allocations. The/// allocated space is something that we track. Note: We rely on the/// chunk servers to tell us how much space is used up on the server./// Since servers can respond at different rates, doing allocations/// based on allocated space ensures equitable distribution;/// otherwise, if we were to do allocations based on the amount of/// used space, then a slow responding server will get pummelled with/// lots of chunks (i.e., used space will be updated on the meta/// server at a slow rate, causing the meta server to think that the/// chunk server has lot of space available).///intLayoutManager::AllocateChunk(MetaAllocate *r){ vector<ChunkServerPtr>::size_type i; vector<int> racks; if (r->numReplicas == 0) { // huh? allocate a chunk with 0 replicas??? return -EINVAL; } FindCandidateRacks(racks); if (racks.size() == 0) return -ENOSPC; r->servers.reserve(r->numReplicas); uint32_t numServersPerRack = r->numReplicas / racks.size(); if (r->numReplicas % racks.size()) numServersPerRack++; // take the server local to the machine on which the client is on // make that the master; this avoids a network transfer ChunkServerPtr localserver; vector <ChunkServerPtr>::iterator j; j = find_if(mChunkServers.begin(), mChunkServers.end(), MatchServerByHost(r->clientHost)); if ((j != mChunkServers.end()) && (IsCandidateServer(*j))) localserver = *j; if (localserver) r->servers.push_back(localserver); for (uint32_t idx = 0; idx < racks.size(); idx++) { vector<ChunkServerPtr> candidates, dummy; if (r->servers.size() >= (uint32_t) r->numReplicas) break; FindCandidateServers(candidates, dummy, racks[idx]); if (candidates.size() == 0) continue; // take as many as we can from this rack uint32_t n = 0; if (localserver && (racks[idx] == localserver->GetRack())) n = 1; for (uint32_t i = 0; i < candidates.size() && n < numServersPerRack; i++) { if (r->servers.size() >= (uint32_t) r->numReplicas) break; if (candidates[i] != localserver) { r->servers.push_back(candidates[i]); n++; } } } if (r->servers.size() == 0) return -ENOSPC; LeaseInfo l(WRITE_LEASE, mLeaseId, r->servers[0]); mLeaseId++; r->master = r->servers[0]; r->servers[0]->AllocateChunk(r, l.leaseId); for (i = 1; i < r->servers.size(); i++) { r->servers[i]->AllocateChunk(r, -1); } ChunkPlacementInfo v; v.fid = r->fid; v.chunkServers = r->servers; v.chunkLeases.push_back(l); mChunkToServerMap[r->chunkId] = v; if (r->servers.size() < (uint32_t) r->numReplicas) ChangeChunkReplication(r->chunkId); return 0;}intLayoutManager::GetChunkWriteLease(MetaAllocate *r, bool &isNewLease){ ChunkPlacementInfo v; vector<ChunkServerPtr>::size_type i; vector<LeaseInfo>::iterator l; // XXX: This is a little too conservative. We should // check if any server has told us about a lease for this // file; if no one we know about has a lease, then deny // issuing the lease during recovery---because there could // be some server who has a lease and hasn't told us yet. if (InRecovery()) { KFS_LOG_INFO("GetChunkWriteLease: InRecovery() => EBUSY"); return -EBUSY; } // if no allocation has been done, can't grab any lease CSMapIter iter = mChunkToServerMap.find(r->chunkId); if (iter == mChunkToServerMap.end()) return -EINVAL; v = iter->second; if (v.chunkServers.size() == 0) // all the associated servers are dead...so, fail // the allocation request. return -KFS::EDATAUNAVAIL; if (v.ongoingReplications > 0) { // don't issue a write lease to a chunk that is being // re-replicated; this prevents replicas from diverging KFS_LOG_VA_INFO("Write lease: %lld is being re-replicated => EBUSY", r->chunkId); return -EBUSY; } l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(), ptr_fun(LeaseInfo::IsValidWriteLease)); if (l != v.chunkLeases.end()) { LeaseInfo lease = *l; time_t now = time(0); string s = timeToStr(now);#ifdef DEBUG assert(now <= lease.expires); KFS_LOG_DEBUG("write lease exists...no version bump");#endif // valid write lease; so, tell the client where to go KFS_LOG_VA_INFO("Valid write lease exists for %lld (expires = %s)", r->chunkId, s.c_str()); isNewLease = false; r->servers = v.chunkServers; r->master = lease.chunkServer; return 0; } // there is no valid write lease; to issue a new write lease, we // need to do a version # bump. do that only if we haven't yet // handed out valid read leases l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(), ptr_fun(LeaseInfo::IsValidLease)); if (l != v.chunkLeases.end()) { KFS_LOG_DEBUG("GetChunkWriteLease: read lease => EBUSY"); return -EBUSY; } // no one has a valid lease LeaseCleanup(r->chunkId, v);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -