⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 layoutmanager.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 5 页
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: LayoutManager.cc 221 2008-11-23 18:54:46Z sriramsrao $ //// Created 2006/06/06// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// \file LayoutManager.cc// \brief Handlers for chunk layout.////----------------------------------------------------------------------------#include <algorithm>#include <functional>#include <sstream>#include "LayoutManager.h"#include "kfstree.h"#include "libkfsIO/Globals.h"using std::for_each;using std::find;using std::ptr_fun;using std::mem_fun;using std::mem_fun_ref;using std::bind2nd;using std::sort;using std::random_shuffle;using std::remove_if;using std::set;using std::vector;using std::map;using std::min;using std::endl;using std::istringstream;using namespace KFS;using namespace KFS::libkfsio;LayoutManager KFS::gLayoutManager;/// Max # of concurrent read/write replications per node///  -- write: is the # of chunks that the node can pull in from outside///  -- read: is the # of chunks that the node is allowed to send out///const int MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE = 5;const int MAX_CONCURRENT_READ_REPLICATIONS_PER_NODE = 10;////// When placing chunks, we see the space available on the node as well as/// we take our estimate of the # of writes on/// the node as a hint for choosing servers; if a server is "loaded" we should/// avoid sending traffic to it.  This value defines a watermark after which load/// begins to be an issue.///const uint32_t CONCURRENT_WRITES_PER_NODE_WATERMARK = 10;/// Helper functor that can be used to find a chunkid from a vector/// of meta chunk info's.class ChunkIdMatcher {	chunkId_t myid;public:	ChunkIdMatcher(chunkId_t c) : myid(c) { }	bool operator() (MetaChunkInfo *c) {		return c->chunkId == myid;	}};LayoutManager::LayoutManager() :	mLeaseId(1), mNumOngoingReplications(0),	mIsRebalancingEnabled(false), mIsExecutingRebalancePlan(false),	mLastChunkRebalanced(1), mLastChunkReplicated(1),	mRecoveryStartTime(0), mMinChunkserversToExitRecovery(1){	pthread_mutex_init(&mChunkServersMutex, NULL);	mReplicationTodoStats = new Counter("Num Replications Todo");	mOngoingReplicationStats = new Counter("Num Ongoing Replications");	mTotalReplicationStats = new Counter("Total Num Replications");	mFailedReplicationStats = new Counter("Num Failed Replications");	mStaleChunkCount = new Counter("Num Stale Chunks");	// how much to be done before we are done	globals().counterManager.AddCounter(mReplicationTodoStats);	// how much are we doing right now	globals().counterManager.AddCounter(mOngoingReplicationStats);	globals().counterManager.AddCounter(mTotalReplicationStats);	globals().counterManager.AddCounter(mFailedReplicationStats);	globals().counterManager.AddCounter(mStaleChunkCount);}class MatchingServer {	ServerLocation loc;public:	MatchingServer(const ServerLocation &l) : loc(l) { }	bool operator() (ChunkServerPtr &s) {		return s->MatchingServer(loc);	}};//// Try to match servers by hostname: for write allocation, we'd like to place// one copy of the block on the same host on which the client is running.//class MatchServerByHost {	string host;public:	MatchServerByHost(const string &s) : host(s) { }	bool operator() (ChunkServerPtr &s) {		ServerLocation l = s->GetServerLocation();		return l.hostname == host;	}};/// Add the newly joined server to the list of servers we have.  Also,/// update our state to include the chunks hosted on this server.voidLayoutManager::AddNewServer(MetaHello *r){        ChunkServerPtr s;        vector <chunkId_t> staleChunkIds;        vector <ChunkInfo>::size_type i;	vector <ChunkServerPtr>::iterator j;	uint64_t allocSpace = r->chunks.size() * CHUNKSIZE;	if (r->server->IsDown())		return;        s = r->server;        s->SetServerLocation(r->location);        s->SetSpace(r->totalSpace, r->usedSpace, allocSpace);	s->SetRack(r->rackId);        // If a previously dead server reconnects, reuse the server's        // position in the list of chunk servers.  This is because in        // the chunk->server mapping table, we use the chunkserver's        // position in the list of connected servers to find it.        //	j = find_if(mChunkServers.begin(), mChunkServers.end(), MatchingServer(r->location));	if (j != mChunkServers.end()) {		KFS_LOG_VA_DEBUG("Duplicate server: %s, %d",				 r->location.hostname.c_str(), r->location.port);		return;        }	for (i = 0; i < r->chunks.size(); ++i) {		vector<MetaChunkInfo *> v;		vector<MetaChunkInfo *>::iterator chunk;		int res = -1;		metatree.getalloc(r->chunks[i].fileId, v);		chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(r->chunks[i].chunkId));		if (chunk != v.end()) {			MetaChunkInfo *mci = *chunk;			if (mci->chunkVersion <= r->chunks[i].chunkVersion) {				// This chunk is non-stale.  Verify that there are				// sufficient copies; if there are too many, nuke some.				ChangeChunkReplication(r->chunks[i].chunkId);				res = UpdateChunkToServerMapping(r->chunks[i].chunkId,								s.get());				assert(res >= 0);				// get the chunksize for the last chunk of fid				// stored on this server				MetaFattr *fa = metatree.getFattr(r->chunks[i].fileId);				if (fa->filesize < 0) {					MetaChunkInfo *lastChunk = v.back();					if (lastChunk->chunkId == r->chunks[i].chunkId)						s->GetChunkSize(r->chunks[i].fileId,								r->chunks[i].chunkId);				}				if (mci->chunkVersion < r->chunks[i].chunkVersion) {					// version #'s differ.  have the chunkserver reset					// to what the metaserver has.					// XXX: This is all due to the issue with not logging					// the version # that the metaserver is issuing.  What is going					// on here is that,					//  -- client made a request					//  -- metaserver bumped the version; notified the chunkservers					//  -- the chunkservers write out the version bump on disk					//  -- the metaserver gets ack; writes out the version bump on disk					//  -- and then notifies the client					// Now, if the metaserver crashes before it writes out the					// version bump, it is possible that some chunkservers did the					// bump, but not the metaserver.  So, fix up.  To avoid other whacky					// scenarios, we increment the chunk version # by the incarnation stuff					// to avoid reissuing the same version # multiple times.					s->NotifyChunkVersChange(r->chunks[i].fileId,							r->chunks[i].chunkId,							mci->chunkVersion);				}			}			else {                        	KFS_LOG_VA_INFO("Old version for chunk id = %lld => stale",                                         r->chunks[i].chunkId);			}		}                if (res < 0) {                        /// stale chunk                        KFS_LOG_VA_INFO("Non-existent chunk id = %lld => stale",                                         r->chunks[i].chunkId);                        staleChunkIds.push_back(r->chunks[i].chunkId);			mStaleChunkCount->Update(1);                }	}        if (staleChunkIds.size() > 0) {                s->NotifyStaleChunks(staleChunkIds);        }	// prevent the network thread from wandering this list while we change it.	pthread_mutex_lock(&mChunkServersMutex);	mChunkServers.push_back(s);	pthread_mutex_unlock(&mChunkServersMutex);	vector<RackInfo>::iterator rackIter;	rackIter = find_if(mRacks.begin(), mRacks.end(), RackMatcher(r->rackId));	if (rackIter != mRacks.end()) {		rackIter->addServer(s);	} else {		RackInfo ri(r->rackId);		ri.addServer(s);		mRacks.push_back(ri);	}	// Update the list since a new server is in	CheckHibernatingServersStatus();}class MapPurger {	CSMap &cmap;	CRCandidateSet &crset;	const ChunkServer *target;public:	MapPurger(CSMap &m, CRCandidateSet &c, const ChunkServer *t):		cmap(m), crset(c), target(t) { }	void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) {		ChunkPlacementInfo c = p.second;        	vector <ChunkServerPtr>::iterator i;		//		// only chunks hosted on the target need to be checked for		// replication level		//		i = find_if(c.chunkServers.begin(), c.chunkServers.end(),			ChunkServerMatcher(target));		if (i == c.chunkServers.end())			return;		c.chunkServers.erase(remove_if(c.chunkServers.begin(), c.chunkServers.end(),					ChunkServerMatcher(target)),					c.chunkServers.end());		cmap[p.first] = c;		// we need to check the replication level of this chunk		crset.insert(p.first);	}};class MapRetirer {	CSMap &cmap;	CRCandidateSet &crset;	ChunkServer *retiringServer;public:	MapRetirer(CSMap &m, CRCandidateSet &c, ChunkServer *t):		cmap(m), crset(c), retiringServer(t) { }	void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) {		ChunkPlacementInfo c = p.second;        	vector <ChunkServerPtr>::iterator i;		i = find_if(c.chunkServers.begin(), c.chunkServers.end(),			ChunkServerMatcher(retiringServer));		if (i == c.chunkServers.end())			return;		// we need to check the replication level of this chunk		crset.insert(p.first);		retiringServer->EvacuateChunk(p.first);	}};class MapDumper {	ofstream &ofs;public:	MapDumper(ofstream &o) : ofs(o) { }	void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) {		chunkId_t cid = p.first;		ChunkPlacementInfo c = p.second;		ofs << cid << ' ' << c.fid << ' ' << c.chunkServers.size() << ' ';		for (uint32_t i = 0; i < c.chunkServers.size(); i++) {			ofs << c.chunkServers[i]->ServerID() << ' '				<< c.chunkServers[i]->GetRack() << ' ';		}		ofs << endl;	}};class MapDumperStream {	ostringstream &ofs;public:	MapDumperStream(ostringstream &o) : ofs(o) { }	void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) {		chunkId_t cid = p.first;		ChunkPlacementInfo c = p.second;		ofs << cid << ' ' << c.fid << ' ' << c.chunkServers.size() << ' ';		for (uint32_t i = 0; i < c.chunkServers.size(); i++) {			ofs << c.chunkServers[i]->ServerID() << ' '				<< c.chunkServers[i]->GetRack() << ' ';		}		ofs << endl;	}};//// Dump out the chunk block map to a file.  The output can be used in emulation// modes where we setup the block map and experiment.//voidLayoutManager::DumpChunkToServerMap(){	ofstream ofs;	ofs.open("chunkmap.txt");	for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(),		MapDumper(ofs));	ofs.flush();	ofs.close();}// Dump chunk block map to response streamvoidLayoutManager::DumpChunkToServerMap(ostringstream &os){	for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(),			 MapDumperStream(os));}voidLayoutManager::ServerDown(ChunkServer *server){        vector <ChunkServerPtr>::iterator i =		find_if(mChunkServers.begin(), mChunkServers.end(),			ChunkServerMatcher(server));	if (i == mChunkServers.end())		return;	vector<RackInfo>::iterator rackIter;	rackIter = find_if(mRacks.begin(), mRacks.end(), RackMatcher(server->GetRack()));	if (rackIter != mRacks.end()) {		rackIter->removeServer(server);		if (rackIter->getServers().size() == 0) {			// the entire rack of servers is gone			// so, take the rack out			KFS_LOG_VA_INFO("All servers in rack %d are down; taking out the rack", 					server->GetRack());			mRacks.erase(rackIter);		}	}	/// Fail all the ops that were sent/waiting for response from	/// this server.	server->FailPendingOps();	// check if this server was sent to hibernation	bool isHibernating = false;	for (uint32_t j = 0; j < mHibernatingServers.size(); j++) {		if (mHibernatingServers[j].location == server->GetServerLocation()) {			// record all the blocks that need to be checked for			// re-replication later			MapPurger purge(mChunkToServerMap, mHibernatingServers[j].blocks, server);			for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), purge);			isHibernating = true;			break;		}	}	if (!isHibernating) {		MapPurger purge(mChunkToServerMap, mChunkReplicationCandidates, server);		for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), purge);	}	// for reporting purposes, record when it went down	time_t now = time(NULL);	string downSince = timeToStr(now);	ServerLocation loc = server->GetServerLocation();	const char *reason;	if (isHibernating)		reason = "Hibernated";	else if (server->IsRetiring())		reason = "Retired";	else		reason= "Unreachable";	mDownServers << "s=" << loc.hostname << ", p=" << loc.port << ", down="			<< downSince << ", reason=" << reason << "\t";	mChunkServers.erase(i);}intLayoutManager::RetireServer(const ServerLocation &loc, int downtime){	ChunkServerPtr retiringServer;	vector <ChunkServerPtr>::iterator i;	i = find_if(mChunkServers.begin(), mChunkServers.end(), MatchingServer(loc));	if (i == mChunkServers.end())		return -1;	retiringServer = *i;	retiringServer->SetRetiring();	if (downtime > 0) {		HibernatingServerInfo_t hsi;		hsi.location = retiringServer->GetServerLocation();		hsi.sleepEndTime = time(0) + downtime;		mHibernatingServers.push_back(hsi);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -