⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicator.cc

📁 nandflash文件系统源代码
💻 CC
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: Replicator.cc 210 2008-11-05 02:51:49Z sriramsrao $ //// Created 2007/01/17// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2007-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// \brief Code for dealing with a replicating a chunk.  The metaserver//asks a destination chunkserver to obtain a copy of a chunk from a source//chunkserver; in response, the destination chunkserver pulls the data//down and writes it out to disk.  At the end replication, the//destination chunkserver notifies the metaserver.////----------------------------------------------------------------------------#include "Replicator.h"#include "ChunkServer.h"#include "Utils.h"#include "libkfsIO/Globals.h"#include "libkfsIO/Checksum.h"#include <string>#include <sstream>#include "common/log.h"#include <boost/scoped_array.hpp>using boost::scoped_array;using std::string;using std::ostringstream;using std::istringstream;using namespace KFS;using namespace KFS::libkfsio;Replicator::Replicator(ReplicateChunkOp *op) :    mFileId(op->fid), mChunkId(op->chunkId),     mChunkVersion(op->chunkVersion),     mOwner(op), mDone(false),  mOffset(0), mChunkMetadataOp(0),     mReadOp(0), mWriteOp(op->chunkId, op->chunkVersion){    mReadOp.chunkId = op->chunkId;    mReadOp.chunkVersion = op->chunkVersion;    mReadOp.clnt = this;    mWriteOp.clnt = this;    mChunkMetadataOp.clnt = this;    mWriteOp.Reset();    mWriteOp.isFromReReplication = true;    SET_HANDLER(&mReadOp, &ReadOp::HandleReplicatorDone);}Replicator::~Replicator(){}voidReplicator::Start(RemoteSyncSMPtr &peer){#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    mPeer = peer;    mChunkMetadataOp.seq = mPeer->NextSeqnum();    mChunkMetadataOp.chunkId = mChunkId;    SET_HANDLER(this, &Replicator::HandleStartDone);    mPeer->Enqueue(&mChunkMetadataOp);}intReplicator::HandleStartDone(int code, void *data){#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    if (mChunkMetadataOp.status < 0) {        Terminate();        return 0;    }    mChunkSize = mChunkMetadataOp.chunkSize;    mChunkVersion = mChunkMetadataOp.chunkVersion;    mReadOp.chunkVersion = mWriteOp.chunkVersion = mChunkVersion;    // set the version to a value that will never be used; if    // replication is successful, we then bump up the counter.    if (gChunkManager.AllocChunk(mFileId, mChunkId, 0, true) < 0) {        Terminate();        return -1;    }    Read();    return 0;}voidReplicator::Read(){    ReplicatorPtr self = shared_from_this();#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    if (mOffset >= (off_t) mChunkSize) {        mDone = true;        Terminate();        return;    }    mReadOp.seq = mPeer->NextSeqnum();    mReadOp.status = 0;    mReadOp.offset = mOffset;    // read an MB     mReadOp.numBytes = 1 << 20;    mPeer->Enqueue(&mReadOp);        SET_HANDLER(this, &Replicator::HandleReadDone);}intReplicator::HandleReadDone(int code, void *data){#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    if (mReadOp.status < 0) {        KFS_LOG_VA_INFO("Read from peer %s failed with error: %d",                        mPeer->GetLocation().ToString().c_str(), mReadOp.status);        Terminate();        return 0;    }    delete mWriteOp.dataBuf;        mWriteOp.Reset();    mWriteOp.dataBuf = new IOBuffer();    mWriteOp.numBytes = mReadOp.dataBuf->BytesConsumable();    mWriteOp.dataBuf->Move(mReadOp.dataBuf, mWriteOp.numBytes);    mWriteOp.offset = mOffset;    mWriteOp.isFromReReplication = true;    // align the writes to checksum boundaries    if ((mWriteOp.numBytes >= CHECKSUM_BLOCKSIZE) &&        (mWriteOp.numBytes % CHECKSUM_BLOCKSIZE) != 0)        // round-down so to speak; whatever is left will be picked up by the next read        mWriteOp.numBytes = (mWriteOp.numBytes / CHECKSUM_BLOCKSIZE) * CHECKSUM_BLOCKSIZE;    SET_HANDLER(this, &Replicator::HandleWriteDone);    if (gChunkManager.WriteChunk(&mWriteOp) < 0) {        // abort everything        Terminate();        return -1;    }    return 0;}intReplicator::HandleWriteDone(int code, void *data){    ReplicatorPtr self = shared_from_this();#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    assert((code == EVENT_CMD_DONE) || (code == EVENT_DISK_WROTE));    if (mWriteOp.status < 0) {        KFS_LOG_VA_INFO("Write failed with error: %d", mWriteOp.status);        Terminate();        return 0;    }    mOffset += mWriteOp.numBytesIO;    Read();    return 0;}voidReplicator::Terminate(){#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif    if (mDone) {        KFS_LOG_VA_INFO("Replication for %ld finished", mChunkId);        // now that replication is all done, set the version appropriately        gChunkManager.ChangeChunkVers(mFileId, mChunkId, mChunkVersion);        SET_HANDLER(this, &Replicator::HandleReplicationDone);                gChunkManager.ReplicationDone(mChunkId);        int res = gChunkManager.WriteChunkMetadata(mChunkId, &mWriteOp);        if (res == 0)            return;    }           KFS_LOG_VA_INFO("Replication for %ld failed from %s...cleaning up",                     mChunkId, mPeer->GetLocation().ToString().c_str());    gChunkManager.DeleteChunk(mChunkId);    mOwner->status = -1;    // Notify the owner of completion    mOwner->HandleEvent(EVENT_CMD_DONE, NULL);}// logging of the chunk meta data finished; we are all doneintReplicator::HandleReplicationDone(int code, void *data){    mOwner->status = 0;        // Notify the owner of completion    mOwner->HandleEvent(EVENT_CMD_DONE, (void *) &mChunkVersion);    return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -