📄 replicator.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: Replicator.cc 210 2008-11-05 02:51:49Z sriramsrao $ //// Created 2007/01/17// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2007-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// \brief Code for dealing with a replicating a chunk. The metaserver//asks a destination chunkserver to obtain a copy of a chunk from a source//chunkserver; in response, the destination chunkserver pulls the data//down and writes it out to disk. At the end replication, the//destination chunkserver notifies the metaserver.////----------------------------------------------------------------------------#include "Replicator.h"#include "ChunkServer.h"#include "Utils.h"#include "libkfsIO/Globals.h"#include "libkfsIO/Checksum.h"#include <string>#include <sstream>#include "common/log.h"#include <boost/scoped_array.hpp>using boost::scoped_array;using std::string;using std::ostringstream;using std::istringstream;using namespace KFS;using namespace KFS::libkfsio;Replicator::Replicator(ReplicateChunkOp *op) : mFileId(op->fid), mChunkId(op->chunkId), mChunkVersion(op->chunkVersion), mOwner(op), mDone(false), mOffset(0), mChunkMetadataOp(0), mReadOp(0), mWriteOp(op->chunkId, op->chunkVersion){ mReadOp.chunkId = op->chunkId; mReadOp.chunkVersion = op->chunkVersion; mReadOp.clnt = this; mWriteOp.clnt = this; mChunkMetadataOp.clnt = this; mWriteOp.Reset(); mWriteOp.isFromReReplication = true; SET_HANDLER(&mReadOp, &ReadOp::HandleReplicatorDone);}Replicator::~Replicator(){}voidReplicator::Start(RemoteSyncSMPtr &peer){#ifdef DEBUG verifyExecutingOnEventProcessor();#endif mPeer = peer; mChunkMetadataOp.seq = mPeer->NextSeqnum(); mChunkMetadataOp.chunkId = mChunkId; SET_HANDLER(this, &Replicator::HandleStartDone); mPeer->Enqueue(&mChunkMetadataOp);}intReplicator::HandleStartDone(int code, void *data){#ifdef DEBUG verifyExecutingOnEventProcessor();#endif if (mChunkMetadataOp.status < 0) { Terminate(); return 0; } mChunkSize = mChunkMetadataOp.chunkSize; mChunkVersion = mChunkMetadataOp.chunkVersion; mReadOp.chunkVersion = mWriteOp.chunkVersion = mChunkVersion; // set the version to a value that will never be used; if // replication is successful, we then bump up the counter. if (gChunkManager.AllocChunk(mFileId, mChunkId, 0, true) < 0) { Terminate(); return -1; } Read(); return 0;}voidReplicator::Read(){ ReplicatorPtr self = shared_from_this();#ifdef DEBUG verifyExecutingOnEventProcessor();#endif if (mOffset >= (off_t) mChunkSize) { mDone = true; Terminate(); return; } mReadOp.seq = mPeer->NextSeqnum(); mReadOp.status = 0; mReadOp.offset = mOffset; // read an MB mReadOp.numBytes = 1 << 20; mPeer->Enqueue(&mReadOp); SET_HANDLER(this, &Replicator::HandleReadDone);}intReplicator::HandleReadDone(int code, void *data){#ifdef DEBUG verifyExecutingOnEventProcessor();#endif if (mReadOp.status < 0) { KFS_LOG_VA_INFO("Read from peer %s failed with error: %d", mPeer->GetLocation().ToString().c_str(), mReadOp.status); Terminate(); return 0; } delete mWriteOp.dataBuf; mWriteOp.Reset(); mWriteOp.dataBuf = new IOBuffer(); mWriteOp.numBytes = mReadOp.dataBuf->BytesConsumable(); mWriteOp.dataBuf->Move(mReadOp.dataBuf, mWriteOp.numBytes); mWriteOp.offset = mOffset; mWriteOp.isFromReReplication = true; // align the writes to checksum boundaries if ((mWriteOp.numBytes >= CHECKSUM_BLOCKSIZE) && (mWriteOp.numBytes % CHECKSUM_BLOCKSIZE) != 0) // round-down so to speak; whatever is left will be picked up by the next read mWriteOp.numBytes = (mWriteOp.numBytes / CHECKSUM_BLOCKSIZE) * CHECKSUM_BLOCKSIZE; SET_HANDLER(this, &Replicator::HandleWriteDone); if (gChunkManager.WriteChunk(&mWriteOp) < 0) { // abort everything Terminate(); return -1; } return 0;}intReplicator::HandleWriteDone(int code, void *data){ ReplicatorPtr self = shared_from_this();#ifdef DEBUG verifyExecutingOnEventProcessor();#endif assert((code == EVENT_CMD_DONE) || (code == EVENT_DISK_WROTE)); if (mWriteOp.status < 0) { KFS_LOG_VA_INFO("Write failed with error: %d", mWriteOp.status); Terminate(); return 0; } mOffset += mWriteOp.numBytesIO; Read(); return 0;}voidReplicator::Terminate(){#ifdef DEBUG verifyExecutingOnEventProcessor();#endif if (mDone) { KFS_LOG_VA_INFO("Replication for %ld finished", mChunkId); // now that replication is all done, set the version appropriately gChunkManager.ChangeChunkVers(mFileId, mChunkId, mChunkVersion); SET_HANDLER(this, &Replicator::HandleReplicationDone); gChunkManager.ReplicationDone(mChunkId); int res = gChunkManager.WriteChunkMetadata(mChunkId, &mWriteOp); if (res == 0) return; } KFS_LOG_VA_INFO("Replication for %ld failed from %s...cleaning up", mChunkId, mPeer->GetLocation().ToString().c_str()); gChunkManager.DeleteChunk(mChunkId); mOwner->status = -1; // Notify the owner of completion mOwner->HandleEvent(EVENT_CMD_DONE, NULL);}// logging of the chunk meta data finished; we are all doneintReplicator::HandleReplicationDone(int code, void *data){ mOwner->status = 0; // Notify the owner of completion mOwner->HandleEvent(EVENT_CMD_DONE, (void *) &mChunkVersion); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -