📄 kfsops.cc
字号:
{ kfsSeq_t seq; PingOp *po; parseCommon(prop, seq); po = new PingOp(seq); *c = po; return 0;}intparseHandlerDumpChunkMap(Properties &prop, KfsOp **c){ kfsSeq_t seq; DumpChunkMapOp *po; parseCommon(prop, seq); po = new DumpChunkMapOp(seq); *c = po; return 0;}intparseHandlerStats(Properties &prop, KfsOp **c){ kfsSeq_t seq; StatsOp *so; parseCommon(prop, seq); so = new StatsOp(seq); *c = so; return 0;}////// Generic event handler for tracking completion of an event/// execution. Push the op to the logger and the net thread will pick/// it up and dispatch it.///intKfsOp::HandleDone(int code, void *data){ gLogger.Submit(this); return 0;}////// A read op finished. Set the status and the # of bytes read/// alongwith the data and notify the client.///intReadOp::HandleDone(int code, void *data){ IOBuffer *b; off_t chunkSize = 0; // DecrementCounter(CMD_READ);#ifdef DEBUG verifyExecutingOnEventProcessor();#endif if (code == EVENT_DISK_ERROR) { status = -1; if (data != NULL) { status = *(int *) data; KFS_LOG_VA_INFO("Disk error: errno = %d, chunkid = %lld", status, chunkId); } gChunkManager.ChunkIOFailed(chunkId, status); } else if (code == EVENT_DISK_READ) { if (dataBuf == NULL) { dataBuf = new IOBuffer(); } b = (IOBuffer *) data; // Order matters...when we append b, we take the data from b // and put it into our buffer. dataBuf->Append(b); // verify checksum gChunkManager.ReadChunkDone(this); numBytesIO = dataBuf->BytesConsumable(); if (status == 0) // checksum verified status = numBytesIO; } if (status >= 0) { if (numBytesIO < (int) CHECKSUM_BLOCKSIZE) { uint32_t cks; cks = ComputeBlockChecksum(dataBuf, numBytesIO); checksum.push_back(cks); } else { checksum = ComputeChecksums(dataBuf, numBytesIO); } // send the disk IO time back to client for telemetry reporting struct timeval timeNow; gettimeofday(&timeNow, NULL); diskIOTime = ComputeTimeDiff(startTime, timeNow); } gChunkManager.ChunkSize(chunkId, &chunkSize); if (wop != NULL) { // if the read was triggered by a write, then resume execution of write wop->Execute(); return 0; } if ((chunkSize > 0 && (offset + numBytesIO >= (off_t) chunkSize)) && (!gLeaseClerk.IsLeaseValid(chunkId))) { // If we have read the full chunk, close out the fd. The // observation is that reads are sequential and when we // finished a chunk, the client will move to the next one. gChunkManager.CloseChunk(chunkId); } gLogger.Submit(this); return 0;}intReadOp::HandleReplicatorDone(int code, void *data){ // notify the replicator object that the read it had submitted to // the peer has finished. return clnt->HandleEvent(code, data);}intWriteOp::HandleWriteDone(int code, void *data){ // DecrementCounter(CMD_WRITE); if (isFromReReplication) { if (code == EVENT_DISK_WROTE) { status = *(int *) data; numBytesIO = status; } else { status = -1; } return clnt->HandleEvent(code, this); } assert(wpop != NULL); if (code == EVENT_DISK_ERROR) { // eat up everything that was sent dataBuf->Consume(numBytes); status = -1; if (data != NULL) { status = *(int *) data; KFS_LOG_VA_INFO("Disk error: errno = %d, chunkid = %lld", status, chunkId); } gChunkManager.ChunkIOFailed(chunkId, status); wpop->HandleEvent(EVENT_CMD_DONE, this); return 0; } else if (code == EVENT_DISK_WROTE) { status = *(int *) data; numBytesIO = status; SET_HANDLER(this, &WriteOp::HandleSyncDone); // queue the sync op only if we are all done with writing to // this chunk: if ((size_t) numBytesIO != (size_t) numBytes) { // write didn't do everything that was asked; we need to retry KFS_LOG_VA_INFO("Write on chunk did less: asked = %ld, did = %ld; asking clnt to retry", numBytes, numBytesIO); status = -EAGAIN; } waitForSyncDone = false; if (offset + numBytesIO >= (off_t) KFS::CHUNKSIZE) { // If we have written till the end of the chunk, close out the // fd. The observation is that writes are sequential and when // we finished a chunk, the client will move to the next // one.#if 0 if (gChunkManager.Sync(this) < 0) { KFS_LOG_DEBUG("Sync failed..."); // eat up everything that was sent dataBuf->Consume(numBytes); // Sync failed status = -1; // clnt->HandleEvent(EVENT_CMD_DONE, this); return wsop->HandleEvent(EVENT_CMD_DONE, this); }#endif } if (!waitForSyncDone) { // KFS_LOG_DEBUG("Queued sync; not waiting for sync to finish..."); // sync is queued; no need to wait for it to finish return HandleSyncDone(EVENT_SYNC_DONE, 0); } } return 0;}////// A write op finished. Set the status and the # of bytes written/// and notify the owning write commit op.///intWriteOp::HandleSyncDone(int code, void *data){ // eat up everything that was sent dataBuf->Consume(numBytes); if (code != EVENT_SYNC_DONE) { status = -1; } if (status >= 0) { gChunkManager.ChunkSize(chunkId, &chunkSize); SET_HANDLER(this, &WriteOp::HandleLoggingDone); gLogger.Submit(this); } else { wpop->HandleEvent(EVENT_CMD_DONE, this); } return 0;}intWriteOp::HandleLoggingDone(int code, void *data){ assert(wpop != NULL); return wpop->HandleEvent(EVENT_CMD_DONE, this);}////// Handlers for ops that need logging. This method is invoked by the/// logger thread. So, don't access any globals here---otherwise, we/// need to add locking.///voidAllocChunkOp::Log(ofstream &ofs){ ofs << "ALLOCATE " << chunkId << ' ' << fileId << ' '; ofs << chunkVersion << "\n"; assert(!ofs.fail());}/// Resetting a chunk's version # is equivalent to doing an allocation/// of an existing chunk.voidChangeChunkVersOp::Log(ofstream &ofs){ ofs << "CHANGE_CHUNK_VERS " << chunkId << ' ' << fileId << ' '; ofs << chunkVersion << "\n"; assert(!ofs.fail());}voidDeleteChunkOp::Log(ofstream &ofs){ ofs << "DELETE " << chunkId << "\n"; assert(!ofs.fail());}voidWriteOp::Log(ofstream &ofs){ ofs << "WRITE " << chunkId << ' ' << chunkSize << ' '; ofs << offset << ' '; ofs << checksums.size(); for (vector<uint32_t>::size_type i = 0; i < checksums.size(); i++) { ofs << ' ' << checksums[i]; } ofs << "\n"; assert(!ofs.fail());}voidTruncateChunkOp::Log(ofstream &ofs){ ofs << "TRUNCATE " << chunkId << ' ' << chunkSize << "\n"; assert(!ofs.fail());}// For replicating a chunk, we log nothing. We don't write out info// about the chunk in checkpoint files until replication is complete.// This way, if we ever crash during chunk-replication, we'll simply// nuke out the chunk on startup.voidReplicateChunkOp::Log(ofstream &ofs){}////// Handlers for executing the various ops. If the op execution is/// "in-line", that is the op doesn't block, then when the execution/// is finished, the op is handed off to the logger; the net thread/// will drain the logger and then notify the client. Otherwise, the op is queued/// for execution and the client gets notified whenever the op/// finishes execution.///voidOpenOp::Execute(){ status = gChunkManager.OpenChunk(chunkId, openFlags); UpdateCounter(CMD_OPEN); //clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this);}voidCloseOp::Execute(){ UpdateCounter(CMD_CLOSE); gChunkManager.CloseChunk(chunkId); status = 0; // clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this);}voidAllocChunkOp::Execute(){ UpdateCounter(CMD_ALLOC_CHUNK); int res; // page in the chunk meta-data if needed if (!gChunkManager.NeedToReadChunkMetadata(chunkId)) { HandleChunkMetaReadDone(0, NULL); return; } SET_HANDLER(this, &AllocChunkOp::HandleChunkMetaReadDone); if ((res = gChunkManager.ReadChunkMetadata(chunkId, this)) < 0) { KFS_LOG_VA_INFO("Unable read chunk metadata for chunk: %lld; error = %d", chunkId, res); status = -EINVAL; gLogger.Submit(this); return; }}intAllocChunkOp::HandleChunkMetaReadDone(int code, void *data){ status = gChunkManager.AllocChunk(fileId, chunkId, chunkVersion); // if (status < 0) { // failed; nothing to log // clnt->HandleEvent(EVENT_CMD_DONE, this); // return; // } if (leaseId >= 0) { gCtrWriteMaster.Update(1); gLeaseClerk.RegisterLease(chunkId, leaseId); } if (status < 0) gLogger.Submit(this); // Submit the op and wait to be notified SET_HANDLER(this, &AllocChunkOp::HandleChunkMetaWriteDone); status = gChunkManager.WriteChunkMetadata(chunkId, this); if (status < 0) gLogger.Submit(this); return 0;}voidDeleteChunkOp::Execute(){ UpdateCounter(CMD_DELETE_CHUNK); status = gChunkManager.DeleteChunk(chunkId); // if (status < 0) // // failed; nothing to log // clnt->HandleEvent(EVENT_CMD_DONE, this); // else gLogger.Submit(this);}voidTruncateChunkOp::Execute(){ UpdateCounter(CMD_TRUNCATE_CHUNK); SET_HANDLER(this, &TruncateChunkOp::HandleChunkMetaReadDone); if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) { status = -EINVAL; gLogger.Submit(this); }}intTruncateChunkOp::HandleChunkMetaReadDone(int code, void *data){ status = *(int *) data; if (status < 0) { gLogger.Submit(this); return 0; } status = gChunkManager.TruncateChunk(chunkId, chunkSize); if (status < 0) { gLogger.Submit(this); return 0; } SET_HANDLER(this, &TruncateChunkOp::HandleChunkMetaWriteDone); status = gChunkManager.WriteChunkMetadata(chunkId, this); if (status < 0) gLogger.Submit(this); return 0;}voidReplicateChunkOp::Execute(){ RemoteSyncSMPtr peer = gChunkServer.FindServer(location); UpdateCounter(CMD_REPLICATE_CHUNK);#ifdef DEBUG string s = location.ToString(); KFS_LOG_VA_DEBUG("Replicating chunk: %ld from %s", chunkId, s.c_str());#endif if (!peer) { string s = location.ToString(); KFS_LOG_VA_INFO("Unable to find peer: %s", s.c_str()); status = -1; gLogger.Submit(this); return; } replicator.reset(new Replicator(this)); // Get the animation going... SET_HANDLER(this, &ReplicateChunkOp::HandleDone); replicator->Start(peer);}voidChangeChunkVersOp::Execute(){ UpdateCounter(CMD_CHANGE_CHUNK_VERS); SET_HANDLER(this, &ChangeChunkVersOp::HandleChunkMetaReadDone); if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) { status = -EINVAL; gLogger.Submit(this); }}intChangeChunkVersOp::HandleChunkMetaReadDone(int code, void *data){ status = *(int *) data; if (status < 0) { gLogger.Submit(this); return 0; } status = gChunkManager.ChangeChunkVers(fileId, chunkId, chunkVersion); if (status < 0) { gLogger.Submit(this); return 0; } SET_HANDLER(this, &ChangeChunkVersOp::HandleChunkMetaWriteDone); status = gChunkManager.WriteChunkMetadata(chunkId, this); if (status < 0) gLogger.Submit(this); return 0;}// This is the heartbeat sent by the meta servervoidHeartbeatOp::Execute(){ UpdateCounter(CMD_HEARTBEAT); totalSpace = gChunkManager.GetTotalSpace(); usedSpace = gChunkManager.GetUsedSpace(); numChunks = gChunkManager.GetNumChunks(); status = 0; // clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this);}voidRetireOp::Execute(){ // we are told to retire...so, bow out KFS_LOG_INFO("We have been asked to retire...bye"); StopNetProcessor(0);}voidStaleChunksOp::Execute(){ vector<kfsChunkId_t>::size_type i; for (i = 0; i < staleChunkIds.size(); ++i) { gChunkManager.StaleChunk(staleChunkIds[i]); } status = 0; // clnt->HandleEvent(EVENT_CMD_DONE, this); gLogger.Submit(this);}voidReadOp::Execute(){ UpdateCounter(CMD_READ); SET_HANDLER(this, &ReadOp::HandleChunkMetaReadDone); if (gChunkManager.ReadChunkMetadata(chunkId, this) < 0) { status = -EINVAL; gLogger.Submit(this); }}intReadOp::HandleChunkMetaReadDone(int code, void *data){ status = *(int *) data; if (status < 0) { gLogger.Submit(this); return 0; } SET_HANDLER(this, &ReadOp::HandleDone); status = gChunkManager.ReadChunk(this); if (status < 0) { // clnt->HandleEvent(EVENT_CMD_DONE, this); if (wop == NULL) { // we are done with this op; this needs draining gLogger.Submit(this); } else { // resume execution of write wop->Execute(); } } return 0;}//// Handling of writes is done in multiple steps:// 1. The client allocates a chunk from the metaserver; the metaserver// picks a set of hosting chunkservers and nominates one of the// server's as the "master" for the transaction.// 2. The client pushes data for a write via a WritePrepareOp to each// of the hosting chunkservers (in any order).// 3. The chunkserver in turn enqueues the write with the ChunkManager// object. The ChunkManager assigns an id to the write. NOTE:// nothing is written out to disk at this point.// 4. After the client has pushed out data to replica chunk-servers// and gotten write-id's, the client does a WriteSync to the master.// 5. The master retrieves the write corresponding to the write-id and// commits the write to disk.// 6. The master then sends out a WriteCommit to each of the replica// chunkservers asking them to commit the write; this commit message// is sent concurrently to all the replicas.// 7. After the replicas reply, the master replies to the client with// status from individual servers and how much got written on each.//
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -