⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 layoutmanager.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 5 页
字号:
	// Need space on the servers..otherwise, fail it	r->servers = v.chunkServers;	for (i = 0; i < r->servers.size(); i++) {		if (r->servers[i]->GetAvailSpace() < CHUNKSIZE)			return -ENOSPC;	}	isNewLease = true;	LeaseInfo lease(WRITE_LEASE, mLeaseId, r->servers[0]);	mLeaseId++;	v.chunkLeases.push_back(lease);	mChunkToServerMap[r->chunkId] = v;	// when issuing a new lease, bump the version # by the increment	r->chunkVersion += chunkVersionInc;	r->master = r->servers[0];	r->master->AllocateChunk(r, lease.leaseId);	for (i = 1; i < r->servers.size(); i++) {		r->servers[i]->AllocateChunk(r, -1);	}	KFS_LOG_VA_INFO("New write lease issued for %lld; version = %lld", r->chunkId, r->chunkVersion);	return 0;}/* * \brief Process a reqeuest for a READ lease.*/intLayoutManager::GetChunkReadLease(MetaLeaseAcquire *req){	ChunkPlacementInfo v;	if (InRecovery()) {		KFS_LOG_INFO("GetChunkReadLease: inRecovery() => EBUSY");		return -EBUSY;	}        CSMapIter iter = mChunkToServerMap.find(req->chunkId);        if (iter == mChunkToServerMap.end())                return -EINVAL;	// issue a read lease	LeaseInfo lease(READ_LEASE, mLeaseId);	mLeaseId++;	v = iter->second;	v.chunkLeases.push_back(lease);	mChunkToServerMap[req->chunkId] = v;	req->leaseId = lease.leaseId;	return 0;}class ValidLeaseIssued {	CSMap &chunkToServerMap;public:	ValidLeaseIssued(CSMap &m) : chunkToServerMap(m) { }	bool operator() (MetaChunkInfo *c) {		ChunkPlacementInfo v;		vector<LeaseInfo>::iterator l;		CSMapIter iter = chunkToServerMap.find(c->chunkId);		if (iter == chunkToServerMap.end())			return false;		v = iter->second;		l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(),				ptr_fun(LeaseInfo::IsValidLease));		return (l != v.chunkLeases.end());	}};boolLayoutManager::IsValidLeaseIssued(const vector <MetaChunkInfo *> &c){	vector <MetaChunkInfo *>::const_iterator i;	i = find_if(c.begin(), c.end(), ValidLeaseIssued(mChunkToServerMap));	if (i == c.end())		return false;	KFS_LOG_VA_DEBUG("Valid lease issued on chunk: %lld",			(*i)->chunkId);	return true;}class LeaseIdMatcher {	int64_t myid;public:	LeaseIdMatcher(int64_t id) : myid(id) { }	bool operator() (const LeaseInfo &l) {		return l.leaseId == myid;	}};intLayoutManager::LeaseRenew(MetaLeaseRenew *req){	ChunkPlacementInfo v;	vector<LeaseInfo>::iterator l;        CSMapIter iter = mChunkToServerMap.find(req->chunkId);        if (iter == mChunkToServerMap.end()) {		if (InRecovery()) {			// Allow lease renewals during recovery			LeaseInfo lease(req->leaseType, req->leaseId);			if (req->leaseId > mLeaseId)				mLeaseId = req->leaseId + 1;			v.chunkLeases.push_back(lease);			mChunkToServerMap[req->chunkId] = v;			return 0;		}                return -EINVAL;	}	v = iter->second;	l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(),			LeaseIdMatcher(req->leaseId));	if (l == v.chunkLeases.end())		return -EINVAL;	time_t now = time(0);	if (now > l->expires) {		// can't renew dead leases; get a new one		v.chunkLeases.erase(l);		return -ELEASEEXPIRED;	}	l->expires = now + LEASE_INTERVAL_SECS;	mChunkToServerMap[req->chunkId] = v;	return 0;}intLayoutManager::LeaseRelinquish(MetaLeaseRelinquish *req){	ChunkPlacementInfo v;	vector<LeaseInfo>::iterator l;        CSMapIter iter = mChunkToServerMap.find(req->chunkId);	if (iter == mChunkToServerMap.end())		return -ELEASEEXPIRED;		v = iter->second;	l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(),			LeaseIdMatcher(req->leaseId));	if (l == v.chunkLeases.end())		return -EINVAL;	// the owner of the lease is giving up the lease; so, remove the lease	v.chunkLeases.erase(l);	mChunkToServerMap[req->chunkId] = v;	return 0;}////// Handling a corrupted chunk involves removing the mapping/// from chunk id->chunkserver that we know has it.///voidLayoutManager::ChunkCorrupt(MetaChunkCorrupt *r){	ChunkPlacementInfo v;	r->server->IncCorruptChunks();        CSMapIter iter = mChunkToServerMap.find(r->chunkId);	if (iter == mChunkToServerMap.end())		return;	v = iter->second;	if(v.fid != r->fid) {		KFS_LOG_VA_WARN("Server %s claims invalid chunk: <%lld, %lld> to be corrupt",				r->server->ServerID().c_str(), r->fid, r->chunkId);		return;	}	KFS_LOG_VA_INFO("Server %s claims file/chunk: <%lld, %lld> to be corrupt",			r->server->ServerID().c_str(), r->fid, r->chunkId);	v.chunkServers.erase(remove_if(v.chunkServers.begin(), v.chunkServers.end(),			ChunkServerMatcher(r->server.get())), v.chunkServers.end());	mChunkToServerMap[r->chunkId] = v;	// check the replication state when the replicaiton checker gets to it	ChangeChunkReplication(r->chunkId);	// this chunk has to be replicated from elsewhere; since this is no	// longer hosted on this server, take it out of its list of blocks	r->server->MovingChunkDone(r->chunkId);	if (r->server->IsRetiring()) {		r->server->EvacuateChunkDone(r->chunkId);	}}class ChunkDeletor {    chunkId_t chunkId;public:    ChunkDeletor(chunkId_t c) : chunkId(c) { }    void operator () (ChunkServerPtr &c) { c->DeleteChunk(chunkId); }};////// Deleting a chunk involves two things: (1) removing the/// mapping from chunk id->chunk server that has it; (2) sending/// an RPC to the associated chunk server to nuke out the chunk.///voidLayoutManager::DeleteChunk(chunkId_t chunkId){	vector<ChunkServerPtr> c;        // if we know anything about this chunk at all, then we        // process the delete request.	if (GetChunkToServerMapping(chunkId, c) != 0)		return;	// remove the mapping	mChunkToServerMap.erase(chunkId);	// submit an RPC request	for_each(c.begin(), c.end(), ChunkDeletor(chunkId));}class Truncator {    chunkId_t chunkId;    off_t sz;public:    Truncator(chunkId_t c, off_t s) : chunkId(c), sz(s) { }    void operator () (ChunkServerPtr &c) { c->TruncateChunk(chunkId, sz); }};////// To truncate a chunk, find the server that holds the chunk and/// submit an RPC request to it.///voidLayoutManager::TruncateChunk(chunkId_t chunkId, off_t sz){	vector<ChunkServerPtr> c;        // if we know anything about this chunk at all, then we        // process the truncate request.	if (GetChunkToServerMapping(chunkId, c) != 0)		return;	// submit an RPC request        Truncator doTruncate(chunkId, sz);	for_each(c.begin(), c.end(), doTruncate);}voidLayoutManager::AddChunkToServerMapping(chunkId_t chunkId, fid_t fid,					ChunkServer *c){	ChunkPlacementInfo v;        if (c == NULL) {		// Store an empty mapping to signify the presence of this		// particular chunkId.		v.fid = fid;		mChunkToServerMap[chunkId] = v;		return;        }	assert(ValidServer(c));	KFS_LOG_VA_DEBUG("Laying out chunk=%lld on server %s",			 chunkId, c->GetServerName());	if (UpdateChunkToServerMapping(chunkId, c) == 0)            return;	v.fid = fid;        v.chunkServers.push_back(c->shared_from_this());        mChunkToServerMap[chunkId] = v;}voidLayoutManager::RemoveChunkToServerMapping(chunkId_t chunkId){        CSMapIter iter = mChunkToServerMap.find(chunkId);        if (iter == mChunkToServerMap.end())                return;        mChunkToServerMap.erase(iter);}intLayoutManager::UpdateChunkToServerMapping(chunkId_t chunkId, ChunkServer *c){        // If the chunkid isn't present in the mapping table, it could be a        // stale chunk        CSMapIter iter = mChunkToServerMap.find(chunkId);        if (iter == mChunkToServerMap.end())                return -1;	/*	KFS_LOG_VA_DEBUG("chunk=%lld was laid out on server %s",			 chunkId, c->GetServerName());	*/        iter->second.chunkServers.push_back(c->shared_from_this());        return 0;}intLayoutManager::GetChunkToServerMapping(chunkId_t chunkId, vector<ChunkServerPtr> &c){        CSMapConstIter iter = mChunkToServerMap.find(chunkId);        if ((iter == mChunkToServerMap.end()) ||		(iter->second.chunkServers.size() == 0))                return -1;        c = iter->second.chunkServers;        return 0;}/// Wrapper class due to silly template/smart-ptr madnessclass Dispatcher {public:	Dispatcher() { }	void operator() (ChunkServerPtr &c) {		c->Dispatch();	}};voidLayoutManager::Dispatch(){	// this method is called in the context of the network thread.	// lock out the request processor to prevent changes to the list.	pthread_mutex_lock(&mChunkServersMutex);	for_each(mChunkServers.begin(), mChunkServers.end(), Dispatcher());	pthread_mutex_unlock(&mChunkServersMutex);}boolLayoutManager::ValidServer(ChunkServer *c){	vector <ChunkServerPtr>::const_iterator i;	i = find_if(mChunkServers.begin(), mChunkServers.end(),		ChunkServerMatcher(c));	return (i != mChunkServers.end());}class Pinger {	string &result;	// return the total/used for all the nodes in the cluster	uint64_t &totalSpace;	uint64_t &usedSpace;public:	Pinger(string &r, uint64_t &t, uint64_t &u) :		result(r), totalSpace(t), usedSpace(u) { }	void operator () (ChunkServerPtr &c)	{		c->Ping(result);		totalSpace += c->GetTotalSpace();		usedSpace += c->GetUsedSpace();	}};class RetiringStatus {	string &result;public:	RetiringStatus(string &r):result(r) { }	void operator () (ChunkServerPtr &c) { c->GetRetiringStatus(result); }};voidLayoutManager::Ping(string &systemInfo, string &upServers, string &downServers, string &retiringServers){	uint64_t totalSpace = 0, usedSpace = 0;	Pinger doPing(upServers, totalSpace, usedSpace);	for_each(mChunkServers.begin(), mChunkServers.end(), doPing);	downServers = mDownServers.str();	for_each(mChunkServers.begin(), mChunkServers.end(), RetiringStatus(retiringServers));	ostringstream os;	os << "Up since= " << timeToStr(mRecoveryStartTime) << '\t';	os << "Total space= " << totalSpace << '\t';	os << "Used space= " << usedSpace;	systemInfo = os.str();}class UpServersList {    ostringstream &os;public:	UpServersList(ostringstream &os): os(os) { }    void operator () (ChunkServerPtr &c) {        os << c->GetServerLocation().ToString() << endl;    }};voidLayoutManager::UpServers(ostringstream &os){    UpServersList upServers(os);    for_each(mChunkServers.begin(), mChunkServers.end(), upServers);}/// functor to tell if a lease has expiredclass LeaseExpired {	time_t now;public:	LeaseExpired(time_t n): now(n) { }	bool operator () (const LeaseInfo &l) { return now >= l.expires; }};class ChunkWriteDecrementor {public:	void operator() (ChunkServerPtr &c) { c->UpdateNumChunkWrites(-1); }};/// If the write lease on a chunk is expired, then decrement the # of writes/// on the servers that are involved in the write.class DecChunkWriteCount {	fid_t f;	chunkId_t c;public:	DecChunkWriteCount(fid_t fid, chunkId_t id) : f(fid), c(id) { }	void operator() (const LeaseInfo &l) {		if (l.leaseType != WRITE_LEASE)			return;		vector<ChunkServerPtr> servers;		gLayoutManager.GetChunkToServerMapping(c, servers);		for_each(servers.begin(), servers.end(), ChunkWriteDecrementor());		// get the chunk's size from one of the servers		if (servers.size() > 0)			servers[0]->GetChunkSize(f, c);	}};/// functor to that expires out leasesclass LeaseExpirer {	CSMap &cmap;	time_t now;public:	LeaseExpirer(CSMap &m, time_t n): cmap(m), now(n) { }	void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p)	{		ChunkPlacementInfo c = p.second;		chunkId_t chunkId = p.first;		vector<LeaseInfo>::iterator i;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -