📄 chunkserver.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: ChunkServer.cc 224 2008-12-16 22:48:11Z sriramsrao $ //// Created 2006/06/06// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------#include "ChunkServer.h"#include "LayoutManager.h"#include "NetDispatch.h"#include "util.h"#include "libkfsIO/Globals.h"using namespace KFS;using namespace libkfsio;#include <cassert>#include <string>#include <sstream>using std::string;using std::istringstream;#include <boost/scoped_array.hpp>using boost::scoped_array;#include "common/log.h"//// if a chunkserver is not responsive for over 10 mins, mark it down//const int32_t INACTIVE_SERVER_TIMEOUT = 600;ChunkServer::ChunkServer() : mSeqNo(1), mTimer(NULL), mHelloDone(false), mDown(false), mHeartbeatSent(false), mHeartbeatSkipped(false), mIsRetiring(false), mRackId(-1), mNumCorruptChunks(0), mTotalSpace(0), mUsedSpace(0), mAllocSpace(0), mNumChunks(0), mNumChunkWrites(0), mNumChunkWriteReplications(0), mNumChunkReadReplications(0){ // this is used in emulation mode...}ChunkServer::ChunkServer(NetConnectionPtr &conn) : mSeqNo(1), mNetConnection(conn), mHelloDone(false), mDown(false), mHeartbeatSent(false), mHeartbeatSkipped(false), mIsRetiring(false), mRackId(-1), mNumCorruptChunks(0), mTotalSpace(0), mUsedSpace(0), mAllocSpace(0), mNumChunks(0), mNumChunkWrites(0), mNumChunkWriteReplications(0), mNumChunkReadReplications(0){ mTimer = new ChunkServerTimeoutImpl(this); // Receive HELLO message SET_HANDLER(this, &ChunkServer::HandleHello); globals().netManager.RegisterTimeoutHandler(mTimer);}ChunkServer::~ChunkServer(){ // KFS_LOG_VA_DEBUG("Deleting %p", this); if (mNetConnection) mNetConnection->Close(); if (mTimer) { globals().netManager.UnRegisterTimeoutHandler(mTimer); delete mTimer; }}voidChunkServer::StopTimer(){ if (mTimer) { globals().netManager.UnRegisterTimeoutHandler(mTimer); delete mTimer; mTimer = NULL; }}////// Handle a HELLO message from the chunk server./// @param[in] code: The type of event that occurred/// @param[in] data: Data being passed in relative to the event that/// occurred./// @retval 0 to indicate successful event handling; -1 otherwise.///intChunkServer::HandleHello(int code, void *data){ IOBuffer *iobuf; int msgLen, retval; switch (code) { case EVENT_NET_READ: // We read something from the network. It // better be a HELLO message. iobuf = (IOBuffer *) data; if (IsMsgAvail(iobuf, &msgLen)) { retval = HandleMsg(iobuf, msgLen); if (retval < 0) { // Couldn't process hello // message...bye-bye mDown = true; gNetDispatch.GetChunkServerFactory()->RemoveServer(this); return -1; } if (retval > 0) { // not all data is available...so, hold on return 0; } mHelloDone = true; mLastHeard = time(NULL); // Hello message successfully // processed. Setup to handle RPCs SET_HANDLER(this, &ChunkServer::HandleRequest); } break; case EVENT_NET_WROTE: // Something went out on the network. break; case EVENT_NET_ERROR: // KFS_LOG_VA_DEBUG("Closing connection"); mDown = true; gNetDispatch.GetChunkServerFactory()->RemoveServer(this); break; default: assert(!"Unknown event"); return -1; } return 0;}////// Generic event handler. Decode the event that occurred and/// appropriately extract out the data and deal with the event./// @param[in] code: The type of event that occurred/// @param[in] data: Data being passed in relative to the event that/// occurred./// @retval 0 to indicate successful event handling; -1 otherwise.///intChunkServer::HandleRequest(int code, void *data){ IOBuffer *iobuf; int msgLen; MetaRequest *op; switch (code) { case EVENT_NET_READ: // We read something from the network. It is // either an RPC (such as hello) or a reply to // an RPC we sent earlier. iobuf = (IOBuffer *) data; while (IsMsgAvail(iobuf, &msgLen)) { HandleMsg(iobuf, msgLen); } break; case EVENT_CMD_DONE: op = (MetaRequest *) data; if (!mDown) { SendResponse(op); } // nothing left to be done...get rid of it delete op; break; case EVENT_NET_WROTE: // Something went out on the network. break; case EVENT_NET_ERROR: KFS_LOG_VA_INFO("Chunk server %s is down...", GetServerName()); StopTimer(); FailDispatchedOps(); // Take out the server from write-allocation mTotalSpace = mAllocSpace = mUsedSpace = 0; mNetConnection->Close(); mNetConnection.reset(); mDown = true; // force the server down thru the main loop to avoid races op = new MetaBye(0, shared_from_this()); op->clnt = this; gNetDispatch.GetChunkServerFactory()->RemoveServer(this); submit_request(op); break; default: assert(!"Unknown event"); return -1; } return 0;}////// We have a message from the chunk server. The message we got is one/// of:/// - a HELLO message from the server /// - it is a response to some command we previously sent/// - is an RPC from the chunkserver/// /// Of these, for the first and third case,create an op and/// send that down the pike; in the second case, retrieve the op from/// the pending list, attach the response, and push that down the pike.////// @param[in] iobuf: Buffer containing the command/// @param[in] msgLen: Length of the command in the buffer/// @retval 0 if we handled the message properly; -1 on error; /// 1 if there is more data needed for this message and we haven't/// yet received the data.intChunkServer::HandleMsg(IOBuffer *iobuf, int msgLen){ char buf[5]; if (!mHelloDone) { return HandleHelloMsg(iobuf, msgLen); } iobuf->CopyOut(buf, 3); buf[4] = '\0'; if (strncmp(buf, "OK", 2) == 0) { return HandleReply(iobuf, msgLen); } return HandleCmd(iobuf, msgLen);}/// Case #1: Handle Hello message from a chunkserver that/// just connected to us.intChunkServer::HandleHelloMsg(IOBuffer *iobuf, int msgLen){ scoped_array<char> buf, contentBuf; MetaRequest *op; MetaHello *helloOp; int i, nAvail; istringstream ist; buf.reset(new char[msgLen + 1]); iobuf->CopyOut(buf.get(), msgLen); buf[msgLen] = '\0'; assert(!mHelloDone); // We should only get a HELLO message here; anything // else is bad. if (ParseCommand(buf.get(), msgLen, &op) < 0) { KFS_LOG_VA_DEBUG("Aye?: %s", buf.get()); iobuf->Consume(msgLen); // we couldn't parse out hello return -1; } // we really ought to get only hello here if (op->op != META_HELLO) { KFS_LOG_VA_DEBUG("Only need hello...but: %s", buf.get()); iobuf->Consume(msgLen); delete op; // got a bogus command return -1; } helloOp = static_cast<MetaHello *> (op); KFS_LOG_VA_INFO("New server: \n%s", buf.get()); op->clnt = this; helloOp->server = shared_from_this(); // make sure we have the chunk ids... if (helloOp->contentLength > 0) { nAvail = iobuf->BytesConsumable() - msgLen; if (nAvail < helloOp->contentLength) { // need to wait for data... delete op; return 1; } // we have everything iobuf->Consume(msgLen); contentBuf.reset(new char[helloOp->contentLength + 1]); contentBuf[helloOp->contentLength] = '\0'; // get the chunkids iobuf->CopyOut(contentBuf.get(), helloOp->contentLength); iobuf->Consume(helloOp->contentLength); ist.str(contentBuf.get()); for (i = 0; i < helloOp->numChunks; ++i) { ChunkInfo c; ist >> c.fileId; ist >> c.chunkId; ist >> c.chunkVersion; helloOp->chunks.push_back(c); // KFS_LOG_VA_DEBUG("Server has chunk: %lld", chunkId); } } else { // Message is ready to be pushed down. So remove it. iobuf->Consume(msgLen); } // send it on its merry way submit_request(op); return 0;}////// Case #2: Handle an RPC from a chunkserver.///intChunkServer::HandleCmd(IOBuffer *iobuf, int msgLen){ scoped_array<char> buf; MetaRequest *op; buf.reset(new char[msgLen + 1]); iobuf->CopyOut(buf.get(), msgLen); buf[msgLen] = '\0'; assert(mHelloDone); // Message is ready to be pushed down. So remove it. iobuf->Consume(msgLen); if (ParseCommand(buf.get(), msgLen, &op) != 0) { return -1; } if (op->op == META_CHUNK_CORRUPT) { MetaChunkCorrupt *ccop = static_cast<MetaChunkCorrupt *>(op); ccop->server = shared_from_this(); } op->clnt = this; submit_request(op); /* if (iobuf->BytesConsumable() > 0) { KFS_LOG_VA_DEBUG("More command data likely available: "); } */ return 0;}////// Case #3: Handle a reply from a chunkserver to an RPC we/// previously sent.///intChunkServer::HandleReply(IOBuffer *iobuf, int msgLen){ scoped_array<char> buf, contentBuf; MetaRequest *op; int status; seq_t cseq; istringstream ist; Properties prop; MetaChunkRequest *submittedOp; buf.reset(new char[msgLen + 1]); iobuf->CopyOut(buf.get(), msgLen); buf[msgLen] = '\0'; assert(mHelloDone); // Message is ready to be pushed down. So remove it. iobuf->Consume(msgLen); // We got a response for a command we previously // sent. So, match the response to its request and // resume request processing. ParseResponse(buf.get(), msgLen, prop); cseq = prop.getValue("Cseq", (seq_t) -1); status = prop.getValue("Status", -1); op = FindMatchingRequest(cseq); if (op == NULL) { // Uh-oh...this can happen if the server restarts between sending // the message and getting reply back // assert(!"Unable to find command for a response"); KFS_LOG_VA_WARN("Unable to find command for response (cseq = %d)", cseq); return -1; } // KFS_LOG_VA_DEBUG("Got response for cseq=%d", cseq); submittedOp = static_cast <MetaChunkRequest *> (op); submittedOp->status = status; if (submittedOp->op == META_CHUNK_HEARTBEAT) { mTotalSpace = prop.getValue("Total-space", (long long) 0); mUsedSpace = prop.getValue("Used-space", (long long) 0); mNumChunks = prop.getValue("Num-chunks", 0); mAllocSpace = mUsedSpace + mNumChunkWrites * CHUNKSIZE; mHeartbeatSent = false; mLastHeard = time(NULL); } else if (submittedOp->op == META_CHUNK_REPLICATE) { MetaChunkReplicate *mcr = static_cast <MetaChunkReplicate *> (op); mcr->fid = prop.getValue("File-handle", (long long) 0); mcr->chunkVersion = prop.getValue("Chunk-version", (long long) 0); } else if (submittedOp->op == META_CHUNK_SIZE) { MetaChunkSize *mcs = static_cast <MetaChunkSize *> (op); mcs->chunkSize = prop.getValue("Size", (off_t) -1); } ResumeOp(op); return 0;}voidChunkServer::ResumeOp(MetaRequest *op){ MetaChunkRequest *submittedOp; MetaAllocate *allocateOp; MetaRequest *req; // get the original request and get rid of the // intermediate one we generated for the RPC. submittedOp = static_cast <MetaChunkRequest *> (op); req = submittedOp->req; // op types: // - allocate ops have an "original" request; this // needs to be reactivated, so that a response can // be sent to the client. // - delete ops are a fire-n-forget. there is no // other processing left to be done on them. // - heartbeat: update the space usage statistics and nuke // it, which is already done (in the caller of this method) // - stale-chunk notify: we tell the chunk server and that is it. // if (submittedOp->op == META_CHUNK_ALLOCATE) { assert(req && (req->op == META_ALLOCATE)); // if there is a non-zero status, don't throw it away if (req->status == 0) req->status = submittedOp->status; delete submittedOp; allocateOp = static_cast<MetaAllocate *> (req); allocateOp->numServerReplies++; // wait until we get replies from all servers if (allocateOp->numServerReplies == allocateOp->servers.size()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -