⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 chunkserver.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 2 页
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: ChunkServer.cc 224 2008-12-16 22:48:11Z sriramsrao $ //// Created 2006/06/06// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------#include "ChunkServer.h"#include "LayoutManager.h"#include "NetDispatch.h"#include "util.h"#include "libkfsIO/Globals.h"using namespace KFS;using namespace libkfsio;#include <cassert>#include <string>#include <sstream>using std::string;using std::istringstream;#include <boost/scoped_array.hpp>using boost::scoped_array;#include "common/log.h"//// if a chunkserver is not responsive for over 10 mins, mark it down//const int32_t INACTIVE_SERVER_TIMEOUT = 600;ChunkServer::ChunkServer() :    mSeqNo(1), mTimer(NULL),	mHelloDone(false), mDown(false), mHeartbeatSent(false),	mHeartbeatSkipped(false), mIsRetiring(false), mRackId(-1), 	mNumCorruptChunks(0), mTotalSpace(0), mUsedSpace(0), mAllocSpace(0), 	mNumChunks(0), mNumChunkWrites(0), 	mNumChunkWriteReplications(0), mNumChunkReadReplications(0){	// this is used in emulation mode...}ChunkServer::ChunkServer(NetConnectionPtr &conn) :	mSeqNo(1), mNetConnection(conn), 	mHelloDone(false), mDown(false), mHeartbeatSent(false),	mHeartbeatSkipped(false), mIsRetiring(false), mRackId(-1), 	mNumCorruptChunks(0), mTotalSpace(0), mUsedSpace(0), mAllocSpace(0), 	mNumChunks(0), mNumChunkWrites(0), 	mNumChunkWriteReplications(0), mNumChunkReadReplications(0){        mTimer = new ChunkServerTimeoutImpl(this);        // Receive HELLO message        SET_HANDLER(this, &ChunkServer::HandleHello);        globals().netManager.RegisterTimeoutHandler(mTimer);}ChunkServer::~ChunkServer(){	// KFS_LOG_VA_DEBUG("Deleting %p", this);        if (mNetConnection)                mNetConnection->Close();	if (mTimer) {        	globals().netManager.UnRegisterTimeoutHandler(mTimer);        	delete mTimer;	}}voidChunkServer::StopTimer(){	if (mTimer) {        	globals().netManager.UnRegisterTimeoutHandler(mTimer);        	delete mTimer;		mTimer = NULL;	}}////// Handle a HELLO message from the chunk server./// @param[in] code: The type of event that occurred/// @param[in] data: Data being passed in relative to the event that/// occurred./// @retval 0 to indicate successful event handling; -1 otherwise.///intChunkServer::HandleHello(int code, void *data){	IOBuffer *iobuf;	int msgLen, retval;	switch (code) {	case EVENT_NET_READ:		// We read something from the network.  It		// better be a HELLO message.		iobuf = (IOBuffer *) data;		if (IsMsgAvail(iobuf, &msgLen)) {			retval = HandleMsg(iobuf, msgLen);			if (retval < 0) {				 // Couldn't process hello				 // message...bye-bye				mDown = true;				gNetDispatch.GetChunkServerFactory()->RemoveServer(this);				return -1;			}			if (retval > 0) {				// not all data is available...so, hold on				return 0;			}			mHelloDone = true;			mLastHeard = time(NULL);			// Hello message successfully			// processed.  Setup to handle RPCs			SET_HANDLER(this, &ChunkServer::HandleRequest);		}		break;	 case EVENT_NET_WROTE:		// Something went out on the network.  		break;	 case EVENT_NET_ERROR:		// KFS_LOG_VA_DEBUG("Closing connection");		mDown = true;		gNetDispatch.GetChunkServerFactory()->RemoveServer(this);		break;	 default:		assert(!"Unknown event");		return -1;	}	return 0;}////// Generic event handler.  Decode the event that occurred and/// appropriately extract out the data and deal with the event./// @param[in] code: The type of event that occurred/// @param[in] data: Data being passed in relative to the event that/// occurred./// @retval 0 to indicate successful event handling; -1 otherwise.///intChunkServer::HandleRequest(int code, void *data){	IOBuffer *iobuf;	int msgLen;	MetaRequest *op;	switch (code) {	case EVENT_NET_READ:		// We read something from the network.  It is		// either an RPC (such as hello) or a reply to		// an RPC we sent earlier.		iobuf = (IOBuffer *) data;		while (IsMsgAvail(iobuf, &msgLen)) {			HandleMsg(iobuf, msgLen);		}		break;	case EVENT_CMD_DONE:		op = (MetaRequest *) data;		if (!mDown) {			SendResponse(op);		}			// nothing left to be done...get rid of it		delete op;		break;	case EVENT_NET_WROTE:		// Something went out on the network.  		break;	case EVENT_NET_ERROR:		KFS_LOG_VA_INFO("Chunk server %s is down...", GetServerName());		StopTimer();		FailDispatchedOps();				// Take out the server from write-allocation		mTotalSpace = mAllocSpace = mUsedSpace = 0;				mNetConnection->Close();		mNetConnection.reset();		mDown = true;		// force the server down thru the main loop to avoid races		op = new MetaBye(0, shared_from_this());		op->clnt = this;		gNetDispatch.GetChunkServerFactory()->RemoveServer(this);		submit_request(op);				break;	default:		assert(!"Unknown event");		return -1;	}	return 0;}////// We have a message from the chunk server.  The message we got is one/// of:///  -  a HELLO message from the server ///  -  it is a response to some command we previously sent///  -  is an RPC from the chunkserver/// /// Of these, for the first and third case,create an op and/// send that down the pike; in the second case, retrieve the op from/// the pending list, attach the response, and push that down the pike.////// @param[in] iobuf: Buffer containing the command/// @param[in] msgLen: Length of the command in the buffer/// @retval 0 if we handled the message properly; -1 on error; ///   1 if there is more data needed for this message and we haven't///   yet received the data.intChunkServer::HandleMsg(IOBuffer *iobuf, int msgLen){	char buf[5];	if (!mHelloDone) {		return HandleHelloMsg(iobuf, msgLen);	}        	iobuf->CopyOut(buf, 3);	buf[4] = '\0';	if (strncmp(buf, "OK", 2) == 0) {		return HandleReply(iobuf, msgLen);	}	return HandleCmd(iobuf, msgLen);}/// Case #1: Handle Hello message from a chunkserver that/// just connected to us.intChunkServer::HandleHelloMsg(IOBuffer *iobuf, int msgLen){	scoped_array<char> buf, contentBuf;        MetaRequest *op;        MetaHello *helloOp;        int i, nAvail;	istringstream ist;        buf.reset(new char[msgLen + 1]);        iobuf->CopyOut(buf.get(), msgLen);        buf[msgLen] = '\0';        assert(!mHelloDone);            // We should only get a HELLO message here; anything        // else is bad.        if (ParseCommand(buf.get(), msgLen, &op) < 0) {            KFS_LOG_VA_DEBUG("Aye?: %s", buf.get());            iobuf->Consume(msgLen);            // we couldn't parse out hello            return -1;        }        // we really ought to get only hello here        if (op->op != META_HELLO) {            KFS_LOG_VA_DEBUG("Only  need hello...but: %s", buf.get());            iobuf->Consume(msgLen);            delete op;            // got a bogus command            return -1;        }        helloOp = static_cast<MetaHello *> (op);        KFS_LOG_VA_INFO("New server: \n%s", buf.get());        op->clnt = this;        helloOp->server = shared_from_this();        // make sure we have the chunk ids...        if (helloOp->contentLength > 0) {            nAvail = iobuf->BytesConsumable() - msgLen;            if (nAvail < helloOp->contentLength) {                // need to wait for data...                delete op;                return 1;            }            // we have everything            iobuf->Consume(msgLen);            contentBuf.reset(new char[helloOp->contentLength + 1]);            contentBuf[helloOp->contentLength] = '\0';                                    // get the chunkids            iobuf->CopyOut(contentBuf.get(), helloOp->contentLength);            iobuf->Consume(helloOp->contentLength);            ist.str(contentBuf.get());            for (i = 0; i < helloOp->numChunks; ++i) {                ChunkInfo c;                ist >> c.fileId;                ist >> c.chunkId;                ist >> c.chunkVersion;                helloOp->chunks.push_back(c);                // KFS_LOG_VA_DEBUG("Server has chunk: %lld", chunkId);            }        } else {            // Message is ready to be pushed down.  So remove it.            iobuf->Consume(msgLen);        }        // send it on its merry way        submit_request(op);        return 0;}////// Case #2: Handle an RPC from a chunkserver.///intChunkServer::HandleCmd(IOBuffer *iobuf, int msgLen){	scoped_array<char> buf;        MetaRequest *op;        buf.reset(new char[msgLen + 1]);        iobuf->CopyOut(buf.get(), msgLen);        buf[msgLen] = '\0';        assert(mHelloDone);            // Message is ready to be pushed down.  So remove it.        iobuf->Consume(msgLen);        if (ParseCommand(buf.get(), msgLen, &op) != 0) {            return -1;        }	if (op->op == META_CHUNK_CORRUPT) {		MetaChunkCorrupt *ccop = static_cast<MetaChunkCorrupt *>(op);		ccop->server = shared_from_this();	}        op->clnt = this;        submit_request(op);	/*        if (iobuf->BytesConsumable() > 0) {                KFS_LOG_VA_DEBUG("More command data likely available: ");        }	*/        return 0;}////// Case #3: Handle a reply from a chunkserver to an RPC we/// previously sent.///intChunkServer::HandleReply(IOBuffer *iobuf, int msgLen){	scoped_array<char> buf, contentBuf;        MetaRequest *op;        int status;        seq_t cseq;	istringstream ist;        Properties prop;        MetaChunkRequest *submittedOp;        buf.reset(new char[msgLen + 1]);        iobuf->CopyOut(buf.get(), msgLen);        buf[msgLen] = '\0';        assert(mHelloDone);            // Message is ready to be pushed down.  So remove it.        iobuf->Consume(msgLen);        // We got a response for a command we previously        // sent.  So, match the response to its request and        // resume request processing.        ParseResponse(buf.get(), msgLen, prop);        cseq = prop.getValue("Cseq", (seq_t) -1);        status = prop.getValue("Status", -1);        op = FindMatchingRequest(cseq);        if (op == NULL) {            // Uh-oh...this can happen if the server restarts between sending	    // the message and getting reply back            // assert(!"Unable to find command for a response");	    KFS_LOG_VA_WARN("Unable to find command for response (cseq = %d)", cseq);            return -1;        }        // KFS_LOG_VA_DEBUG("Got response for cseq=%d", cseq);        submittedOp = static_cast <MetaChunkRequest *> (op);        submittedOp->status = status;        if (submittedOp->op == META_CHUNK_HEARTBEAT) {            mTotalSpace = prop.getValue("Total-space", (long long) 0);            mUsedSpace = prop.getValue("Used-space", (long long) 0);            mNumChunks = prop.getValue("Num-chunks", 0);	    mAllocSpace = mUsedSpace + mNumChunkWrites * CHUNKSIZE;	    mHeartbeatSent = false;	    mLastHeard = time(NULL);	} else if (submittedOp->op == META_CHUNK_REPLICATE) {	    MetaChunkReplicate *mcr = static_cast <MetaChunkReplicate *> (op);	    mcr->fid = prop.getValue("File-handle", (long long) 0);	    mcr->chunkVersion = prop.getValue("Chunk-version", (long long) 0);	} else if (submittedOp->op == META_CHUNK_SIZE) {	    MetaChunkSize *mcs = static_cast <MetaChunkSize *> (op);	    mcs->chunkSize = prop.getValue("Size", (off_t) -1);	}        ResumeOp(op);            return 0;}voidChunkServer::ResumeOp(MetaRequest *op){        MetaChunkRequest *submittedOp;        MetaAllocate *allocateOp;        MetaRequest *req;        // get the original request and get rid of the        // intermediate one we generated for the RPC.        submittedOp = static_cast <MetaChunkRequest *> (op);        req = submittedOp->req;        // op types:        // - allocate ops have an "original" request; this        //    needs to be reactivated, so that a response can        //    be sent to the client.        // -  delete ops are a fire-n-forget. there is no        //    other processing left to be done on them.        // -  heartbeat: update the space usage statistics and nuke        // it, which is already done (in the caller of this method)        // -  stale-chunk notify: we tell the chunk server and that is it.        //        if (submittedOp->op == META_CHUNK_ALLOCATE) {                assert(req && (req->op == META_ALLOCATE));		// if there is a non-zero status, don't throw it away		if (req->status == 0)                	req->status = submittedOp->status;		delete submittedOp;                                allocateOp = static_cast<MetaAllocate *> (req);                allocateOp->numServerReplies++;                // wait until we get replies from all servers                if (allocateOp->numServerReplies == allocateOp->servers.size()) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -