⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfswrite.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 2 页
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsWrite.cc 213 2008-11-05 20:18:30Z sriramsrao $ //// Created 2006/10/02// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// All the code to deal with writes from the client.//----------------------------------------------------------------------------#include "KfsClient.h"#include "KfsClientInt.h"#include "common/properties.h"#include "common/log.h"#include "meta/kfstypes.h"#include "libkfsIO/Checksum.h"#include "Utils.h"#include <cerrno>#include <iostream>#include <string>using std::string;using std::ostringstream;using std::istringstream;using std::min;using std::max;using std::cout;using std::endl;using namespace KFS;static boolNeedToRetryAllocation(int status){    return ((status == -EHOSTUNREACH) ||	    (status == -ETIMEDOUT) ||	    (status == -EBUSY) ||	    (status == -EIO) ||            (status == -KFS::EBADVERS) ||	    (status == -KFS::EALLOCFAILED));}ssize_tKfsClientImpl::Write(int fd, const char *buf, size_t numBytes){    MutexLock l(&mMutex);    size_t nwrote = 0;    ssize_t numIO = 0;    int res;    if (!valid_fd(fd) || mFileTable[fd] == NULL || mFileTable[fd]->openMode == O_RDONLY) {        KFS_LOG_VA_INFO("Write to fd: %d failed---fd is likely closed", fd);	return -EBADF;    }    FileAttr *fa = FdAttr(fd);    if (fa->isDirectory)	return -EISDIR;    FilePosition *pos = FdPos(fd);    //    // Loop thru chunk after chunk until we write the desired #    // of bytes.    while (nwrote < numBytes) {	size_t nleft = numBytes - nwrote;        // Don't need this: if we don't have the lease, don't	// know where the chunk is, allocation will get that info.	// LocateChunk(fd, pos->chunkNum);	// need to retry here...	if ((res = DoAllocation(fd)) < 0) {	    // allocation failed...bail	    break;	}	if (pos->preferredServer == NULL) {	    numIO = OpenChunk(fd);	    if (numIO < 0) {		// KFS_LOG_VA_DEBUG("OpenChunk(%lld)", numIO);		break;	    }	}	if (nleft < ChunkBuffer::BUF_SIZE || FdBuffer(fd)->dirty) {	    // either the write is small or there is some dirty	    // data...so, aggregate as much as possible and then it'll	    // get flushed	    numIO = WriteToBuffer(fd, buf + nwrote, nleft);	} else {	    // write is big and there is nothing dirty...so,	    // write-thru	    numIO = WriteToServer(fd, pos->chunkOffset, buf + nwrote, nleft);	}	if (numIO < 0) {            if (numIO == -KFS::ELEASEEXPIRED) {                KFS_LOG_VA_INFO("Continuing to retry write for errorcode = %d", numIO);                continue;            }            KFS_LOG_VA_INFO("Write failed %s @offset: %lld: asked: %d, did: %d, errorcode = %d",                            mFileTable[fd]->pathname.c_str(), pos->fileOffset, numBytes, nwrote, numIO);	    break;	}	nwrote += numIO;	numIO = Seek(fd, numIO, SEEK_CUR);	if (numIO < 0) {	    // KFS_LOG_VA_DEBUG("Seek(%lld)", numIO);	    break;	}    }    if (nwrote == 0 && numIO < 0)	return numIO;    if (nwrote != numBytes) {	KFS_LOG_VA_DEBUG("----Write done: asked: %llu, got: %llu-----",			  numBytes, nwrote);    }    return nwrote;}ssize_tKfsClientImpl::WriteToBuffer(int fd, const char *buf, size_t numBytes){    ssize_t numIO;    size_t lastByte;    FilePosition *pos = FdPos(fd);    ChunkBuffer *cb = FdBuffer(fd);    // if the buffer has dirty data and this write doesn't abut it,    // or if buffer is full, flush the buffer before proceeding.    // XXX: Reconsider buffering to do a read-modify-write of    // large amounts of data.    if (cb->dirty && cb->chunkno != pos->chunkNum) {	int status = FlushBuffer(fd);	if (status < 0)	    return status;    }    cb->allocate();    off_t start = pos->chunkOffset - cb->start;    size_t previous = cb->length;    if (cb->dirty && ((start != (off_t) previous) ||	              (previous == ChunkBuffer::BUF_SIZE))) {	int status = FlushBuffer(fd);	if (status < 0)	    return status;    }    if (!cb->dirty) {	cb->chunkno = pos->chunkNum;	cb->start = pos->chunkOffset;	cb->length = 0;	cb->dirty = true;    }    // ensure that write doesn't straddle chunk boundaries    numBytes = min(numBytes, (size_t) (KFS::CHUNKSIZE - pos->chunkOffset));    if (numBytes == 0)	return 0;    // max I/O we can do    numIO = min(ChunkBuffer::BUF_SIZE - cb->length, numBytes);    assert(numIO > 0);    // KFS_LOG_VA_DEBUG("Buffer absorbs write...%d bytes", numIO);    // chunkBuf[0] corresponds to some offset in the chunk,    // which is defined by chunkBufStart.    // chunkOffset corresponds to the position in the chunk    // where the "filepointer" is currently at.    // Figure out where the data we want copied into starts    start = pos->chunkOffset - cb->start;    assert(start >= 0 && start < (off_t) ChunkBuffer::BUF_SIZE);    memcpy(&cb->buf[start], buf, numIO);    lastByte = start + numIO;    // update the size according to where the last byte just    // got written.    if (lastByte > cb->length)	cb->length = lastByte;    return numIO;}ssize_tKfsClientImpl::FlushBuffer(int fd){    ssize_t numIO = 0;    ChunkBuffer *cb = FdBuffer(fd);    if (cb->dirty) {	numIO = WriteToServer(fd, cb->start, cb->buf, cb->length);	if (numIO >= 0) {	    cb->dirty = false;            // we just flushed the buffer...so, there is no data in it            cb->length = 0;        }    }    return numIO;}ssize_tKfsClientImpl::WriteToServer(int fd, off_t offset, const char *buf, size_t numBytes){    assert(KFS::CHUNKSIZE - offset >= 0);    size_t numAvail = min(numBytes, (size_t) (KFS::CHUNKSIZE - offset));    int res = 0;    for (int retryCount = 0; retryCount < NUM_RETRIES_PER_OP; retryCount++) {	// Same as read: align writes to checksum block boundaries	if (offset + numAvail <= OffsetToChecksumBlockEnd(offset))	    res = DoSmallWriteToServer(fd, offset, buf, numBytes);	else {            struct timeval startTime, endTime;            double timeTaken;                        gettimeofday(&startTime, NULL);            	    res = DoLargeWriteToServer(fd, offset, buf, numBytes);            gettimeofday(&endTime, NULL);                        timeTaken = (endTime.tv_sec - startTime.tv_sec) +                (endTime.tv_usec - startTime.tv_usec) * 1e-6;            if (timeTaken > 5.0) {                ostringstream os;                ChunkAttr *chunk = GetCurrChunk(fd);                for (uint32_t i = 0; i < chunk->chunkServerLoc.size(); i++)                    os << chunk->chunkServerLoc[i].ToString().c_str() << ' ';                KFS_LOG_VA_INFO("Writes thru chain %s for chunk %lld are taking: %.3f secs",                                 os.str().c_str(), GetCurrChunk(fd)->chunkId, timeTaken);            }                        KFS_LOG_VA_DEBUG("Total Time to write data to server(s): %.4f secs", timeTaken);        }        if (res >= 0)            break;        // write failure...retry        ostringstream os;        ChunkAttr *chunk = GetCurrChunk(fd);                    for (uint32_t i = 0; i < chunk->chunkServerLoc.size(); i++)            os << chunk->chunkServerLoc[i].ToString().c_str() << ' ';        // whatever be the error, wait a bit and retry...        KFS_LOG_VA_INFO("Daisy-chain: %s; Will retry allocation/write on chunk %lld due to error code: %d",                         os.str().c_str(), GetCurrChunk(fd)->chunkId, res);        Sleep(LEASE_RETRY_DELAY_SECS);        // save the value of res; in case we tried too many times        // and are giving up, we need the error to propogate        int r;        if ((r = DoAllocation(fd, true)) < 0) {            KFS_LOG_VA_INFO("Re-allocation on chunk %lld failed because of error code = %d", r);            return r;        }    }    if (res < 0) {        // any other error        string errstr = ErrorCodeToStr(res);        ChunkAttr *chunk = GetCurrChunk(fd);        KFS_LOG_VA_INFO("Retries failed: Write on chunk %lld failed because of error: (code = %d) %s",                         chunk->chunkId, res, errstr.c_str());    }    return res;}intKfsClientImpl::DoAllocation(int fd, bool force){    ChunkAttr *chunk = NULL;    FileAttr *fa = FdAttr(fd);    int res = 0;    if (IsCurrChunkAttrKnown(fd))	chunk = GetCurrChunk(fd);    if (chunk && force)	chunk->didAllocation = false;    if ((chunk == NULL) || (chunk->chunkId == (kfsChunkId_t) -1) ||	(!chunk->didAllocation)) {	// if we couldn't locate the chunk, it must be a new one.	// also, if it is an existing chunk, force an allocation	// if needed (which'll cause version # bumps, lease	// handouts etc).	for (uint8_t retryCount = 0; retryCount < NUM_RETRIES_PER_OP; retryCount++) {	    if (retryCount) {		if (res == -EBUSY)		    // the metaserver says it can't get us a lease for		    // the chunk.  so, wait for a bit for the lease to		    // expire and then retry		    Sleep(LEASE_RETRY_DELAY_SECS);		else		    Sleep(RETRY_DELAY_SECS);	    }	    res = AllocChunk(fd);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -