📄 layoutmanager.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: LayoutManager.cc 221 2008-11-23 18:54:46Z sriramsrao $ //// Created 2006/06/06// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// \file LayoutManager.cc// \brief Handlers for chunk layout.////----------------------------------------------------------------------------#include <algorithm>#include <functional>#include <sstream>#include "LayoutManager.h"#include "kfstree.h"#include "libkfsIO/Globals.h"using std::for_each;using std::find;using std::ptr_fun;using std::mem_fun;using std::mem_fun_ref;using std::bind2nd;using std::sort;using std::random_shuffle;using std::remove_if;using std::set;using std::vector;using std::map;using std::min;using std::endl;using std::istringstream;using namespace KFS;using namespace KFS::libkfsio;LayoutManager KFS::gLayoutManager;/// Max # of concurrent read/write replications per node/// -- write: is the # of chunks that the node can pull in from outside/// -- read: is the # of chunks that the node is allowed to send out///const int MAX_CONCURRENT_WRITE_REPLICATIONS_PER_NODE = 5;const int MAX_CONCURRENT_READ_REPLICATIONS_PER_NODE = 10;////// When placing chunks, we see the space available on the node as well as/// we take our estimate of the # of writes on/// the node as a hint for choosing servers; if a server is "loaded" we should/// avoid sending traffic to it. This value defines a watermark after which load/// begins to be an issue.///const uint32_t CONCURRENT_WRITES_PER_NODE_WATERMARK = 10;/// Helper functor that can be used to find a chunkid from a vector/// of meta chunk info's.class ChunkIdMatcher { chunkId_t myid;public: ChunkIdMatcher(chunkId_t c) : myid(c) { } bool operator() (MetaChunkInfo *c) { return c->chunkId == myid; }};LayoutManager::LayoutManager() : mLeaseId(1), mNumOngoingReplications(0), mIsRebalancingEnabled(false), mIsExecutingRebalancePlan(false), mLastChunkRebalanced(1), mLastChunkReplicated(1), mRecoveryStartTime(0), mMinChunkserversToExitRecovery(1){ pthread_mutex_init(&mChunkServersMutex, NULL); mReplicationTodoStats = new Counter("Num Replications Todo"); mOngoingReplicationStats = new Counter("Num Ongoing Replications"); mTotalReplicationStats = new Counter("Total Num Replications"); mFailedReplicationStats = new Counter("Num Failed Replications"); mStaleChunkCount = new Counter("Num Stale Chunks"); // how much to be done before we are done globals().counterManager.AddCounter(mReplicationTodoStats); // how much are we doing right now globals().counterManager.AddCounter(mOngoingReplicationStats); globals().counterManager.AddCounter(mTotalReplicationStats); globals().counterManager.AddCounter(mFailedReplicationStats); globals().counterManager.AddCounter(mStaleChunkCount);}class MatchingServer { ServerLocation loc;public: MatchingServer(const ServerLocation &l) : loc(l) { } bool operator() (ChunkServerPtr &s) { return s->MatchingServer(loc); }};//// Try to match servers by hostname: for write allocation, we'd like to place// one copy of the block on the same host on which the client is running.//class MatchServerByHost { string host;public: MatchServerByHost(const string &s) : host(s) { } bool operator() (ChunkServerPtr &s) { ServerLocation l = s->GetServerLocation(); return l.hostname == host; }};/// Add the newly joined server to the list of servers we have. Also,/// update our state to include the chunks hosted on this server.voidLayoutManager::AddNewServer(MetaHello *r){ ChunkServerPtr s; vector <chunkId_t> staleChunkIds; vector <ChunkInfo>::size_type i; vector <ChunkServerPtr>::iterator j; uint64_t allocSpace = r->chunks.size() * CHUNKSIZE; if (r->server->IsDown()) return; s = r->server; s->SetServerLocation(r->location); s->SetSpace(r->totalSpace, r->usedSpace, allocSpace); s->SetRack(r->rackId); // If a previously dead server reconnects, reuse the server's // position in the list of chunk servers. This is because in // the chunk->server mapping table, we use the chunkserver's // position in the list of connected servers to find it. // j = find_if(mChunkServers.begin(), mChunkServers.end(), MatchingServer(r->location)); if (j != mChunkServers.end()) { KFS_LOG_VA_DEBUG("Duplicate server: %s, %d", r->location.hostname.c_str(), r->location.port); return; } for (i = 0; i < r->chunks.size(); ++i) { vector<MetaChunkInfo *> v; vector<MetaChunkInfo *>::iterator chunk; int res = -1; metatree.getalloc(r->chunks[i].fileId, v); chunk = find_if(v.begin(), v.end(), ChunkIdMatcher(r->chunks[i].chunkId)); if (chunk != v.end()) { MetaChunkInfo *mci = *chunk; if (mci->chunkVersion <= r->chunks[i].chunkVersion) { // This chunk is non-stale. Verify that there are // sufficient copies; if there are too many, nuke some. ChangeChunkReplication(r->chunks[i].chunkId); res = UpdateChunkToServerMapping(r->chunks[i].chunkId, s.get()); assert(res >= 0); // get the chunksize for the last chunk of fid // stored on this server MetaFattr *fa = metatree.getFattr(r->chunks[i].fileId); if (fa->filesize < 0) { MetaChunkInfo *lastChunk = v.back(); if (lastChunk->chunkId == r->chunks[i].chunkId) s->GetChunkSize(r->chunks[i].fileId, r->chunks[i].chunkId); } if (mci->chunkVersion < r->chunks[i].chunkVersion) { // version #'s differ. have the chunkserver reset // to what the metaserver has. // XXX: This is all due to the issue with not logging // the version # that the metaserver is issuing. What is going // on here is that, // -- client made a request // -- metaserver bumped the version; notified the chunkservers // -- the chunkservers write out the version bump on disk // -- the metaserver gets ack; writes out the version bump on disk // -- and then notifies the client // Now, if the metaserver crashes before it writes out the // version bump, it is possible that some chunkservers did the // bump, but not the metaserver. So, fix up. To avoid other whacky // scenarios, we increment the chunk version # by the incarnation stuff // to avoid reissuing the same version # multiple times. s->NotifyChunkVersChange(r->chunks[i].fileId, r->chunks[i].chunkId, mci->chunkVersion); } } else { KFS_LOG_VA_INFO("Old version for chunk id = %lld => stale", r->chunks[i].chunkId); } } if (res < 0) { /// stale chunk KFS_LOG_VA_INFO("Non-existent chunk id = %lld => stale", r->chunks[i].chunkId); staleChunkIds.push_back(r->chunks[i].chunkId); mStaleChunkCount->Update(1); } } if (staleChunkIds.size() > 0) { s->NotifyStaleChunks(staleChunkIds); } // prevent the network thread from wandering this list while we change it. pthread_mutex_lock(&mChunkServersMutex); mChunkServers.push_back(s); pthread_mutex_unlock(&mChunkServersMutex); vector<RackInfo>::iterator rackIter; rackIter = find_if(mRacks.begin(), mRacks.end(), RackMatcher(r->rackId)); if (rackIter != mRacks.end()) { rackIter->addServer(s); } else { RackInfo ri(r->rackId); ri.addServer(s); mRacks.push_back(ri); } // Update the list since a new server is in CheckHibernatingServersStatus();}class MapPurger { CSMap &cmap; CRCandidateSet &crset; const ChunkServer *target;public: MapPurger(CSMap &m, CRCandidateSet &c, const ChunkServer *t): cmap(m), crset(c), target(t) { } void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) { ChunkPlacementInfo c = p.second; vector <ChunkServerPtr>::iterator i; // // only chunks hosted on the target need to be checked for // replication level // i = find_if(c.chunkServers.begin(), c.chunkServers.end(), ChunkServerMatcher(target)); if (i == c.chunkServers.end()) return; c.chunkServers.erase(remove_if(c.chunkServers.begin(), c.chunkServers.end(), ChunkServerMatcher(target)), c.chunkServers.end()); cmap[p.first] = c; // we need to check the replication level of this chunk crset.insert(p.first); }};class MapRetirer { CSMap &cmap; CRCandidateSet &crset; ChunkServer *retiringServer;public: MapRetirer(CSMap &m, CRCandidateSet &c, ChunkServer *t): cmap(m), crset(c), retiringServer(t) { } void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) { ChunkPlacementInfo c = p.second; vector <ChunkServerPtr>::iterator i; i = find_if(c.chunkServers.begin(), c.chunkServers.end(), ChunkServerMatcher(retiringServer)); if (i == c.chunkServers.end()) return; // we need to check the replication level of this chunk crset.insert(p.first); retiringServer->EvacuateChunk(p.first); }};class MapDumper { ofstream &ofs;public: MapDumper(ofstream &o) : ofs(o) { } void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) { chunkId_t cid = p.first; ChunkPlacementInfo c = p.second; ofs << cid << ' ' << c.fid << ' ' << c.chunkServers.size() << ' '; for (uint32_t i = 0; i < c.chunkServers.size(); i++) { ofs << c.chunkServers[i]->ServerID() << ' ' << c.chunkServers[i]->GetRack() << ' '; } ofs << endl; }};class MapDumperStream { ostringstream &ofs;public: MapDumperStream(ostringstream &o) : ofs(o) { } void operator () (const map<chunkId_t, ChunkPlacementInfo >::value_type p) { chunkId_t cid = p.first; ChunkPlacementInfo c = p.second; ofs << cid << ' ' << c.fid << ' ' << c.chunkServers.size() << ' '; for (uint32_t i = 0; i < c.chunkServers.size(); i++) { ofs << c.chunkServers[i]->ServerID() << ' ' << c.chunkServers[i]->GetRack() << ' '; } ofs << endl; }};//// Dump out the chunk block map to a file. The output can be used in emulation// modes where we setup the block map and experiment.//voidLayoutManager::DumpChunkToServerMap(){ ofstream ofs; ofs.open("chunkmap.txt"); for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), MapDumper(ofs)); ofs.flush(); ofs.close();}// Dump chunk block map to response streamvoidLayoutManager::DumpChunkToServerMap(ostringstream &os){ for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), MapDumperStream(os));}voidLayoutManager::ServerDown(ChunkServer *server){ vector <ChunkServerPtr>::iterator i = find_if(mChunkServers.begin(), mChunkServers.end(), ChunkServerMatcher(server)); if (i == mChunkServers.end()) return; vector<RackInfo>::iterator rackIter; rackIter = find_if(mRacks.begin(), mRacks.end(), RackMatcher(server->GetRack())); if (rackIter != mRacks.end()) { rackIter->removeServer(server); if (rackIter->getServers().size() == 0) { // the entire rack of servers is gone // so, take the rack out KFS_LOG_VA_INFO("All servers in rack %d are down; taking out the rack", server->GetRack()); mRacks.erase(rackIter); } } /// Fail all the ops that were sent/waiting for response from /// this server. server->FailPendingOps(); // check if this server was sent to hibernation bool isHibernating = false; for (uint32_t j = 0; j < mHibernatingServers.size(); j++) { if (mHibernatingServers[j].location == server->GetServerLocation()) { // record all the blocks that need to be checked for // re-replication later MapPurger purge(mChunkToServerMap, mHibernatingServers[j].blocks, server); for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), purge); isHibernating = true; break; } } if (!isHibernating) { MapPurger purge(mChunkToServerMap, mChunkReplicationCandidates, server); for_each(mChunkToServerMap.begin(), mChunkToServerMap.end(), purge); } // for reporting purposes, record when it went down time_t now = time(NULL); string downSince = timeToStr(now); ServerLocation loc = server->GetServerLocation(); const char *reason; if (isHibernating) reason = "Hibernated"; else if (server->IsRetiring()) reason = "Retired"; else reason= "Unreachable"; mDownServers << "s=" << loc.hostname << ", p=" << loc.port << ", down=" << downSince << ", reason=" << reason << "\t"; mChunkServers.erase(i);}intLayoutManager::RetireServer(const ServerLocation &loc, int downtime){ ChunkServerPtr retiringServer; vector <ChunkServerPtr>::iterator i; i = find_if(mChunkServers.begin(), mChunkServers.end(), MatchingServer(loc)); if (i == mChunkServers.end()) return -1; retiringServer = *i; retiringServer->SetRetiring(); if (downtime > 0) { HibernatingServerInfo_t hsi; hsi.location = retiringServer->GetServerLocation(); hsi.sleepEndTime = time(0) + downtime; mHibernatingServers.push_back(hsi);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -