📄 clientsm.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: ClientSM.cc 210 2008-11-05 02:51:49Z sriramsrao $ //// Created 2006/03/23// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------#include "ClientSM.h"#include "ChunkManager.h"#include "ChunkServer.h"#include "Utils.h"#include "KfsOps.h"#include <string>#include <sstream>using std::string;using std::ostringstream;#include "common/log.h"#include "libkfsIO/Globals.h"#include <boost/scoped_array.hpp>using boost::scoped_array;using namespace KFS;using namespace KFS::libkfsio;ClientSM::ClientSM(NetConnectionPtr &conn) { mNetConnection = conn; SET_HANDLER(this, &ClientSM::HandleRequest);}ClientSM::~ClientSM(){ KfsOp *op; assert(mOps.empty()); while (!mOps.empty()) { op = mOps.front(); mOps.pop_front(); delete op; } gClientManager.Remove(this);}////// Send out the response to the client request. The response is/// generated by MetaRequest as per the protocol./// @param[in] op The request for which we finished execution.///voidClientSM::SendResponse(KfsOp *op){ ostringstream os; ReadOp *rop; string s = op->Show(); int len;#ifdef DEBUG verifyExecutingOnNetProcessor();#endif op->Response(os); KFS_LOG_VA_DEBUG("Command %s: Response status: %d\n", s.c_str(), op->status); len = os.str().length(); if (len > 0) mNetConnection->Write(os.str().c_str(), len); if (op->op == CMD_WRITE_SYNC) { KFS_LOG_VA_INFO("Ack'ing write sync: %s", op->Show().c_str()); } if (op->op == CMD_READ) { // need to send out the data read rop = static_cast<ReadOp *> (op); if (op->status >= 0) { KFS_LOG_VA_INFO("Read done: %s, status = %d", rop->Show().c_str(), rop->status); assert(rop->dataBuf->BytesConsumable() == rop->status); mNetConnection->Write(rop->dataBuf, rop->numBytesIO); } } else if (op->op == CMD_GET_CHUNK_METADATA) { GetChunkMetadataOp *gcm = static_cast<GetChunkMetadataOp *>(op); if (op->status >= 0) mNetConnection->Write(gcm->dataBuf, gcm->numBytesIO); }}////// Generic event handler. Decode the event that occurred and/// appropriately extract out the data and deal with the event./// @param[in] code: The type of event that occurred/// @param[in] data: Data being passed in relative to the event that/// occurred./// @retval 0 to indicate successful event handling; -1 otherwise.///intClientSM::HandleRequest(int code, void *data){ IOBuffer *iobuf; KfsOp *op; int cmdLen;#ifdef DEBUG verifyExecutingOnNetProcessor();#endif switch (code) { case EVENT_NET_READ: // We read something from the network. Run the RPC that // came in. iobuf = (IOBuffer *) data; while (IsMsgAvail(iobuf, &cmdLen)) { // if we don't have all the data for the command, wait if (!HandleClientCmd(iobuf, cmdLen)) break; } break; case EVENT_NET_WROTE: // Something went out on the network. For now, we don't // track it. Later, we may use it for tracking throttling // and such. break; case EVENT_CMD_DONE: // An op finished execution. Send response back in FIFO gChunkServer.OpFinished(); op = (KfsOp *) data; op->done = true; assert(!mOps.empty()); while (!mOps.empty()) { KfsOp *qop = mOps.front(); if (!qop->done) break; if (mNetConnection) SendResponse(qop); mOps.pop_front(); OpFinished(qop); delete qop; } if (mNetConnection) mNetConnection->StartFlush(); break; case EVENT_NET_ERROR: // KFS_LOG_VA_DEBUG("Closing connection"); if (mNetConnection) mNetConnection->Close(); // get rid of the connection to all the peers in daisy chain; // if there were any outstanding ops, they will all come back // to this method as EVENT_CMD_DONE and we clean them up above. ReleaseAllServers(mRemoteSyncers); // if there are any disk ops, wait for the ops to finish SET_HANDLER(this, &ClientSM::HandleTerminate); HandleTerminate(code, NULL); break; default: assert(!"Unknown event"); break; } return 0;}////// Termination handler. For the client state machine, we could have/// ops queued at the logger. So, for cleanup wait for all the/// outstanding ops to finish and then delete this. In this state,/// the only event that gets raised is that an op finished; anything/// else is bad.///intClientSM::HandleTerminate(int code, void *data){ KfsOp *op;#ifdef DEBUG verifyExecutingOnNetProcessor();#endif switch (code) { case EVENT_CMD_DONE: gChunkServer.OpFinished(); // An op finished execution. Send a response back op = (KfsOp *) data; op->done = true; if (op != mOps.front()) break; while (!mOps.empty()) { op = mOps.front(); if (!op->done) break; OpFinished(op); // we are done with the op mOps.pop_front(); delete op; } break; case EVENT_NET_ERROR: // clean things up break; default: assert(!"Unknown event"); break; } if (mOps.empty()) { // all ops are done...so, now, we can nuke ourself. assert(mPendingOps.empty()); delete this; } return 0;}////// We have a command in a buffer. It is possible that we don't have/// everything we need to execute it (for example, for a write we may/// not have received all the data the client promised). So, parse/// out the command and if we have everything execute it./// boolClientSM::HandleClientCmd(IOBuffer *iobuf, int cmdLen){ scoped_array<char> buf; KfsOp *op; size_t nAvail; buf.reset(new char[cmdLen + 1]); iobuf->CopyOut(buf.get(), cmdLen); buf[cmdLen] = '\0'; if (ParseCommand(buf.get(), cmdLen, &op) != 0) { iobuf->Consume(cmdLen); KFS_LOG_VA_DEBUG("Aye?: %s", buf.get()); // got a bogus command return true; } if (op->op == CMD_WRITE_PREPARE) { WritePrepareOp *wop = static_cast<WritePrepareOp *> (op); assert(wop != NULL); // if we don't have all the data for the write, hold on... nAvail = iobuf->BytesConsumable() - cmdLen; if (nAvail < wop->numBytes) { delete op; // we couldn't process the command...so, wait return false; } iobuf->Consume(cmdLen); wop->dataBuf = new IOBuffer(); wop->dataBuf->Move(iobuf, wop->numBytes); nAvail = wop->dataBuf->BytesConsumable(); // KFS_LOG_VA_DEBUG("Got command: %s", buf.get()); // KFS_LOG_VA_DEBUG("# of bytes avail for write: %lu", nAvail); } else { string s = op->Show(); KFS_LOG_VA_DEBUG("Got command: %s\n", s.c_str()); iobuf->Consume(cmdLen); } if (op->op == CMD_WRITE_SYNC) { KFS_LOG_VA_INFO("Received write sync: %s", op->Show().c_str()); // make the write sync depend on a previous write KfsOp *w = NULL; for (deque<KfsOp *>::iterator i = mOps.begin(); i != mOps.end(); i++) { if (((*i)->op == CMD_WRITE_PREPARE) || ((*i)->op == CMD_WRITE_PREPARE_FWD) || ((*i)->op == CMD_WRITE)) { w = *i; } } if (w != NULL) { OpPair p; op->clnt = this; p.op = w; p.dependentOp = op; mPendingOps.push_back(p); KFS_LOG_VA_DEBUG("Keeping write-sync (%d) pending and depends on %d", op->seq, p.op->seq); return true; } else { KFS_LOG_VA_DEBUG("Write-sync is being pushed down; no writes left... (%d ops left in q)", mOps.size()); } } mOps.push_back(op); op->clnt = this; // op->Execute(); gChunkServer.OpInserted(); SubmitOp(op); return true;}voidClientSM::OpFinished(KfsOp *doneOp){ // multiple ops could be waiting for a single op to finish... while (1) { if (mPendingOps.empty()) return; OpPair p; p = mPendingOps.front(); if (p.op->seq != doneOp->seq) { break; } KFS_LOG_VA_DEBUG("Submitting write-sync (%d) since %d finished", p.dependentOp->seq, p.op->seq); mOps.push_back(p.dependentOp); gChunkServer.OpInserted(); mPendingOps.pop_front(); SubmitOp(p.dependentOp); }}RemoteSyncSMPtrClientSM::FindServer(const ServerLocation &loc, bool connect){ return KFS::FindServer(mRemoteSyncers, loc, connect);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -