📄 kfsops.cc
字号:
voidDumpChunkMapOp::Execute(){ // Dump chunk map gChunkManager.DumpChunkMap(); status = 0; gLogger.Submit(this);}voidStatsOp::Execute(){ ostringstream os; os << "Num aios: " << globals().diskManager.NumDiskIOOutstanding() << "\r\n"; os << "Num ops: " << gChunkServer.GetNumOps() << "\r\n"; globals().counterManager.Show(os); stats = os.str(); status = 0; // clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this);}intAllocChunkOp::HandleChunkMetaWriteDone(int code, void *data){ gLogger.Submit(this); return 0;}intTruncateChunkOp::HandleChunkMetaWriteDone(int code, void *data){ gLogger.Submit(this); return 0;}intChangeChunkVersOp::HandleChunkMetaWriteDone(int code, void *data){ gLogger.Submit(this); return 0;}////// Generate response for an op based on the KFS protocol.///voidKfsOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n\r\n";}voidSizeOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; if (status < 0) { os << "Status: " << status << "\r\n\r\n"; return; } os << "Status: " << status << "\r\n"; os << "Size: " << size << "\r\n\r\n";}voidGetChunkMetadataOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; if (status < 0) { os << "Status: " << status << "\r\n\r\n"; return; } os << "Status: " << status << "\r\n"; os << "Chunk-handle: " << chunkId << "\r\n"; os << "Chunk-version: " << chunkVersion << "\r\n"; os << "Size: " << chunkSize << "\r\n"; os << "Content-length: " << numBytesIO << "\r\n\r\n";}voidReadOp::Response(ostringstream &os){ // DecrementCounter(CMD_READ); os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; if (status < 0) { os << "Status: " << status << "\r\n\r\n"; return; } os << "Status: " << status << "\r\n"; os << "DiskIOtime: " << diskIOTime << "\r\n"; os << "Checksum-entries: " << checksum.size() << "\r\n"; if (checksum.size() == 0) { os << "Checksums: " << 0 << "\r\n"; } else { os << "Checksums: "; for (uint32_t i = 0; i < checksum.size(); i++) os << checksum[i] << ' '; os << "\r\n"; } os << "Content-length: " << numBytesIO << "\r\n\r\n";}voidWriteIdAllocOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; if (status < 0) { os << "Status: " << status << "\r\n\r\n"; return; } os << "Status: " << status << "\r\n"; // os << "Write-id: " << writeId << "\r\n\r\n"; os << "Write-id: " << writeIdStr << "\r\n\r\n";}voidWritePrepareOp::Response(ostringstream &os){ // no reply for a prepare...the reply is covered by sync if (1) return; os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; if (status < 0) { os << "Status: " << status << "\r\n\r\n"; return; } os << "Status: " << status << "\r\n\r\n";}voidSizeOp::Request(ostringstream &os){ os << "SIZE \r\n"; os << "Cseq: " << seq << "\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Chunk-handle: " << chunkId << "\r\n"; os << "Chunk-version: " << chunkVersion << "\r\n\r\n";}voidGetChunkMetadataOp::Request(ostringstream &os){ os << "GET_CHUNK_METADATA \r\n"; os << "Cseq: " << seq << "\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Chunk-handle: " << chunkId << "\r\n\r\n";}voidReadOp::Request(ostringstream &os){ os << "READ \r\n"; os << "Cseq: " << seq << "\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Chunk-handle: " << chunkId << "\r\n"; os << "Chunk-version: " << chunkVersion << "\r\n"; os << "Offset: " << offset << "\r\n"; os << "Num-bytes: " << numBytes << "\r\n\r\n";}voidWriteIdAllocOp::Request(ostringstream &os){ os << "WRITE_ID_ALLOC\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Chunk-handle:" << chunkId << "\r\n"; os << "Chunk-version:" << chunkVersion << "\r\n"; os << "Offset: " << offset << "\r\n"; os << "Num-bytes: " << numBytes << "\r\n"; os << "Num-servers: " << numServers << "\r\n"; os << "Servers: " << servers << "\r\n\r\n";}voidWritePrepareFwdOp::Request(ostringstream &os){ os << "WRITE_PREPARE\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Chunk-handle:" << owner->chunkId << "\r\n"; os << "Chunk-version:" << owner->chunkVersion << "\r\n"; os << "Offset: " << owner->offset << "\r\n"; os << "Num-bytes: " << owner->numBytes << "\r\n"; os << "Checksum: " << owner->checksum << "\r\n"; os << "Num-servers: " << owner->numServers << "\r\n"; os << "Servers: " << owner->servers << "\r\n\r\n";}voidWriteSyncOp::Request(ostringstream &os){ os << "WRITE_SYNC\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Chunk-handle:" << chunkId << "\r\n"; os << "Chunk-version:" << chunkVersion << "\r\n"; os << "Num-servers: " << numServers << "\r\n"; os << "Servers: " << servers << "\r\n\r\n";}voidWriteSyncOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n\r\n"; }voidAllocChunkOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n\r\n";}voidHeartbeatOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n"; os << "Total-space: " << totalSpace << "\r\n"; os << "Used-space: " << usedSpace << "\r\n"; os << "Num-chunks: " << numChunks << "\r\n\r\n";}voidStaleChunksOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n\r\n";}voidReplicateChunkOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n"; if (status == 0) { os << "File-handle: " << fid << "\r\n"; os << "Chunk-version: " << chunkVersion << "\r\n"; } os << "\r\n";}voidPingOp::Response(ostringstream &os){ ServerLocation loc = gMetaServerSM.GetLocation(); os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n"; os << "Meta-server-host: " << loc.hostname << "\r\n"; os << "Meta-server-port: " << loc.port << "\r\n"; os << "Total-space: " << totalSpace << "\r\n"; os << "Used-space: " << usedSpace << "\r\n\r\n";}voidDumpChunkMapOp::Response(ostringstream &os){ ostringstream v; gChunkManager.DumpChunkMap(v); os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n"; os << "Content-length: " << v.str().length() << "\r\n\r\n"; if (v.str().length() > 0) os << v.str();}voidStatsOp::Response(ostringstream &os){ os << "OK\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Status: " << status << "\r\n"; os << stats << "\r\n";}////////////////////////////////////////////////// Now the handle done's....////////////////////////////////////////////////intSizeOp::HandleDone(int code, void *data){ // notify the owning object that the op finished clnt->HandleEvent(EVENT_CMD_DONE, this); return 0;}intGetChunkMetadataOp::HandleDone(int code, void *data){ // notify the owning object that the op finished clnt->HandleEvent(EVENT_CMD_DONE, this); return 0;}intReplicateChunkOp::HandleDone(int code, void *data){ if (data != NULL) chunkVersion = * (kfsSeq_t *) data; else chunkVersion = -1; gLogger.Submit(this); return 0;}intWritePrepareFwdOp::HandleDone(int code, void *data){ // data fwding is finished; notify owner return owner->HandleEvent(EVENT_CMD_DONE, this);}class ReadChunkMetaNotifier { int res;public: ReadChunkMetaNotifier(int r) : res(r) { } void operator()(KfsOp *op) { op->HandleEvent(EVENT_CMD_DONE, &res); }};intReadChunkMetaOp::HandleDone(int code, void *data){ int res = -EINVAL; if (code == EVENT_DISK_ERROR) { status = -1; if (data != NULL) { status = *(int *) data; KFS_LOG_VA_INFO("Disk error: errno = %d, chunkid = %lld", status, chunkId); gChunkManager.ChunkIOFailed(chunkId, status); res = status; } } else if (code == EVENT_DISK_READ) { IOBuffer *dataBuf = (IOBuffer *) data; if (dataBuf->BytesConsumable() >= (int) sizeof(DiskChunkInfo_t)) { DiskChunkInfo_t dci; dataBuf->CopyOut((char *) &dci, sizeof(DiskChunkInfo_t)); res = gChunkManager.SetChunkMetadata(dci); } } gChunkManager.ReadChunkMetadataDone(chunkId); clnt->HandleEvent(EVENT_CMD_DONE, (void *) &res); for_each(waiters.begin(), waiters.end(), ReadChunkMetaNotifier(res)); delete this; return 0;}WriteOp::~WriteOp(){ if (isWriteIdHolder) { // track how long it took for the write to finish up: // enqueueTime tracks when the last write was done to this // writeid struct timeval lastWriteTime; lastWriteTime.tv_sec = enqueueTime; lastWriteTime.tv_usec = 0; float timeSpent = ComputeTimeDiff(startTime, lastWriteTime); if (timeSpent < 1e-6) timeSpent = 0.0; if (timeSpent > 5.0) { gChunkServer.SendTelemetryReport(CMD_WRITE, timeSpent); } // we don't want write id's to pollute stats gettimeofday(&startTime, NULL); gCtrWriteDuration.Update(1); gCtrWriteDuration.Update(timeSpent); } if (dataBuf != NULL) delete dataBuf; if (rop != NULL) { rop->wop = NULL; assert(rop->dataBuf == NULL); delete rop; } if (diskConnection) diskConnection->Close();}WriteIdAllocOp::~WriteIdAllocOp(){ if (fwdedOp != NULL) delete fwdedOp;}WritePrepareOp::~WritePrepareOp(){ // on a successful prepare, dataBuf should be moved to a write op. assert((status != 0) || (dataBuf == NULL)); if (dataBuf != NULL) delete dataBuf; if (writeFwdOp != NULL) delete writeFwdOp; if (writeOp != NULL) delete writeOp;}WriteSyncOp::~WriteSyncOp(){ off_t chunkSize = 0; if (fwdedOp != NULL) delete fwdedOp; if (writeOp != NULL) delete writeOp; gChunkManager.ChunkSize(chunkId, &chunkSize); if ((chunkSize > 0) && (chunkSize >= (off_t) KFS::CHUNKSIZE)) { // we are done with all the writing to this chunk; so, close it gChunkManager.CloseChunk(chunkId); }}voidLeaseRenewOp::Request(ostringstream &os){ os << "LEASE_RENEW\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Chunk-handle:" << chunkId << "\r\n"; os << "Lease-id: " << leaseId << "\r\n"; os << "Lease-type: " << leaseType << "\r\n\r\n";}intLeaseRenewOp::HandleDone(int code, void *data){ KfsOp *op = (KfsOp *) data; assert(op == this); return op->clnt->HandleEvent(EVENT_CMD_DONE, data);}voidLeaseRelinquishOp::Request(ostringstream &os){ os << "LEASE_RELINQUISH\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Chunk-handle:" << chunkId << "\r\n"; os << "Lease-id: " << leaseId << "\r\n"; os << "Lease-type: " << leaseType << "\r\n\r\n";}intLeaseRelinquishOp::HandleDone(int code, void *data){ KfsOp *op = (KfsOp *) data; assert(op == this); delete this; return 0;}voidCorruptChunkOp::Request(ostringstream &os){ os << "CORRUPT_CHUNK\r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Cseq: " << seq << "\r\n"; os << "File-handle:" << fid << "\r\n"; os << "Chunk-handle:" << chunkId << "\r\n\r\n";}intCorruptChunkOp::HandleDone(int code, void *data){ // Thank you metaserver for replying :-) delete this; return 0;}class PrintChunkInfo { ostringstream &os;public: PrintChunkInfo(ostringstream &o) : os(o) { } void operator() (ChunkInfo_t &c) { os << c.fileId << ' '; os << c.chunkId << ' '; os << c.chunkVersion << ' '; }};voidHelloMetaOp::Request(ostringstream &os){ ostringstream chunkInfo; os << "HELLO \r\n"; os << "Version: " << KFS_VERSION_STR << "\r\n"; os << "Cseq: " << seq << "\r\n"; os << "Chunk-server-name: " << myLocation.hostname << "\r\n"; os << "Chunk-server-port: " << myLocation.port << "\r\n"; os << "Cluster-key: " << clusterKey << "\r\n"; os << "MD5Sum: " << md5sum << "\r\n"; os << "Rack-id: " << rackId << "\r\n"; os << "Total-space: " << totalSpace << "\r\n"; os << "Used-space: " << usedSpace << "\r\n"; // now put in the chunk information os << "Num-chunks: " << chunks.size() << "\r\n"; // figure out the content-length first... for_each(chunks.begin(), chunks.end(), PrintChunkInfo(chunkInfo)); os << "Content-length: " << chunkInfo.str().length() << "\r\n\r\n"; os << chunkInfo.str().c_str();}// timeout op to the event processor goingvoidTimeoutOp::Execute(){ gChunkManager.Timeout(); gLeaseClerk.Timeout(); // do not delete "this" since it is either a member variable of // the ChunkManagerTimeoutImpl or a static object. // bump the seq # so we know how many times it got executed seq++;}voidKillRemoteSyncOp::Execute(){ RemoteSyncSM *remoteSyncSM = static_cast<RemoteSyncSM *>(clnt); assert(remoteSyncSM != NULL); remoteSyncSM->Finish();}voidHelloMetaOp::Execute(){ totalSpace = gChunkManager.GetTotalSpace(); usedSpace = gChunkManager.GetUsedSpace(); gChunkManager.GetHostedChunks(chunks); status = 0; gLogger.Submit(this);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -