📄 chunkserver.h
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: ChunkServer.h 149 2008-09-10 05:31:35Z sriramsrao $ //// Created 2006/06/05// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// \file ChunkServer.h// \brief Object that handles the communication with an individual// chunk server. Model here is the following:// - For write-allocation, layout manager asks the ChunkServer object// to send an RPC to the chunk server.// - The ChunkServer object sends the RPC and holds on to the request// that triggered the RPC.// - Eventually, when the RPC reply is received, the request is// re-activated (alongwith the response) and is sent back down the pike.////----------------------------------------------------------------------------#ifndef META_CHUNKSERVER_H#define META_CHUNKSERVER_H#include <string>#include <sstream>#include <set>using std::string;using std::ostringstream;#include <boost/shared_ptr.hpp>#include <boost/enable_shared_from_this.hpp>#include <time.h>#include "libkfsIO/KfsCallbackObj.h"#include "libkfsIO/ITimeout.h"#include "libkfsIO/NetConnection.h"#include "request.h"#include "queue.h"#include "common/properties.h"namespace KFS{ /// Chunk server connects to the meta server, sends a HELLO /// message to configure its state with the meta server, and /// from then onwards, the meta server then drives the RPCs. /// Types of messages: /// Meta server --> Chunk server: Allocate, Free, Heartbeat /// /// Something to trigger timeouts so that we send heartbeat to /// the chunk server. class ChunkServerTimeoutImpl; class ChunkServer : public KfsCallbackObj, public boost::enable_shared_from_this<ChunkServer> { public: /// /// Sequence: /// Chunk server connects. /// - A new chunkserver sm is born /// - chunkserver sends a HELLO with config info /// - send/recv messages with that chunkserver. /// ChunkServer(); ChunkServer(NetConnectionPtr &conn); ~ChunkServer(); /// Handler to handle the HELLO message. This method /// gets from the net manager when it sees some data /// is available on the socket. int HandleHello(int code, void *data); /// Generic event handler to handle network /// events. This method gets from the net manager when /// it sees some data is available on the socket. int HandleRequest(int code, void *data); /// Enqueue a request to be dispatched to this server /// @param[in] r the request to be enqueued. virtual void Enqueue(MetaRequest *r); /// Send an RPC to allocate a chunk on this server. /// An RPC request is enqueued and the call returns. /// When the server replies to the RPC, the request /// processing resumes. /// @param[in] r the request associated with the RPC call. /// @param[in] leaseId the id associated with the write lease. /// @retval 0 on success; -1 on failure /// int AllocateChunk(MetaAllocate *r, int64_t leaseId); /// Send an RPC to delete a chunk on this server. /// An RPC request is enqueued and the call returns. /// When the server replies to the RPC, the request /// processing resumes. /// @param[in] chunkId name of the chunk that is being /// deleted. /// @retval 0 on success; -1 on failure /// int DeleteChunk(chunkId_t chunkId); /// Send an RPC to truncate a chunk. /// An RPC request is enqueued and the call returns. /// When the server replies to the RPC, the request /// processing resumes. /// @param[in] chunkId name of the chunk that is being /// truncated /// @param[in] s size to which chunk is being truncated to. /// @retval 0 on success; -1 on failure /// int TruncateChunk(chunkId_t chunkId, off_t s); /// /// Send a message to the server asking it to go down. /// void Retire(); /// Method to get the size of a chunk from a chunkserver. int GetChunkSize(fid_t fid, chunkId_t chunkId); /// Methods to handle (re) replication of a chunk. If there are /// insufficient copies of a chunk, we replicate it. int ReplicateChunk(fid_t fid, chunkId_t chunkId, seq_t chunkVersion, const ServerLocation &loc); /// Replication of a chunk finished. Update statistics void ReplicateChunkDone(chunkId_t chunkId) { mNumChunkWriteReplications--; assert(mNumChunkWriteReplications >= 0); if (mNumChunkWriteReplications < 0) mNumChunkWriteReplications = 0; MovingChunkDone(chunkId); } /// Accessor method to get # of replications that are being /// handled by this server. int GetNumChunkReplications() const { return mNumChunkWriteReplications; } /// During re-replication, we want to track how much b/w is /// being spent read requests for replication by the server. This /// is to prevent a server being overloaded and becoming /// unresponsive as we try to increase the # of replicas. int GetReplicationReadLoad() const { return mNumChunkReadReplications; } void UpdateReplicationReadLoad(int count) { mNumChunkReadReplications += count; if (mNumChunkReadReplications < 0) mNumChunkReadReplications = 0; } /// Periodically, send a heartbeat message to the /// chunk server. The message is enqueued to the list /// of RPCs that need to be dispatched; whenever the /// dispatcher sends them out, the message goes. void Heartbeat(); /// If a chunkserver isn't responding, don't send any /// write load towards it. We detect loaded servers to be /// those that don't respond to heartbeat messages. bool IsResponsiveServer() const { return !mHeartbeatSkipped; } /// To support scheduled down-time and allow maintenance to be /// done on the server node, we could "retire" a server; when the /// server is being retired, we evacuate the blocks on that server /// and re-replicate them elsewhere (on non-retiring nodes). /// During the stage where the server is being retired, we don't /// want to send any new write traffic to the server. /// void SetRetiring(); bool IsRetiring() const { return mIsRetiring; } void IncCorruptChunks() { mNumCorruptChunks++; } /// Provide some stats...useful for ops void GetRetiringStatus(string &result); /// Notify the server object that the chunk needs evacuation. void EvacuateChunk(chunkId_t chunkId) { mEvacuatingChunks.insert(chunkId); } std::set<chunkId_t> GetEvacuatingChunks() { return mEvacuatingChunks; } /// When the plan is read in, the set of chunks that /// need to be moved to this node is updated. void AddToChunksToMove(chunkId_t chunkId) { mChunksToMove.insert(chunkId); } std::set<chunkId_t> GetChunksToMove() { return mChunksToMove; } void ClearChunksToMove() { mChunksToMove.clear(); } /// Whenever this node re-replicates a chunk that was targeted /// for rebalancing, update the set. void MovingChunkDone(chunkId_t chunkId) { mChunksToMove.erase(chunkId); } /// Evacuation of a chunk that maybe hosted on this server is /// done; if this server is retiring and all chunks on this are /// evacuated, we can tell the server to retire. void EvacuateChunkDone(chunkId_t chunkId); /// Whenever the layout manager determines that this /// server has stale chunks, it queues an RPC to /// notify the chunk server of the stale data. void NotifyStaleChunks(const vector<chunkId_t> &staleChunks); void NotifyStaleChunk(chunkId_t staleChunk); /// There is a difference between the version # as stored /// at the chunkserver and what is on the metaserver. By sending /// this message, the metaserver is asking the chunkserver to change /// the version # to what is passed in. void NotifyChunkVersChange(fid_t fid, chunkId_t chunkId, seq_t chunkVers); /// Dispatch all the pending RPCs to the chunk server. virtual void Dispatch(); /// An op has been dispatched. Stash a pointer to that op /// in the list of dispatched ops. /// @param[in] op The op that was just dispatched void Dispatched(MetaRequest *r) { mDispatchedReqs.push_back(r); } /// /// We sent a request; we got a reply. Take the op /// which has the response values filled in and resume /// processing for that op. /// @param[in] op The op for which we got a reply /// from a chunkserver. /// void ResumeOp(MetaRequest *op); /// Accessor method to get the host name/port ServerLocation GetServerLocation() const { return mLocation; } string ServerID() { return mLocation.ToString(); } /// Check if the hostname/port matches what is passed in /// @param[in] name name to match /// @param[in] port port # to match /// @retval true if a match occurs; false otherwise bool MatchingServer(const ServerLocation &loc) const {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -