📄 kfsops.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsOps.cc 232 2009-01-01 20:40:09Z lohitvijayarenu $ //// Created 2006/05/26// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// Code for parsing commands sent to the Chunkserver and generating// responses that summarize the result of their execution.//// //----------------------------------------------------------------------------#include "KfsOps.h"#include "common/Version.h"#include "common/kfstypes.h"#include "libkfsIO/Globals.h"#include "meta/thread.h"#include "meta/queue.h"#include "libkfsIO/Checksum.h"#include "ChunkManager.h"#include "ChunkServer.h"#include "LeaseClerk.h"#include "Replicator.h"#include "Utils.h"#include <algorithm>#include <boost/lexical_cast.hpp>using std::map;using std::string;using std::ofstream;using std::istringstream;using std::ostringstream;using std::for_each;using std::vector;using namespace KFS;using namespace KFS::libkfsio;typedef int (*ParseHandler)(Properties &, KfsOp **);/// command -> parsehandler maptypedef map<string, ParseHandler> ParseHandlerMap;typedef map<string, ParseHandler>::iterator ParseHandlerMapIter;// handlers for parsingParseHandlerMap gParseHandlers;// Counters for the various opstypedef map<KfsOp_t, Counter *> OpCounterMap;typedef map<KfsOp_t, Counter *>::iterator OpCounterMapIter;OpCounterMap gCounters;Counter gCtrWriteMaster("Write Master");Counter gCtrWriteDuration("Write Duration");const char *KFS_VERSION_STR = "KFS/1.0";// various parse handlersint parseHandlerOpen(Properties &prop, KfsOp **c);int parseHandlerClose(Properties &prop, KfsOp **c);int parseHandlerRead(Properties &prop, KfsOp **c);int parseHandlerWriteIdAlloc(Properties &prop, KfsOp **c);int parseHandlerWritePrepare(Properties &prop, KfsOp **c);int parseHandlerWriteSync(Properties &prop, KfsOp **c);int parseHandlerSize(Properties &prop, KfsOp **c);int parseHandlerGetChunkMetadata(Properties &prop, KfsOp **c);int parseHandlerAllocChunk(Properties &prop, KfsOp **c);int parseHandlerDeleteChunk(Properties &prop, KfsOp **c);int parseHandlerTruncateChunk(Properties &prop, KfsOp **c);int parseHandlerReplicateChunk(Properties &prop, KfsOp **c);int parseHandlerHeartbeat(Properties &prop, KfsOp **c);int parseHandlerChangeChunkVers(Properties &prop, KfsOp **c);int parseHandlerStaleChunks(Properties &prop, KfsOp **c);int parseHandlerRetire(Properties &prop, KfsOp **c);int parseHandlerPing(Properties &prop, KfsOp **c);int parseHandlerDumpChunkMap(Properties &prop, KfsOp **c);int parseHandlerStats(Properties &prop, KfsOp **c);static TimeoutOp timeoutOp(0);voidKFS::SubmitOp(KfsOp *op){ op->type = OP_REQUEST; op->Execute();}voidKFS::SubmitOpResponse(KfsOp *op){ op->type = OP_RESPONSE; op->HandleEvent(EVENT_CMD_DONE, op);}voidKFS::verifyExecutingOnEventProcessor(){ return;}voidKFS::InitParseHandlers(){ gParseHandlers["OPEN"] = parseHandlerOpen; gParseHandlers["CLOSE"] = parseHandlerClose; gParseHandlers["READ"] = parseHandlerRead; gParseHandlers["WRITE_ID_ALLOC"] = parseHandlerWriteIdAlloc; gParseHandlers["WRITE_PREPARE"] = parseHandlerWritePrepare; gParseHandlers["WRITE_SYNC"] = parseHandlerWriteSync; gParseHandlers["SIZE"] = parseHandlerSize; gParseHandlers["GET_CHUNK_METADATA"] = parseHandlerGetChunkMetadata; gParseHandlers["ALLOCATE"] = parseHandlerAllocChunk; gParseHandlers["DELETE"] = parseHandlerDeleteChunk; gParseHandlers["TRUNCATE"] = parseHandlerTruncateChunk; gParseHandlers["REPLICATE"] = parseHandlerReplicateChunk; gParseHandlers["HEARTBEAT"] = parseHandlerHeartbeat; gParseHandlers["STALE_CHUNKS"] = parseHandlerStaleChunks; gParseHandlers["CHUNK_VERS_CHANGE"] = parseHandlerChangeChunkVers; gParseHandlers["RETIRE"] = parseHandlerRetire; gParseHandlers["PING"] = parseHandlerPing; gParseHandlers["DUMP_CHUNKMAP"] = parseHandlerDumpChunkMap; gParseHandlers["STATS"] = parseHandlerStats;}static voidAddCounter(const char *name, KfsOp_t opName){ Counter *c; c = new Counter(name); globals().counterManager.AddCounter(c); gCounters[opName] = c;}voidKFS::RegisterCounters(){ static int calledOnce = 0; if (calledOnce) return; calledOnce = 1; AddCounter("Open", CMD_OPEN); AddCounter("Read", CMD_READ); AddCounter("Write Prepare", CMD_WRITE_PREPARE); AddCounter("Write Sync", CMD_WRITE_SYNC); AddCounter("Write (AIO)", CMD_WRITE); AddCounter("Size", CMD_SIZE); AddCounter("Get Chunk Metadata", CMD_GET_CHUNK_METADATA); AddCounter("Alloc", CMD_ALLOC_CHUNK); AddCounter("Delete", CMD_DELETE_CHUNK); AddCounter("Truncate", CMD_TRUNCATE_CHUNK); AddCounter("Replicate", CMD_REPLICATE_CHUNK); AddCounter("Heartbeat", CMD_HEARTBEAT); AddCounter("Change Chunk Vers", CMD_CHANGE_CHUNK_VERS); globals().counterManager.AddCounter(&gCtrWriteMaster); globals().counterManager.AddCounter(&gCtrWriteDuration);}static voidUpdateCounter(KfsOp_t opName){ Counter *c; OpCounterMapIter iter; iter = gCounters.find(opName); if (iter == gCounters.end()) return; c = iter->second; c->Update(1);}#if 0static voidDecrementCounter(KfsOp_t opName){ Counter *c; OpCounterMapIter iter; iter = gCounters.find(opName); if (iter == gCounters.end()) return; c = iter->second; c->Update(-1);}#endifstatic voidUpdateMsgProcessingTime(const KfsOp *op) { struct timeval timeNow; float timeSpent; gettimeofday(&timeNow, NULL); timeSpent = ComputeTimeDiff(op->startTime, timeNow); OpCounterMapIter iter = gCounters.find(op->op); if (iter == gCounters.end()) return; Counter *c = iter->second; c->Update(timeSpent);}KfsOp::~KfsOp(){ UpdateMsgProcessingTime(this);}////// Given a command in a buffer, parse it out and build a "Command"/// structure which can then be executed. For parsing, we take the/// string representation of a command and build a Properties object/// out of it; we can then pull the various headers in whatever order/// we choose./// Commands are of the form:/// <COMMAND NAME> \r\n/// {header: value \r\n}+\r\n////// The general model in parsing the client command:/// 1. Each command has its own parser/// 2. Extract out the command name and find the parser for that/// command/// 3. Dump the header/value pairs into a properties object, so that we/// can extract the header/value fields in any order./// 4. Finally, call the parser for the command sent by the client.////// @param[in] cmdBuf: buffer containing the request sent by the client/// @param[in] cmdLen: length of cmdBuf/// @param[out] res: A piece of memory allocated by calling new that/// contains the data for the request. It is the caller's/// responsibility to delete the memory returned in res./// @retval 0 on success; -1 if there is an error/// intKFS::ParseCommand(char *cmdBuf, int cmdLen, KfsOp **res){ const char *delims = " \r\n"; // header/value pairs are separated by a : const char separator = ':'; string cmdStr; string::size_type cmdEnd; Properties prop; istringstream ist(cmdBuf); ParseHandlerMapIter entry; ParseHandler handler; // get the first line and find the command name ist >> cmdStr; // trim the command cmdEnd = cmdStr.find_first_of(delims); if (cmdEnd != string::npos) { cmdStr.erase(cmdEnd); } // find the parse handler and parse the thing entry = gParseHandlers.find(cmdStr); if (entry == gParseHandlers.end()) return -1; handler = entry->second; prop.loadProperties(ist, separator, false); return (*handler)(prop, res);}voidparseCommon(Properties &prop, kfsSeq_t &seq){ seq = prop.getValue("Cseq", (kfsSeq_t) -1);}intparseHandlerOpen(Properties &prop, KfsOp **c){ kfsSeq_t seq; OpenOp *oc; string openMode; parseCommon(prop, seq); oc = new OpenOp(seq); oc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); openMode = prop.getValue("Intent", ""); // XXX: need to do a string compare oc->openFlags = O_RDWR; *c = oc; return 0;}intparseHandlerClose(Properties &prop, KfsOp **c){ kfsSeq_t seq; CloseOp *cc; parseCommon(prop, seq); cc = new CloseOp(seq); cc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); *c = cc; return 0;}intparseHandlerRead(Properties &prop, KfsOp **c){ kfsSeq_t seq; ReadOp *rc; parseCommon(prop, seq); rc = new ReadOp(seq); rc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); rc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1); rc->offset = prop.getValue("Offset", (off_t) 0); rc->numBytes = prop.getValue("Num-bytes", (long long) 0); *c = rc; return 0;}intparseHandlerWriteIdAlloc(Properties &prop, KfsOp **c){ kfsSeq_t seq; WriteIdAllocOp *wi; parseCommon(prop, seq); wi = new WriteIdAllocOp(seq); wi->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); wi->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1); wi->offset = prop.getValue("Offset", (off_t) 0); wi->numBytes = prop.getValue("Num-bytes", (long long) 0); wi->numServers = prop.getValue("Num-servers", 0); wi->servers = prop.getValue("Servers", ""); *c = wi; return 0;}intparseHandlerWritePrepare(Properties &prop, KfsOp **c){ kfsSeq_t seq; WritePrepareOp *wp; parseCommon(prop, seq); wp = new WritePrepareOp(seq); wp->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); wp->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1); wp->offset = prop.getValue("Offset", (off_t) 0); wp->numBytes = prop.getValue("Num-bytes", (long long) 0); wp->numServers = prop.getValue("Num-servers", 0); wp->servers = prop.getValue("Servers", ""); wp->checksum = (uint32_t) prop.getValue("Checksum", (off_t) 0); *c = wp; return 0;}intparseHandlerWriteSync(Properties &prop, KfsOp **c){ kfsSeq_t seq; WriteSyncOp *ws; kfsChunkId_t cid; int64_t chunkVers; parseCommon(prop, seq); cid = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); chunkVers = prop.getValue("Chunk-version", (int64_t) -1); ws = new WriteSyncOp(seq, cid, chunkVers); ws->numServers = prop.getValue("Num-servers", 0); ws->servers = prop.getValue("Servers", ""); *c = ws; return 0;}intparseHandlerSize(Properties &prop, KfsOp **c){ kfsSeq_t seq; SizeOp *sc; parseCommon(prop, seq); sc = new SizeOp(seq); sc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); sc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1); *c = sc; return 0;}intparseHandlerGetChunkMetadata(Properties &prop, KfsOp **c){ kfsSeq_t seq; GetChunkMetadataOp *gcm; parseCommon(prop, seq); gcm = new GetChunkMetadataOp(seq); gcm->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); *c = gcm; return 0;}intparseHandlerAllocChunk(Properties &prop, KfsOp **c){ kfsSeq_t seq; AllocChunkOp *cc; parseCommon(prop, seq); cc = new AllocChunkOp(seq); cc->fileId = prop.getValue("File-handle", (kfsFileId_t) -1); cc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); cc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1); // if the leaseId is a positive value, then this server has the // lease on this chunk. cc->leaseId = prop.getValue("Lease-id", (int64_t) -1); *c = cc; return 0;}intparseHandlerDeleteChunk(Properties &prop, KfsOp **c){ kfsSeq_t seq; DeleteChunkOp *cc; parseCommon(prop, seq); cc = new DeleteChunkOp(seq); cc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); *c = cc; return 0;}intparseHandlerTruncateChunk(Properties &prop, KfsOp **c){ kfsSeq_t seq; TruncateChunkOp *tc; parseCommon(prop, seq); tc = new TruncateChunkOp(seq); tc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); tc->chunkSize = prop.getValue("Chunk-size", (long long) 0); *c = tc; return 0;}intparseHandlerReplicateChunk(Properties &prop, KfsOp **c){ kfsSeq_t seq; ReplicateChunkOp *rc; string s; parseCommon(prop, seq); rc = new ReplicateChunkOp(seq); rc->fid = prop.getValue("File-handle", (kfsFileId_t) -1); rc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); rc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1); s = prop.getValue("Chunk-location", ""); if (s != "") { rc->location.FromString(s); } *c = rc; return 0;}intparseHandlerChangeChunkVers(Properties &prop, KfsOp **c){ kfsSeq_t seq; ChangeChunkVersOp *rc; parseCommon(prop, seq); rc = new ChangeChunkVersOp(seq); rc->fileId = prop.getValue("File-handle", (kfsFileId_t) -1); rc->chunkId = prop.getValue("Chunk-handle", (kfsChunkId_t) -1); rc->chunkVersion = prop.getValue("Chunk-version", (int64_t) -1); *c = rc; return 0;}intparseHandlerHeartbeat(Properties &prop, KfsOp **c){ kfsSeq_t seq; HeartbeatOp *hb; parseCommon(prop, seq); hb = new HeartbeatOp(seq); *c = hb; return 0;}intparseHandlerRetire(Properties &prop, KfsOp **c){ kfsSeq_t seq; parseCommon(prop, seq); *c = new RetireOp(seq); return 0;}intparseHandlerStaleChunks(Properties &prop, KfsOp **c){ kfsSeq_t seq; StaleChunksOp *sc; parseCommon(prop, seq); sc = new StaleChunksOp(seq); sc->contentLength = prop.getValue("Content-length", 0); sc->numStaleChunks = prop.getValue("Num-chunks", 0); *c = sc; return 0;}intparseHandlerPing(Properties &prop, KfsOp **c)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -