📄 request.cc
字号:
/*! * $Id: request.cc 237 2009-01-09 07:45:20Z sriramsrao $ * * \file request.cc * \brief process queue of outstanding metadata requests * \author Blake Lewis and Sriram Rao * * Copyright 2008 Quantcast Corp. * Copyright 2006-2008 Kosmix Corp. * * This file is part of Kosmos File System (KFS). * * Licensed under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */#include <map>#include "common/Version.h"#include "kfstree.h"#include "queue.h"#include "request.h"#include "logger.h"#include "checkpoint.h"#include "util.h"#include "LayoutManager.h"#include "libkfsIO/Globals.h"using std::map;using std::string;using std::istringstream;using std::ifstream;using std::min;using namespace KFS;using namespace KFS::libkfsio;MetaQueue <MetaRequest> requestList;typedef void (*ReqHandler)(MetaRequest *r);map <MetaOp, ReqHandler> handler;typedef int (*ParseHandler)(Properties &, MetaRequest **);static int parseHandlerLookup(Properties &prop, MetaRequest **r);static int parseHandlerLookupPath(Properties &prop, MetaRequest **r);static int parseHandlerCreate(Properties &prop, MetaRequest **r);static int parseHandlerRemove(Properties &prop, MetaRequest **r);static int parseHandlerRename(Properties &prop, MetaRequest **r);static int parseHandlerMkdir(Properties &prop, MetaRequest **r);static int parseHandlerRmdir(Properties &prop, MetaRequest **r);static int parseHandlerReaddir(Properties &prop, MetaRequest **r);static int parseHandlerReaddirPlus(Properties &prop, MetaRequest **r);static int parseHandlerGetalloc(Properties &prop, MetaRequest **r);static int parseHandlerGetlayout(Properties &prop, MetaRequest **r);static int parseHandlerGetDirSummary(Properties &prop, MetaRequest **r);static int parseHandlerAllocate(Properties &prop, MetaRequest **r);static int parseHandlerTruncate(Properties &prop, MetaRequest **r);static int parseHandlerChangeFileReplication(Properties &prop, MetaRequest **r);static int parseHandlerRetireChunkserver(Properties &prop, MetaRequest **r);static int parseHandlerToggleRebalancing(Properties &prop, MetaRequest **r);static int parseHandlerExecuteRebalancePlan(Properties &prop, MetaRequest **r);static int parseHandlerLeaseAcquire(Properties &prop, MetaRequest **r);static int parseHandlerLeaseRenew(Properties &prop, MetaRequest **r);static int parseHandlerLeaseRelinquish(Properties &prop, MetaRequest **r);static int parseHandlerChunkCorrupt(Properties &prop, MetaRequest **r);static int parseHandlerHello(Properties &prop, MetaRequest **r);static int parseHandlerPing(Properties &prop, MetaRequest **r);static int parseHandlerStats(Properties &prop, MetaRequest **r);static int parseHandlerDumpChunkToServerMap(Properties &prop, MetaRequest **r);static int parseHandlerOpenFiles(Properties &prop, MetaRequest **r);static int parseHandlerToggleWORM(Properties &prop, MetaRequest **r);static int parseHandlerUpServers(Properties &prop, MetaRequest **r);/// command -> parsehandler maptypedef map<string, ParseHandler> ParseHandlerMap;typedef map<string, ParseHandler>::iterator ParseHandlerMapIter;// handlers for parsingParseHandlerMap gParseHandlers;// mapping for the counterstypedef map<MetaOp, Counter *> OpCounterMap;typedef map<MetaOp, Counter *>::iterator OpCounterMapIter;OpCounterMap gCounters;// see the comments in setClusterKey()string gClusterKey;string gMD5SumFn;bool gWormMode = false;static boolfile_exists(fid_t fid){ return metatree.getFattr(fid) != NULL;}static boolpath_exists(const string &pathname){ MetaFattr *fa = metatree.lookupPath(KFS::ROOTFID, pathname); return fa != NULL;}static boolis_dir(fid_t fid){ MetaFattr *fa = metatree.getFattr(fid); return fa != NULL && fa->type == KFS_DIR;}static voidAddCounter(const char *name, MetaOp opName){ Counter *c = new Counter(name); globals().counterManager.AddCounter(c); gCounters[opName] = c;}voidKFS::RegisterCounters(){ static int calledOnce = 0; if (calledOnce) return; calledOnce = 1; AddCounter("Get alloc", META_GETALLOC); AddCounter("Get layout", META_GETLAYOUT); AddCounter("Lookup", META_LOOKUP); AddCounter("Lookup Path", META_LOOKUP_PATH); AddCounter("Allocate", META_ALLOCATE); AddCounter("Truncate", META_TRUNCATE); AddCounter("Create", META_CREATE); AddCounter("Remove", META_REMOVE); AddCounter("Rename", META_RENAME); AddCounter("Mkdir", META_MKDIR); AddCounter("Rmdir", META_RMDIR); AddCounter("Change File Replication", META_CHANGE_FILE_REPLICATION); AddCounter("Lease Acquire", META_LEASE_ACQUIRE); AddCounter("Lease Renew", META_LEASE_RENEW); AddCounter("Lease Cleanup", META_LEASE_CLEANUP); AddCounter("Corrupt Chunk ", META_CHUNK_CORRUPT); AddCounter("Chunkserver Hello ", META_HELLO); AddCounter("Chunkserver Bye ", META_BYE); AddCounter("Chunkserver Retire Start", META_RETIRE_CHUNKSERVER); // AddCounter("Chunkserver Retire Done", META_CHUNK_RETIRE_DONE); AddCounter("Replication Checker ", META_CHUNK_REPLICATION_CHECK); AddCounter("Replication Done ", META_CHUNK_REPLICATE);}static voidUpdateCounter(MetaOp opName){ Counter *c; OpCounterMapIter iter; iter = gCounters.find(opName); if (iter == gCounters.end()) return; c = iter->second; c->Update(1);}/* * Submit a request to change the increment used for bumping up chunk version #. * @param[in] r The request that depends on chunk-version-increment being written * out to disk as part of completing the request processing. */voidKFS::ChangeIncarnationNumber(MetaRequest *r){ if (chunkVersionInc < 1) // disable this bumping for now ++chunkVersionInc; MetaChangeChunkVersionInc *ccvi = new MetaChangeChunkVersionInc(chunkVersionInc, r); submit_request(ccvi);}/* * Set the "key" for this cluster. All chunkservers connecting to the meta-data * server should provide this key in the hello message. * @param[in] key The desired cluster key*/voidKFS::setClusterKey(const char *key){ gClusterKey = key;}/* * A way of doing admission control on chunkservers is to ensure that they run * an "approved" binary. The approved list is defined by the MD5Sum of the * binary in an MD5Sum file. All chunkservers connecting to * the metaserver should provide their md5sum in the hello message. If the * md5sum is in the approved list, then the chunkserver is admitted to the * system. * @param[in] md5sumFn The filename with the list of MD5Sums*/voidKFS::setMD5SumFn(const char *md5sumFn){ gMD5SumFn = md5sumFn;}/* * Set WORM mode. In WORM mode, deletes are disabled. */voidKFS::setWORMMode(bool value){ gWormMode = value;}/* * Boilerplate code for specific request types. Cast to the * appropriate type, call the corresponding KFS tree routine, * then use the callback to return the results. */static voidhandle_lookup(MetaRequest *r){ MetaLookup *req = static_cast <MetaLookup *>(r); MetaFattr *fa = metatree.lookup(req->dir, req->name); req->status = (fa == NULL) ? -ENOENT : 0; if (fa != NULL) req->result = *fa;}static voidhandle_lookup_path(MetaRequest *r){ MetaLookupPath *req = static_cast <MetaLookupPath *>(r); MetaFattr *fa = metatree.lookupPath(req->root, req->path); req->status = (fa == NULL) ? -ENOENT : 0; if (fa != NULL) req->result = *fa;}static voidhandle_create(MetaRequest *r){ MetaCreate *req = static_cast <MetaCreate *>(r); fid_t fid = 0; if (!is_dir(req->dir)) { req->status = -ENOTDIR; return; } req->status = metatree.create(req->dir, req->name, &fid, req->numReplicas, req->exclusive); req->fid = fid;}static voidhandle_mkdir(MetaRequest *r){ MetaMkdir *req = static_cast <MetaMkdir *>(r); if (!is_dir(req->dir)) { req->status = -ENOTDIR; return; } fid_t fid = 0; req->status = metatree.mkdir(req->dir, req->name, &fid); req->fid = fid;}/*! * Specially named files (such as, those that end with ".tmp") can be * mutated by remove/rename. Otherwise, in WORM no deletes/renames are allowed. */static boolisWormMutationAllowed(const string &pathname){ string::size_type pos; pos = pathname.rfind(".tmp"); return pos != string::npos;}/*! * \brief Remove a file in a directory. Also, remove the chunks * associated with the file. For removing chunks, we send off * RPCs to the appropriate chunkservers. */static voidhandle_remove(MetaRequest *r){ MetaRemove *req = static_cast <MetaRemove *>(r); if (gWormMode && (!isWormMutationAllowed(req->name))) { // deletes are disabled in WORM mode except for specially named // files req->status = -EPERM; return; } req->status = metatree.remove(req->dir, req->name);}static voidhandle_rmdir(MetaRequest *r){ MetaRmdir *req = static_cast <MetaRmdir *>(r); if (gWormMode && (!isWormMutationAllowed(req->name))) { // deletes are disabled in WORM mode req->status = -EPERM; return; } req->status = metatree.rmdir(req->dir, req->name);}static voidhandle_readdir(MetaRequest *r){ MetaReaddir *req = static_cast <MetaReaddir *>(r); if (!file_exists(req->dir)) req->status = -ENOENT; else if (!is_dir(req->dir)) req->status = -ENOTDIR; else { vector<MetaDentry *> res; req->status = metatree.readdir(req->dir, res); // // Previously, req->v used to be vector<MetaDentry *>. This // meant that req->v carried out a pointer for something in the // metatree. Now, if the pointer was deleted, then req->v // contains dangling references and can corrupt memory. // Instead, make a copy of the dentry and we are good. // if (req->status == 0) { for (uint32_t i = 0; i < res.size(); i++) req->v.push_back(res[i]); } }}class EnumerateLocations { vector <ServerLocation> &v;public: EnumerateLocations(vector <ServerLocation> &result): v(result) { } void operator () (ChunkServerPtr c) { ServerLocation l = c->GetServerLocation(); v.push_back(l); }};class ListServerLocations { ostringstream &os;public: ListServerLocations(ostringstream &out): os(out) { } void operator () (const ServerLocation &s) { os << " " << s.ToString(); }};class EnumerateReaddirPlusInfo { ostringstream &os;public: EnumerateReaddirPlusInfo(ostringstream &o) : os(o) { } void operator()(MetaDentry *entry) { static string fname[] = { "empty", "file", "dir" }; MetaFattr *fa = metatree.lookup(entry->getDir(), entry->getName()); os << "Begin-entry" << "\r\n"; if (fa == NULL) { return; } os << "Name: " << entry->getName() << "\r\n"; os << "File-handle: " << toString(fa->id()) << "\r\n"; os << "Type: " << fname[fa->type] << "\r\n"; sendtime(os, "M-Time:", fa->mtime, "\r\n"); sendtime(os, "C-Time:", fa->ctime, "\r\n"); sendtime(os, "CR-Time:", fa->crtime, "\r\n"); if (fa->type == KFS_DIR) { return; } // for a file, get the layout and provide location of last chunk // so that the client can compute filesize vector<MetaChunkInfo*> chunkInfo; vector<ChunkServerPtr> c; int status = metatree.getalloc(fa->id(), chunkInfo); if ((status != 0) || (chunkInfo.size() == 0)) { os << "Chunk-count: 0\r\n"; os << "File-size: 0\r\n"; os << "Replication: " << toString(fa->numReplicas) << "\r\n"; return; } MetaChunkInfo* lastChunk = chunkInfo.back(); ChunkLayoutInfo l; l.offset = lastChunk->offset; l.chunkId = lastChunk->chunkId; l.chunkVersion = lastChunk->chunkVersion; if (gLayoutManager.GetChunkToServerMapping(l.chunkId, c) != 0) { // if all the servers hosting the chunk are // down...sigh... os << "Chunk-count: 0\r\n"; os << "File-size: 0\r\n"; os << "Replication: " << toString(fa->numReplicas) << "\r\n"; return; } // // we give the client all the info about the last block of the // file; we also tell the client what we know about the // filesize. if the value we send is -1, the client will figure // out the size. // for_each(c.begin(), c.end(), EnumerateLocations(l.locations)); os << "Chunk-count: " << toString(fa->chunkcount) << "\r\n"; os << "File-size: " << toString(fa->filesize) << "\r\n"; os << "Replication: " << toString(fa->numReplicas) << "\r\n"; os << "Chunk-offset: " << l.offset << "\r\n"; os << "Chunk-handle: " << l.chunkId << "\r\n"; os << "Chunk-version: " << l.chunkVersion << "\r\n"; os << "Num-replicas: " << l.locations.size() << "\r\n"; os << "Replicas: "; for_each(l.locations.begin(), l.locations.end(), ListServerLocations(os)); os << "\r\n"; }};static voidhandle_readdirplus(MetaRequest *r){ MetaReaddirPlus *req = static_cast <MetaReaddirPlus *>(r); if (!file_exists(req->dir)) { req->status = -ENOENT; return; } else if (!is_dir(req->dir)) { req->status = -ENOTDIR; return; } vector<MetaDentry *> res; req->status = metatree.readdir(req->dir, res); if (req->status != 0) return; // now that we have the entire directory read, for each entry in the // directory, get the attributes out. req->numEntries = res.size(); for_each(res.begin(), res.end(), EnumerateReaddirPlusInfo(req->v));}/*! * \brief Given a directory tree rooted at dir, return * the # of files/# of bytes in that tree. */static intgetDirSummary(fid_t dir, uint64_t &nFiles, uint64_t &nBytes){ int res; vector<MetaDentry *> dentries; res = metatree.readdir(dir, dentries); if (res != 0) return res; for (uint32_t i = 0; i < dentries.size(); i++) { MetaDentry *entry = dentries[i]; string entryName = entry->getName(); if ((entryName == ".") || (entryName == "..")) continue; MetaFattr *fa = metatree.lookup(entry->getDir(), entryName); if ((fa == NULL) || (fa->type == KFS_NONE)) continue; if (fa->type == KFS_DIR) { if (fa->id() == dir) continue; res = getDirSummary(fa->id(), nFiles, nBytes); if (res != 0) return res; continue; } nFiles++; if (fa->filesize >= 0) nBytes += fa->filesize; else { // we don't know the filesize; so, we assume all the // chunks upto the last one are full and use that value // as an approximation of the file size. if (fa->chunkcount > 0) nBytes += ((fa->chunkcount - 1) * CHUNKSIZE); else nBytes += CHUNKSIZE; } } // all good return 0;}static voidhandle_getDirSummary(MetaRequest *r){ MetaGetDirSummary *req = static_cast <MetaGetDirSummary *>(r); if (!is_dir(req->dir)) { req->status = -ENOTDIR; return; } req->status = getDirSummary(req->dir, req->numFiles, req->numBytes);}/*! * \brief Get the allocation information for a specific chunk in a file. */static voidhandle_getalloc(MetaRequest *r){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -