⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 layoutmanager.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 5 页
字号:
	int extraReplicas = 0, numOngoing;	vector<ChunkServerPtr> c;	if (server->IsRetiring())		return;	c.push_back(server);	// try to start where we were done with this server	CRCandidateSetIter citer = mChunkReplicationCandidates.find(chunkReplicated + 1);	if (citer == mChunkReplicationCandidates.end()) {		// try to start where we left off last time; if that chunk has		// disappeared, find something "closeby"		citer = mChunkReplicationCandidates.upper_bound(mLastChunkReplicated);	}	if (citer == mChunkReplicationCandidates.end()) {		mLastChunkReplicated = 1;		citer = mChunkReplicationCandidates.begin();	}	struct timeval start, now;	gettimeofday(&start, NULL);	for (; citer != mChunkReplicationCandidates.end(); ++citer) {		gettimeofday(&now, NULL);		if (ComputeTimeDiff(start, now) > 0.2)			// if we have spent more than 200 m-seconds here, stop			// serve other requests			break;		chunkId = *citer;        	CSMapIter iter = mChunkToServerMap.find(chunkId);        	if (iter == mChunkToServerMap.end()) {			continue;		}		if (iter->second.ongoingReplications > 0)			continue;		// if the chunk is already hosted on this server, the chunk isn't a candidate for		// work to be sent to this server.		if (IsChunkHostedOnServer(iter->second.chunkServers, server) ||			(!CanReplicateChunkNow(iter->first, iter->second, extraReplicas)))			continue;		if (extraReplicas > 0) {			if (mRacks.size() > 1) {				// when there is more than one rack, since we				// are re-replicating a chunk, we don't want to put two				// copies of a chunk on the same rack.  if one				// of the nodes is retiring, we don't want to				// count the node in this rack set---we want to				// put a block on to the same rack.				set<int> excludeRacks;				for_each(iter->second.chunkServers.begin(), iter->second.chunkServers.end(),					RackSetter(excludeRacks, true));				if (excludeRacks.find(server->GetRack()) != excludeRacks.end())					continue;			}			numOngoing = ReplicateChunk(iter->first, iter->second, 1, c);			iter->second.ongoingReplications += numOngoing;			if (numOngoing > 0) {				mNumOngoingReplications++;				mLastChunkReplicated = chunkId;			}		}		if (server->GetNumChunkReplications() > MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE)			break;	}	// if there is any room left to do more work...	ExecuteRebalancePlan(server);}voidLayoutManager::ChunkReplicationDone(MetaChunkReplicate *req){	vector<ChunkServerPtr>::iterator source;	mOngoingReplicationStats->Update(-1);	// Book-keeping....	CSMapIter iter = mChunkToServerMap.find(req->chunkId);	if (iter != mChunkToServerMap.end()) {		iter->second.ongoingReplications--;		if (iter->second.ongoingReplications == 0)			// if all the replications for this chunk are done,			// then update the global counter.			mNumOngoingReplications--;		if (iter->second.ongoingReplications < 0)			// sanity...			iter->second.ongoingReplications = 0;	}	req->server->ReplicateChunkDone(req->chunkId);	source = find_if(mChunkServers.begin(), mChunkServers.end(),			MatchingServer(req->srcLocation));	if (source !=  mChunkServers.end()) {		(*source)->UpdateReplicationReadLoad(-1);	}	if (req->status != 0) {		// Replication failed...we will try again later		KFS_LOG_VA_INFO("%s: re-replication for chunk %lld failed, code = %d",			req->server->GetServerLocation().ToString().c_str(),			req->chunkId, req->status);		mFailedReplicationStats->Update(1);		return;	}	// replication succeeded: book-keeping	// if any of the hosting servers were being "retired", notify them that	// re-replication of any chunks hosted on them is finished	if (iter != mChunkToServerMap.end()) {		for_each(iter->second.chunkServers.begin(), iter->second.chunkServers.end(),			ReplicationDoneNotifier(req->chunkId));	}	// validate that the server got the latest copy of the chunk	vector<MetaChunkInfo *> v;	vector<MetaChunkInfo *>::iterator chunk;	metatree.getalloc(req->fid, v);	chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(req->chunkId));	if (chunk == v.end()) {		// Chunk disappeared -> stale; this chunk will get nuked		KFS_LOG_VA_INFO("Re-replicate: chunk (%lld) disappeared => so, stale",				req->chunkId);		mFailedReplicationStats->Update(1);		req->server->NotifyStaleChunk(req->chunkId);		return;	}	MetaChunkInfo *mci = *chunk;	if (mci->chunkVersion != req->chunkVersion) {		// Version that we replicated has changed...so, stale		KFS_LOG_VA_INFO("Re-replicate: chunk (%lld) version changed (was=%lld, now=%lld) => so, stale",				req->chunkId, req->chunkVersion, mci->chunkVersion);		mFailedReplicationStats->Update(1);		req->server->NotifyStaleChunk(req->chunkId);		return;	}	// Yaeee...all good...	KFS_LOG_VA_DEBUG("%s reports that re-replication for chunk %lld is all done",			req->server->GetServerLocation().ToString().c_str(), req->chunkId);	UpdateChunkToServerMapping(req->chunkId, req->server.get());	// since this server is now free, send more work its way...	if (req->server->GetNumChunkReplications() <= 1) {		FindReplicationWorkForServer(req->server, req->chunkId);	}}//// To delete additional copies of a chunk, find the servers that have the least// amount of space and delete the chunk from there.  In addition, also pay// attention to rack-awareness: if two copies are on the same rack, then we pick// the server that is the most loaded and delete it there//voidLayoutManager::DeleteAddlChunkReplicas(chunkId_t chunkId, ChunkPlacementInfo &clli,				uint32_t extraReplicas){	vector<ChunkServerPtr> servers = clli.chunkServers, copiesToDiscard;	uint32_t numReplicas = servers.size() - extraReplicas;	set<int> chosenRacks;	// if any of the copies are on nodes that are retiring, leave the copies	// alone; we will reclaim space later if needed        int numRetiringServers = count_if(servers.begin(), servers.end(),					RetiringServerPred());	if (numRetiringServers > 0)		return;	// We get servers sorted by increasing amount of space utilization; so the candidates	// we want to delete are at the end	SortServersByUtilization(servers);	for_each(servers.begin(), servers.end(), RackSetter(chosenRacks));	if (chosenRacks.size() == numReplicas) {		// we need to keep as many copies as racks.  so, find the extra		// copies on a given rack and delete them		clli.chunkServers.clear();		chosenRacks.clear();		for (uint32_t i = 0; i < servers.size(); i++) {			if (chosenRacks.find(servers[i]->GetRack()) ==				chosenRacks.end()) {				chosenRacks.insert(servers[i]->GetRack());				clli.chunkServers.push_back(servers[i]);			} else {				// second copy on the same rack				copiesToDiscard.push_back(servers[i]);			}		}	} else {		clli.chunkServers = servers;		// Get rid of the extra stuff from the end		clli.chunkServers.resize(numReplicas);		// The first N are what we want to keep; the rest should go.		copiesToDiscard.insert(copiesToDiscard.end(), servers.begin() + numReplicas, servers.end());	}	mChunkToServerMap[chunkId] = clli;	ostringstream msg;	msg << "Chunk " << chunkId << " lives on: \n";	for (uint32_t i = 0; i < clli.chunkServers.size(); i++) {		msg << clli.chunkServers[i]->GetServerLocation().ToString() << ' '			<< clli.chunkServers[i]->GetRack() << "; ";	}	msg << "\n";	msg << "Discarding chunk on: ";	for (uint32_t i = 0; i < copiesToDiscard.size(); i++) {		msg << copiesToDiscard[i]->GetServerLocation().ToString() << ' '			<< copiesToDiscard[i]->GetRack() << " ";	}	msg << "\n";	KFS_LOG_VA_INFO("%s", msg.str().c_str());	for_each(copiesToDiscard.begin(), copiesToDiscard.end(), ChunkDeletor(chunkId));}voidLayoutManager::ChangeChunkReplication(chunkId_t chunkId){	mChunkReplicationCandidates.insert(chunkId);}//// Check if the server is part of the set of the servers hosting the chunk//boolLayoutManager::IsChunkHostedOnServer(const vector<ChunkServerPtr> &hosters,					const ChunkServerPtr &server){	vector<ChunkServerPtr>::const_iterator iter;	iter = find(hosters.begin(), hosters.end(), server);	return iter != hosters.end();}class LoadedServerPred {public:	LoadedServerPred() { }	bool operator()(const ChunkServerPtr &s) const {		return s->GetSpaceUtilization() > MAX_SERVER_SPACE_UTIL_THRESHOLD;	}};//// We are trying to move a chunk between two servers on the same rack.  For a// given chunk, we try to find as many "migration pairs" (source/destination// nodes) within the respective racks.//voidLayoutManager::FindIntraRackRebalanceCandidates(vector<ChunkServerPtr> &candidates,			const vector<ChunkServerPtr> &nonloadedServers,			const ChunkPlacementInfo &clli){	vector<ChunkServerPtr>::const_iterator iter;	for (uint32_t i = 0; i < clli.chunkServers.size(); i++) {		vector<ChunkServerPtr> servers;		if (clli.chunkServers[i]->GetSpaceUtilization() <			MAX_SERVER_SPACE_UTIL_THRESHOLD) {			continue;		}		//we have a loaded server; find another non-loaded		//server within the same rack (case 1 from above)		FindCandidateServers(servers, nonloadedServers, clli.chunkServers,					clli.chunkServers[i]->GetRack());		if (servers.size() == 0) {			// nothing available within the rack to do the move			continue;		}		// make sure that we are not putting 2 copies of a chunk on the		// same server		for (uint32_t j = 0; j < servers.size(); j++) {			iter = find(candidates.begin(), candidates.end(), servers[j]);			if (iter == candidates.end()) {				candidates.push_back(servers[j]);				break;			}		}	}}//// For rebalancing, for a chunk, we could not find a candidate server on the same rack as a// loaded server.  Hence, we are trying to move the chunk between two servers on two// different racks.  So, find a migration pair: source/destination on two different racks.//voidLayoutManager::FindInterRackRebalanceCandidate(ChunkServerPtr &candidate,			const vector<ChunkServerPtr> &nonloadedServers,			const ChunkPlacementInfo &clli){	vector<ChunkServerPtr> servers;	FindCandidateServers(servers, nonloadedServers, clli.chunkServers);	if (servers.size() == 0) {		return;	}	// XXX: in emulation mode, we have 0 racks due to compile issues	if (mRacks.size() <= 1)		return;	// if we had only one rack then the intra-rack move should have found a	// candidate.	assert(mRacks.size() > 1);	if (mRacks.size() <= 1)		return;	// For the candidate we pick, we want to enforce the property that all	// the copies of the chunks are on different racks.	set<int> excludeRacks;        for_each(clli.chunkServers.begin(), clli.chunkServers.end(),	                RackSetter(excludeRacks));	for (uint32_t i = 0; i < servers.size(); i++) {		set<int>::iterator iter = excludeRacks.find(servers[i]->GetRack());		if (iter == excludeRacks.end()) {			candidate = servers[i];			return;		}	}}//// Periodically, if we find that some chunkservers have LOT (> 80% free) of space// and if others are loaded (i.e., < 30% free space), move chunks around.  This// helps with keeping better disk space utilization (and maybe load).//intLayoutManager::RebalanceServers(){	if ((InRecovery()) || (mChunkServers.size() == 0)) {		return 0;	}	// if we are doing rebalancing based on a plan, execute as	// much of the plan as there is room.	ExecuteRebalancePlan();	for_each(mRacks.begin(), mRacks.end(), mem_fun_ref(&RackInfo::computeSpace));	if (!mIsRebalancingEnabled)		return 0;	vector<ChunkServerPtr> servers = mChunkServers;	vector<ChunkServerPtr> loadedServers, nonloadedServers;	int extraReplicas, numBlocksMoved = 0;	for (uint32_t i = 0; i < servers.size(); i++) {		if (servers[i]->IsRetiring())			continue;		if (servers[i]->GetSpaceUtilization() < MIN_SERVER_SPACE_UTIL_THRESHOLD)			nonloadedServers.push_back(servers[i]);		else if (servers[i]->GetSpaceUtilization() > MAX_SERVER_SPACE_UTIL_THRESHOLD)			loadedServers.push_back(servers[i]);	}	if ((nonloadedServers.size() == 0) || (loadedServers.size() == 0))		return 0;	bool allbusy = false;	// try to start where we left off last time; if that chunk has	// disappeared, find something "closeby"	CSMapIter iter = mChunkToServerMap.find(mLastChunkRebalanced);	if (iter == mChunkToServerMap.end())		iter = mChunkToServerMap.upper_bound(mLastChunkRebalanced);	for (; iter != mChunkToServerMap.end(); iter++) {		allbusy = true;		for (uint32_t i = 0; i < nonloadedServers.size(); i++) {			if (nonloadedServers[i]->GetNumChunkReplications() <				MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE) {				allbusy = false;				break;			}		}		if (allbusy)			break;		if (numBlocksMoved > 200)			break;		chunkId_t chunkId = iter->first;		ChunkPlacementInfo &clli = iter->second;		vector<ChunkServerPtr> candidates;		// chunk could be moved around if it is hosted on a loaded server		vector<ChunkServerPtr>::const_iterator csp;		csp = find_if(clli.chunkServers.begin(), clli.chunkServers.end(),				LoadedServerPred());		if (csp == clli.chunkServers.end())			continue;		// we have seen this chunkId; next time, we'll start time from		// around here		mLastChunkRebalanced = chunkId;		// If this chunk is already being replicated or it is busy, skip		if ((clli.ongoingReplications > 0) ||			(!CanReplicateChunkNow(chunkId, clli, extraReplicas)))				continue;		// if we got too many copies of this chunk, don't bother		if (extraReplicas < 0)			continue;		// there are two ways nodes can be added:		//  1. new nodes to an existing rack: in this case, we want to		//  migrate chunk within a rack		//  2. new rack of nodes gets added: in this case, we want to		//  migrate chunks to the new rack as long as we don't get more		//  than one copy onto the same rack		FindIntraRackRebalanceCandidates(candidates, nonloadedServers, clli);		if (candidates.size() == 0) {			ChunkServerPtr cand;			FindInterRackRebalanceCandidate(cand, nonloadedServers, clli);			if (cand)				candidates.push_back(cand);		}		if (candidates.size() == 0)			// no candidates :-(			continue;		// get the chunk version		vector<MetaChunkInfo *> v;		vector<MetaChunkInfo *>::iterator chunk;		metatree.getalloc(clli.fid, v);		chunk = fin

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -