⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsops.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 4 页
字号:
static boolneedToForwardWrite(string &serverInfo, uint32_t numServers, int &myPos,                   ServerLocation &peerLoc,                    bool isWriteIdPresent, int64_t &writeId){    istringstream ist(serverInfo);    ServerLocation loc;    bool foundLocal = false;    int64_t id;    bool needToForward = false;    // the list of servers is ordered: we forward to the next one    // in the list.    for (uint32_t i = 0; i < numServers; i++) {        ist >> loc.hostname;        ist >> loc.port;        if (isWriteIdPresent)            ist >> id;                    if (gChunkServer.IsLocalServer(loc)) {            // return the position of where this server is present in the list            myPos = i;             foundLocal = true;            if (isWriteIdPresent)                writeId = id;            continue;        }        // forward if we are not the last in the list        if (foundLocal) {            needToForward = true;            break;        }    }    peerLoc = loc;    return needToForward;}voidWriteIdAllocOp::Execute(){    // check if we need to forward anywhere    bool needToForward = false, writeMaster = false;    int64_t dummy;    int myPos = 1;    ServerLocation peerLoc;    if (numServers > 0) {        needToForward = needToForwardWrite(servers, numServers, myPos, peerLoc, false, dummy);        writeMaster = (myPos == 0);        if (needToForward) {            status = ForwardToPeer(peerLoc);            if (status < 0) {                if (gLeaseClerk.IsLeaseValid(chunkId)) {                    // The write-id allocation has failed; we don't want to renew the lease.                    // Now, when the client forces a re-allocation, the                    // metaserver will do a version bump; when the node that                    // was dead comes back, we can detect it has missed a write                    gLeaseClerk.RelinquishLease(chunkId);                }                // can't forward to peer...so fail the write-id allocation                gLogger.Submit(this);                return;            }        }    }    if (writeMaster) {        // Notify the lease clerk that we are doing write.  This is to        // signal the lease clerk to renew the lease for the chunk when appropriate.        gLeaseClerk.DoingWrite(chunkId);    }        status = gChunkManager.AllocateWriteId(this);    if (status == 0)        writeIdStr = gChunkServer.GetMyLocation() + " " +             boost::lexical_cast<string>(writeId);    // don't need to forward: we are done    // clnt->HandleEvent(EVENT_CMD_DONE, this);    if (!needToForward) {        KFS_LOG_VA_INFO("Write-id alloc: %s", Show().c_str());        ReadChunkMetadata();    }}intWriteIdAllocOp::ForwardToPeer(const ServerLocation &loc){    ClientSM *client = static_cast<ClientSM *>(clnt);    assert(client != NULL);    RemoteSyncSMPtr peer = client->FindServer(loc);        if (!peer) {        KFS_LOG_VA_DEBUG("Unable to find syncSM to peer: %s", loc.ToString().c_str());        // flag the error        return -EHOSTUNREACH;    }    fwdedOp = new WriteIdAllocOp(peer->NextSeqnum(), this);    // when this op comes, need to notify "this"    fwdedOp->clnt = this;    // KFS_LOG_VA_INFO("Sending write to peer: %s",    // writeFwdOp->Show().c_str());    KFS_LOG_VA_INFO("Fwd'ing write-id alloc to peer: %s", fwdedOp->Show().c_str());    SET_HANDLER(this, &WriteIdAllocOp::HandlePeerReply);    peer->Enqueue(fwdedOp);        return 0;}intWriteIdAllocOp::HandlePeerReply(int code, void *data){    WriteIdAllocOp *op = static_cast<WriteIdAllocOp *> (data);    if (op->status < 0) {        status = op->status;        // return clnt->HandleEvent(EVENT_CMD_DONE, this);        if (gLeaseClerk.IsLeaseValid(chunkId)) {            // The write has failed; we don't want to renew the lease.            // Now, when the client forces a re-allocation, the            // metaserver will do a version bump; when the node that            // was dead comes back, we can detect it has missed a write            gLeaseClerk.RelinquishLease(chunkId);        }        KFS_LOG_VA_INFO("write-id alloc failed: %s, code = %d", Show().c_str(), status);                gLogger.Submit(this);        return 0;    }    writeIdStr.append(" ");    writeIdStr.append(op->writeIdStr);    ReadChunkMetadata();    return 0;}voidWriteIdAllocOp::ReadChunkMetadata(){    int res;    // Now, we are all done pending metadata read    SET_HANDLER(this, &WriteIdAllocOp::HandleDone);    // page in the chunk meta-data if needed    if (!gChunkManager.NeedToReadChunkMetadata(chunkId)) {        HandleDone(0, NULL);        return;    }    // if the read was successful, the call to read will callback handle-done    if ((res = gChunkManager.ReadChunkMetadata(chunkId, this)) < 0) {        KFS_LOG_VA_INFO("Unable read chunk metadata for chunk: %lld; error = %d", chunkId, res);        status = -EINVAL;        HandleDone(0, NULL);                return;    }}intWriteIdAllocOp::HandleDone(int code, void *data){    KFS_LOG_VA_INFO("Sending write-id alloc back (status = %d): %s", status, Show().c_str());    gLogger.Submit(this);    return 0;}void WritePrepareOp::Execute(){    ServerLocation peerLoc;    int myPos;    UpdateCounter(CMD_WRITE_PREPARE);    SET_HANDLER(this, &WritePrepareOp::HandleDone);            // check if we need to forward anywhere    bool needToForward = false, writeMaster;    needToForward = needToForwardWrite(servers, numServers, myPos, peerLoc, true, writeId);    writeMaster = (myPos == 0);    if (!gChunkManager.IsValidWriteId(writeId)) {        status = -EINVAL;        gLogger.Submit(this);        return;    }    if (!gChunkManager.IsChunkMetadataLoaded(chunkId)) {        if (gLeaseClerk.IsLeaseValid(chunkId)) {            // The write-id allocation has failed; we don't want to renew the lease.            // Now, when the client forces a re-allocation, the            // metaserver will do a version bump; when the node that            // was dead comes back, we can detect it has missed a write            gLeaseClerk.RelinquishLease(chunkId);        }                KFS_LOG_VA_INFO("Write prepare failed...checksums are not loaded; so lease expired for %ld",                        chunkId);        status = -KFS::ELEASEEXPIRED;        gLogger.Submit(this);        return;    }    if (writeMaster) {        // if we are the master, check the lease...        if (!gLeaseClerk.IsLeaseValid(chunkId)) {            KFS_LOG_VA_INFO("Write prepare failed...as lease expired for %ld",                             chunkId);            status = -KFS::ELEASEEXPIRED;            gLogger.Submit(this);            return;        }        // Notify the lease clerk that we are doing write.  This is to        // signal the lease clerk to renew the lease for the chunk when appropriate.        gLeaseClerk.DoingWrite(chunkId);    }    if (checksum != 0) {        uint32_t val = ComputeBlockChecksum(dataBuf, numBytes);        if (val != checksum) {            KFS_LOG_VA_INFO("Checksum mismatch: sent = %u, computed = %u for %s",                            checksum, val, Show().c_str());            status = -EBADCKSUM;            // so that the error goes out on a sync            gChunkManager.SetWriteStatus(writeId, status);            gLogger.Submit(this);            return;        }    }    if (needToForward) {        IOBuffer *clonedData = dataBuf->Clone();        status = ForwardToPeer(peerLoc, clonedData);        if (status < 0) {            delete clonedData;            // so that the error goes out on a sync            gChunkManager.SetWriteStatus(writeId, status);            if (gLeaseClerk.IsLeaseValid(chunkId)) {                // The write has failed; we don't want to renew the lease.                // Now, when the client forces a re-allocation, the                // metaserver will do a version bump; when the node that                // was dead comes back, we can detect it has missed a write                gLeaseClerk.RelinquishLease(chunkId);            }            // can't forward to peer...so fail the write            gLogger.Submit(this);            return;        }    }    // will clone only when the op is good    writeOp = gChunkManager.CloneWriteOp(writeId);    UpdateCounter(CMD_WRITE);    assert(writeOp != NULL);    writeOp->offset = offset;    writeOp->numBytes = numBytes;    writeOp->dataBuf = dataBuf;    writeOp->wpop = this;    writeOp->checksums.push_back(checksum);    dataBuf = NULL;    writeOp->enqueueTime = time(NULL);    KFS_LOG_VA_INFO("Writing to chunk=%lld, @offset=%lld, nbytes=%lld, checksum=%u",                    chunkId, offset, numBytes, checksum);    status = gChunkManager.WriteChunk(writeOp);        if (status < 0)        gLogger.Submit(this);}intWritePrepareOp::ForwardToPeer(const ServerLocation &loc, IOBuffer *dataBuf){    ClientSM *client = static_cast<ClientSM *>(clnt);    assert(client != NULL);    RemoteSyncSMPtr peer = client->FindServer(loc);        if (!peer) {        KFS_LOG_VA_INFO("Unable to find syncSM to peer: %s", loc.ToString().c_str());        // flag the error        return -EHOSTUNREACH;    }    writeFwdOp = new WritePrepareFwdOp(peer->NextSeqnum(), this, dataBuf, loc);    // KFS_LOG_VA_INFO("Sending write to peer: %s",    // writeFwdOp->Show().c_str());    KFS_LOG_VA_DEBUG("Fwd'ing write to peer: %s", writeFwdOp->Show().c_str());    peer->Enqueue(writeFwdOp);        return 0;}intWritePrepareOp::HandleDone(int code, void *data){    WritePrepareFwdOp *op = static_cast<WritePrepareFwdOp *> (data);#ifdef DEBUG    verifyExecutingOnEventProcessor();    #endif    if ((op != NULL) && (op->status < 0)) {        status = op->status;        if (gLeaseClerk.IsLeaseValid(chunkId)) {            // The write has failed; we don't want to renew the lease.            // Now, when the client forces a re-allocation, the            // metaserver will do a version bump; when the node that            // was dead comes back, we can detect it has missed a write            gLeaseClerk.RelinquishLease(chunkId);        }    }    numDone++;    if ((writeFwdOp == NULL) || (numDone >= 2)) {        gLogger.Submit(this);    }    return 0;}void WriteSyncOp::Execute(){    ServerLocation peerLoc;    int myPos;    bool needToWriteMetadata = true;    UpdateCounter(CMD_WRITE_SYNC);    KFS_LOG_VA_INFO("Executing write sync: %s", Show().c_str());    // check if we need to forward anywhere    bool needToForward = false;    if (numServers > 0) {        needToForward = needToForwardWrite(servers, numServers, myPos, peerLoc, true, writeId);        if (myPos == 0)            writeMaster = true;    }    writeOp = gChunkManager.CloneWriteOp(writeId);    if (writeOp == NULL) {        KFS_LOG_VA_INFO("Write sync failed...unable to find write-id: %ld", writeId);        status = -EINVAL;        gLogger.Submit(this);        return;    }        writeOp->enqueueTime = time(NULL);    if (writeOp->status < 0) {        // due to failures with data fwd'ing/checksum errors and such        status = writeOp->status;        gLogger.Submit(this);        return;    }    if (!gChunkManager.IsChunkMetadataLoaded(chunkId)) {        off_t csize;        gChunkManager.ChunkSize(chunkId, &csize);        if (csize > 0 && (csize >= (off_t) KFS::CHUNKSIZE)) {            // the metadata block could be paged out by a previous sync            needToWriteMetadata = false;        } else {            KFS_LOG_VA_INFO("Write sync failed...checksums got paged out?; so lease expired for %ld",                            chunkId);            status = -KFS::ELEASEEXPIRED;            gLogger.Submit(this);            return;        }    }    if (writeMaster) {        // if we are the master, check the lease...        if (!gLeaseClerk.IsLeaseValid(chunkId)) {            KFS_LOG_VA_INFO("Write sync failed...lease expired for %ld",                            chunkId);            status = -KFS::ELEASEEXPIRED;            gLogger.Submit(this);            return;        }        // Notify the lease clerk that we are doing write.  This is to        // signal the lease clerk to renew the lease for the chunk when appropriate.        gLeaseClerk.DoingWrite(chunkId);    }    if (needToForward) {        status = ForwardToPeer(peerLoc);        if (status < 0) {            // write can't be forwarded; so give up the lease, so that            // we can force re-allocation            gLeaseClerk.RelinquishLease(chunkId);            // can't forward to peer...so fail the write            gLogger.Submit(this);            return;        }    }    // commit writes on local/remote servers    SET_HANDLER(this, &WriteSyncOp::HandleDone);    if (!needToWriteMetadata) {        status = 0;        HandleDone(0, NULL);        return;    }            status = gChunkManager.WriteChunkMetadata(chunkId, this);    assert(status >= 0);    if (status < 0)        HandleDone(0, NULL);        // writeOp->wsop = this;    // XXX: validate id/version?    // validate the # of bytes is everything we got....otherwise, fail the op    // writeOp->Execute();}intWriteSyncOp::ForwardToPeer(const ServerLocation &loc){    ClientSM *client = static_cast<ClientSM *>(clnt);    assert(client != NULL);    RemoteSyncSMPtr peer = client->FindServer(loc);        if (!peer) {        KFS_LOG_VA_INFO("Unable to find syncSM to peer: %s", loc.ToString().c_str());        // flag the error        return -EHOSTUNREACH;    }    fwdedOp = new WriteSyncOp(peer->NextSeqnum(), chunkId, chunkVersion);    fwdedOp->numServers = numServers;    fwdedOp->servers = servers;    fwdedOp->clnt = this;    SET_HANDLER(fwdedOp, &WriteSyncOp::HandlePeerReply);    KFS_LOG_VA_DEBUG("Fwd'ing write-sync to peer: %s", fwdedOp->Show().c_str());    peer->Enqueue(fwdedOp);        return 0;}int WriteSyncOp::HandlePeerReply(int code, void *data){    assert(clnt != NULL);    return clnt->HandleEvent(code, this);}intWriteSyncOp::HandleDone(int code, void *data){    KfsOp *op = static_cast<KfsOp *> (data);#ifdef DEBUG    verifyExecutingOnEventProcessor();    #endif    if (op && (op->status < 0)) {        status = op->status;        KFS_LOG_VA_INFO("Peer (%s) returned: %d", op->Show().c_str(), op->status);    }    if ((status < 0) && (gLeaseClerk.IsLeaseValid(chunkId))) {        // The write has failed; we don't want to renew the lease.        // Now, when the client forces a re-allocation, the        // metaserver will do a version bump; when the node that        // was dead comes back, we can detect it has missed a write        gLeaseClerk.RelinquishLease(chunkId);    }    numDone++;    if ((fwdedOp == NULL) || (numDone >= 2)) {        // either no forwarding or local/fwding is also done...so, we are done        gLogger.Submit(this);    }    return 0;}void WriteOp::Execute(){    UpdateCounter(CMD_WRITE);    status = gChunkManager.WriteChunk(this);    if (status < 0) {        assert(wpop != NULL);        wpop->HandleEvent(EVENT_CMD_DONE, this);    }}voidSizeOp::Execute(){    UpdateCounter(CMD_SIZE);    status = gChunkManager.ChunkSize(chunkId, &size);    // clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);}voidGetChunkMetadataOp::Execute(){    ChunkInfo_t ci;    UpdateCounter(CMD_GET_CHUNK_METADATA);    SET_HANDLER(this, &GetChunkMetadataOp::HandleChunkMetaReadDone);    if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) {        status = -EINVAL;        gLogger.Submit(this);    }}intGetChunkMetadataOp::HandleChunkMetaReadDone(int code, void *data){    status = *(int *) data;    if (status < 0) {        gLogger.Submit(this);        return 0;    }    uint32_t *checksums = NULL;    status = gChunkManager.GetChunkChecksums(chunkId, &checksums);    if ((status == 0) && (checksums != NULL)) {        chunkVersion = gChunkManager.GetChunkVersion(chunkId);        gChunkManager.ChunkSize(chunkId, &chunkSize);        dataBuf = new IOBuffer();        dataBuf->CopyIn((const char *) checksums, MAX_CHUNK_CHECKSUM_BLOCKS * sizeof(uint32_t));        numBytesIO = dataBuf->BytesConsumable();    }    // clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);    return 0;}voidPingOp::Execute(){    totalSpace = gChunkManager.GetTotalSpace();    usedSpace = gChunkManager.GetUsedSpace();    status = 0;    // clnt->HandleEvent(EVENT_CMD_DONE, this);    gLogger.Submit(this);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -