📄 chunkserver.cc
字号:
allocateOp->layoutDone = true; // The op is no longer suspended. req->suspended = false; // send it on its merry way submit_request(req); } } else if ((submittedOp->op == META_CHUNK_DELETE) || (submittedOp->op == META_CHUNK_TRUNCATE) || (submittedOp->op == META_CHUNK_HEARTBEAT) || (submittedOp->op == META_CHUNK_STALENOTIFY) || (submittedOp->op == META_CHUNK_VERSCHANGE) || (submittedOp->op == META_CHUNK_RETIRE)) { assert(req == NULL); delete submittedOp; } else if (submittedOp->op == META_CHUNK_REPLICATE) { // This op is internally generated. We need to notify // the layout manager of this op's completion. So, send // it there. MetaChunkReplicate *mcr = static_cast<MetaChunkReplicate *>(submittedOp); KFS_LOG_VA_DEBUG("Meta chunk replicate for chunk %lld finished with version %lld, status: %d", mcr->chunkId, mcr->chunkVersion, submittedOp->status); submit_request(submittedOp); // the op will get nuked after it is processed } else if (submittedOp->op == META_CHUNK_SIZE) { // chunkserver has responded with the chunk's size. So, update // the meta-tree submit_request(submittedOp); } else { assert(!"Unknown op!"); }}////// The response sent by a chunkserver is of the form:/// OK \r\n/// Cseq: <seq #>\r\n/// Status: <status> \r\n/// {<other header/value pair>\r\n}*\r\n////// @param[in] buf Buffer containing the response/// @param[in] bufLen length of buf/// @param[out] prop Properties object with the response header/values/// voidChunkServer::ParseResponse(char *buf, int bufLen, Properties &prop){ istringstream ist(buf); const char separator = ':'; string respOk; // KFS_LOG_VA_DEBUG("Got chunk-server-response: %s", buf); ist >> respOk; // Response better start with OK if (respOk.compare("OK") != 0) { KFS_LOG_VA_DEBUG("Didn't get an OK: instead, %s", respOk.c_str()); return; } prop.loadProperties(ist, separator, false);}// Helper functor that matches ops by sequence #'sclass OpMatch { seq_t myseq;public: OpMatch(seq_t s) : myseq(s) { } bool operator() (const MetaRequest *r) { return (r->opSeqno == myseq); }};////// Request/responses are matched based on sequence #'s.///MetaRequest *ChunkServer::FindMatchingRequest(seq_t cseq){ list<MetaRequest *>::iterator iter; MetaRequest *op; iter = find_if(mDispatchedReqs.begin(), mDispatchedReqs.end(), OpMatch(cseq)); if (iter == mDispatchedReqs.end()) return NULL; op = *iter; mDispatchedReqs.erase(iter); return op;}////// Queue an RPC request///voidChunkServer::Enqueue(MetaRequest *r) { mPendingReqs.enqueue(r); globals().netKicker.Kick();}intChunkServer::AllocateChunk(MetaAllocate *r, int64_t leaseId){ MetaChunkAllocate *ca; mAllocSpace += CHUNKSIZE; UpdateNumChunkWrites(1); ca = new MetaChunkAllocate(NextSeq(), r, this, leaseId); // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(ca); return 0;}intChunkServer::DeleteChunk(chunkId_t chunkId){ MetaChunkDelete *r; mAllocSpace -= CHUNKSIZE; if (IsRetiring()) { EvacuateChunkDone(chunkId); } r = new MetaChunkDelete(NextSeq(), this, chunkId); // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r); return 0;}intChunkServer::TruncateChunk(chunkId_t chunkId, off_t s){ MetaChunkTruncate *r; mAllocSpace -= (CHUNKSIZE - s); r = new MetaChunkTruncate(NextSeq(), this, chunkId, s); // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r); return 0;}intChunkServer::GetChunkSize(fid_t fid, chunkId_t chunkId){ MetaChunkSize *r; r = new MetaChunkSize(NextSeq(), this, fid, chunkId); // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r); return 0;}intChunkServer::ReplicateChunk(fid_t fid, chunkId_t chunkId, seq_t chunkVersion, const ServerLocation &loc){ MetaChunkReplicate *r; r = new MetaChunkReplicate(NextSeq(), this, fid, chunkId, chunkVersion, loc); r->server = shared_from_this(); mNumChunkWriteReplications++; // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r); return 0;}voidChunkServer::Heartbeat(){ if (!mHelloDone) { return; } if (mHeartbeatSent) { string loc = mLocation.ToString(); time_t now = time(0); if (now - mLastHeard > INACTIVE_SERVER_TIMEOUT) { KFS_LOG_VA_INFO("Server %s has been non-responsive for too long; taking it down", loc.c_str()); // We are executing in the context of the network thread // So, take the server down as though the net connection // broke. HandleRequest(EVENT_NET_ERROR, NULL); return; } // If a request is outstanding, don't send one more mHeartbeatSkipped = true; KFS_LOG_VA_INFO("Skipping send of heartbeat to %s", loc.c_str()); return; } mHeartbeatSent = true; mHeartbeatSkipped = false; MetaChunkHeartbeat *r; r = new MetaChunkHeartbeat(NextSeq(), this); // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r);}voidChunkServer::NotifyStaleChunks(const vector<chunkId_t> &staleChunkIds){ MetaChunkStaleNotify *r; mAllocSpace -= (CHUNKSIZE * staleChunkIds.size()); r = new MetaChunkStaleNotify(NextSeq(), this); r->staleChunkIds = staleChunkIds; // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r);}voidChunkServer::NotifyStaleChunk(chunkId_t staleChunkId){ MetaChunkStaleNotify *r; mAllocSpace -= CHUNKSIZE; r = new MetaChunkStaleNotify(NextSeq(), this); r->staleChunkIds.push_back(staleChunkId); // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r);}voidChunkServer::NotifyChunkVersChange(fid_t fid, chunkId_t chunkId, seq_t chunkVers){ MetaChunkVersChange *r; r = new MetaChunkVersChange(NextSeq(), this, fid, chunkId, chunkVers); // save a pointer to the request so that we can match up the // response whenever we get it. Enqueue(r);}voidChunkServer::SetRetiring(){ mIsRetiring = true; mRetireStartTime = time(NULL); KFS_LOG_VA_INFO("Initiation of retire for chunks on %s : %d blocks to do", ServerID().c_str(), mNumChunks);}voidChunkServer::EvacuateChunkDone(chunkId_t chunkId){ if (!mIsRetiring) return; mEvacuatingChunks.erase(chunkId); if (mEvacuatingChunks.empty()) { KFS_LOG_VA_INFO("Evacuation of chunks on %s is done; retiring", ServerID().c_str()); Retire(); }}voidChunkServer::Retire(){ MetaChunkRetire *r; r = new MetaChunkRetire(NextSeq(), this); Enqueue(r);}//// Helper functor that dispatches an RPC request to the server.//class OpDispatcher { ChunkServer *server; NetConnectionPtr conn;public: OpDispatcher(ChunkServer *s, NetConnectionPtr &c) : server(s), conn(c) { } void operator()(MetaRequest *r) { ostringstream os; MetaChunkRequest *cr = static_cast <MetaChunkRequest *> (r); if (!conn) { // Server is dead...so, drop the op r->status = -EIO; server->ResumeOp(r); return; } assert(cr != NULL); // Get the request into string format cr->request(os); // Send it on its merry way conn->Write(os.str().c_str(), os.str().length()); if (cr->op == META_CHUNK_REPLICATE) { MetaChunkReplicate *mcr = static_cast <MetaChunkReplicate *> (cr); KFS_LOG_VA_INFO("Dispatched re-replication request: %s", mcr->Show().c_str()); } // Notify the server the op is dispatched server->Dispatched(r); }};voidChunkServer::Dispatch(){ OpDispatcher dispatcher(this, mNetConnection); list<MetaRequest *> reqs; MetaRequest *r; while((r = mPendingReqs.dequeue_nowait())) { reqs.push_back(r); } for_each(reqs.begin(), reqs.end(), dispatcher); reqs.clear();}// Helper functor that fails an op with an error code.class OpFailer { ChunkServer *server; int errCode;public: OpFailer(ChunkServer *s, int c) : server(s), errCode(c) { }; void operator() (MetaRequest *op) { op->status = errCode; server->ResumeOp(op); }};voidChunkServer::FailDispatchedOps(){ for_each(mDispatchedReqs.begin(), mDispatchedReqs.end(), OpFailer(this, -EIO)); mDispatchedReqs.clear();}voidChunkServer::FailPendingOps(){ list<MetaRequest *> reqs; MetaRequest *r; while((r = mPendingReqs.dequeue_nowait())) { reqs.push_back(r); } for_each(reqs.begin(), reqs.end(), OpFailer(this, -EIO)); reqs.clear();}inline float convertToMB(off_t bytes){ return bytes / (1024.0 * 1024.0);}inline float convertToGB(off_t bytes){ return bytes / (1024.0 * 1024.0 * 1024.0);}voidChunkServer::GetRetiringStatus(string &result){ if (!mIsRetiring) return; ostringstream ost; char timebuf[64]; ctime_r(&mRetireStartTime, timebuf); if (timebuf[24] == '\n') timebuf[24] = '\0'; ost << "s=" << mLocation.hostname << ", p=" << mLocation.port << ", started=" << timebuf << ", numLeft=" << mEvacuatingChunks.size() << ", numDone=" << mNumChunks - mEvacuatingChunks.size() << '\t'; result += ost.str();}voidChunkServer::Ping(string &result){ ostringstream ost; time_t now = time(NULL); bool isOverloaded = false; // for nodes taken out of write allocation, send the info back; this allows // the UI to color these nodes differently if (GetSpaceUtilization() > MAX_SERVER_SPACE_UTIL_THRESHOLD) isOverloaded = true; if (mTotalSpace < (1L << 30)) { ost << "s=" << mLocation.hostname << ", p=" << mLocation.port << ", total=" << convertToMB(mTotalSpace) << "(MB), used=" << convertToMB(mUsedSpace) << "(MB), util=" << GetSpaceUtilization() * 100.0 << "%, nblocks=" << mNumChunks << ", lastheard=" << now - mLastHeard << " (sec)" << ", ncorrupt=" << mNumCorruptChunks << ", nchunksToMove=" << mChunksToMove.size(); if (isOverloaded) ost << ", overloaded=1"; ost << "\t"; } else { ost << "s=" << mLocation.hostname << ", p=" << mLocation.port << ", total=" << convertToGB(mTotalSpace) << "(GB), used=" << convertToGB(mUsedSpace) << "(GB), util=" << GetSpaceUtilization() * 100.0 << "%, nblocks=" << mNumChunks << ", lastheard=" << now - mLastHeard << " (sec)" << ", ncorrupt=" << mNumCorruptChunks << ", nchunksToMove=" << mChunksToMove.size(); if (isOverloaded) ost << ", overloaded=1"; ost << "\t"; } result += ost.str();}voidChunkServer::SendResponse(MetaRequest *op){ ostringstream os; op->response(os); if (os.str().length() > 0) mNetConnection->Write(os.str().c_str(), os.str().length());}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -