⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsops.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 4 页
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsOps.cc 232 2009-01-01 20:40:09Z lohitvijayarenu $ //// Created 2006/05/26// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// Code for parsing commands sent to the Chunkserver and generating// responses that summarize the result of their execution.//// //----------------------------------------------------------------------------#include "KfsOps.h"#include "common/Version.h"#include "common/kfstypes.h"#include "libkfsIO/Globals.h"#include "meta/thread.h"#include "meta/queue.h"#include "libkfsIO/Checksum.h"#include "ChunkManager.h"#include "ChunkServer.h"#include "LeaseClerk.h"#include "Replicator.h"#include "Utils.h"#include <algorithm>#include <boost/lexical_cast.hpp>using std::map;using std::string;using std::ofstream;using std::istringstream;using std::ostringstream;using std::for_each;using std::vector;using namespace KFS;using namespace KFS::libkfsio;typedef int (*ParseHandler)(Properties &, KfsOp **);/// command -> parsehandler maptypedef map<string, ParseHandler> ParseHandlerMap;typedef map<string, ParseHandler>::iterator ParseHandlerMapIter;// handlers for parsingParseHandlerMap	gParseHandlers;// Counters for the various opstypedef map<KfsOp_t, Counter *> OpCounterMap;typedef map<KfsOp_t, Counter *>::iterator OpCounterMapIter;OpCounterMap gCounters;Counter gCtrWriteMaster("Write Master");Counter gCtrWriteDuration("Write Duration");const char *KFS_VERSION_STR = "KFS/1.0";// various parse handlersint parseHandlerOpen(Properties &prop, KfsOp **c);int parseHandlerClose(Properties &prop, KfsOp **c);int parseHandlerRead(Properties &prop, KfsOp **c);int parseHandlerWriteIdAlloc(Properties &prop, KfsOp **c);int parseHandlerWritePrepare(Properties &prop, KfsOp **c);int parseHandlerWriteSync(Properties &prop, KfsOp **c);int parseHandlerSize(Properties &prop, KfsOp **c);int parseHandlerGetChunkMetadata(Properties &prop, KfsOp **c);int parseHandlerAllocChunk(Properties &prop, KfsOp **c);int parseHandlerDeleteChunk(Properties &prop, KfsOp **c);int parseHandlerTruncateChunk(Properties &prop, KfsOp **c);int parseHandlerReplicateChunk(Properties &prop, KfsOp **c);int parseHandlerHeartbeat(Properties &prop, KfsOp **c);int parseHandlerChangeChunkVers(Properties &prop, KfsOp **c);int parseHandlerStaleChunks(Properties &prop, KfsOp **c);int parseHandlerRetire(Properties &prop, KfsOp **c);int parseHandlerPing(Properties &prop, KfsOp **c);int parseHandlerDumpChunkMap(Properties &prop, KfsOp **c);int parseHandlerStats(Properties &prop, KfsOp **c);static TimeoutOp timeoutOp(0);voidKFS::SubmitOp(KfsOp *op){    op->type = OP_REQUEST;    op->Execute();}voidKFS::SubmitOpResponse(KfsOp *op){    op->type = OP_RESPONSE;    op->HandleEvent(EVENT_CMD_DONE, op);}voidKFS::verifyExecutingOnEventProcessor(){    return;}voidKFS::InitParseHandlers(){    gParseHandlers["OPEN"] = parseHandlerOpen;    gParseHandlers["CLOSE"] = parseHandlerClose;    gParseHandlers["READ"] = parseHandlerRead;    gParseHandlers["WRITE_ID_ALLOC"] = parseHandlerWriteIdAlloc;    gParseHandlers["WRITE_PREPARE"] = parseHandlerWritePrepare;    gParseHandlers["WRITE_SYNC"] = parseHandlerWriteSync;    gParseHandlers["SIZE"] = parseHandlerSize;    gParseHandlers["GET_CHUNK_METADATA"] = parseHandlerGetChunkMetadata;    gParseHandlers["ALLOCATE"] = parseHandlerAllocChunk;    gParseHandlers["DELETE"] = parseHandlerDeleteChunk;    gParseHandlers["TRUNCATE"] = parseHandlerTruncateChunk;    gParseHandlers["REPLICATE"] = parseHandlerReplicateChunk;    gParseHandlers["HEARTBEAT"] = parseHandlerHeartbeat;    gParseHandlers["STALE_CHUNKS"] = parseHandlerStaleChunks;    gParseHandlers["CHUNK_VERS_CHANGE"] = parseHandlerChangeChunkVers;    gParseHandlers["RETIRE"] = parseHandlerRetire;    gParseHandlers["PING"] = parseHandlerPing;    gParseHandlers["DUMP_CHUNKMAP"] = parseHandlerDumpChunkMap;    gParseHandlers["STATS"] = parseHandlerStats;}static voidAddCounter(const char *name, KfsOp_t opName){    Counter *c;        c = new Counter(name);    globals().counterManager.AddCounter(c);    gCounters[opName] = c;}voidKFS::RegisterCounters(){    static int calledOnce = 0;    if (calledOnce)        return;    calledOnce = 1;    AddCounter("Open", CMD_OPEN);    AddCounter("Read", CMD_READ);    AddCounter("Write Prepare", CMD_WRITE_PREPARE);    AddCounter("Write Sync", CMD_WRITE_SYNC);    AddCounter("Write (AIO)", CMD_WRITE);    AddCounter("Size", CMD_SIZE);    AddCounter("Get Chunk Metadata", CMD_GET_CHUNK_METADATA);    AddCounter("Alloc", CMD_ALLOC_CHUNK);    AddCounter("Delete", CMD_DELETE_CHUNK);    AddCounter("Truncate", CMD_TRUNCATE_CHUNK);    AddCounter("Replicate", CMD_REPLICATE_CHUNK);    AddCounter("Heartbeat", CMD_HEARTBEAT);    AddCounter("Change Chunk Vers", CMD_CHANGE_CHUNK_VERS);    globals().counterManager.AddCounter(&gCtrWriteMaster);    globals().counterManager.AddCounter(&gCtrWriteDuration);}static voidUpdateCounter(KfsOp_t opName){    Counter *c;    OpCounterMapIter iter;        iter = gCounters.find(opName);    if (iter == gCounters.end())        return;    c = iter->second;    c->Update(1);}#if 0static voidDecrementCounter(KfsOp_t opName){    Counter *c;    OpCounterMapIter iter;        iter = gCounters.find(opName);    if (iter == gCounters.end())        return;    c = iter->second;    c->Update(-1);}#endifstatic voidUpdateMsgProcessingTime(const KfsOp *op) {    struct timeval timeNow;    float timeSpent;    gettimeofday(&timeNow, NULL);    timeSpent = ComputeTimeDiff(op->startTime, timeNow);    OpCounterMapIter iter = gCounters.find(op->op);    if (iter == gCounters.end())        return;    Counter *c = iter->second;    c->Update(timeSpent);}KfsOp::~KfsOp(){    UpdateMsgProcessingTime(this);}////// Given a command in a buffer, parse it out and build a "Command"/// structure which can then be executed.  For parsing, we take the/// string representation of a command and build a Properties object/// out of it; we can then pull the various headers in whatever order/// we choose./// Commands are of the form:/// <COMMAND NAME> \r\n/// {header: value \r\n}+\r\n//////  The general model in parsing the client command:/// 1. Each command has its own parser/// 2. Extract out the command name and find the parser for that/// command/// 3. Dump the header/value pairs into a properties object, so that we/// can extract the header/value fields in any order./// 4. Finally, call the parser for the command sent by the client.////// @param[in] cmdBuf: buffer containing the request sent by the client/// @param[in] cmdLen: length of cmdBuf/// @param[out] res: A piece of memory allocated by calling new that/// contains the data for the request.  It is the caller's/// responsibility to delete the memory returned in res./// @retval 0 on success;  -1 if there is an error/// intKFS::ParseCommand(char *cmdBuf, int cmdLen, KfsOp **res){    const char *delims = " \r\n";    // header/value pairs are separated by a :    const char separator = ':';    string cmdStr;    string::size_type cmdEnd;    Properties prop;    istringstream ist(cmdBuf);    ParseHandlerMapIter entry;    ParseHandler handler;        // get the first line and find the command name    ist >> cmdStr;    // trim the command    cmdEnd = cmdStr.find_first_of(delims);    if (cmdEnd != string::npos) {        cmdStr.erase(cmdEnd);    }        // find the parse handler and parse the thing    entry = gParseHandlers.find(cmdStr);    if (entry == gParseHandlers.end())        return -1;    handler = entry->second;    prop.loadProperties(ist, separator, false);    return (*handler)(prop, res);}voidparseCommon(Properties &prop, kfsSeq_t &seq){    seq = prop.getValue("Cseq", (kfsSeq_t) -1);}intparseHandlerOpen(Properties &prop, KfsOp **c){    kfsSeq_t seq;    OpenOp *oc;    string openMode;    parseCommon(prop, seq);    oc = new OpenOp(seq);    oc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    openMode = prop.getValue("Intent", "");    // XXX: need to do a string compare    oc->openFlags = O_RDWR;    *c = oc;    return 0;}intparseHandlerClose(Properties &prop, KfsOp **c){    kfsSeq_t seq;    CloseOp *cc;    parseCommon(prop, seq);    cc = new CloseOp(seq);    cc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    *c = cc;    return 0;}intparseHandlerRead(Properties &prop, KfsOp **c){    kfsSeq_t seq;    ReadOp *rc;    parseCommon(prop, seq);    rc = new ReadOp(seq);    rc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    rc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1);    rc->offset = prop.getValue("Offset", (off_t) 0);    rc->numBytes = prop.getValue("Num-bytes", (long long) 0);    *c = rc;    return 0;}intparseHandlerWriteIdAlloc(Properties &prop, KfsOp **c){    kfsSeq_t seq;    WriteIdAllocOp *wi;    parseCommon(prop, seq);    wi = new WriteIdAllocOp(seq);    wi->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    wi->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1);    wi->offset = prop.getValue("Offset", (off_t) 0);    wi->numBytes = prop.getValue("Num-bytes", (long long) 0);    wi->numServers = prop.getValue("Num-servers", 0);    wi->servers = prop.getValue("Servers", "");    *c = wi;    return 0;}intparseHandlerWritePrepare(Properties &prop, KfsOp **c){    kfsSeq_t seq;    WritePrepareOp *wp;    parseCommon(prop, seq);    wp = new WritePrepareOp(seq);    wp->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    wp->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1);    wp->offset = prop.getValue("Offset", (off_t) 0);    wp->numBytes = prop.getValue("Num-bytes", (long long) 0);    wp->numServers = prop.getValue("Num-servers", 0);    wp->servers = prop.getValue("Servers", "");    wp->checksum = (uint32_t) prop.getValue("Checksum", (off_t) 0);    *c = wp;    return 0;}intparseHandlerWriteSync(Properties &prop, KfsOp **c){    kfsSeq_t seq;    WriteSyncOp *ws;    kfsChunkId_t cid;    int64_t chunkVers;    parseCommon(prop, seq);    cid = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    chunkVers = prop.getValue("Chunk-version", (int64_t) -1);        ws = new WriteSyncOp(seq, cid, chunkVers);    ws->numServers = prop.getValue("Num-servers", 0);    ws->servers = prop.getValue("Servers", "");    *c = ws;    return 0;}intparseHandlerSize(Properties &prop, KfsOp **c){    kfsSeq_t seq;    SizeOp *sc;    parseCommon(prop, seq);    sc = new SizeOp(seq);    sc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    sc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1);    *c = sc;    return 0;}intparseHandlerGetChunkMetadata(Properties &prop, KfsOp **c){    kfsSeq_t seq;    GetChunkMetadataOp *gcm;    parseCommon(prop, seq);    gcm = new GetChunkMetadataOp(seq);    gcm->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    *c = gcm;    return 0;}intparseHandlerAllocChunk(Properties &prop, KfsOp **c){    kfsSeq_t seq;    AllocChunkOp *cc;    parseCommon(prop, seq);    cc = new AllocChunkOp(seq);    cc->fileId = prop.getValue("File-handle", (kfsFileId_t) -1);    cc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    cc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1);    // if the leaseId is a positive value, then this server has the    // lease on this chunk.    cc->leaseId = prop.getValue("Lease-id", (int64_t) -1);    *c = cc;    return 0;}intparseHandlerDeleteChunk(Properties &prop, KfsOp **c){    kfsSeq_t seq;    DeleteChunkOp *cc;    parseCommon(prop, seq);    cc = new DeleteChunkOp(seq);    cc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    *c = cc;    return 0;}intparseHandlerTruncateChunk(Properties &prop, KfsOp **c){    kfsSeq_t seq;    TruncateChunkOp *tc;    parseCommon(prop, seq);    tc = new TruncateChunkOp(seq);    tc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    tc->chunkSize = prop.getValue("Chunk-size", (long long) 0);    *c = tc;    return 0;}intparseHandlerReplicateChunk(Properties &prop, KfsOp **c){    kfsSeq_t seq;    ReplicateChunkOp *rc;    string s;    parseCommon(prop, seq);    rc = new ReplicateChunkOp(seq);    rc->fid = prop.getValue("File-handle", (kfsFileId_t) -1);    rc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    rc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1);    s = prop.getValue("Chunk-location", "");    if (s != "") {        rc->location.FromString(s);    }    *c = rc;    return 0;}intparseHandlerChangeChunkVers(Properties &prop, KfsOp **c){    kfsSeq_t seq;    ChangeChunkVersOp *rc;    parseCommon(prop, seq);    rc = new ChangeChunkVersOp(seq);    rc->fileId = prop.getValue("File-handle", (kfsFileId_t) -1);    rc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1);    rc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1);    *c = rc;    return 0;}intparseHandlerHeartbeat(Properties &prop, KfsOp **c){    kfsSeq_t seq;    HeartbeatOp *hb;    parseCommon(prop, seq);    hb = new HeartbeatOp(seq);    *c = hb;    return 0;}intparseHandlerRetire(Properties &prop, KfsOp **c){    kfsSeq_t seq;    parseCommon(prop, seq);    *c = new RetireOp(seq);    return 0;}intparseHandlerStaleChunks(Properties &prop, KfsOp **c){    kfsSeq_t seq;    StaleChunksOp *sc;    parseCommon(prop, seq);    sc = new StaleChunksOp(seq);    sc->contentLength = prop.getValue("Content-length", 0);    sc->numStaleChunks = prop.getValue("Num-chunks", 0);    *c = sc;    return 0;}intparseHandlerPing(Properties &prop, KfsOp **c)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -