📄 kfsread.cc
字号:
//// Issue a single read op to the server and get data back.//ssize_tKfsClientImpl::DoSmallReadFromServer(int fd, char *buf, size_t numBytes){ ChunkAttr *chunk = GetCurrChunk(fd); ReadOp op(nextSeq(), chunk->chunkId, chunk->chunkVersion); op.offset = mFileTable[fd]->currPos.chunkOffset; op.numBytes = min(chunk->chunkSize, (off_t) numBytes); op.AttachContentBuf(buf, numBytes); // make sure we aren't overflowing... assert(buf + op.numBytes <= buf + numBytes); (void)DoOpCommon(&op, mFileTable[fd]->currPos.preferredServer); VerifyChecksum(&op, mFileTable[fd]->currPos.preferredServer); ssize_t numIO = (op.status >= 0) ? op.contentLength : op.status; op.ReleaseContentBuf(); return numIO;}size_tKfsClientImpl::ZeroFillBuf(int fd, char *buf, size_t numBytes){ size_t numIO, bytesInFile, bytesInChunk; ChunkAttr *chunk = GetCurrChunk(fd); if (mFileTable[fd]->currPos.chunkOffset < (off_t) chunk->chunkSize) return 0; // more data in chunk // We've hit End-of-chunk. There are two cases here: // 1. There is more data in the file and that data is in // the next chunk // 2. This chunk was filled with less data than what was // "promised". (Maybe, write got lost). // In either case, zero-fill: the amount to zero-fill is // in the min. of the two. // // Also, we can hit the end-of-chunk if we fail to locate a // chunk. This can happen if there is a hole in the file. // assert(mFileTable[fd]->currPos.fileOffset <= (off_t) mFileTable[fd]->fattr.fileSize); bytesInFile = mFileTable[fd]->fattr.fileSize - mFileTable[fd]->currPos.fileOffset; assert(chunk->chunkSize <= (off_t) KFS::CHUNKSIZE); bytesInChunk = KFS::CHUNKSIZE - chunk->chunkSize; numIO = min(bytesInChunk, bytesInFile); // Fill in 0's based on space in the buffer.... numIO = min(numIO, numBytes); // KFS_LOG_DEBUG("Zero-filling %d bytes for read @ %lld", numIO, mFileTable[fd]->currPos.chunkOffset); memset(buf, 0, numIO); return numIO;}size_tKfsClientImpl::CopyFromChunkBuf(int fd, char *buf, size_t numBytes){ size_t numIO; FilePosition *pos = FdPos(fd); ChunkBuffer *cb = FdBuffer(fd); size_t start = pos->chunkOffset - cb->start; // Wrong chunk in buffer or if the starting point in the buffer is // "BEYOND" the current location of the file pointer, we don't // have the data. "BEYOND" => offset is before the starting point // or offset is after the end of the buffer if ((pos->chunkNum != cb->chunkno) || (pos->chunkOffset < cb->start) || (pos->chunkOffset >= (off_t) (cb->start + cb->length))) return 0; // first figure out how much data is available in the buffer // to be copied out. numIO = min(cb->length - start, numBytes); // chunkBuf[0] corresponds to some offset in the chunk, // which is defined by chunkBufStart. // chunkOffset corresponds to the position in the chunk // where the "filepointer" is currently at. // Figure out where the data we want copied out starts memcpy(buf, &cb->buf[start], numIO); // KFS_LOG_DEBUG("Copying out data from chunk buf...%d bytes", numIO); return numIO;}ssize_tKfsClientImpl::DoLargeReadFromServer(int fd, char *buf, size_t numBytes){ FilePosition *pos = FdPos(fd); ChunkAttr *chunk = GetCurrChunk(fd); vector<ReadOp *> ops; assert(chunk->chunkSize - pos->chunkOffset >= 0); size_t numAvail = min((size_t) (chunk->chunkSize - pos->chunkOffset), numBytes); size_t numRead = 0; while (numRead < numAvail) { ReadOp *op = new ReadOp(nextSeq(), chunk->chunkId, chunk->chunkVersion); op->numBytes = min(MAX_BYTES_PER_READ_IO, numAvail - numRead); // op->numBytes = min(KFS::CHUNKSIZE, numAvail - numRead); assert(op->numBytes > 0); op->offset = pos->chunkOffset + numRead; // if the read is going to straddle checksum block boundaries, // break up the read into multiple reads: this simplifies // server side code. for each read request, a single checksum // block will need to be read and after the checksum verifies, // the server can "trim" the data that wasn't asked for. if (OffsetToChecksumBlockStart(op->offset) != op->offset) { op->numBytes = OffsetToChecksumBlockEnd(op->offset) - op->offset; } op->AttachContentBuf(buf + numRead, op->numBytes); numRead += op->numBytes; ops.push_back(op); } // make sure we aren't overflowing... assert(buf + numRead <= buf + numBytes); struct timeval readStart, readEnd; gettimeofday(&readStart, NULL); { ostringstream os; os << pos->GetPreferredServerLocation().ToString().c_str() << ':' << " c=" << chunk->chunkId << " o=" << pos->chunkOffset << " n=" << numBytes; KFS_LOG_VA_DEBUG("Reading from %s", os.str().c_str()); } ssize_t numIO = DoPipelinedRead(ops, pos->preferredServer); /* if (numIO < 0) { KFS_LOG_DEBUG("Pipelined read from server failed..."); } */ int retryStatus = 0; for (vector<KfsOp *>::size_type i = 0; i < ops.size(); ++i) { ReadOp *op = static_cast<ReadOp *> (ops[i]); if (op->status < 0) { if (NeedToRetryRead(op->status)) { // preserve EIO so that we can avoid that server if (retryStatus != -EIO) retryStatus = op->status; } numIO = op->status; } else if (numIO >= 0) numIO += op->status; op->ReleaseContentBuf(); delete op; } // If the op needs to be retried, pass that up if (retryStatus != 0) numIO = retryStatus; gettimeofday(&readEnd, NULL); double timeSpent = ComputeTimeDiff(readStart, readEnd); { ostringstream os; os << pos->GetPreferredServerLocation().ToString().c_str() << ':' << " c=" << chunk->chunkId << " o=" << pos->chunkOffset << " n=" << numBytes << " got=" << numIO << " time=" << timeSpent; if (timeSpent > 5.0) { struct sockaddr_in saddr; KFS_LOG_VA_INFO("Read done from %s", os.str().c_str()); if (pos->GetPreferredServerAddr(saddr) == 0) { KFS_LOG_VA_DEBUG("Sending telemetry report about: %s", os.str().c_str()); double diskIOTime[MAX_IO_INFO_PER_PKT]; double elapsedTime[MAX_IO_INFO_PER_PKT]; vector<KfsOp *>::size_type count = 0; for (; (count < ops.size()) && (count < MAX_IO_INFO_PER_PKT); count++) { ReadOp *op = static_cast<ReadOp *> (ops[count]); diskIOTime[count] = op->diskIOTime; elapsedTime[count] = op->elapsedTime; } mTelemetryReporter.publish(saddr.sin_addr, timeSpent, "READ", count, diskIOTime, elapsedTime); } } } return numIO;}////// Common work for a read op that can be pipelined./// The idea is to plumb the pipe with a set of requests; then,/// whenever one finishes, submit a new request.////// @param[in] ops the vector of ops to be done/// @param[in] sock the socket on which we communicate with server////// @retval 0 on success; -1 on failure///intKfsClientImpl::DoPipelinedRead(vector<ReadOp *> &ops, TcpSocket *sock){ vector<ReadOp *>::size_type first = 0, next, minOps; int res = 0; ReadOp *op; bool leaseExpired = false; // plumb the pipe with 1MB minOps = min((size_t) (MIN_BYTES_PIPELINE_IO / MAX_BYTES_PER_READ_IO), ops.size()); // plumb the pipe with a few ops for (next = 0; next < minOps; ++next) { op = ops[next]; gettimeofday(&op->submitTime, NULL); res = DoOpSend(op, sock); if (res < 0) return -1; } // run the pipe: whenever one op finishes, queue another while (next < ops.size()) { struct timeval now; op = ops[first]; res = DoOpResponse(op, sock); if (res < 0) return -1; gettimeofday(&now, NULL); op->elapsedTime = ComputeTimeDiff(op->submitTime, now); ++first; op = ops[next]; if (!IsChunkLeaseGood(op->chunkId)) { leaseExpired = true; break; } gettimeofday(&op->submitTime, NULL); res = DoOpSend(op, sock); if (res < 0) return -1; ++next; } // get the response for the remaining ones while (first < next) { struct timeval now; op = ops[first]; res = DoOpResponse(op, sock); if (res < 0) return -1; gettimeofday(&now, NULL); op->elapsedTime = ComputeTimeDiff(op->submitTime, now); if (leaseExpired) op->status = 0; ++first; } // do checksum verification if (res >= 0) { for (next = 0; next < ops.size(); next++) { op = ops[next]; if (op->checksums.size() == 0) continue; VerifyChecksum(op, sock); } } return 0;}boolKfsClientImpl::VerifyChecksum(ReadOp* op, TcpSocket* sock){ for (size_t pos = 0; pos < op->contentLength; pos += CHECKSUM_BLOCKSIZE) { size_t len = min(CHECKSUM_BLOCKSIZE, (uint32_t) (op->contentLength - pos)); uint32_t cksum = ComputeBlockChecksum(op->contentBuf + pos, len); uint32_t cksumIndex = pos / CHECKSUM_BLOCKSIZE; if (op->checksums.size() < cksumIndex) { // didn't get all the checksums KFS_LOG_VA_DEBUG("Didn't get checksum for offset: %lld", op->offset + pos); continue; } uint32_t serverCksum = op->checksums[cksumIndex]; if (serverCksum != cksum) { if (sock) { struct sockaddr_in saddr; char ipname[INET_ADDRSTRLEN]; sock->GetPeerName((struct sockaddr *) &saddr); inet_ntop(AF_INET, &(saddr.sin_addr), ipname, INET_ADDRSTRLEN); KFS_LOG_VA_INFO("Checksum mismatch from %s starting @pos = %lld: got = %d, computed = %d for %s", ipname, op->offset + pos, serverCksum, cksum, op->Show().c_str()); mTelemetryReporter.publish(saddr.sin_addr, -1.0, "CHECKSUM_MISMATCH"); } op->status = -KFS::EBADCKSUM; } } return (op->status >= 0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -