📄 kfswrite.cc
字号:
if ((res >= 0) || (!NeedToRetryAllocation(res))) { break; } // allocation failed...retry } if (res < 0) // allocation failed...bail return res; chunk = GetCurrChunk(fd); assert(chunk != NULL); chunk->didAllocation = true; if (force) { KFS_LOG_VA_INFO("Forced allocation: chunk=%lld, version=%lld", chunk->chunkId, chunk->chunkVersion); } else { ++fa->chunkCount; } } return 0;}ssize_tKfsClientImpl::DoSmallWriteToServer(int fd, off_t offset, const char *buf, size_t numBytes){ return DoLargeWriteToServer(fd, offset, buf, numBytes);}ssize_tKfsClientImpl::DoLargeWriteToServer(int fd, off_t offset, const char *buf, size_t numBytes){ size_t numAvail, numWrote = 0; ssize_t numIO; FilePosition *pos = FdPos(fd); ChunkAttr *chunk = GetCurrChunk(fd); ServerLocation loc = chunk->chunkServerLoc[0]; TcpSocket *masterSock = FdPos(fd)->GetChunkServerSocket(loc); vector<WritePrepareOp *> ops; vector<WriteInfo> writeId; assert(KFS::CHUNKSIZE - offset >= 0); numAvail = min(numBytes, (size_t) (KFS::CHUNKSIZE - offset)); // cout << "Pushing to server: " << offset << ' ' << numBytes << endl; // get the write id numIO = AllocateWriteId(fd, offset, numBytes, writeId, masterSock); if (numIO < 0) return numIO; // Split the write into a bunch of smaller ops while (numWrote < numAvail) { WritePrepareOp *op = new WritePrepareOp(nextSeq(), chunk->chunkId, chunk->chunkVersion, writeId); op->numBytes = min(MAX_BYTES_PER_WRITE_IO, numAvail - numWrote); if ((op->numBytes % CHECKSUM_BLOCKSIZE) != 0) { // if the write isn't aligned to end on a checksum block // boundary, then break the write into two parts: //(1) start at offset and end on a checksum block boundary //(2) is the rest, which is less than the size of checksum //block // This simplifies chunkserver code: either the writes are // multiples of checksum blocks or a single write which is // smaller than a checksum block. if (op->numBytes > CHECKSUM_BLOCKSIZE) op->numBytes = (op->numBytes / CHECKSUM_BLOCKSIZE) * CHECKSUM_BLOCKSIZE; // else case #2 from above comment and op->numBytes is setup right } assert(op->numBytes > 0); op->offset = offset + numWrote; // similar to read, breakup the write if it is straddling // checksum block boundaries. if (OffsetToChecksumBlockStart(op->offset) != op->offset) { op->numBytes = min((size_t) (OffsetToChecksumBlockEnd(op->offset) - op->offset), op->numBytes); } op->AttachContentBuf(buf + numWrote, op->numBytes); op->contentLength = op->numBytes; op->checksum = ComputeBlockChecksum(op->contentBuf, op->contentLength); { ostringstream os; os << "@offset: " << op->offset << " nbytes: " << op->numBytes << " cksum: " << op->checksum; KFS_LOG_VA_DEBUG("%s", os.str().c_str()); } numWrote += op->numBytes; ops.push_back(op); } // For pipelined data push to work, we break the write into a // sequence of smaller ops and push them to the master; the master // then forwards each op to one replica, who then forwards to // next. numIO = DoPipelinedWrite(fd, ops, masterSock); if (numIO < 0) { // // the write failed; caller will do the retry // KFS_LOG_VA_INFO("Write failed...chunk = %lld, version = %lld, offset = %lld, error = %d", ops[0]->chunkId, ops[0]->chunkVersion, ops[0]->offset, numIO); } // figure out how much was committed numIO = 0; for (vector<KfsOp *>::size_type i = 0; i < ops.size(); ++i) { WritePrepareOp *op = static_cast<WritePrepareOp *> (ops[i]); if (op->status < 0) numIO = op->status; else if (numIO >= 0) numIO += op->status; op->ReleaseContentBuf(); delete op; } if (numIO >= 0 && (off_t)chunk->chunkSize < offset + numIO) { // grow the chunksize only if we wrote past the last byte in the chunk chunk->chunkSize = offset + numIO; // if we wrote past the last byte of the file, then grow the // file size. Note that, chunks 0..chunkNum-1 are assumed to // be full. So, take the size of the last chunk and to that // add the size of the "full" chunks to get the size FileAttr *fa = FdAttr(fd); off_t eow = chunk->chunkSize + (pos->chunkNum * KFS::CHUNKSIZE); fa->fileSize = max(fa->fileSize, eow); } if (numIO != (ssize_t) numBytes) { KFS_LOG_VA_DEBUG("Wrote to server (fd = %d), %lld bytes, was asked %lld bytes", fd, numIO, numBytes); } else { KFS_LOG_VA_DEBUG("Wrote to server (fd = %d), %lld bytes", fd, numIO); } return numIO;}intKfsClientImpl::AllocateWriteId(int fd, off_t offset, size_t numBytes, vector<WriteInfo> &writeId, TcpSocket *masterSock){ ChunkAttr *chunk = GetCurrChunk(fd); WriteIdAllocOp op(nextSeq(), chunk->chunkId, chunk->chunkVersion, offset, numBytes); int res; op.chunkServerLoc = chunk->chunkServerLoc; res = DoOpSend(&op, masterSock); if ((res < 0) || (op.status < 0)) { if (op.status < 0) return op.status; return res; } res = DoOpResponse(&op, masterSock); if ((res < 0) || (op.status < 0)) { if (op.status < 0) return op.status; return res; } // get rid of any old stuff writeId.clear(); writeId.reserve(op.chunkServerLoc.size()); istringstream ist(op.writeIdStr); for (uint32_t i = 0; i < chunk->chunkServerLoc.size(); i++) { ServerLocation loc; int64_t id; ist >> loc.hostname; ist >> loc.port; ist >> id; writeId.push_back(WriteInfo(loc, id)); } return 0;}intKfsClientImpl::PushData(int fd, vector<WritePrepareOp *> &ops, uint32_t start, uint32_t count, TcpSocket *masterSock){ uint32_t last = min((size_t) (start + count), ops.size()); int res = 0; for (uint32_t i = start; i < last; i++) { res = DoOpSend(ops[i], masterSock); if (res < 0) break; } return res;}intKfsClientImpl::SendCommit(int fd, vector<WriteInfo> &writeId, TcpSocket *masterSock, WriteSyncOp &sop){ ChunkAttr *chunk = GetCurrChunk(fd); int res = 0; sop.Init(nextSeq(), chunk->chunkId, chunk->chunkVersion, writeId); res = DoOpSend(&sop, masterSock); if (res < 0) return sop.status; return 0; }intKfsClientImpl::GetCommitReply(WriteSyncOp &sop, TcpSocket *masterSock){ int res; res = DoOpResponse(&sop, masterSock); if (res < 0) return sop.status; return sop.status;}intKfsClientImpl::DoPipelinedWrite(int fd, vector<WritePrepareOp *> &ops, TcpSocket *masterSock){ int res; vector<WritePrepareOp *>::size_type next, minOps; WriteSyncOp syncOp[2]; if (ops.size() == 0) return 0; // push the data to the server; to avoid bursting the server with // a full chunk, do it in a pipelined fashion: // -- send 512K of data; do a flush; send another 512K; do another flush // -- every time we get an ack back, we send another 512K // // we got 2 commits: current is the one we just sent; previous is // the one for which we are expecting a reply int prevCommit = 0; int currCommit = 1; minOps = min((size_t) (MIN_BYTES_PIPELINE_IO / MAX_BYTES_PER_WRITE_IO) / 2, ops.size()); res = PushData(fd, ops, 0, minOps, masterSock); if (res < 0) goto error_out; res = SendCommit(fd, ops[0]->writeInfo, masterSock, syncOp[0]); if (res < 0) goto error_out; for (next = minOps; next < ops.size(); next += minOps) { res = PushData(fd, ops, next, minOps, masterSock); if (res < 0) goto error_out; res = SendCommit(fd, ops[next]->writeInfo, masterSock, syncOp[currCommit]); if (res < 0) goto error_out; res = GetCommitReply(syncOp[prevCommit], masterSock); prevCommit = currCommit; currCommit++; currCommit %= 2; if (res < 0) // the commit for previous failed; there is still the // business of getting the reply for the "current" one // that we sent out. break; } res = GetCommitReply(syncOp[prevCommit], masterSock); error_out: if (res < 0) { // res will be -1; we need to pick out the error from the op that failed for (uint32_t i = 0; i < ops.size(); i++) { if (ops[i]->status < 0) { res = ops[i]->status; break; } } } // set the status for each op: either all were successful or none was. for (uint32_t i = 0; i < ops.size(); i++) { if (res < 0) ops[i]->status = res; else ops[i]->status = ops[i]->numBytes; } return res;}intKfsClientImpl::IssueCommit(int fd, vector<WriteInfo> &writeId, TcpSocket *masterSock){ ChunkAttr *chunk = GetCurrChunk(fd); WriteSyncOp sop(nextSeq(), chunk->chunkId, chunk->chunkVersion, writeId); int res; res = DoOpSend(&sop, masterSock); if (res < 0) return sop.status; res = DoOpResponse(&sop, masterSock); if (res < 0) return sop.status; return sop.status;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -