📄 layoutmanager.cc
字号:
// Need space on the servers..otherwise, fail it r->servers = v.chunkServers; for (i = 0; i < r->servers.size(); i++) { if (r->servers[i]->GetAvailSpace() < CHUNKSIZE) return -ENOSPC; } isNewLease = true; LeaseInfo lease(WRITE_LEASE, mLeaseId, r->servers[0]); mLeaseId++; v.chunkLeases.push_back(lease); mChunkToServerMap[r->chunkId] = v; // when issuing a new lease, bump the version # by the increment r->chunkVersion += chunkVersionInc; r->master = r->servers[0]; r->master->AllocateChunk(r, lease.leaseId); for (i = 1; i < r->servers.size(); i++) { r->servers[i]->AllocateChunk(r, -1); } KFS_LOG_VA_INFO("New write lease issued for %lld; version = %lld", r->chunkId, r->chunkVersion); return 0;}/* * \brief Process a reqeuest for a READ lease.*/intLayoutManager::GetChunkReadLease(MetaLeaseAcquire *req){ ChunkPlacementInfo v; if (InRecovery()) { KFS_LOG_INFO("GetChunkReadLease: inRecovery() => EBUSY"); return -EBUSY; } CSMapIter iter = mChunkToServerMap.find(req->chunkId); if (iter == mChunkToServerMap.end()) return -EINVAL; // issue a read lease LeaseInfo lease(READ_LEASE, mLeaseId); mLeaseId++; v = iter->second; v.chunkLeases.push_back(lease); mChunkToServerMap[req->chunkId] = v; req->leaseId = lease.leaseId; return 0;}class ValidLeaseIssued { CSMap &chunkToServerMap;public: ValidLeaseIssued(CSMap &m) : chunkToServerMap(m) { } bool operator() (MetaChunkInfo *c) { ChunkPlacementInfo v; vector<LeaseInfo>::iterator l; CSMapIter iter = chunkToServerMap.find(c->chunkId); if (iter == chunkToServerMap.end()) return false; v = iter->second; l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(), ptr_fun(LeaseInfo::IsValidLease)); return (l != v.chunkLeases.end()); }};boolLayoutManager::IsValidLeaseIssued(const vector <MetaChunkInfo *> &c){ vector <MetaChunkInfo *>::const_iterator i; i = find_if(c.begin(), c.end(), ValidLeaseIssued(mChunkToServerMap)); if (i == c.end()) return false; KFS_LOG_VA_DEBUG("Valid lease issued on chunk: %lld", (*i)->chunkId); return true;}class LeaseIdMatcher { int64_t myid;public: LeaseIdMatcher(int64_t id) : myid(id) { } bool operator() (const LeaseInfo &l) { return l.leaseId == myid; }};intLayoutManager::LeaseRenew(MetaLeaseRenew *req){ ChunkPlacementInfo v; vector<LeaseInfo>::iterator l; CSMapIter iter = mChunkToServerMap.find(req->chunkId); if (iter == mChunkToServerMap.end()) { if (InRecovery()) { // Allow lease renewals during recovery LeaseInfo lease(req->leaseType, req->leaseId); if (req->leaseId > mLeaseId) mLeaseId = req->leaseId + 1; v.chunkLeases.push_back(lease); mChunkToServerMap[req->chunkId] = v; return 0; } return -EINVAL; } v = iter->second; l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(), LeaseIdMatcher(req->leaseId)); if (l == v.chunkLeases.end()) return -EINVAL; time_t now = time(0); if (now > l->expires) { // can't renew dead leases; get a new one v.chunkLeases.erase(l); return -ELEASEEXPIRED; } l->expires = now + LEASE_INTERVAL_SECS; mChunkToServerMap[req->chunkId] = v; return 0;}intLayoutManager::LeaseRelinquish(MetaLeaseRelinquish *req){ ChunkPlacementInfo v; vector<LeaseInfo>::iterator l; CSMapIter iter = mChunkToServerMap.find(req->chunkId); if (iter == mChunkToServerMap.end()) return -ELEASEEXPIRED; v = iter->second; l = find_if(v.chunkLeases.begin(), v.chunkLeases.end(), LeaseIdMatcher(req->leaseId)); if (l == v.chunkLeases.end()) return -EINVAL; // the owner of the lease is giving up the lease; so, remove the lease v.chunkLeases.erase(l); mChunkToServerMap[req->chunkId] = v; return 0;}////// Handling a corrupted chunk involves removing the mapping/// from chunk id->chunkserver that we know has it.///voidLayoutManager::ChunkCorrupt(MetaChunkCorrupt *r){ ChunkPlacementInfo v; r->server->IncCorruptChunks(); CSMapIter iter = mChunkToServerMap.find(r->chunkId); if (iter == mChunkToServerMap.end()) return; v = iter->second; if(v.fid != r->fid) { KFS_LOG_VA_WARN("Server %s claims invalid chunk: <%lld, %lld> to be corrupt", r->server->ServerID().c_str(), r->fid, r->chunkId); return; } KFS_LOG_VA_INFO("Server %s claims file/chunk: <%lld, %lld> to be corrupt", r->server->ServerID().c_str(), r->fid, r->chunkId); v.chunkServers.erase(remove_if(v.chunkServers.begin(), v.chunkServers.end(), ChunkServerMatcher(r->server.get())), v.chunkServers.end()); mChunkToServerMap[r->chunkId] = v; // check the replication state when the replicaiton checker gets to it ChangeChunkReplication(r->chunkId); // this chunk has to be replicated from elsewhere; since this is no // longer hosted on this server, take it out of its list of blocks r->server->MovingChunkDone(r->chunkId); if (r->server->IsRetiring()) { r->server->EvacuateChunkDone(r->chunkId); }}class ChunkDeletor { chunkId_t chunkId;public: ChunkDeletor(chunkId_t c) : chunkId(c) { } void operator () (ChunkServerPtr &c) { c->DeleteChunk(chunkId); }};////// Deleting a chunk involves two things: (1) removing the/// mapping from chunk id->chunk server that has it; (2) sending/// an RPC to the associated chunk server to nuke out the chunk.///voidLayoutManager::DeleteChunk(chunkId_t chunkId){ vector<ChunkServerPtr> c; // if we know anything about this chunk at all, then we // process the delete request. if (GetChunkToServerMapping(chunkId, c) != 0) return; // remove the mapping mChunkToServerMap.erase(chunkId); // submit an RPC request for_each(c.begin(), c.end(), ChunkDeletor(chunkId));}class Truncator { chunkId_t chunkId; off_t sz;public: Truncator(chunkId_t c, off_t s) : chunkId(c), sz(s) { } void operator () (ChunkServerPtr &c) { c->TruncateChunk(chunkId, sz); }};////// To truncate a chunk, find the server that holds the chunk and/// submit an RPC request to it.///voidLayoutManager::TruncateChunk(chunkId_t chunkId, off_t sz){ vector<ChunkServerPtr> c; // if we know anything about this chunk at all, then we // process the truncate request. if (GetChunkToServerMapping(chunkId, c) != 0) return; // submit an RPC request Truncator doTruncate(chunkId, sz); for_each(c.begin(), c.end(), doTruncate);}voidLayoutManager::AddChunkToServerMapping(chunkId_t chunkId, fid_t fid, ChunkServer *c){ ChunkPlacementInfo v; if (c == NULL) { // Store an empty mapping to signify the presence of this // particular chunkId. v.fid = fid; mChunkToServerMap[chunkId] = v; return; } assert(ValidServer(c)); KFS_LOG_VA_DEBUG("Laying out chunk=%lld on server %s", chunkId, c->GetServerName()); if (UpdateChunkToServerMapping(chunkId, c) == 0) return; v.fid = fid; v.chunkServers.push_back(c->shared_from_this()); mChunkToServerMap[chunkId] = v;}voidLayoutManager::RemoveChunkToServerMapping(chunkId_t chunkId){ CSMapIter iter = mChunkToServerMap.find(chunkId); if (iter == mChunkToServerMap.end()) return; mChunkToServerMap.erase(iter);}intLayoutManager::UpdateChunkToServerMapping(chunkId_t chunkId, ChunkServer *c){ // If the chunkid isn't present in the mapping table, it could be a // stale chunk CSMapIter iter = mChunkToServerMap.find(chunkId); if (iter == mChunkToServerMap.end()) return -1; /* KFS_LOG_VA_DEBUG("chunk=%lld was laid out on server %s", chunkId, c->GetServerName()); */ iter->second.chunkServers.push_back(c->shared_from_this()); return 0;}intLayoutManager::GetChunkToServerMapping(chunkId_t chunkId, vector<ChunkServerPtr> &c){ CSMapConstIter iter = mChunkToServerMap.find(chunkId); if ((iter == mChunkToServerMap.end()) || (iter->second.chunkServers.size() == 0)) return -1; c = iter->second.chunkServers; return 0;}/// Wrapper class due to silly template/smart-ptr madnessclass Dispatcher {public: Dispatcher() { } void operator() (ChunkServerPtr &c) { c->Dispatch(); }};voidLayoutManager::Dispatch(){ // this method is called in the context of the network thread. // lock out the request processor to prevent changes to the list. pthread_mutex_lock(&mChunkServersMutex); for_each(mChunkServers.begin(), mChunkServers.end(), Dispatcher()); pthread_mutex_unlock(&mChunkServersMutex);}boolLayoutManager::ValidServer(ChunkServer *c){ vector <ChunkServerPtr>::const_iterator i; i = find_if(mChunkServers.begin(), mChunkServers.end(), ChunkServerMatcher(c)); return (i != mChunkServers.end());}class Pinger { string &result; // return the total/used for all the nodes in the cluster uint64_t &totalSpace; uint64_t &usedSpace;public: Pinger(string &r, uint64_t &t, uint64_t &u) : result(r), totalSpace(t), usedSpace(u) { } void operator () (ChunkServerPtr &c) { c->Ping(result); totalSpace += c->GetTotalSpace(); usedSpace += c->GetUsedSpace(); }};class RetiringStatus { string &result;public: RetiringStatus(string &r):result(r) { } void operator () (ChunkServerPtr &c) { c->GetRetiringStatus(result); }};voidLayoutManager::Ping(string &systemInfo, string &upServers, string &downServers, string &retiringServers){ uint64_t totalSpace = 0, usedSpace = 0; Pinger doPing(upServers, totalSpace, usedSpace); for_each(mChunkServers.begin(), mChunkServers.end(), doPing); downServers = mDownServers.str(); for_each(mChunkServers.begin(), mChunkServers.end(), RetiringStatus(retiringServers)); ostringstream os; os << "Up since= " << timeToStr(mRecoveryStartTime) << '\t'; os << "Total space= " << totalSpace << '\t'; os << "Used space= " << usedSpace; systemInfo = os.str();}class UpServersList { ostringstream &os;public: UpServersList(ostringstream &os): os(os) { } void operator () (ChunkServerPtr &c) { os << c->GetServerLocation().ToString() << endl; }};voidLayoutManager::UpServers(ostringstream &os){ UpServersList upServers(os); for_each(mChunkServers.begin(), mChunkServers.end(), upServers);}/// functor to tell if a lease has expiredclass LeaseExpired { time_t now;public: LeaseExpired(time_t n): now(n) { } bool operator () (const LeaseInfo &l) { return now >= l.expires; }};class ChunkWriteDecrementor {public: void operator() (ChunkServerPtr &c) { c->UpdateNumChunkWrites(-1); }};/// If the write lease on a chunk is expired, then decrement the # of writes/// on the servers that are involved in the write.class DecChunkWriteCount { fid_t f; chunkId_t c;public: DecChunkWriteCount(fid_t fid, chunkId_t id) : f(fid), c(id) { } void operator() (const LeaseInfo &l) { if (l.leaseType != WRITE_LEASE) return; vector<ChunkServerPtr> servers; gLayoutManager.GetChunkToServerMapping(c, servers); for_each(servers.begin(), servers.end(), ChunkWriteDecrementor()); // get the chunk's size from one of the servers if (servers.size() > 0) servers[0]->GetChunkSize(f, c); }};/// functor to that expires out leasesclass LeaseExpirer { CSMap &cmap; time_t now;public: LeaseExpirer(CSMap &m, time_t n): cmap(m), now(n) { } void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) { ChunkPlacementInfo c = p.second; chunkId_t chunkId = p.first; vector<LeaseInfo>::iterator i;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -