⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clientsm.cc

📁 nandflash文件系统源代码
💻 CC
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: ClientSM.cc 210 2008-11-05 02:51:49Z sriramsrao $ //// Created 2006/03/23// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------#include "ClientSM.h"#include "ChunkManager.h"#include "ChunkServer.h"#include "Utils.h"#include "KfsOps.h"#include <string>#include <sstream>using std::string;using std::ostringstream;#include "common/log.h"#include "libkfsIO/Globals.h"#include <boost/scoped_array.hpp>using boost::scoped_array;using namespace KFS;using namespace KFS::libkfsio;ClientSM::ClientSM(NetConnectionPtr &conn) {    mNetConnection = conn;    SET_HANDLER(this, &ClientSM::HandleRequest);}ClientSM::~ClientSM(){    KfsOp *op;    assert(mOps.empty());    while (!mOps.empty()) {        op = mOps.front();        mOps.pop_front();        delete op;    }    gClientManager.Remove(this);}////// Send out the response to the client request.  The response is/// generated by MetaRequest as per the protocol./// @param[in] op The request for which we finished execution.///voidClientSM::SendResponse(KfsOp *op){    ostringstream os;    ReadOp *rop;    string s = op->Show();    int len;#ifdef DEBUG    verifyExecutingOnNetProcessor();#endif        op->Response(os);    KFS_LOG_VA_DEBUG("Command %s: Response status: %d\n",                      s.c_str(), op->status);    len = os.str().length();    if (len > 0)        mNetConnection->Write(os.str().c_str(), len);    if (op->op == CMD_WRITE_SYNC) {        KFS_LOG_VA_INFO("Ack'ing write sync: %s", op->Show().c_str());    }    if (op->op == CMD_READ) {        // need to send out the data read        rop = static_cast<ReadOp *> (op);        if (op->status >= 0) {            KFS_LOG_VA_INFO("Read done: %s, status = %d", rop->Show().c_str(), rop->status);            assert(rop->dataBuf->BytesConsumable() == rop->status);            mNetConnection->Write(rop->dataBuf, rop->numBytesIO);        }    } else if (op->op == CMD_GET_CHUNK_METADATA) {        GetChunkMetadataOp *gcm = static_cast<GetChunkMetadataOp *>(op);        if (op->status >= 0)            mNetConnection->Write(gcm->dataBuf, gcm->numBytesIO);                }}////// Generic event handler.  Decode the event that occurred and/// appropriately extract out the data and deal with the event./// @param[in] code: The type of event that occurred/// @param[in] data: Data being passed in relative to the event that/// occurred./// @retval 0 to indicate successful event handling; -1 otherwise.///intClientSM::HandleRequest(int code, void *data){    IOBuffer *iobuf;    KfsOp *op;    int cmdLen;#ifdef DEBUG    verifyExecutingOnNetProcessor();#endif        switch (code) {    case EVENT_NET_READ:	// We read something from the network.  Run the RPC that	// came in.	iobuf = (IOBuffer *) data;	while (IsMsgAvail(iobuf, &cmdLen)) {	    // if we don't have all the data for the command, wait	    if (!HandleClientCmd(iobuf, cmdLen))		break;	}	break;    case EVENT_NET_WROTE:	// Something went out on the network.  For now, we don't	// track it. Later, we may use it for tracking throttling	// and such.	break;    case EVENT_CMD_DONE:	// An op finished execution.  Send response back in FIFO        gChunkServer.OpFinished();            	op = (KfsOp *) data;	op->done = true;        assert(!mOps.empty());	while (!mOps.empty()) {	    KfsOp *qop = mOps.front();	    if (!qop->done)		break;            if (mNetConnection)                SendResponse(qop);	    mOps.pop_front();            OpFinished(qop);	    delete qop;	}        if (mNetConnection)            mNetConnection->StartFlush();	break;    case EVENT_NET_ERROR:	// KFS_LOG_VA_DEBUG("Closing connection");	if (mNetConnection)	    mNetConnection->Close();        // get rid of the connection to all the peers in daisy chain;        // if there were any outstanding ops, they will all come back        // to this method as EVENT_CMD_DONE and we clean them up above.        ReleaseAllServers(mRemoteSyncers);        // if there are any disk ops, wait for the ops to finish        SET_HANDLER(this, &ClientSM::HandleTerminate);        HandleTerminate(code, NULL);	break;    default:	assert(!"Unknown event");	break;    }    return 0;}////// Termination handler.  For the client state machine, we could have/// ops queued at the logger.  So, for cleanup wait for all the/// outstanding ops to finish and then delete this.  In this state,/// the only event that gets raised is that an op finished; anything/// else is bad.///intClientSM::HandleTerminate(int code, void *data){    KfsOp *op;#ifdef DEBUG    verifyExecutingOnNetProcessor();#endif        switch (code) {    case EVENT_CMD_DONE:        gChunkServer.OpFinished();	// An op finished execution.  Send a response back	op = (KfsOp *) data;	op->done = true;	if (op != mOps.front())	    break;	while (!mOps.empty()) {	    op = mOps.front();	    if (!op->done)		break;            OpFinished(op);	    // we are done with the op	    mOps.pop_front();	    delete op;	}	break;    case EVENT_NET_ERROR:        // clean things up        break;    default:	assert(!"Unknown event");	break;    }    if (mOps.empty()) {        // all ops are done...so, now, we can nuke ourself.        assert(mPendingOps.empty());        delete this;    }    return 0;}////// We have a command in a buffer.  It is possible that we don't have/// everything we need to execute it (for example, for a write we may/// not have received all the data the client promised).  So, parse/// out the command and if we have everything execute it./// boolClientSM::HandleClientCmd(IOBuffer *iobuf,                          int cmdLen){    scoped_array<char> buf;    KfsOp *op;    size_t nAvail;    buf.reset(new char[cmdLen + 1]);    iobuf->CopyOut(buf.get(), cmdLen);    buf[cmdLen] = '\0';        if (ParseCommand(buf.get(), cmdLen, &op) != 0) {        iobuf->Consume(cmdLen);        KFS_LOG_VA_DEBUG("Aye?: %s", buf.get());        // got a bogus command        return true;    }    if (op->op == CMD_WRITE_PREPARE) {        WritePrepareOp *wop = static_cast<WritePrepareOp *> (op);        assert(wop != NULL);        // if we don't have all the data for the write, hold on...        nAvail = iobuf->BytesConsumable() - cmdLen;                if (nAvail < wop->numBytes) {            delete op;            // we couldn't process the command...so, wait            return false;        }        iobuf->Consume(cmdLen);        wop->dataBuf = new IOBuffer();        wop->dataBuf->Move(iobuf, wop->numBytes);        nAvail = wop->dataBuf->BytesConsumable();        // KFS_LOG_VA_DEBUG("Got command: %s", buf.get());        // KFS_LOG_VA_DEBUG("# of bytes avail for write: %lu", nAvail);    } else {        string s = op->Show();        KFS_LOG_VA_DEBUG("Got command: %s\n", s.c_str());        iobuf->Consume(cmdLen);    }    if (op->op == CMD_WRITE_SYNC) {        KFS_LOG_VA_INFO("Received write sync: %s", op->Show().c_str());        // make the write sync depend on a previous write        KfsOp *w = NULL;        for (deque<KfsOp *>::iterator i = mOps.begin(); i != mOps.end(); i++) {            if (((*i)->op == CMD_WRITE_PREPARE) || ((*i)->op == CMD_WRITE_PREPARE_FWD) ||                ((*i)->op == CMD_WRITE)) {                                w = *i;            }        }        if (w != NULL) {            OpPair p;            op->clnt = this;            p.op = w;            p.dependentOp = op;            mPendingOps.push_back(p);            KFS_LOG_VA_DEBUG("Keeping write-sync (%d) pending and depends on %d",                              op->seq, p.op->seq);            return true;        } else {            KFS_LOG_VA_DEBUG("Write-sync is being pushed down; no writes left... (%d ops left in q)",                             mOps.size());        }    }    mOps.push_back(op);    op->clnt = this;    // op->Execute();    gChunkServer.OpInserted();    SubmitOp(op);    return true;}voidClientSM::OpFinished(KfsOp *doneOp){    // multiple ops could be waiting for a single op to finish...    while (1) {        if (mPendingOps.empty())            return;        OpPair p;        p = mPendingOps.front();        if (p.op->seq != doneOp->seq) {            break;        }        KFS_LOG_VA_DEBUG("Submitting write-sync (%d) since %d finished", p.dependentOp->seq,                          p.op->seq);        mOps.push_back(p.dependentOp);        gChunkServer.OpInserted();        mPendingOps.pop_front();        SubmitOp(p.dependentOp);    }}RemoteSyncSMPtrClientSM::FindServer(const ServerLocation &loc, bool connect){    return KFS::FindServer(mRemoteSyncers, loc, connect);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -