⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsread.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 2 页
字号:
//// Issue a single read op to the server and get data back.//ssize_tKfsClientImpl::DoSmallReadFromServer(int fd, char *buf, size_t numBytes){    ChunkAttr *chunk = GetCurrChunk(fd);    ReadOp op(nextSeq(), chunk->chunkId, chunk->chunkVersion);    op.offset = mFileTable[fd]->currPos.chunkOffset;    op.numBytes = min(chunk->chunkSize, (off_t) numBytes);    op.AttachContentBuf(buf, numBytes);    // make sure we aren't overflowing...    assert(buf + op.numBytes <= buf + numBytes);    (void)DoOpCommon(&op, mFileTable[fd]->currPos.preferredServer);    VerifyChecksum(&op, mFileTable[fd]->currPos.preferredServer);    ssize_t numIO = (op.status >= 0) ? op.contentLength : op.status;    op.ReleaseContentBuf();    return numIO;}size_tKfsClientImpl::ZeroFillBuf(int fd, char *buf, size_t numBytes){    size_t numIO, bytesInFile, bytesInChunk;    ChunkAttr *chunk = GetCurrChunk(fd);    if (mFileTable[fd]->currPos.chunkOffset < (off_t) chunk->chunkSize)	return 0;		// more data in chunk    // We've hit End-of-chunk.  There are two cases here:    // 1. There is more data in the file and that data is in    // the next chunk    // 2. This chunk was filled with less data than what was    // "promised".  (Maybe, write got lost).    // In either case, zero-fill: the amount to zero-fill is    // in the min. of the two.    //    // Also, we can hit the end-of-chunk if we fail to locate a    // chunk.  This can happen if there is a hole in the file.    //    assert(mFileTable[fd]->currPos.fileOffset <=           (off_t) mFileTable[fd]->fattr.fileSize);    bytesInFile = mFileTable[fd]->fattr.fileSize -        mFileTable[fd]->currPos.fileOffset;    assert(chunk->chunkSize <= (off_t) KFS::CHUNKSIZE);    bytesInChunk = KFS::CHUNKSIZE - chunk->chunkSize;    numIO = min(bytesInChunk, bytesInFile);    // Fill in 0's based on space in the buffer....    numIO = min(numIO, numBytes);    // KFS_LOG_DEBUG("Zero-filling %d bytes for read @ %lld", numIO, mFileTable[fd]->currPos.chunkOffset);    memset(buf, 0, numIO);    return numIO;}size_tKfsClientImpl::CopyFromChunkBuf(int fd, char *buf, size_t numBytes){    size_t numIO;    FilePosition *pos = FdPos(fd);    ChunkBuffer *cb = FdBuffer(fd);    size_t start = pos->chunkOffset - cb->start;    // Wrong chunk in buffer or if the starting point in the buffer is    // "BEYOND" the current location of the file pointer, we don't    // have the data.  "BEYOND" => offset is before the starting point    // or offset is after the end of the buffer    if ((pos->chunkNum != cb->chunkno) ||        (pos->chunkOffset < cb->start) ||        (pos->chunkOffset >= (off_t) (cb->start + cb->length)))	return 0;    // first figure out how much data is available in the buffer    // to be copied out.    numIO = min(cb->length - start, numBytes);    // chunkBuf[0] corresponds to some offset in the chunk,    // which is defined by chunkBufStart.    // chunkOffset corresponds to the position in the chunk    // where the "filepointer" is currently at.    // Figure out where the data we want copied out starts    memcpy(buf, &cb->buf[start], numIO);    // KFS_LOG_DEBUG("Copying out data from chunk buf...%d bytes", numIO);    return numIO;}ssize_tKfsClientImpl::DoLargeReadFromServer(int fd, char *buf, size_t numBytes){    FilePosition *pos = FdPos(fd);    ChunkAttr *chunk = GetCurrChunk(fd);    vector<ReadOp *> ops;    assert(chunk->chunkSize - pos->chunkOffset >= 0);    size_t numAvail = min((size_t) (chunk->chunkSize - pos->chunkOffset), numBytes);    size_t numRead = 0;    while (numRead < numAvail) {	ReadOp *op = new ReadOp(nextSeq(), chunk->chunkId, chunk->chunkVersion);	op->numBytes = min(MAX_BYTES_PER_READ_IO, numAvail - numRead);        // op->numBytes = min(KFS::CHUNKSIZE, numAvail - numRead);	assert(op->numBytes > 0);	op->offset = pos->chunkOffset + numRead;	// if the read is going to straddle checksum block boundaries,	// break up the read into multiple reads: this simplifies	// server side code.  for each read request, a single checksum	// block will need to be read and after the checksum verifies,	// the server can "trim" the data that wasn't asked for.	if (OffsetToChecksumBlockStart(op->offset) != op->offset) {	    op->numBytes = OffsetToChecksumBlockEnd(op->offset) - op->offset;	}	op->AttachContentBuf(buf + numRead, op->numBytes);	numRead += op->numBytes;	ops.push_back(op);    }    // make sure we aren't overflowing...    assert(buf + numRead <= buf + numBytes);    struct timeval readStart, readEnd;    gettimeofday(&readStart, NULL);    {        ostringstream os;        os << pos->GetPreferredServerLocation().ToString().c_str() << ':'            << " c=" << chunk->chunkId << " o=" << pos->chunkOffset << " n=" << numBytes;        KFS_LOG_VA_DEBUG("Reading from %s", os.str().c_str());    }    ssize_t numIO = DoPipelinedRead(ops, pos->preferredServer);    /*    if (numIO < 0) {	KFS_LOG_DEBUG("Pipelined read from server failed...");    }    */    int retryStatus = 0;    for (vector<KfsOp *>::size_type i = 0; i < ops.size(); ++i) {	ReadOp *op = static_cast<ReadOp *> (ops[i]);	if (op->status < 0) {            if (NeedToRetryRead(op->status)) {                // preserve EIO so that we can avoid that server                if (retryStatus != -EIO)                    retryStatus = op->status;            }	    numIO = op->status;        }	else if (numIO >= 0)	    numIO += op->status;	op->ReleaseContentBuf();	delete op;    }    // If the op needs to be retried, pass that up    if (retryStatus != 0)        numIO = retryStatus;    gettimeofday(&readEnd, NULL);    double timeSpent = ComputeTimeDiff(readStart, readEnd);    {        ostringstream os;        os << pos->GetPreferredServerLocation().ToString().c_str() << ':'            << " c=" << chunk->chunkId << " o=" << pos->chunkOffset << " n=" << numBytes           << " got=" << numIO << " time=" << timeSpent;                if (timeSpent > 5.0) {            struct sockaddr_in saddr;            KFS_LOG_VA_INFO("Read done from %s", os.str().c_str());            if (pos->GetPreferredServerAddr(saddr) == 0) {                KFS_LOG_VA_DEBUG("Sending telemetry report about: %s", os.str().c_str());                double diskIOTime[MAX_IO_INFO_PER_PKT];                double elapsedTime[MAX_IO_INFO_PER_PKT];                vector<KfsOp *>::size_type count = 0;                for (; (count < ops.size()) && (count < MAX_IO_INFO_PER_PKT); count++) {                    ReadOp *op = static_cast<ReadOp *> (ops[count]);                    diskIOTime[count] = op->diskIOTime;                    elapsedTime[count] = op->elapsedTime;                }                mTelemetryReporter.publish(saddr.sin_addr, timeSpent, "READ",                                            count, diskIOTime, elapsedTime);            }        }            }    return numIO;}////// Common work for a read op that can be pipelined./// The idea is to plumb the pipe with a set of requests; then,/// whenever one finishes, submit a new request.////// @param[in] ops the vector of ops to be done/// @param[in] sock the socket on which we communicate with server////// @retval 0 on success; -1 on failure///intKfsClientImpl::DoPipelinedRead(vector<ReadOp *> &ops, TcpSocket *sock){    vector<ReadOp *>::size_type first = 0, next, minOps;    int res = 0;    ReadOp *op;    bool leaseExpired = false;    // plumb the pipe with 1MB    minOps = min((size_t) (MIN_BYTES_PIPELINE_IO / MAX_BYTES_PER_READ_IO), ops.size());    // plumb the pipe with a few ops    for (next = 0; next < minOps; ++next) {        op = ops[next];        gettimeofday(&op->submitTime, NULL);	res = DoOpSend(op, sock);	if (res < 0)	    return -1;    }    // run the pipe: whenever one op finishes, queue another    while (next < ops.size()) {        struct timeval now;	op = ops[first];	res = DoOpResponse(op, sock);	if (res < 0)	    return -1;        gettimeofday(&now, NULL);        op->elapsedTime = ComputeTimeDiff(op->submitTime, now);	++first;	op = ops[next];	if (!IsChunkLeaseGood(op->chunkId)) {	    leaseExpired = true;	    break;	}        gettimeofday(&op->submitTime, NULL);	res = DoOpSend(op, sock);	if (res < 0)	    return -1;	++next;    }    // get the response for the remaining ones    while (first < next) {        struct timeval now;	op = ops[first];	res = DoOpResponse(op, sock);	if (res < 0)	    return -1;        gettimeofday(&now, NULL);        op->elapsedTime = ComputeTimeDiff(op->submitTime, now);	if (leaseExpired)	    op->status = 0;	++first;    }    // do checksum verification    if (res >= 0) {        for (next = 0; next < ops.size(); next++) {            op = ops[next];            if (op->checksums.size() == 0)                continue;            VerifyChecksum(op, sock);        }    }    return 0;}boolKfsClientImpl::VerifyChecksum(ReadOp* op, TcpSocket* sock){    for (size_t pos = 0; pos < op->contentLength; pos += CHECKSUM_BLOCKSIZE) {        size_t len = min(CHECKSUM_BLOCKSIZE, (uint32_t) (op->contentLength - pos));        uint32_t cksum = ComputeBlockChecksum(op->contentBuf + pos, len);        uint32_t cksumIndex = pos / CHECKSUM_BLOCKSIZE;        if (op->checksums.size() < cksumIndex) {            // didn't get all the checksums            KFS_LOG_VA_DEBUG("Didn't get checksum for offset: %lld",                             op->offset + pos);            continue;        }        uint32_t serverCksum = op->checksums[cksumIndex];        if (serverCksum != cksum) {            if (sock) {                struct sockaddr_in saddr;                char ipname[INET_ADDRSTRLEN];                sock->GetPeerName((struct sockaddr *) &saddr);                inet_ntop(AF_INET, &(saddr.sin_addr), ipname, INET_ADDRSTRLEN);                KFS_LOG_VA_INFO("Checksum mismatch from %s starting @pos = %lld: got = %d, computed = %d for %s",                                ipname, op->offset + pos, serverCksum, cksum, op->Show().c_str());                mTelemetryReporter.publish(saddr.sin_addr, -1.0, "CHECKSUM_MISMATCH");            }            op->status = -KFS::EBADCKSUM;        }    }    return (op->status >= 0);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -