📄 kfswrite.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsWrite.cc 213 2008-11-05 20:18:30Z sriramsrao $ //// Created 2006/10/02// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// All the code to deal with writes from the client.//----------------------------------------------------------------------------#include "KfsClient.h"#include "KfsClientInt.h"#include "common/properties.h"#include "common/log.h"#include "meta/kfstypes.h"#include "libkfsIO/Checksum.h"#include "Utils.h"#include <cerrno>#include <iostream>#include <string>using std::string;using std::ostringstream;using std::istringstream;using std::min;using std::max;using std::cout;using std::endl;using namespace KFS;static boolNeedToRetryAllocation(int status){ return ((status == -EHOSTUNREACH) || (status == -ETIMEDOUT) || (status == -EBUSY) || (status == -EIO) || (status == -KFS::EBADVERS) || (status == -KFS::EALLOCFAILED));}ssize_tKfsClientImpl::Write(int fd, const char *buf, size_t numBytes){ MutexLock l(&mMutex); size_t nwrote = 0; ssize_t numIO = 0; int res; if (!valid_fd(fd) || mFileTable[fd] == NULL || mFileTable[fd]->openMode == O_RDONLY) { KFS_LOG_VA_INFO("Write to fd: %d failed---fd is likely closed", fd); return -EBADF; } FileAttr *fa = FdAttr(fd); if (fa->isDirectory) return -EISDIR; FilePosition *pos = FdPos(fd); // // Loop thru chunk after chunk until we write the desired # // of bytes. while (nwrote < numBytes) { size_t nleft = numBytes - nwrote; // Don't need this: if we don't have the lease, don't // know where the chunk is, allocation will get that info. // LocateChunk(fd, pos->chunkNum); // need to retry here... if ((res = DoAllocation(fd)) < 0) { // allocation failed...bail break; } if (pos->preferredServer == NULL) { numIO = OpenChunk(fd); if (numIO < 0) { // KFS_LOG_VA_DEBUG("OpenChunk(%lld)", numIO); break; } } if (nleft < ChunkBuffer::BUF_SIZE || FdBuffer(fd)->dirty) { // either the write is small or there is some dirty // data...so, aggregate as much as possible and then it'll // get flushed numIO = WriteToBuffer(fd, buf + nwrote, nleft); } else { // write is big and there is nothing dirty...so, // write-thru numIO = WriteToServer(fd, pos->chunkOffset, buf + nwrote, nleft); } if (numIO < 0) { if (numIO == -KFS::ELEASEEXPIRED) { KFS_LOG_VA_INFO("Continuing to retry write for errorcode = %d", numIO); continue; } KFS_LOG_VA_INFO("Write failed %s @offset: %lld: asked: %d, did: %d, errorcode = %d", mFileTable[fd]->pathname.c_str(), pos->fileOffset, numBytes, nwrote, numIO); break; } nwrote += numIO; numIO = Seek(fd, numIO, SEEK_CUR); if (numIO < 0) { // KFS_LOG_VA_DEBUG("Seek(%lld)", numIO); break; } } if (nwrote == 0 && numIO < 0) return numIO; if (nwrote != numBytes) { KFS_LOG_VA_DEBUG("----Write done: asked: %llu, got: %llu-----", numBytes, nwrote); } return nwrote;}ssize_tKfsClientImpl::WriteToBuffer(int fd, const char *buf, size_t numBytes){ ssize_t numIO; size_t lastByte; FilePosition *pos = FdPos(fd); ChunkBuffer *cb = FdBuffer(fd); // if the buffer has dirty data and this write doesn't abut it, // or if buffer is full, flush the buffer before proceeding. // XXX: Reconsider buffering to do a read-modify-write of // large amounts of data. if (cb->dirty && cb->chunkno != pos->chunkNum) { int status = FlushBuffer(fd); if (status < 0) return status; } cb->allocate(); off_t start = pos->chunkOffset - cb->start; size_t previous = cb->length; if (cb->dirty && ((start != (off_t) previous) || (previous == ChunkBuffer::BUF_SIZE))) { int status = FlushBuffer(fd); if (status < 0) return status; } if (!cb->dirty) { cb->chunkno = pos->chunkNum; cb->start = pos->chunkOffset; cb->length = 0; cb->dirty = true; } // ensure that write doesn't straddle chunk boundaries numBytes = min(numBytes, (size_t) (KFS::CHUNKSIZE - pos->chunkOffset)); if (numBytes == 0) return 0; // max I/O we can do numIO = min(ChunkBuffer::BUF_SIZE - cb->length, numBytes); assert(numIO > 0); // KFS_LOG_VA_DEBUG("Buffer absorbs write...%d bytes", numIO); // chunkBuf[0] corresponds to some offset in the chunk, // which is defined by chunkBufStart. // chunkOffset corresponds to the position in the chunk // where the "filepointer" is currently at. // Figure out where the data we want copied into starts start = pos->chunkOffset - cb->start; assert(start >= 0 && start < (off_t) ChunkBuffer::BUF_SIZE); memcpy(&cb->buf[start], buf, numIO); lastByte = start + numIO; // update the size according to where the last byte just // got written. if (lastByte > cb->length) cb->length = lastByte; return numIO;}ssize_tKfsClientImpl::FlushBuffer(int fd){ ssize_t numIO = 0; ChunkBuffer *cb = FdBuffer(fd); if (cb->dirty) { numIO = WriteToServer(fd, cb->start, cb->buf, cb->length); if (numIO >= 0) { cb->dirty = false; // we just flushed the buffer...so, there is no data in it cb->length = 0; } } return numIO;}ssize_tKfsClientImpl::WriteToServer(int fd, off_t offset, const char *buf, size_t numBytes){ assert(KFS::CHUNKSIZE - offset >= 0); size_t numAvail = min(numBytes, (size_t) (KFS::CHUNKSIZE - offset)); int res = 0; for (int retryCount = 0; retryCount < NUM_RETRIES_PER_OP; retryCount++) { // Same as read: align writes to checksum block boundaries if (offset + numAvail <= OffsetToChecksumBlockEnd(offset)) res = DoSmallWriteToServer(fd, offset, buf, numBytes); else { struct timeval startTime, endTime; double timeTaken; gettimeofday(&startTime, NULL); res = DoLargeWriteToServer(fd, offset, buf, numBytes); gettimeofday(&endTime, NULL); timeTaken = (endTime.tv_sec - startTime.tv_sec) + (endTime.tv_usec - startTime.tv_usec) * 1e-6; if (timeTaken > 5.0) { ostringstream os; ChunkAttr *chunk = GetCurrChunk(fd); for (uint32_t i = 0; i < chunk->chunkServerLoc.size(); i++) os << chunk->chunkServerLoc[i].ToString().c_str() << ' '; KFS_LOG_VA_INFO("Writes thru chain %s for chunk %lld are taking: %.3f secs", os.str().c_str(), GetCurrChunk(fd)->chunkId, timeTaken); } KFS_LOG_VA_DEBUG("Total Time to write data to server(s): %.4f secs", timeTaken); } if (res >= 0) break; // write failure...retry ostringstream os; ChunkAttr *chunk = GetCurrChunk(fd); for (uint32_t i = 0; i < chunk->chunkServerLoc.size(); i++) os << chunk->chunkServerLoc[i].ToString().c_str() << ' '; // whatever be the error, wait a bit and retry... KFS_LOG_VA_INFO("Daisy-chain: %s; Will retry allocation/write on chunk %lld due to error code: %d", os.str().c_str(), GetCurrChunk(fd)->chunkId, res); Sleep(LEASE_RETRY_DELAY_SECS); // save the value of res; in case we tried too many times // and are giving up, we need the error to propogate int r; if ((r = DoAllocation(fd, true)) < 0) { KFS_LOG_VA_INFO("Re-allocation on chunk %lld failed because of error code = %d", r); return r; } } if (res < 0) { // any other error string errstr = ErrorCodeToStr(res); ChunkAttr *chunk = GetCurrChunk(fd); KFS_LOG_VA_INFO("Retries failed: Write on chunk %lld failed because of error: (code = %d) %s", chunk->chunkId, res, errstr.c_str()); } return res;}intKfsClientImpl::DoAllocation(int fd, bool force){ ChunkAttr *chunk = NULL; FileAttr *fa = FdAttr(fd); int res = 0; if (IsCurrChunkAttrKnown(fd)) chunk = GetCurrChunk(fd); if (chunk && force) chunk->didAllocation = false; if ((chunk == NULL) || (chunk->chunkId == (kfsChunkId_t) -1) || (!chunk->didAllocation)) { // if we couldn't locate the chunk, it must be a new one. // also, if it is an existing chunk, force an allocation // if needed (which'll cause version # bumps, lease // handouts etc). for (uint8_t retryCount = 0; retryCount < NUM_RETRIES_PER_OP; retryCount++) { if (retryCount) { if (res == -EBUSY) // the metaserver says it can't get us a lease for // the chunk. so, wait for a bit for the lease to // expire and then retry Sleep(LEASE_RETRY_DELAY_SECS); else Sleep(RETRY_DELAY_SECS); } res = AllocChunk(fd);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -