📄 kfsread.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsRead.cc 226 2008-12-19 06:21:46Z mikeov $//// Created 2006/10/02// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// All the code to deal with read.//----------------------------------------------------------------------------#include "KfsClientInt.h"#include "common/config.h"#include "common/properties.h"#include "common/log.h"#include "libkfsIO/Checksum.h"#include "Utils.h"#include <cerrno>#include <iostream>#include <string>using std::string;using std::ostringstream;using std::istringstream;using std::min;using std::max;using namespace KFS;static double ComputeTimeDiff(const struct timeval &startTime, const struct timeval &endTime){ float timeSpent; timeSpent = (endTime.tv_sec * 1e6 + endTime.tv_usec) - (startTime.tv_sec * 1e6 + startTime.tv_usec); return timeSpent / 1e6;}static boolNeedToRetryRead(int status){ return ((status == -KFS::EBADVERS) || (status == -KFS::EBADCKSUM) || (status == -KFS::ESERVERBUSY) || (status == -EHOSTUNREACH) || (status == -EINVAL) || (status == -EIO) || (status == -EAGAIN) || (status == -ETIMEDOUT));}static boolNeedToChangeReplica(int errcode){ return ((errcode == -EHOSTUNREACH) || (errcode == -ETIMEDOUT) || (errcode == -EIO));}ssize_tKfsClientImpl::Read(int fd, char *buf, size_t numBytes){ MutexLock l(&mMutex); size_t nread = 0, nleft; ssize_t numIO = 0; if (!valid_fd(fd) || mFileTable[fd] == NULL || mFileTable[fd]->openMode == O_WRONLY) { KFS_LOG_VA_INFO("Read to fd: %d failed---fd is likely closed", fd); return -EBADF; } FilePosition *pos = FdPos(fd); FileAttr *fa = FdAttr(fd); if (fa->isDirectory) return -EISDIR; // flush buffer so sizes are updated properly ChunkBuffer *cb = FdBuffer(fd); if (cb->dirty) FlushBuffer(fd); cb->allocate(); // Loop thru chunk after chunk until we either get the desired # // of bytes or we hit EOF. while (nread < numBytes) { // // Basic invariant: when we enter this loop, the connections // we have to the chunkservers (if any) are correct. As we // read thru a file, we call seek whenever we have data to // hand out to the client. As we cross chunk boundaries, seek // will invalidate our current set of connections and force us // to get new ones via a call to OpenChunk(). This same // principle holds for write code path as well. // if (!IsChunkReadable(fd)) break; if (pos->fileOffset >= (off_t) fa->fileSize) { KFS_LOG_VA_DEBUG("Current pointer (%lld) is past EOF (%lld) ...so, done", pos->fileOffset, fa->fileSize); break; } nleft = numBytes - nread; numIO = ReadChunk(fd, buf + nread, nleft); if (numIO < 0) break; nread += numIO; Seek(fd, numIO, SEEK_CUR); } if ((pos->fileOffset < (off_t) fa->fileSize) && (nread < numBytes)) { FilePosition *pos = FdPos(fd); string s; if ((pos != NULL) && (pos->preferredServer != NULL)) { s = pos->GetPreferredServerLocation().ToString(); } KFS_LOG_VA_INFO("Read done from %s on %s: @offset: %lld: asked: %d, returning %d, errorcode = %d", s.c_str(), mFileTable[fd]->pathname.c_str(), pos->fileOffset, numBytes, (int) nread, (int) numIO); } return nread;}boolKfsClientImpl::IsChunkReadable(int fd){ FilePosition *pos = FdPos(fd); int res = -1; ChunkAttr *chunk = NULL; for (int retryCount = 0; retryCount < NUM_RETRIES_PER_OP; retryCount++) { res = LocateChunk(fd, pos->chunkNum); if (res >= 0) { chunk = GetCurrChunk(fd); if (pos->preferredServer == NULL && chunk->chunkId != (kfsChunkId_t)-1) { // use nonblocking connect to chunkserver; if one fails to // connect, we switch to another replica. res = OpenChunk(fd, true); if (res < 0) { if (pos->preferredServer != NULL) pos->AvoidServer(pos->preferredServerLocation); continue; } } break; } if (res == -EAGAIN) { // could be that all 3 servers are temporarily down Sleep(RETRY_DELAY_SECS); continue; } else { // we can't locate the chunk...fail return false; } } if (res < 0) return false; return IsChunkLeaseGood(chunk->chunkId);}boolKfsClientImpl::IsChunkLeaseGood(kfsChunkId_t chunkId){ if (chunkId > 0) { if ((!mLeaseClerk.IsLeaseValid(chunkId)) && (GetLease(chunkId) < 0)) { // couldn't get a valid lease return false; } if (mLeaseClerk.ShouldRenewLease(chunkId)) { RenewLease(chunkId); } } return true;}ssize_tKfsClientImpl::ReadChunk(int fd, char *buf, size_t numBytes){ ssize_t numIO; ChunkAttr *chunk; FilePosition *pos = FdPos(fd); int retryCount = 0; assert(valid_fd(fd)); assert(pos->fileOffset < (off_t) mFileTable[fd]->fattr.fileSize); numIO = CopyFromChunkBuf(fd, buf, numBytes); if (numIO > 0) return numIO; chunk = GetCurrChunk(fd); while (retryCount < NUM_RETRIES_PER_OP) { if (pos->preferredServer == NULL) { int status; // we come into this function with a connection to some // chunkserver; as part of the read, the connection // broke. so, we need to "re-figure" where the chunk is. if (chunk->chunkId < 0) { status = LocateChunk(fd, pos->chunkNum); if (status < 0) { retryCount++; Sleep(RETRY_DELAY_SECS); continue; } } // we know where the chunk is.... assert(chunk->chunkId != (kfsChunkId_t) -1); // we are here because we are handling failover/version # // mismatch retryCount++; Sleep(RETRY_DELAY_SECS); status = OpenChunk(fd, true); if (NeedToChangeReplica(status)) { // we couldn't read the data off the disk from the server; // when we retry, we need to pick another replica if (pos->preferredServer != NULL) { string s = pos->GetPreferredServerLocation().ToString(); KFS_LOG_VA_INFO("Got error=%d from server %s for %s @offset: %lld; avoiding server", numIO, s.c_str(), mFileTable[fd]->pathname.c_str(), pos->fileOffset); } chunk->AvoidServer(pos->preferredServerLocation); pos->AvoidServer(pos->preferredServerLocation); continue; } if (status < 0) { // open failed..so, bail return status; } } numIO = ZeroFillBuf(fd, buf, numBytes); if (numIO > 0) return numIO; if (numBytes < ChunkBuffer::BUF_SIZE) { // small reads...so buffer the data ChunkBuffer *cb = FdBuffer(fd); numIO = ReadFromServer(fd, cb->buf, ChunkBuffer::BUF_SIZE); if (numIO > 0) { cb->chunkno = pos->chunkNum; cb->start = pos->chunkOffset; cb->length = numIO; numIO = CopyFromChunkBuf(fd, buf, numBytes); } } else { // big read...forget buffering numIO = ReadFromServer(fd, buf, numBytes); } if ((numIO >= 0) || (!NeedToRetryRead(numIO))) { // either we got data or it is an error which doesn't // require a retry of the read. break; } if (NeedToChangeReplica(numIO)) { // we couldn't read the data off the disk from the server; // when we retry, we need to pick another replica if (pos->preferredServer != NULL) { string s = pos->GetPreferredServerLocation().ToString(); KFS_LOG_VA_INFO("Got error=%d from server %s for %s @offset: %lld; avoiding server", numIO, s.c_str(), mFileTable[fd]->pathname.c_str(), pos->fileOffset); } chunk->AvoidServer(pos->preferredServerLocation); pos->AvoidServer(pos->preferredServerLocation); continue; } // KFS_LOG_DEBUG("Need to retry read..."); // Ok...so, we need to retry the read. so, re-determine where // the chunk went and then retry. chunk->chunkId = -1; pos->ResetServers(); } return numIO;}ssize_tKfsClientImpl::ReadFromServer(int fd, char *buf, size_t numBytes){ size_t numAvail; ChunkAttr *chunk = GetCurrChunk(fd); FilePosition *pos = FdPos(fd); int res; assert(chunk->chunkSize - pos->chunkOffset >= 0); numAvail = min((size_t) (chunk->chunkSize - pos->chunkOffset), numBytes); // Align the reads to checksum block boundaries, so that checksum // verification on the server can be done efficiently: if the read falls // within a checksum block, issue it as one read; otherwise, split // the read into multiple reads. if (pos->chunkOffset + numAvail <= OffsetToChecksumBlockEnd(pos->chunkOffset)) res = DoSmallReadFromServer(fd, buf, numBytes); else res = DoLargeReadFromServer(fd, buf, numBytes); return res;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -