⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsread.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 2 页
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsRead.cc 226 2008-12-19 06:21:46Z mikeov $//// Created 2006/10/02// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// All the code to deal with read.//----------------------------------------------------------------------------#include "KfsClientInt.h"#include "common/config.h"#include "common/properties.h"#include "common/log.h"#include "libkfsIO/Checksum.h"#include "Utils.h"#include <cerrno>#include <iostream>#include <string>using std::string;using std::ostringstream;using std::istringstream;using std::min;using std::max;using namespace KFS;static double ComputeTimeDiff(const struct timeval &startTime, const struct timeval &endTime){    float timeSpent;    timeSpent = (endTime.tv_sec * 1e6 + endTime.tv_usec) -         (startTime.tv_sec * 1e6 + startTime.tv_usec);    return timeSpent / 1e6;}static boolNeedToRetryRead(int status){    return ((status == -KFS::EBADVERS) ||            (status == -KFS::EBADCKSUM) ||            (status == -KFS::ESERVERBUSY) ||            (status == -EHOSTUNREACH) ||            (status == -EINVAL) ||            (status == -EIO) ||            (status == -EAGAIN) ||            (status == -ETIMEDOUT));}static boolNeedToChangeReplica(int errcode){    return ((errcode == -EHOSTUNREACH) || (errcode == -ETIMEDOUT) || (errcode == -EIO));}ssize_tKfsClientImpl::Read(int fd, char *buf, size_t numBytes){    MutexLock l(&mMutex);    size_t nread = 0, nleft;    ssize_t numIO = 0;    if (!valid_fd(fd) || mFileTable[fd] == NULL || mFileTable[fd]->openMode == O_WRONLY) {        KFS_LOG_VA_INFO("Read to fd: %d failed---fd is likely closed", fd);        	return -EBADF;    }    FilePosition *pos = FdPos(fd);    FileAttr *fa = FdAttr(fd);    if (fa->isDirectory)	return -EISDIR;    // flush buffer so sizes are updated properly    ChunkBuffer *cb = FdBuffer(fd);    if (cb->dirty)	FlushBuffer(fd);    cb->allocate();        // Loop thru chunk after chunk until we either get the desired #    // of bytes or we hit EOF.    while (nread < numBytes) {        //        // Basic invariant: when we enter this loop, the connections        // we have to the chunkservers (if any) are correct.  As we        // read thru a file, we call seek whenever we have data to        // hand out to the client.  As we cross chunk boundaries, seek        // will invalidate our current set of connections and force us        // to get new ones via a call to OpenChunk().   This same        // principle holds for write code path as well.        //	if (!IsChunkReadable(fd))	    break;	if (pos->fileOffset >= (off_t) fa->fileSize) {	    KFS_LOG_VA_DEBUG("Current pointer (%lld) is past EOF (%lld) ...so, done",	                     pos->fileOffset, fa->fileSize);	    break;	}	nleft = numBytes - nread;	numIO = ReadChunk(fd, buf + nread, nleft);	if (numIO < 0)	    break;	nread += numIO;	Seek(fd, numIO, SEEK_CUR);    }    if ((pos->fileOffset < (off_t) fa->fileSize) && (nread < numBytes)) {        FilePosition *pos = FdPos(fd);        string s;        if ((pos != NULL) && (pos->preferredServer != NULL)) {            s = pos->GetPreferredServerLocation().ToString();        }                KFS_LOG_VA_INFO("Read done from %s on %s: @offset: %lld: asked: %d, returning %d, errorcode = %d",                        s.c_str(), mFileTable[fd]->pathname.c_str(), pos->fileOffset, numBytes, (int) nread, (int) numIO);    }    return nread;}boolKfsClientImpl::IsChunkReadable(int fd){    FilePosition *pos = FdPos(fd);    int res = -1;    ChunkAttr *chunk = NULL;    for (int retryCount = 0; retryCount < NUM_RETRIES_PER_OP; retryCount++) {        res = LocateChunk(fd, pos->chunkNum);        if (res >= 0) {            chunk = GetCurrChunk(fd);            if (pos->preferredServer == NULL && chunk->chunkId != (kfsChunkId_t)-1) {                // use nonblocking connect to chunkserver; if one fails to                // connect, we switch to another replica.                 res = OpenChunk(fd, true);                if (res < 0) {                    if (pos->preferredServer != NULL)                        pos->AvoidServer(pos->preferredServerLocation);                    continue;                }            }            break;        }        if (res == -EAGAIN) {            // could be that all 3 servers are temporarily down            Sleep(RETRY_DELAY_SECS);            continue;        } else {            // we can't locate the chunk...fail            return false;        }    }    if (res < 0)        return false;    return IsChunkLeaseGood(chunk->chunkId);}boolKfsClientImpl::IsChunkLeaseGood(kfsChunkId_t chunkId){    if (chunkId > 0) {	if ((!mLeaseClerk.IsLeaseValid(chunkId)) &&	    (GetLease(chunkId) < 0)) {	    // couldn't get a valid lease	    return false;	}	if (mLeaseClerk.ShouldRenewLease(chunkId)) {	    RenewLease(chunkId);	}    }    return true;}ssize_tKfsClientImpl::ReadChunk(int fd, char *buf, size_t numBytes){    ssize_t numIO;    ChunkAttr *chunk;    FilePosition *pos = FdPos(fd);    int retryCount = 0;    assert(valid_fd(fd));    assert(pos->fileOffset < (off_t) mFileTable[fd]->fattr.fileSize);    numIO = CopyFromChunkBuf(fd, buf, numBytes);    if (numIO > 0)	return numIO;    chunk = GetCurrChunk(fd);    while (retryCount < NUM_RETRIES_PER_OP) {	if (pos->preferredServer == NULL) {            int status;            // we come into this function with a connection to some            // chunkserver; as part of the read, the connection            // broke.  so, we need to "re-figure" where the chunk is.            if (chunk->chunkId < 0) {                status = LocateChunk(fd, pos->chunkNum);                if (status < 0) {                    retryCount++;                    Sleep(RETRY_DELAY_SECS);                    continue;                }            }            // we know where the chunk is....            assert(chunk->chunkId != (kfsChunkId_t) -1);            // we are here because we are handling failover/version #            // mismatch	    retryCount++;	    Sleep(RETRY_DELAY_SECS);	    status = OpenChunk(fd, true);            if (NeedToChangeReplica(status)) {                // we couldn't read the data off the disk from the server;                // when we retry, we need to pick another replica                                if (pos->preferredServer != NULL) {                    string s = pos->GetPreferredServerLocation().ToString();                    KFS_LOG_VA_INFO("Got error=%d from server %s for %s @offset: %lld; avoiding server",                                numIO, s.c_str(), mFileTable[fd]->pathname.c_str(), pos->fileOffset);                }                chunk->AvoidServer(pos->preferredServerLocation);                pos->AvoidServer(pos->preferredServerLocation);                continue;            }	    if (status < 0) {                // open failed..so, bail	        return status;            }	}	numIO = ZeroFillBuf(fd, buf, numBytes);	if (numIO > 0)	    return numIO;	if (numBytes < ChunkBuffer::BUF_SIZE) {	    // small reads...so buffer the data	    ChunkBuffer *cb = FdBuffer(fd);	    numIO = ReadFromServer(fd, cb->buf, ChunkBuffer::BUF_SIZE);	    if (numIO > 0) {	        cb->chunkno = pos->chunkNum;	        cb->start = pos->chunkOffset;	        cb->length = numIO;	        numIO = CopyFromChunkBuf(fd, buf, numBytes);	    }	} else {	    // big read...forget buffering	    numIO = ReadFromServer(fd, buf, numBytes);	}        if ((numIO >= 0) || (!NeedToRetryRead(numIO))) {            // either we got data or it is an error which doesn't            // require a retry of the read.            break;        }        if (NeedToChangeReplica(numIO)) {            // we couldn't read the data off the disk from the server;            // when we retry, we need to pick another replica            if (pos->preferredServer != NULL) {                string s = pos->GetPreferredServerLocation().ToString();                KFS_LOG_VA_INFO("Got error=%d from server %s for %s @offset: %lld; avoiding server",                                numIO, s.c_str(), mFileTable[fd]->pathname.c_str(), pos->fileOffset);            }            chunk->AvoidServer(pos->preferredServerLocation);            pos->AvoidServer(pos->preferredServerLocation);            continue;        }        // KFS_LOG_DEBUG("Need to retry read...");        // Ok...so, we need to retry the read.  so, re-determine where        // the chunk went and then retry.        chunk->chunkId = -1;        pos->ResetServers();    }    return numIO;}ssize_tKfsClientImpl::ReadFromServer(int fd, char *buf, size_t numBytes){    size_t numAvail;    ChunkAttr *chunk = GetCurrChunk(fd);    FilePosition *pos = FdPos(fd);    int res;    assert(chunk->chunkSize - pos->chunkOffset >= 0);    numAvail = min((size_t) (chunk->chunkSize - pos->chunkOffset),                   numBytes);    // Align the reads to checksum block boundaries, so that checksum    // verification on the server can be done efficiently: if the read falls    // within a checksum block, issue it as one read; otherwise, split    // the read into multiple reads.    if (pos->chunkOffset + numAvail <=	OffsetToChecksumBlockEnd(pos->chunkOffset))	res = DoSmallReadFromServer(fd, buf, numBytes);    else	res = DoLargeReadFromServer(fd, buf, numBytes);    return res;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -