⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 remotesyncsm.cc

📁 nandflash文件系统源代码
💻 CC
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: RemoteSyncSM.cc 235 2009-01-07 23:34:55Z sriramsrao $ //// Created 2006/09/27// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------#include "RemoteSyncSM.h"#include "Utils.h"#include "ChunkServer.h"#include "libkfsIO/NetManager.h"#include "libkfsIO/Globals.h"#include "common/log.h"#include "common/properties.h"#include <cerrno>#include <sstream>#include <algorithm>using std::find_if;using std::for_each;using std::istringstream;using std::ostringstream;using std::list;using namespace KFS;using namespace KFS::libkfsio;#include <boost/scoped_array.hpp>using boost::scoped_array;RemoteSyncSM::~RemoteSyncSM(){    if (mTimer)        globals().netManager.UnRegisterTimeoutHandler(mTimer);    delete mTimer;    if (mNetConnection)        mNetConnection->Close();    assert(mDispatchedOps.size() == 0);    assert(mPendingOps.empty());}boolRemoteSyncSM::Connect(){    TcpSocket *sock;    int res;    KFS_LOG_VA_DEBUG("Trying to connect to: %s", mLocation.ToString().c_str());    sock = new TcpSocket();    // do a non-blocking connect    res = sock->Connect(mLocation, true);    if ((res < 0) && (res != -EINPROGRESS)) {        KFS_LOG_VA_INFO("Connect to remote server (%s) failed: code = %d",                        mLocation.ToString().c_str(), res);        delete sock;        return false;    }    if (mTimer == NULL) {        mTimer = new RemoteSyncSMTimeoutImpl(this);        globals().netManager.RegisterTimeoutHandler(mTimer);    }    KFS_LOG_VA_INFO("Connect to remote server (%s) succeeded (res = %d)...",                    mLocation.ToString().c_str(), res);    SET_HANDLER(this, &RemoteSyncSM::HandleEvent);    mNetConnection.reset(new NetConnection(sock, this));    mNetConnection->SetDoingNonblockingConnect();        // If there is no activity on this socket for 30 minutes, we want    // to be notified, so that we can close connection.    mNetConnection->SetInactivityTimeout(30 * 60);    // Add this to the poll vector    globals().netManager.AddConnection(mNetConnection);    return true;}voidRemoteSyncSM::Enqueue(KfsOp *op){    ostringstream os;    if (!mNetConnection) {        mLastRequestSent = mLastResponseRecd = time(0);        if (!Connect()) {            KFS_LOG_VA_INFO("Connect to peer %s failed; failing ops", mLocation.ToString().c_str());            mDispatchedOps.push_back(op);            FailAllOps();            return;        }    }    if (!mNetConnection->IsGood()) {        KFS_LOG_VA_INFO("Lost the connection to peer %s; failing ops", mLocation.ToString().c_str());        mDispatchedOps.push_back(op);        FailAllOps();        return;    }    op->Request(os);    mNetConnection->Write(os.str().c_str(), os.str().length());    if (op->op == CMD_WRITE_PREPARE_FWD) {        // send the data as well        WritePrepareFwdOp *wpfo = static_cast<WritePrepareFwdOp *>(op);                mNetConnection->Write(wpfo->dataBuf, wpfo->dataBuf->BytesConsumable());        // fire'n'forget        op->status = 0;        KFS::SubmitOpResponse(op);                }    else {        mLastRequestSent = time(0);        mDispatchedOps.push_back(op);    }    mNetConnection->StartFlush();}intRemoteSyncSM::HandleEvent(int code, void *data){    IOBuffer *iobuf;    int msgLen, res;    // take a ref to prevent the object from being deleted    // while we are still in this function.    RemoteSyncSMPtr self = shared_from_this();    const char *reason = "error";#ifdef DEBUG    verifyExecutingOnNetProcessor();#endif        switch (code) {    case EVENT_NET_READ:	// We read something from the network.  Run the RPC that	// came in if we got all the data for the RPC	iobuf = (IOBuffer *) data;        mLastResponseRecd = time(0);	while (IsMsgAvail(iobuf, &msgLen)) {	    res = HandleResponse(iobuf, msgLen);            if (res < 0)                // maybe the response isn't fully available                break;	}	break;    case EVENT_NET_WROTE:	// Something went out on the network.  For now, we don't	// track it. Later, we may use it for tracking throttling	// and such.	break;            case EVENT_INACTIVITY_TIMEOUT:    	reason = "inactivity timeout";    case EVENT_NET_ERROR:        // If there is an error or there is no activity on the socket        // for N mins, we close the connection. 	KFS_LOG_VA_INFO("Closing connection to peer: %s due to %s", mLocation.ToString().c_str(), reason);	if (mNetConnection)	    mNetConnection->Close();        if (mTimer != NULL) {            globals().netManager.UnRegisterTimeoutHandler(mTimer);            delete mTimer;            mTimer = NULL;        }        // we are done...        Finish();	break;    default:	assert(!"Unknown event");	break;    }    return 0;}intRemoteSyncSM::HandleResponse(IOBuffer *iobuf, int msgLen){    const char separator = ':';    scoped_array<char> buf(new char[msgLen + 1]);    Properties prop;    kfsSeq_t seqNum;    int status;    list<KfsOp *>::iterator i;    size_t numBytes;    int64_t nAvail;    iobuf->CopyOut(buf.get(), msgLen);    buf[msgLen] = '\0';    istringstream ist(buf.get());    prop.loadProperties(ist, separator, false);    seqNum = prop.getValue("Cseq", (kfsSeq_t) -1);    status = prop.getValue("Status", -1);    numBytes = prop.getValue("Content-length", (long long) 0);    // if we don't have all the data for the write, hold on...    nAvail = iobuf->BytesConsumable() - msgLen;            if (nAvail < (int64_t) numBytes) {        // the data isn't here yet...wait...        return -1;    }    // now, we got everything...    iobuf->Consume(msgLen);    // find the matching op    i = find_if(mDispatchedOps.begin(), mDispatchedOps.end(),                 OpMatcher(seqNum));    if (i != mDispatchedOps.end()) {        KfsOp *op = *i;        op->status = status;        mDispatchedOps.erase(i);        if (op->op == CMD_WRITE_ID_ALLOC) {            WriteIdAllocOp *wiao = static_cast<WriteIdAllocOp *>(op);            wiao->writeIdStr = prop.getValue("Write-id", "");        } else if (op->op == CMD_READ) {            ReadOp *rop = static_cast<ReadOp *> (op);            if (rop->dataBuf == NULL)                rop->dataBuf = new IOBuffer();            rop->dataBuf->Move(iobuf, numBytes);        } else if (op->op == CMD_SIZE) {            SizeOp *sop = static_cast<SizeOp *>(op);            sop->size = prop.getValue("Size", 0);        } else if (op->op == CMD_GET_CHUNK_METADATA) {            GetChunkMetadataOp *gcm = static_cast<GetChunkMetadataOp *>(op);            gcm->chunkVersion = prop.getValue("Chunk-version", 0);            gcm->chunkSize = prop.getValue("Size", 0);            if (gcm->dataBuf == NULL)                gcm->dataBuf = new IOBuffer();            gcm->dataBuf->Move(iobuf, numBytes);                    }        // op->HandleEvent(EVENT_DONE, op);        KFS::SubmitOpResponse(op);    } else {        KFS_LOG_VA_DEBUG("Discarding a reply for unknown seq #: %d", seqNum);    }    return 0;}// Helper functor that fails an op with an error code.class OpFailer {    int errCode;public:    OpFailer(int c) : errCode(c) { };    void operator() (KfsOp *op) {        op->status = errCode;        // op->HandleEvent(EVENT_DONE, op);        KFS::SubmitOpResponse(op);    }};voidRemoteSyncSM::FailAllOps(){    KfsOp *op;    // get rid of the pending ones as well    while ((op = mPendingOps.dequeue_nowait()) != NULL) {        mDispatchedOps.push_back(op);    }        for_each(mDispatchedOps.begin(), mDispatchedOps.end(),             OpFailer(-EHOSTUNREACH));    mDispatchedOps.clear();}voidRemoteSyncSM::Finish(){#ifdef DEBUG    verifyExecutingOnEventProcessor();#endif        FailAllOps();    // if the object was owned by the chunkserver, have it release the reference    gChunkServer.RemoveServer(this);}//// Utility functions to operate on a list of remotesync servers//class RemoteSyncSMMatcher {    ServerLocation myLoc;public:    RemoteSyncSMMatcher(const ServerLocation &loc) :        myLoc(loc) { }    bool operator() (RemoteSyncSMPtr other) {        return other->GetLocation() == myLoc;    }};RemoteSyncSMPtrKFS::FindServer(list<RemoteSyncSMPtr> &remoteSyncers, const ServerLocation &location,                 bool connect){    list<RemoteSyncSMPtr>::iterator i;    RemoteSyncSMPtr peer;    i = find_if(remoteSyncers.begin(), remoteSyncers.end(),                RemoteSyncSMMatcher(location));    if (i != remoteSyncers.end()) {        peer = *i;        return peer;    }    if (!connect)        return peer;    peer.reset(new RemoteSyncSM(location));    if (peer->Connect()) {        remoteSyncers.push_back(peer);    } else {        // we couldn't connect...so, force destruction        peer.reset();    }    return peer;}voidKFS::RemoveServer(list<RemoteSyncSMPtr> &remoteSyncers, RemoteSyncSM *target){    list<RemoteSyncSMPtr>::iterator i;    i = find_if(remoteSyncers.begin(), remoteSyncers.end(),                RemoteSyncSMMatcher(target->GetLocation()));    if (i != remoteSyncers.end()) {        remoteSyncers.erase(i);    }}voidKFS::ReleaseAllServers(list<RemoteSyncSMPtr> &remoteSyncers){    list<RemoteSyncSMPtr>::iterator i;    while (1) {        i = remoteSyncers.begin();        if (i == remoteSyncers.end())            break;        RemoteSyncSMPtr r = *i;        remoteSyncers.erase(i);        r->Finish();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -