📄 request.cc
字号:
MetaGetalloc *req = static_cast <MetaGetalloc *>(r); MetaChunkInfo *chunkInfo; vector<ChunkServerPtr> c; if (!file_exists(req->fid)) { KFS_LOG_VA_DEBUG("handle_getalloc: no such file %lld", req->fid); req->status = -ENOENT; return; } req->status = metatree.getalloc(req->fid, req->offset, &chunkInfo); if (req->status != 0) { KFS_LOG_VA_DEBUG( "handle_getalloc(%lld, %lld) = %d: kfsop failed", req->fid, req->offset, req->status); return; } req->chunkId = chunkInfo->chunkId; req->chunkVersion = chunkInfo->chunkVersion; if (gLayoutManager.GetChunkToServerMapping(req->chunkId, c) != 0) { KFS_LOG_DEBUG("handle_getalloc: no chunkservers"); req->status = -EAGAIN; return; } for_each(c.begin(), c.end(), EnumerateLocations(req->locations)); req->status = 0;}/*! * \brief Get the allocation information for a file. Determine * how many chunks there and where they are located. */static voidhandle_getlayout(MetaRequest *r){ MetaGetlayout *req = static_cast <MetaGetlayout *>(r); vector<MetaChunkInfo*> chunkInfo; vector<ChunkServerPtr> c; if (!file_exists(req->fid)) { req->status = -ENOENT; return; } req->status = metatree.getalloc(req->fid, chunkInfo); if (req->status != 0) return; for (vector<ChunkLayoutInfo>::size_type i = 0; i < chunkInfo.size(); i++) { ChunkLayoutInfo l; l.offset = chunkInfo[i]->offset; l.chunkId = chunkInfo[i]->chunkId; l.chunkVersion = chunkInfo[i]->chunkVersion; if (gLayoutManager.GetChunkToServerMapping(l.chunkId, c) != 0) { req->status = -EHOSTUNREACH; return; } for_each(c.begin(), c.end(), EnumerateLocations(l.locations)); req->v.push_back(l); } req->status = 0;}class ChunkVersionChanger { fid_t fid; chunkId_t chunkId; seq_t chunkVers;public: ChunkVersionChanger(fid_t f, chunkId_t c, seq_t v) : fid(f), chunkId(c), chunkVers(v) { } void operator() (ChunkServerPtr p) { p->NotifyChunkVersChange(fid, chunkId, chunkVers); }};/*! * \brief handle an allocation request for a chunk in a file. * \param[in] r write allocation request * * Write allocation proceeds as follows: * 1. The client has sent a write allocation request which has been * parsed and turned into an RPC request (which is handled here). * 2. We first get a unique chunk identifier (after validating the * fileid). * 3. We send the request to the layout manager to pick a location * for the chunk. * 4. The layout manager picks a location and sends an RPC to the * corresponding chunk server to create the chunk. * 5. When the RPC is going on, processing for this request is * suspended. * 6. When the RPC reply is received, this request gets re-activated * and we come back to this function. * 7. Assuming that the chunk server returned a success, we update * the metatree to link the chunkId with the fileid (from this * request). * 8. Processing for this request is now complete; it is logged and * a reply is sent back to the client. * * Versioning/Leases introduces a few wrinkles to the above steps: * In step #2, the metatree could return -EEXIST if an allocation * has been done for the <fid, offset>. In such a case, we need to * check with the layout manager to see if a new lease is required. * If a new lease is required, the layout manager bumps up the version * # for the chunk and notifies the chunkservers. The message has to * be suspended until the chunkservers ack. After the message is * restarted, we need to update the metatree to reflect the newer * version # before notifying the client. * * On the other hand, if a new lease isn't required, then the layout * manager tells us where the data has been placed; the process for * the request is therefore complete. */static voidhandle_allocate(MetaRequest *r){ MetaAllocate *req = static_cast<MetaAllocate *>(r); if (!req->layoutDone) { KFS_LOG_VA_DEBUG("Starting layout for req:%lld", req->opSeqno); // force an allocation req->chunkId = 0; // start at step #2 above. req->status = metatree.allocateChunkId( req->fid, req->offset, &req->chunkId, &req->chunkVersion, &req->numReplicas); if ((req->status != 0) && (req->status != -EEXIST)) { // we have a problem return; } if (req->status == -EEXIST) { bool isNewLease = false; // Get a (new) lease if possible req->status = gLayoutManager.GetChunkWriteLease(req, isNewLease); if (req->status != 0) { // couln't get the lease...bail return; } if (!isNewLease) { KFS_LOG_VA_DEBUG("Got valid lease for req:%lld", req->opSeqno); // we got a valid lease. so, return return; } // new lease and chunkservers have been notified // so, wait for them to ack } else if (gLayoutManager.AllocateChunk(req) != 0) { // we have a problem req->status = -ENOSPC; return; } // we have queued an RPC to the chunkserver. so, hold // off processing (step #5) req->suspended = true; return; } KFS_LOG_VA_DEBUG("Layout is done for req:%lld", req->opSeqno); if (req->status != 0) { // we have a problem: it is possible that the server // went down. ask the client to retry.... req->status = -KFS::EALLOCFAILED; metatree.getChunkVersion(req->fid, req->chunkId, &req->chunkVersion); if (req->chunkVersion > 0) { // reset version #'s at the chunkservers for_each(req->servers.begin(), req->servers.end(), ChunkVersionChanger(req->fid, req->chunkId, req->chunkVersion)); } else { // this is the first time the chunk was allocated. // since the allocation failed, remove existence of this chunk // on the metaserver. gLayoutManager.RemoveChunkToServerMapping(req->chunkId); } req->suspended = true; ChangeIncarnationNumber(req); return; } // layout is complete (step #6) req->suspended = false; // update the tree (step #7) and since we turned off the // suspend flag, the request will be logged and go on its // merry way. req->status = metatree.assignChunkId(req->fid, req->offset, req->chunkId, req->chunkVersion); if (req->status != 0) KFS_LOG_VA_DEBUG("Assign chunk id failed for %lld,%lld", req->fid, req->offset);}static voidhandle_truncate(MetaRequest *r){ MetaTruncate *req = static_cast <MetaTruncate *>(r); chunkOff_t allocOffset = 0; if (gWormMode) { req->status = -EPERM; return; } req->status = metatree.truncate(req->fid, req->offset, &allocOffset); if (req->status > 0) { // an allocation is needed MetaAllocate *alloc = new MetaAllocate(req->opSeqno, req->fid, allocOffset); KFS_LOG_VA_DEBUG("Suspending truncation due to alloc at offset: %lld", allocOffset); // tie things together alloc->req = r; req->suspended = true; handle_allocate(alloc); }}static voidhandle_rename(MetaRequest *r){ MetaRename *req = static_cast <MetaRename *>(r); if (gWormMode && ((!isWormMutationAllowed(req->oldname)) || path_exists(req->newname))) { // renames are disabled in WORM mode: otherwise, we could // overwrite an existing file req->status = -EPERM; return; } req->status = metatree.rename(req->dir, req->oldname, req->newname, req->overwrite);}static voidhandle_change_file_replication(MetaRequest *r){ MetaChangeFileReplication *req = static_cast <MetaChangeFileReplication *>(r); if (file_exists(req->fid)) req->status = metatree.changeFileReplication(req->fid, req->numReplicas); else req->status = -ENOENT;}static voidhandle_retire_chunkserver(MetaRequest *r){ MetaRetireChunkserver *req = static_cast <MetaRetireChunkserver *>(r); req->status = gLayoutManager.RetireServer(req->location, req->nSecsDown);}static voidhandle_toggle_rebalancing(MetaRequest *r){ MetaToggleRebalancing *req = static_cast <MetaToggleRebalancing *>(r); gLayoutManager.ToggleRebalancing(req->value); req->status = 0;}static voidhandle_toggle_worm(MetaRequest *r) { MetaToggleWORM *req = static_cast <MetaToggleWORM *>(r); setWORMMode(req->value); req->status = 0;}static voidhandle_execute_rebalanceplan(MetaRequest *r){ MetaExecuteRebalancePlan *req = static_cast <MetaExecuteRebalancePlan *>(r); req->status = gLayoutManager.LoadRebalancePlan(req->planPathname);}static voidhandle_log_rollover(MetaRequest *r){ r->status = 0;}static voidhandle_hello(MetaRequest *r){ MetaHello *req = static_cast <MetaHello *>(r); if (req->status < 0) { // bad hello request...possible cluster key mismatch return; } gLayoutManager.AddNewServer(req); req->status = 0;}static voidhandle_bye(MetaRequest *r){ MetaBye *req = static_cast <MetaBye *>(r); gLayoutManager.ServerDown(req->server.get()); req->status = 0;}static voidhandle_lease_acquire(MetaRequest *r){ MetaLeaseAcquire *req = static_cast <MetaLeaseAcquire *>(r); req->status = gLayoutManager.GetChunkReadLease(req);}static voidhandle_lease_renew(MetaRequest *r){ MetaLeaseRenew *req = static_cast <MetaLeaseRenew *>(r); req->status = gLayoutManager.LeaseRenew(req);}static voidhandle_lease_relinquish(MetaRequest *r){ MetaLeaseRelinquish *req = static_cast <MetaLeaseRelinquish *>(r); req->status = gLayoutManager.LeaseRelinquish(req); KFS_LOG_VA_INFO("Lease relinquish: %s, status = %d", r->Show().c_str(), req->status);}static voidhandle_lease_cleanup(MetaRequest *r){ MetaLeaseCleanup *req = static_cast <MetaLeaseCleanup *>(r); gLayoutManager.LeaseCleanup(); // some leases are gone. so, cleanup dumpster metatree.cleanupDumpster(); req->status = 0;}static voidhandle_chunk_corrupt(MetaRequest *r){ MetaChunkCorrupt *req = static_cast <MetaChunkCorrupt *>(r); gLayoutManager.ChunkCorrupt(req); req->status = 0;}static voidhandle_chunk_replication_check(MetaRequest *r){ MetaChunkReplicationCheck *req = static_cast <MetaChunkReplicationCheck *>(r); gLayoutManager.ChunkReplicationChecker(); req->status = 0;}static voidhandle_chunk_size_done(MetaRequest *r){ MetaChunkSize *req = static_cast <MetaChunkSize *>(r); if (req->chunkSize < 0) { req->status = -1; return; } MetaFattr *fa = metatree.getFattr(req->fid); if ((fa != NULL) && (fa->type == KFS_FILE)) { vector<MetaChunkInfo*> chunkInfo; int status = metatree.getalloc(fa->id(), chunkInfo); if ((status != 0) || (chunkInfo.size() == 0)) { return; } // only if we are looking at the last chunk of the file can we // set the size. MetaChunkInfo* lastChunk = chunkInfo.back(); if (req->chunkId == lastChunk->chunkId) { fa->filesize = (fa->chunkcount - 1) * CHUNKSIZE + req->chunkSize; } } req->status = 0;}static voidhandle_chunk_replication_done(MetaRequest *r){ MetaChunkReplicate *req = static_cast <MetaChunkReplicate *>(r); gLayoutManager.ChunkReplicationDone(req);}static voidhandle_change_chunkVersionInc(MetaRequest *r){ r->status = 0;}static voidhandle_ping(MetaRequest *r){ MetaPing *req = static_cast <MetaPing *>(r); req->status = 0; gLayoutManager.Ping(req->systemInfo, req->servers, req->downServers, req->retiringServers);}static voidhandle_upservers(MetaRequest *r){ MetaUpServers *req = static_cast <MetaUpServers *>(r); req->status = 0; gLayoutManager.UpServers(req->stringStream);}static voidhandle_dump_chunkToServerMap(MetaRequest *r){ MetaDumpChunkToServerMap *req = static_cast <MetaDumpChunkToServerMap *>(r); req->status = 0; gLayoutManager.DumpChunkToServerMap();}static voidhandle_stats(MetaRequest *r){ MetaStats *req = static_cast <MetaStats *>(r); ostringstream os; req->status = 0; globals().counterManager.Show(os); req->stats = os.str();}static voidhandle_open_files(MetaRequest *r){ MetaOpenFiles *req = static_cast <MetaOpenFiles *>(r); req->status = 0; gLayoutManager.GetOpenFiles(req->openForRead, req->openForWrite);}/* * Map request types to the functions that handle them. */static voidsetup_handlers(){ handler[META_LOOKUP] = handle_lookup; handler[META_LOOKUP_PATH] = handle_lookup_path; handler[META_CREATE] = handle_create; handler[META_MKDIR] = handle_mkdir; handler[META_REMOVE] = handle_remove; handler[META_RMDIR] = handle_rmdir; handler[META_READDIR] = handle_readdir; handler[META_READDIRPLUS] = handle_readdirplus; handler[META_GETDIRSUMMARY] = handle_getDirSummary; handler[META_GETALLOC] = handle_getalloc; handler[META_GETLAYOUT] = handle_getlayout; handler[META_ALLOCATE] = handle_allocate; handler[META_TRUNCATE] = handle_truncate; handler[META_RENAME] = handle_rename; handler[META_CHANGE_FILE_REPLICATION] = handle_change_file_replication; handler[META_LOG_ROLLOVER] = handle_log_rollover; handler[META_CHUNK_SIZE] = handle_chunk_size_done; handler[META_CHUNK_REPLICATE] = handle_chunk_replication_done; handler[META_CHUNK_REPLICATION_CHECK] = handle_chunk_replication_check; handler[META_RETIRE_CHUNKSERVER] = handle_retire_chunkserver; handler[META_TOGGLE_REBALANCING] = handle_toggle_rebalancing; handler[META_EXECUTE_REBALANCEPLAN] = handle_execute_rebalanceplan; handler[META_TOGGLE_WORM] = handle_toggle_worm; // Chunk server -> Meta server op handler[META_HELLO] = handle_hello; handler[META_BYE] = handle_bye; // Lease related ops handler[META_LEASE_ACQUIRE] = handle_lease_acquire; handler[META_LEASE_RENEW] = handle_lease_renew; handler[META_LEASE_RELINQUISH] = handle_lease_relinquish; handler[META_LEASE_CLEANUP] = handle_lease_cleanup; // Chunk version # increment/corrupt chunk handler[META_CHANGE_CHUNKVERSIONINC] = handle_change_chunkVersionInc; handler[META_CHUNK_CORRUPT] = handle_chunk_corrupt; // Monitoring RPCs handler[META_PING] = handle_ping; handler[META_STATS] = handle_stats; handler[META_DUMP_CHUNKTOSERVERMAP] = handle_dump_chunkToServerMap; handler[META_OPEN_FILES] = handle_open_files; handler[META_UPSERVERS] = handle_upservers; gParseHandlers["LOOKUP"] = parseHandlerLookup; gParseHandlers["LOOKUP_PATH"] = parseHandlerLookupPath; gParseHandlers["CREATE"] = parseHandlerCreate; gParseHandlers["MKDIR"] = parseHandlerMkdir; gParseHandlers["REMOVE"] = parseHandlerRemove; gParseHandlers["RMDIR"] = parseHandlerRmdir; gParseHandlers["READDIR"] = parseHandlerReaddir; gParseHandlers["READDIRPLUS"] = parseHandlerReaddirPlus; gParseHandlers["GETALLOC"] = parseHandlerGetalloc; gParseHandlers["GETLAYOUT"] = parseHandlerGetlayout; gParseHandlers["GETDIRSUMMARY"] = parseHandlerGetDirSummary; gParseHandlers["ALLOCATE"] = parseHandlerAllocate; gParseHandlers["TRUNCATE"] = parseHandlerTruncate; gParseHandlers["RENAME"] = parseHandlerRename; gParseHandlers["CHANGE_FILE_REPLICATION"] = parseHandlerChangeFileReplication; gParseHandlers["RETIRE_CHUNKSERVER"] = parseHandlerRetireChunkserver; gParseHandlers["EXECUTE_REBALANCEPLAN"] = parseHandlerExecuteRebalancePlan; gParseHandlers["TOGGLE_REBALANCING"] = parseHandlerToggleRebalancing; // Lease related ops gParseHandlers["LEASE_ACQUIRE"] = parseHandlerLeaseAcquire; gParseHandlers["LEASE_RENEW"] = parseHandlerLeaseRenew; gParseHandlers["LEASE_RELINQUISH"] = parseHandlerLeaseRelinquish; gParseHandlers["CORRUPT_CHUNK"] = parseHandlerChunkCorrupt;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -