⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chunkserver.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 2 页
字号:
                    allocateOp->layoutDone = true;                    // The op is no longer suspended.                    req->suspended = false;                    // send it on its merry way                    submit_request(req);                }        }        else if ((submittedOp->op == META_CHUNK_DELETE) ||		 (submittedOp->op == META_CHUNK_TRUNCATE) ||		 (submittedOp->op == META_CHUNK_HEARTBEAT) ||		 (submittedOp->op == META_CHUNK_STALENOTIFY) ||		 (submittedOp->op == META_CHUNK_VERSCHANGE) ||		 (submittedOp->op == META_CHUNK_RETIRE)) {                assert(req == NULL);		delete submittedOp;                        }        else if (submittedOp->op == META_CHUNK_REPLICATE) {		// This op is internally generated.  We need to notify		// the layout manager of this op's completion.  So, send		// it there.		MetaChunkReplicate *mcr = static_cast<MetaChunkReplicate *>(submittedOp);		KFS_LOG_VA_DEBUG("Meta chunk replicate for chunk %lld finished with version %lld, status: %d",				mcr->chunkId, mcr->chunkVersion, submittedOp->status);		submit_request(submittedOp);		// the op will get nuked after it is processed	}	else if (submittedOp->op == META_CHUNK_SIZE) {		// chunkserver has responded with the chunk's size.  So, update		// the meta-tree		submit_request(submittedOp);	}        else {                assert(!"Unknown op!");        }}////// The response sent by a chunkserver is of the form:/// OK \r\n/// Cseq: <seq #>\r\n/// Status: <status> \r\n/// {<other header/value pair>\r\n}*\r\n////// @param[in] buf Buffer containing the response/// @param[in] bufLen length of buf/// @param[out] prop  Properties object with the response header/values/// voidChunkServer::ParseResponse(char *buf, int bufLen,                           Properties &prop){        istringstream ist(buf);        const char separator = ':';        string respOk;        // KFS_LOG_VA_DEBUG("Got chunk-server-response: %s", buf);        ist >> respOk;        // Response better start with OK        if (respOk.compare("OK") != 0) {                KFS_LOG_VA_DEBUG("Didn't get an OK: instead, %s",                                 respOk.c_str());                return;        }        prop.loadProperties(ist, separator, false);}// Helper functor that matches ops by sequence #'sclass OpMatch {	seq_t myseq;public:	OpMatch(seq_t s) : myseq(s) { }	bool operator() (const MetaRequest *r) {		return (r->opSeqno == myseq);	}};////// Request/responses are matched based on sequence #'s.///MetaRequest *ChunkServer::FindMatchingRequest(seq_t cseq){        list<MetaRequest *>::iterator iter;	MetaRequest *op;	iter = find_if(mDispatchedReqs.begin(), mDispatchedReqs.end(), OpMatch(cseq));	if (iter == mDispatchedReqs.end())		return NULL;		op = *iter;	mDispatchedReqs.erase(iter);	return op;}////// Queue an RPC request///voidChunkServer::Enqueue(MetaRequest *r) {        mPendingReqs.enqueue(r);	globals().netKicker.Kick();}intChunkServer::AllocateChunk(MetaAllocate *r, int64_t leaseId){        MetaChunkAllocate *ca;        mAllocSpace += CHUNKSIZE;	UpdateNumChunkWrites(1);        ca = new MetaChunkAllocate(NextSeq(), r, this, leaseId);        // save a pointer to the request so that we can match up the        // response whenever we get it.	Enqueue(ca);        return 0;}intChunkServer::DeleteChunk(chunkId_t chunkId){	MetaChunkDelete *r;        mAllocSpace -= CHUNKSIZE;	if (IsRetiring()) {		EvacuateChunkDone(chunkId);	}	r = new MetaChunkDelete(NextSeq(), this, chunkId);	// save a pointer to the request so that we can match up the	// response whenever we get it.	Enqueue(r);	return 0;}intChunkServer::TruncateChunk(chunkId_t chunkId, off_t s){	MetaChunkTruncate *r;	mAllocSpace -= (CHUNKSIZE - s);	r = new MetaChunkTruncate(NextSeq(), this, chunkId, s);	// save a pointer to the request so that we can match up the	// response whenever we get it.	Enqueue(r);	return 0;}intChunkServer::GetChunkSize(fid_t fid, chunkId_t chunkId){	MetaChunkSize *r;	r = new MetaChunkSize(NextSeq(), this, fid, chunkId);	// save a pointer to the request so that we can match up the	// response whenever we get it.	Enqueue(r);	return 0;}intChunkServer::ReplicateChunk(fid_t fid, chunkId_t chunkId, seq_t chunkVersion,				const ServerLocation &loc){	MetaChunkReplicate *r;	r = new MetaChunkReplicate(NextSeq(), this, fid, chunkId, chunkVersion, loc);	r->server = shared_from_this();	mNumChunkWriteReplications++;	// save a pointer to the request so that we can match up the	// response whenever we get it.	Enqueue(r);	return 0;}voidChunkServer::Heartbeat(){	if (!mHelloDone) {		return;	}	if (mHeartbeatSent) {		string loc = mLocation.ToString();		time_t now = time(0);		if (now - mLastHeard > INACTIVE_SERVER_TIMEOUT) {			KFS_LOG_VA_INFO("Server %s has been non-responsive for too long; taking it down", loc.c_str());			// We are executing in the context of the network thread			// So, take the server down as though the net connection			// broke.			HandleRequest(EVENT_NET_ERROR, NULL);			return;		}		// If a request is outstanding, don't send one more		mHeartbeatSkipped = true;		KFS_LOG_VA_INFO("Skipping send of heartbeat to %s", loc.c_str());		return;	}	mHeartbeatSent = true;	mHeartbeatSkipped = false;        MetaChunkHeartbeat *r;        r = new MetaChunkHeartbeat(NextSeq(), this);        // save a pointer to the request so that we can match up the        // response whenever we get it.	Enqueue(r);}voidChunkServer::NotifyStaleChunks(const vector<chunkId_t> &staleChunkIds){	MetaChunkStaleNotify *r;	mAllocSpace -= (CHUNKSIZE * staleChunkIds.size());	r = new MetaChunkStaleNotify(NextSeq(), this);	r->staleChunkIds = staleChunkIds;	// save a pointer to the request so that we can match up the	// response whenever we get it.	Enqueue(r);}voidChunkServer::NotifyStaleChunk(chunkId_t staleChunkId){	MetaChunkStaleNotify *r;	mAllocSpace -= CHUNKSIZE;	r = new MetaChunkStaleNotify(NextSeq(), this);	r->staleChunkIds.push_back(staleChunkId);	// save a pointer to the request so that we can match up the	// response whenever we get it.	Enqueue(r);}voidChunkServer::NotifyChunkVersChange(fid_t fid, chunkId_t chunkId, seq_t chunkVers){        MetaChunkVersChange *r;	r = new MetaChunkVersChange(NextSeq(), this, fid, chunkId, chunkVers);	// save a pointer to the request so that we can match up the	// response whenever we get it.	Enqueue(r);}voidChunkServer::SetRetiring(){	mIsRetiring = true;	mRetireStartTime = time(NULL);	KFS_LOG_VA_INFO("Initiation of retire for chunks on %s : %d blocks to do",			ServerID().c_str(), mNumChunks);}voidChunkServer::EvacuateChunkDone(chunkId_t chunkId){	if (!mIsRetiring)		return;	mEvacuatingChunks.erase(chunkId);	if (mEvacuatingChunks.empty()) {		KFS_LOG_VA_INFO("Evacuation of chunks on %s is done; retiring",				ServerID().c_str());		Retire();	}}voidChunkServer::Retire(){	MetaChunkRetire *r;	r = new MetaChunkRetire(NextSeq(), this);	Enqueue(r);}//// Helper functor that dispatches an RPC request to the server.//class OpDispatcher {	ChunkServer *server;	NetConnectionPtr conn;public:	OpDispatcher(ChunkServer *s, NetConnectionPtr &c) :		server(s), conn(c) { }	void operator()(MetaRequest *r) {        	ostringstream os;        	MetaChunkRequest *cr = static_cast <MetaChunkRequest *> (r);        	if (!conn) {                	// Server is dead...so, drop the op                	r->status = -EIO;                	server->ResumeOp(r);			return;        	}        	assert(cr != NULL);        	// Get the request into string format        	cr->request(os);        	// Send it on its merry way        	conn->Write(os.str().c_str(), os.str().length());		if (cr->op == META_CHUNK_REPLICATE) {			MetaChunkReplicate *mcr = static_cast <MetaChunkReplicate *> (cr);			KFS_LOG_VA_INFO("Dispatched re-replication request: %s",					mcr->Show().c_str());		}		// Notify the server the op is dispatched		server->Dispatched(r);	}};voidChunkServer::Dispatch(){	OpDispatcher dispatcher(this, mNetConnection);	list<MetaRequest *> reqs;	MetaRequest *r;	while((r = mPendingReqs.dequeue_nowait())) {		reqs.push_back(r);	}	for_each(reqs.begin(), reqs.end(), dispatcher);	reqs.clear();}// Helper functor that fails an op with an error code.class OpFailer {	ChunkServer *server;	int errCode;public:	OpFailer(ChunkServer *s, int c) : server(s), errCode(c) { };	void operator() (MetaRequest *op) {                op->status = errCode;                server->ResumeOp(op);	}};voidChunkServer::FailDispatchedOps(){	for_each(mDispatchedReqs.begin(), mDispatchedReqs.end(), 				OpFailer(this, -EIO));	mDispatchedReqs.clear();}voidChunkServer::FailPendingOps(){	list<MetaRequest *> reqs;	MetaRequest *r;	while((r = mPendingReqs.dequeue_nowait())) {		reqs.push_back(r);	}	for_each(reqs.begin(), reqs.end(), OpFailer(this, -EIO));	reqs.clear();}inline float convertToMB(off_t bytes){	return bytes / (1024.0 * 1024.0);}inline float convertToGB(off_t bytes){	return bytes / (1024.0 * 1024.0 * 1024.0);}voidChunkServer::GetRetiringStatus(string &result){	if (!mIsRetiring)		return;	ostringstream ost;	char timebuf[64];	ctime_r(&mRetireStartTime, timebuf);	if (timebuf[24] == '\n')		timebuf[24] = '\0';	ost << "s=" << mLocation.hostname << ", p=" << mLocation.port 		<< ", started=" << timebuf 		<< ", numLeft=" << mEvacuatingChunks.size() << ", numDone=" 		<< mNumChunks - mEvacuatingChunks.size() << '\t';	result += ost.str();}voidChunkServer::Ping(string &result){	ostringstream ost;	time_t now = time(NULL);	bool isOverloaded = false;	// for nodes taken out of write allocation, send the info back; this allows	// the UI to color these nodes differently	if (GetSpaceUtilization() > MAX_SERVER_SPACE_UTIL_THRESHOLD)		isOverloaded = true;	if (mTotalSpace < (1L << 30)) {		ost << "s=" << mLocation.hostname << ", p=" << mLocation.port 	    		<< ", total=" << convertToMB(mTotalSpace) 			<< "(MB), used=" << convertToMB(mUsedSpace)			<< "(MB), util=" << GetSpaceUtilization() * 100.0 			<< "%, nblocks=" << mNumChunks 			<< ", lastheard=" << now - mLastHeard << " (sec)"			<< ", ncorrupt=" << mNumCorruptChunks			<< ", nchunksToMove=" << mChunksToMove.size();		if (isOverloaded)			ost << ", overloaded=1";		ost << "\t";	} else {		ost << "s=" << mLocation.hostname << ", p=" << mLocation.port 	    		<< ", total=" << convertToGB(mTotalSpace) 			<< "(GB), used=" << convertToGB(mUsedSpace)			<< "(GB), util=" << GetSpaceUtilization() * 100.0 			<< "%, nblocks=" << mNumChunks 			<< ", lastheard=" << now - mLastHeard << " (sec)"			<< ", ncorrupt=" << mNumCorruptChunks			<< ", nchunksToMove=" << mChunksToMove.size();		if (isOverloaded)			ost << ", overloaded=1";		ost << "\t";	}	result += ost.str();}voidChunkServer::SendResponse(MetaRequest *op){        ostringstream os;        op->response(os);        if (os.str().length() > 0)            mNetConnection->Write(os.str().c_str(), os.str().length());}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -