⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 layoutmanager.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 5 页
字号:
		i = remove_if(c.chunkLeases.begin(), c.chunkLeases.end(),			LeaseExpired(now));		for_each(i, c.chunkLeases.end(), DecChunkWriteCount(c.fid, chunkId));		// trim the list		c.chunkLeases.erase(i, c.chunkLeases.end());		cmap[p.first] = c;	}};voidLayoutManager::LeaseCleanup(){	time_t now = time(0);	for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(),		LeaseExpirer(mChunkToServerMap, now));}// Cleanup the leases for a particular chunkvoidLayoutManager::LeaseCleanup(chunkId_t chunkId, ChunkPlacementInfo &v){	for_each(v.chunkLeases.begin(), v.chunkLeases.end(),		DecChunkWriteCount(v.fid, chunkId));	v.chunkLeases.clear();}class RetiringServerPred {public:	RetiringServerPred() { }	bool operator()(const ChunkServerPtr &c) {		return c->IsRetiring();	}};class ReplicationDoneNotifier {	chunkId_t cid;public:	ReplicationDoneNotifier(chunkId_t c) : cid(c) { }	void operator()(ChunkServerPtr &s) {		s->EvacuateChunkDone(cid);	}};class RackSetter {	set<int> &racks;	bool excludeRetiringServers;public:	RackSetter(set<int> &r, bool excludeRetiring = false) :		racks(r), excludeRetiringServers(excludeRetiring) { }	void operator()(const ChunkServerPtr &s) {		if (excludeRetiringServers && s->IsRetiring())			return;		racks.insert(s->GetRack());	}};intLayoutManager::ReplicateChunk(chunkId_t chunkId, const ChunkPlacementInfo &clli,				uint32_t extraReplicas){	vector<int> racks;	set<int> excludeRacks;	// find a place	vector<ChunkServerPtr> candidates;	// two steps here: first, exclude the racks on which chunks are already	// placed; if we can't find a unique rack, then put it wherever	// for accounting purposes, ignore the rack(s) which contain a retiring	// chunkserver; we'd like to move the block within that rack if	// possible.	for_each(clli.chunkServers.begin(), clli.chunkServers.end(),		RackSetter(excludeRacks, true));	FindCandidateRacks(racks, excludeRacks);	if (racks.size() == 0) {		// no new rack is available to put the chunk		// take what we got		FindCandidateRacks(racks);		if (racks.size() == 0)			// no rack is available			return 0;	}	uint32_t numServersPerRack = extraReplicas / racks.size();	if (extraReplicas % racks.size())		numServersPerRack++;	for (uint32_t idx = 0; idx < racks.size(); idx++) {		if (candidates.size() >= extraReplicas)			break;		vector<ChunkServerPtr> servers;		// find candidates other than those that are already hosting the		// chunk		FindCandidateServers(servers, clli.chunkServers, racks[idx]);		// take as many as we can from this rack		for (uint32_t i = 0; i < servers.size() && i < numServersPerRack; i++) {			if (candidates.size() >= extraReplicas)				break;			candidates.push_back(servers[i]);		}	}	if (candidates.size() == 0)		return 0;	return ReplicateChunk(chunkId, clli, extraReplicas, candidates);}intLayoutManager::ReplicateChunk(chunkId_t chunkId, const ChunkPlacementInfo &clli,				uint32_t extraReplicas, const				vector<ChunkServerPtr> &candidates){	ChunkServerPtr c, dataServer;	vector<MetaChunkInfo *> v;	vector<MetaChunkInfo *>::iterator chunk;	fid_t fid = clli.fid;	int numDone = 0;	/*	metatree.getalloc(fid, v);	chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(chunkId));	if (chunk == v.end()) {		panic("missing chunk", true);	}	MetaChunkInfo *mci = *chunk;	*/	for (uint32_t i = 0; i < candidates.size() && i < extraReplicas; i++) {		vector<ChunkServerPtr>::const_iterator iter;		c = candidates[i];		// Don't send too many replications to a server		if (c->GetNumChunkReplications() > MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE)			continue;#ifdef DEBUG		// verify that we got good candidates		iter = find(clli.chunkServers.begin(), clli.chunkServers.end(), c);		if (iter != clli.chunkServers.end()) {			assert(!"Not possible...");		}#endif		// prefer a server that is being retired to the other nodes as		// the source of the chunk replication		iter = find_if(clli.chunkServers.begin(), clli.chunkServers.end(),				RetiringServerPred());		const char *reason;		if (iter != clli.chunkServers.end()) {			reason = " evacuating chunk ";			if (((*iter)->GetReplicationReadLoad() <				MAX_CONCURRENT_READ_REPLICATIONS_PER_NODE) &&				(*iter)->IsResponsiveServer())				dataServer = *iter;		} else {			reason = " re-replication ";		}		// if we can't find a retiring server, pick a server that has read b/w available		for (uint32_t j = 0; (!dataServer) &&				(j < clli.chunkServers.size()); j++) {			if ((clli.chunkServers[j]->GetReplicationReadLoad() >= 				MAX_CONCURRENT_READ_REPLICATIONS_PER_NODE) ||				(!(clli.chunkServers[j]->IsResponsiveServer())))				continue;			dataServer = clli.chunkServers[j];		}		if (dataServer) {			ServerLocation srcLocation = dataServer->GetServerLocation();			ServerLocation dstLocation = c->GetServerLocation();			KFS_LOG_VA_INFO("Starting re-replication for chunk %lld (from: %s to %s) reason = %s",					chunkId,					srcLocation.ToString().c_str(),					dstLocation.ToString().c_str(), reason);			dataServer->UpdateReplicationReadLoad(1);			/*			c->ReplicateChunk(fid, chunkId, mci->chunkVersion,				dataServer->GetServerLocation());			*/			// have the chunkserver get the version			c->ReplicateChunk(fid, chunkId, -1,				dataServer->GetServerLocation());			numDone++;		}		dataServer.reset();	}	if (numDone > 0) {		mTotalReplicationStats->Update(1);		mOngoingReplicationStats->Update(numDone);	}	return numDone;}boolLayoutManager::CanReplicateChunkNow(chunkId_t chunkId,				ChunkPlacementInfo &c,				int &extraReplicas){	vector<LeaseInfo>::iterator l;	// Don't replicate chunks for which a write lease	// has been issued.	l = find_if(c.chunkLeases.begin(), c.chunkLeases.end(),		ptr_fun(LeaseInfo::IsValidWriteLease));	if (l != c.chunkLeases.end())		return false;	extraReplicas = 0;	// Can't re-replicate a chunk if we don't have a copy! so,	// take out this chunk from the candidate set.	if (c.chunkServers.size() == 0)		return true;	MetaFattr *fa = metatree.getFattr(c.fid);	if (fa == NULL)		// No file attr.  So, take out this chunk		// from the candidate set.		return true;	// check if the chunk still exists	vector<MetaChunkInfo *> v;	vector<MetaChunkInfo *>::iterator chunk;	metatree.getalloc(c.fid, v);	chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(chunkId));	if (chunk == v.end()) {		// This chunk doesn't exist in this file anymore.		// So, take out this chunk from the candidate set.		return true;	}	// if any of the chunkservers are retiring, we need to make copies	// so, first determine how many copies we need because one of the	// servers hosting the chunk is going down	int numRetiringServers = count_if(c.chunkServers.begin(), c.chunkServers.end(),					RetiringServerPred());	// now, determine if we have sufficient copies	if (numRetiringServers > 0) {		if (c.chunkServers.size() - numRetiringServers < (uint32_t) fa->numReplicas) {			// we need to make this many copies: # of servers that are			// retiring plus the # this chunk is under-replicated			extraReplicas = numRetiringServers +  (fa->numReplicas - c.chunkServers.size());		} else {			// we got sufficient copies even after accounting for			// the retiring server.  so, take out this chunk from			// replication candidates set.			extraReplicas = 0;		}		return true;	}	// May need to re-replicate this chunk:	//    - extraReplicas > 0 means make extra copies;	//    - extraReplicas == 0, take out this chunkid from the candidate set	//    - extraReplicas < 0, means we got too many copies; delete some	extraReplicas = fa->numReplicas - c.chunkServers.size();	if (extraReplicas < 0) {		//		// We need to delete additional copies; however, if		// there is a valid (read) lease issued on the chunk,		// then leave the chunk alone for now; we'll look at		// deleting it when the lease has expired.  This is for		// safety: if a client was reading from the copy of the		// chunk that we are trying to delete, the client will		// see the deletion and will have to failover; avoid		// unnecessary failovers		//		l = find_if(c.chunkLeases.begin(), c.chunkLeases.end(),				ptr_fun(LeaseInfo::IsValidLease));		if (l != c.chunkLeases.end())			return false;	}	return true;}class EvacuateChunkChecker {	CRCandidateSet &candidates;	CSMap &chunkToServerMap;public:	EvacuateChunkChecker(CRCandidateSet &c, CSMap &m) :		candidates(c), chunkToServerMap(m) {}	void operator()(ChunkServerPtr c) {		if (!c->IsRetiring())			return;		CRCandidateSet leftover = c->GetEvacuatingChunks();		for (CRCandidateSetIter citer = leftover.begin(); citer !=			leftover.end(); ++citer) {			chunkId_t chunkId = *citer;			CSMapIter iter = chunkToServerMap.find(chunkId);			if (iter == chunkToServerMap.end()) {				c->EvacuateChunkDone(chunkId);				KFS_LOG_VA_INFO("%s has bogus block %ld",					c->GetServerLocation().ToString().c_str(), chunkId);			} else {				// XXX				// if we don't think this chunk is on this				// server, then we should update the view...				candidates.insert(chunkId);				KFS_LOG_VA_INFO("%s has block %ld that wasn't in replication candidates",					c->GetServerLocation().ToString().c_str(), chunkId);			}		}	}};voidLayoutManager::CheckHibernatingServersStatus(){	time_t now = time(0);	vector <HibernatingServerInfo_t>::iterator iter = mHibernatingServers.begin();	vector<ChunkServerPtr>::iterator i;	while (iter != mHibernatingServers.end()) {		i = find_if(mChunkServers.begin(), mChunkServers.end(),			MatchingServer(iter->location));		if ((i == mChunkServers.end()) && (now < iter->sleepEndTime)) {			// within the time window where the server is sleeping			// so, move on			iter++;			continue;		}		if (i != mChunkServers.end()) {			KFS_LOG_VA_INFO("Hibernated server (%s) is back as promised...",					iter->location.ToString().c_str());		} else {			// server hasn't come back as promised...so, check			// re-replication for the blocks that were on that node			KFS_LOG_VA_INFO("Hibernated server (%s) is not back as promised...",				iter->location.ToString().c_str());			mChunkReplicationCandidates.insert(iter->blocks.begin(), iter->blocks.end());		}		mHibernatingServers.erase(iter);		iter = mHibernatingServers.begin();	}}voidLayoutManager::ChunkReplicationChecker(){	if (InRecovery()) {		return;	}	CheckHibernatingServersStatus();	// There is a set of chunks that are affected: their server went down	// or there is a change in their degree of replication.  in either	// case, walk this set of chunkid's and work on their replication amount.	chunkId_t chunkId;	CRCandidateSet delset;	int extraReplicas, numOngoing;	uint32_t numIterations = 0;	struct timeval start;	gettimeofday(&start, NULL);	for (CRCandidateSetIter citer = mChunkReplicationCandidates.begin();		citer != mChunkReplicationCandidates.end(); ++citer) {		chunkId = *citer;		struct timeval now;		gettimeofday(&now, NULL);		if (ComputeTimeDiff(start, now) > 5.0)			// if we have spent more than 5 seconds here, stop			// serve other requests			break;        	CSMapIter iter = mChunkToServerMap.find(chunkId);        	if (iter == mChunkToServerMap.end()) {			delset.insert(chunkId);			continue;		}		if (iter->second.ongoingReplications > 0)			// this chunk is being re-replicated; we'll check later			continue;		if (!CanReplicateChunkNow(iter->first, iter->second, extraReplicas))			continue;		if (extraReplicas > 0) {			numOngoing = ReplicateChunk(iter->first, iter->second, extraReplicas);			iter->second.ongoingReplications += numOngoing;			if (numOngoing > 0) {				mNumOngoingReplications++;				mLastChunkReplicated = chunkId;				numIterations++;			}		} else if (extraReplicas == 0) {			delset.insert(chunkId);		} else {			DeleteAddlChunkReplicas(iter->first, iter->second, -extraReplicas);			delset.insert(chunkId);		}		if (numIterations > mChunkServers.size() *			MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE)			// throttle...we are handing out			break;	}	if (delset.size() > 0) {		for (CRCandidateSetIter citer = delset.begin();			citer != delset.end(); ++citer) {			// Notify the retiring servers of any of their chunks have			// been evacuated---such as, if there were too many copies of those			// chunks, we are done evacuating them			chunkId = *citer;        		CSMapIter iter = mChunkToServerMap.find(chunkId);			if (iter != mChunkToServerMap.end())				for_each(iter->second.chunkServers.begin(), iter->second.chunkServers.end(),					ReplicationDoneNotifier(chunkId));			mChunkReplicationCandidates.erase(*citer);		}	}	if (mChunkReplicationCandidates.size() == 0) {		// if there are any retiring servers, we need to make sure that		// the servers don't think there is a block to be replicated		// if there is any such, let us get them into the set of		// candidates...need to know why this happens		for_each(mChunkServers.begin(), mChunkServers.end(),			EvacuateChunkChecker(mChunkReplicationCandidates, mChunkToServerMap));	}	RebalanceServers();	mReplicationTodoStats->Set(mChunkReplicationCandidates.size());}voidLayoutManager::FindReplicationWorkForServer(ChunkServerPtr &server, chunkId_t chunkReplicated){	chunkId_t chunkId;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -