📄 kfsops.cc
字号:
static boolneedToForwardWrite(string &serverInfo, uint32_t numServers, int &myPos, ServerLocation &peerLoc, bool isWriteIdPresent, int64_t &writeId){ istringstream ist(serverInfo); ServerLocation loc; bool foundLocal = false; int64_t id; bool needToForward = false; // the list of servers is ordered: we forward to the next one // in the list. for (uint32_t i = 0; i < numServers; i++) { ist >> loc.hostname; ist >> loc.port; if (isWriteIdPresent) ist >> id; if (gChunkServer.IsLocalServer(loc)) { // return the position of where this server is present in the list myPos = i; foundLocal = true; if (isWriteIdPresent) writeId = id; continue; } // forward if we are not the last in the list if (foundLocal) { needToForward = true; break; } } peerLoc = loc; return needToForward;}voidWriteIdAllocOp::Execute(){ // check if we need to forward anywhere bool needToForward = false, writeMaster = false; int64_t dummy; int myPos = 1; ServerLocation peerLoc; if (numServers > 0) { needToForward = needToForwardWrite(servers, numServers, myPos, peerLoc, false, dummy); writeMaster = (myPos == 0); if (needToForward) { status = ForwardToPeer(peerLoc); if (status < 0) { if (gLeaseClerk.IsLeaseValid(chunkId)) { // The write-id allocation has failed; we don't want to renew the lease. // Now, when the client forces a re-allocation, the // metaserver will do a version bump; when the node that // was dead comes back, we can detect it has missed a write gLeaseClerk.RelinquishLease(chunkId); } // can't forward to peer...so fail the write-id allocation gLogger.Submit(this); return; } } } if (writeMaster) { // Notify the lease clerk that we are doing write. This is to // signal the lease clerk to renew the lease for the chunk when appropriate. gLeaseClerk.DoingWrite(chunkId); } status = gChunkManager.AllocateWriteId(this); if (status == 0) writeIdStr = gChunkServer.GetMyLocation() + " " + boost::lexical_cast<string>(writeId); // don't need to forward: we are done // clnt->HandleEvent(EVENT_CMD_DONE, this); if (!needToForward) { KFS_LOG_VA_INFO("Write-id alloc: %s", Show().c_str()); ReadChunkMetadata(); }}intWriteIdAllocOp::ForwardToPeer(const ServerLocation &loc){ ClientSM *client = static_cast<ClientSM *>(clnt); assert(client != NULL); RemoteSyncSMPtr peer = client->FindServer(loc); if (!peer) { KFS_LOG_VA_DEBUG("Unable to find syncSM to peer: %s", loc.ToString().c_str()); // flag the error return -EHOSTUNREACH; } fwdedOp = new WriteIdAllocOp(peer->NextSeqnum(), this); // when this op comes, need to notify "this" fwdedOp->clnt = this; // KFS_LOG_VA_INFO("Sending write to peer: %s", // writeFwdOp->Show().c_str()); KFS_LOG_VA_INFO("Fwd'ing write-id alloc to peer: %s", fwdedOp->Show().c_str()); SET_HANDLER(this, &WriteIdAllocOp::HandlePeerReply); peer->Enqueue(fwdedOp); return 0;}intWriteIdAllocOp::HandlePeerReply(int code, void *data){ WriteIdAllocOp *op = static_cast<WriteIdAllocOp *> (data); if (op->status < 0) { status = op->status; // return clnt->HandleEvent(EVENT_CMD_DONE, this); if (gLeaseClerk.IsLeaseValid(chunkId)) { // The write has failed; we don't want to renew the lease. // Now, when the client forces a re-allocation, the // metaserver will do a version bump; when the node that // was dead comes back, we can detect it has missed a write gLeaseClerk.RelinquishLease(chunkId); } KFS_LOG_VA_INFO("write-id alloc failed: %s, code = %d", Show().c_str(), status); gLogger.Submit(this); return 0; } writeIdStr.append(" "); writeIdStr.append(op->writeIdStr); ReadChunkMetadata(); return 0;}voidWriteIdAllocOp::ReadChunkMetadata(){ int res; // Now, we are all done pending metadata read SET_HANDLER(this, &WriteIdAllocOp::HandleDone); // page in the chunk meta-data if needed if (!gChunkManager.NeedToReadChunkMetadata(chunkId)) { HandleDone(0, NULL); return; } // if the read was successful, the call to read will callback handle-done if ((res = gChunkManager.ReadChunkMetadata(chunkId, this)) < 0) { KFS_LOG_VA_INFO("Unable read chunk metadata for chunk: %lld; error = %d", chunkId, res); status = -EINVAL; HandleDone(0, NULL); return; }}intWriteIdAllocOp::HandleDone(int code, void *data){ KFS_LOG_VA_INFO("Sending write-id alloc back (status = %d): %s", status, Show().c_str()); gLogger.Submit(this); return 0;}void WritePrepareOp::Execute(){ ServerLocation peerLoc; int myPos; UpdateCounter(CMD_WRITE_PREPARE); SET_HANDLER(this, &WritePrepareOp::HandleDone); // check if we need to forward anywhere bool needToForward = false, writeMaster; needToForward = needToForwardWrite(servers, numServers, myPos, peerLoc, true, writeId); writeMaster = (myPos == 0); if (!gChunkManager.IsValidWriteId(writeId)) { status = -EINVAL; gLogger.Submit(this); return; } if (!gChunkManager.IsChunkMetadataLoaded(chunkId)) { if (gLeaseClerk.IsLeaseValid(chunkId)) { // The write-id allocation has failed; we don't want to renew the lease. // Now, when the client forces a re-allocation, the // metaserver will do a version bump; when the node that // was dead comes back, we can detect it has missed a write gLeaseClerk.RelinquishLease(chunkId); } KFS_LOG_VA_INFO("Write prepare failed...checksums are not loaded; so lease expired for %ld", chunkId); status = -KFS::ELEASEEXPIRED; gLogger.Submit(this); return; } if (writeMaster) { // if we are the master, check the lease... if (!gLeaseClerk.IsLeaseValid(chunkId)) { KFS_LOG_VA_INFO("Write prepare failed...as lease expired for %ld", chunkId); status = -KFS::ELEASEEXPIRED; gLogger.Submit(this); return; } // Notify the lease clerk that we are doing write. This is to // signal the lease clerk to renew the lease for the chunk when appropriate. gLeaseClerk.DoingWrite(chunkId); } if (checksum != 0) { uint32_t val = ComputeBlockChecksum(dataBuf, numBytes); if (val != checksum) { KFS_LOG_VA_INFO("Checksum mismatch: sent = %u, computed = %u for %s", checksum, val, Show().c_str()); status = -EBADCKSUM; // so that the error goes out on a sync gChunkManager.SetWriteStatus(writeId, status); gLogger.Submit(this); return; } } if (needToForward) { IOBuffer *clonedData = dataBuf->Clone(); status = ForwardToPeer(peerLoc, clonedData); if (status < 0) { delete clonedData; // so that the error goes out on a sync gChunkManager.SetWriteStatus(writeId, status); if (gLeaseClerk.IsLeaseValid(chunkId)) { // The write has failed; we don't want to renew the lease. // Now, when the client forces a re-allocation, the // metaserver will do a version bump; when the node that // was dead comes back, we can detect it has missed a write gLeaseClerk.RelinquishLease(chunkId); } // can't forward to peer...so fail the write gLogger.Submit(this); return; } } // will clone only when the op is good writeOp = gChunkManager.CloneWriteOp(writeId); UpdateCounter(CMD_WRITE); assert(writeOp != NULL); writeOp->offset = offset; writeOp->numBytes = numBytes; writeOp->dataBuf = dataBuf; writeOp->wpop = this; writeOp->checksums.push_back(checksum); dataBuf = NULL; writeOp->enqueueTime = time(NULL); KFS_LOG_VA_INFO("Writing to chunk=%lld, @offset=%lld, nbytes=%lld, checksum=%u", chunkId, offset, numBytes, checksum); status = gChunkManager.WriteChunk(writeOp); if (status < 0) gLogger.Submit(this);}intWritePrepareOp::ForwardToPeer(const ServerLocation &loc, IOBuffer *dataBuf){ ClientSM *client = static_cast<ClientSM *>(clnt); assert(client != NULL); RemoteSyncSMPtr peer = client->FindServer(loc); if (!peer) { KFS_LOG_VA_INFO("Unable to find syncSM to peer: %s", loc.ToString().c_str()); // flag the error return -EHOSTUNREACH; } writeFwdOp = new WritePrepareFwdOp(peer->NextSeqnum(), this, dataBuf, loc); // KFS_LOG_VA_INFO("Sending write to peer: %s", // writeFwdOp->Show().c_str()); KFS_LOG_VA_DEBUG("Fwd'ing write to peer: %s", writeFwdOp->Show().c_str()); peer->Enqueue(writeFwdOp); return 0;}intWritePrepareOp::HandleDone(int code, void *data){ WritePrepareFwdOp *op = static_cast<WritePrepareFwdOp *> (data);#ifdef DEBUG verifyExecutingOnEventProcessor(); #endif if ((op != NULL) && (op->status < 0)) { status = op->status; if (gLeaseClerk.IsLeaseValid(chunkId)) { // The write has failed; we don't want to renew the lease. // Now, when the client forces a re-allocation, the // metaserver will do a version bump; when the node that // was dead comes back, we can detect it has missed a write gLeaseClerk.RelinquishLease(chunkId); } } numDone++; if ((writeFwdOp == NULL) || (numDone >= 2)) { gLogger.Submit(this); } return 0;}void WriteSyncOp::Execute(){ ServerLocation peerLoc; int myPos; bool needToWriteMetadata = true; UpdateCounter(CMD_WRITE_SYNC); KFS_LOG_VA_INFO("Executing write sync: %s", Show().c_str()); // check if we need to forward anywhere bool needToForward = false; if (numServers > 0) { needToForward = needToForwardWrite(servers, numServers, myPos, peerLoc, true, writeId); if (myPos == 0) writeMaster = true; } writeOp = gChunkManager.CloneWriteOp(writeId); if (writeOp == NULL) { KFS_LOG_VA_INFO("Write sync failed...unable to find write-id: %ld", writeId); status = -EINVAL; gLogger.Submit(this); return; } writeOp->enqueueTime = time(NULL); if (writeOp->status < 0) { // due to failures with data fwd'ing/checksum errors and such status = writeOp->status; gLogger.Submit(this); return; } if (!gChunkManager.IsChunkMetadataLoaded(chunkId)) { off_t csize; gChunkManager.ChunkSize(chunkId, &csize); if (csize > 0 && (csize >= (off_t) KFS::CHUNKSIZE)) { // the metadata block could be paged out by a previous sync needToWriteMetadata = false; } else { KFS_LOG_VA_INFO("Write sync failed...checksums got paged out?; so lease expired for %ld", chunkId); status = -KFS::ELEASEEXPIRED; gLogger.Submit(this); return; } } if (writeMaster) { // if we are the master, check the lease... if (!gLeaseClerk.IsLeaseValid(chunkId)) { KFS_LOG_VA_INFO("Write sync failed...lease expired for %ld", chunkId); status = -KFS::ELEASEEXPIRED; gLogger.Submit(this); return; } // Notify the lease clerk that we are doing write. This is to // signal the lease clerk to renew the lease for the chunk when appropriate. gLeaseClerk.DoingWrite(chunkId); } if (needToForward) { status = ForwardToPeer(peerLoc); if (status < 0) { // write can't be forwarded; so give up the lease, so that // we can force re-allocation gLeaseClerk.RelinquishLease(chunkId); // can't forward to peer...so fail the write gLogger.Submit(this); return; } } // commit writes on local/remote servers SET_HANDLER(this, &WriteSyncOp::HandleDone); if (!needToWriteMetadata) { status = 0; HandleDone(0, NULL); return; } status = gChunkManager.WriteChunkMetadata(chunkId, this); assert(status >= 0); if (status < 0) HandleDone(0, NULL); // writeOp->wsop = this; // XXX: validate id/version? // validate the # of bytes is everything we got....otherwise, fail the op // writeOp->Execute();}intWriteSyncOp::ForwardToPeer(const ServerLocation &loc){ ClientSM *client = static_cast<ClientSM *>(clnt); assert(client != NULL); RemoteSyncSMPtr peer = client->FindServer(loc); if (!peer) { KFS_LOG_VA_INFO("Unable to find syncSM to peer: %s", loc.ToString().c_str()); // flag the error return -EHOSTUNREACH; } fwdedOp = new WriteSyncOp(peer->NextSeqnum(), chunkId, chunkVersion); fwdedOp->numServers = numServers; fwdedOp->servers = servers; fwdedOp->clnt = this; SET_HANDLER(fwdedOp, &WriteSyncOp::HandlePeerReply); KFS_LOG_VA_DEBUG("Fwd'ing write-sync to peer: %s", fwdedOp->Show().c_str()); peer->Enqueue(fwdedOp); return 0;}int WriteSyncOp::HandlePeerReply(int code, void *data){ assert(clnt != NULL); return clnt->HandleEvent(code, this);}intWriteSyncOp::HandleDone(int code, void *data){ KfsOp *op = static_cast<KfsOp *> (data);#ifdef DEBUG verifyExecutingOnEventProcessor(); #endif if (op && (op->status < 0)) { status = op->status; KFS_LOG_VA_INFO("Peer (%s) returned: %d", op->Show().c_str(), op->status); } if ((status < 0) && (gLeaseClerk.IsLeaseValid(chunkId))) { // The write has failed; we don't want to renew the lease. // Now, when the client forces a re-allocation, the // metaserver will do a version bump; when the node that // was dead comes back, we can detect it has missed a write gLeaseClerk.RelinquishLease(chunkId); } numDone++; if ((fwdedOp == NULL) || (numDone >= 2)) { // either no forwarding or local/fwding is also done...so, we are done gLogger.Submit(this); } return 0;}void WriteOp::Execute(){ UpdateCounter(CMD_WRITE); status = gChunkManager.WriteChunk(this); if (status < 0) { assert(wpop != NULL); wpop->HandleEvent(EVENT_CMD_DONE, this); }}voidSizeOp::Execute(){ UpdateCounter(CMD_SIZE); status = gChunkManager.ChunkSize(chunkId, &size); // clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this);}voidGetChunkMetadataOp::Execute(){ ChunkInfo_t ci; UpdateCounter(CMD_GET_CHUNK_METADATA); SET_HANDLER(this, &GetChunkMetadataOp::HandleChunkMetaReadDone); if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) { status = -EINVAL; gLogger.Submit(this); }}intGetChunkMetadataOp::HandleChunkMetaReadDone(int code, void *data){ status = *(int *) data; if (status < 0) { gLogger.Submit(this); return 0; } uint32_t *checksums = NULL; status = gChunkManager.GetChunkChecksums(chunkId, &checksums); if ((status == 0) && (checksums != NULL)) { chunkVersion = gChunkManager.GetChunkVersion(chunkId); gChunkManager.ChunkSize(chunkId, &chunkSize); dataBuf = new IOBuffer(); dataBuf->CopyIn((const char *) checksums, MAX_CHUNK_CHECKSUM_BLOCKS * sizeof(uint32_t)); numBytesIO = dataBuf->BytesConsumable(); } // clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this); return 0;}voidPingOp::Execute(){ totalSpace = gChunkManager.GetTotalSpace(); usedSpace = gChunkManager.GetUsedSpace(); status = 0; // clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -