⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfswrite.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 2 页
字号:
	    if ((res >= 0) || (!NeedToRetryAllocation(res))) {		break;	    }	    // allocation failed...retry	}	if (res < 0)	    // allocation failed...bail	    return res;	chunk = GetCurrChunk(fd);	assert(chunk != NULL);	chunk->didAllocation = true;	if (force) {	    KFS_LOG_VA_INFO("Forced allocation: chunk=%lld, version=%lld",                            chunk->chunkId, chunk->chunkVersion);	}        else {            ++fa->chunkCount;        }    }    return 0;}ssize_tKfsClientImpl::DoSmallWriteToServer(int fd, off_t offset, const char *buf, size_t numBytes){    return DoLargeWriteToServer(fd, offset, buf, numBytes);}ssize_tKfsClientImpl::DoLargeWriteToServer(int fd, off_t offset, const char *buf, size_t numBytes){    size_t numAvail, numWrote = 0;    ssize_t numIO;    FilePosition *pos = FdPos(fd);    ChunkAttr *chunk = GetCurrChunk(fd);    ServerLocation loc = chunk->chunkServerLoc[0];    TcpSocket *masterSock = FdPos(fd)->GetChunkServerSocket(loc);    vector<WritePrepareOp *> ops;    vector<WriteInfo> writeId;    assert(KFS::CHUNKSIZE - offset >= 0);    numAvail = min(numBytes, (size_t) (KFS::CHUNKSIZE - offset));    // cout << "Pushing to server: " << offset << ' ' << numBytes << endl;    // get the write id    numIO = AllocateWriteId(fd, offset, numBytes, writeId, masterSock);    if (numIO < 0)        return numIO;    // Split the write into a bunch of smaller ops    while (numWrote < numAvail) {	WritePrepareOp *op = new WritePrepareOp(nextSeq(), chunk->chunkId,	                                        chunk->chunkVersion, writeId);	op->numBytes = min(MAX_BYTES_PER_WRITE_IO, numAvail - numWrote);	if ((op->numBytes % CHECKSUM_BLOCKSIZE) != 0) {	    // if the write isn't aligned to end on a checksum block	    // boundary, then break the write into two parts:	    //(1) start at offset and end on a checksum block boundary	    //(2) is the rest, which is less than the size of checksum	    //block	    // This simplifies chunkserver code: either the writes are	    // multiples of checksum blocks or a single write which is	    // smaller than a checksum block.	    if (op->numBytes > CHECKSUM_BLOCKSIZE)		op->numBytes = (op->numBytes / CHECKSUM_BLOCKSIZE) * CHECKSUM_BLOCKSIZE;	    // else case #2 from above comment and op->numBytes is setup right	}	assert(op->numBytes > 0);	op->offset = offset + numWrote;	// similar to read, breakup the write if it is straddling	// checksum block boundaries.	if (OffsetToChecksumBlockStart(op->offset) != op->offset) {	    op->numBytes = min((size_t) (OffsetToChecksumBlockEnd(op->offset) - op->offset),	                       op->numBytes);	}	op->AttachContentBuf(buf + numWrote, op->numBytes);	op->contentLength = op->numBytes;        op->checksum = ComputeBlockChecksum(op->contentBuf, op->contentLength);        {            ostringstream os;            os << "@offset: " << op->offset << " nbytes: " << op->numBytes               << " cksum: " << op->checksum;            KFS_LOG_VA_DEBUG("%s", os.str().c_str());        }	numWrote += op->numBytes;	ops.push_back(op);    }    // For pipelined data push to work, we break the write into a    // sequence of smaller ops and push them to the master; the master    // then forwards each op to one replica, who then forwards to    // next.    numIO = DoPipelinedWrite(fd, ops, masterSock);    if (numIO < 0) {        //        // the write failed; caller will do the retry        //        KFS_LOG_VA_INFO("Write failed...chunk = %lld, version = %lld, offset = %lld, error = %d",                        ops[0]->chunkId, ops[0]->chunkVersion, ops[0]->offset,                        numIO);    }    // figure out how much was committed    numIO = 0;    for (vector<KfsOp *>::size_type i = 0; i < ops.size(); ++i) {	WritePrepareOp *op = static_cast<WritePrepareOp *> (ops[i]);	if (op->status < 0)	    numIO = op->status;	else if (numIO >= 0)	    numIO += op->status;	op->ReleaseContentBuf();	delete op;    }    if (numIO >= 0 && (off_t)chunk->chunkSize < offset + numIO) {	// grow the chunksize only if we wrote past the last byte in the chunk	chunk->chunkSize = offset + numIO;	// if we wrote past the last byte of the file, then grow the	// file size.  Note that, chunks 0..chunkNum-1 are assumed to	// be full.  So, take the size of the last chunk and to that	// add the size of the "full" chunks to get the size	FileAttr *fa = FdAttr(fd);	off_t eow = chunk->chunkSize + (pos->chunkNum  * KFS::CHUNKSIZE);	fa->fileSize = max(fa->fileSize, eow);    }    if (numIO != (ssize_t) numBytes) {	KFS_LOG_VA_DEBUG("Wrote to server (fd = %d), %lld bytes, was asked %lld bytes",	                 fd, numIO, numBytes);    } else {        KFS_LOG_VA_DEBUG("Wrote to server (fd = %d), %lld bytes",                         fd, numIO);    }    return numIO;}intKfsClientImpl::AllocateWriteId(int fd, off_t offset, size_t numBytes,                               vector<WriteInfo> &writeId, TcpSocket *masterSock){    ChunkAttr *chunk = GetCurrChunk(fd);    WriteIdAllocOp op(nextSeq(), chunk->chunkId, chunk->chunkVersion, offset, numBytes);    int res;    op.chunkServerLoc = chunk->chunkServerLoc;    res = DoOpSend(&op, masterSock);    if ((res < 0) || (op.status < 0)) {        if (op.status < 0)            return op.status;        return res;    }    res = DoOpResponse(&op, masterSock);    if ((res < 0) || (op.status < 0)) {        if (op.status < 0)            return op.status;        return res;    }    // get rid of any old stuff    writeId.clear();    writeId.reserve(op.chunkServerLoc.size());    istringstream ist(op.writeIdStr);    for (uint32_t i = 0; i < chunk->chunkServerLoc.size(); i++) {        ServerLocation loc;        int64_t id;        ist >> loc.hostname;        ist >> loc.port;        ist >> id;	writeId.push_back(WriteInfo(loc, id));    }    return 0;}intKfsClientImpl::PushData(int fd, vector<WritePrepareOp *> &ops,                         uint32_t start, uint32_t count, TcpSocket *masterSock){    uint32_t last = min((size_t) (start + count), ops.size());    int res = 0;    for (uint32_t i = start; i < last; i++) {                res = DoOpSend(ops[i], masterSock);        if (res < 0)            break;    }    return res;}intKfsClientImpl::SendCommit(int fd, vector<WriteInfo> &writeId, TcpSocket *masterSock,                          WriteSyncOp &sop){    ChunkAttr *chunk = GetCurrChunk(fd);    int res = 0;    sop.Init(nextSeq(), chunk->chunkId, chunk->chunkVersion, writeId);    res = DoOpSend(&sop, masterSock);    if (res < 0)        return sop.status;    return 0;    }intKfsClientImpl::GetCommitReply(WriteSyncOp &sop, TcpSocket *masterSock){    int res;    res = DoOpResponse(&sop, masterSock);    if (res < 0)        return sop.status;    return sop.status;}intKfsClientImpl::DoPipelinedWrite(int fd, vector<WritePrepareOp *> &ops, TcpSocket *masterSock){    int res;    vector<WritePrepareOp *>::size_type next, minOps;    WriteSyncOp syncOp[2];    if (ops.size() == 0)        return 0;    // push the data to the server; to avoid bursting the server with    // a full chunk, do it in a pipelined fashion:    //  -- send 512K of data; do a flush; send another 512K; do another flush    //  -- every time we get an ack back, we send another 512K    //      // we got 2 commits: current is the one we just sent; previous is    // the one for which we are expecting a reply    int prevCommit = 0;    int currCommit = 1;      minOps = min((size_t) (MIN_BYTES_PIPELINE_IO / MAX_BYTES_PER_WRITE_IO) / 2, ops.size());    res = PushData(fd, ops, 0, minOps, masterSock);    if (res < 0)        goto error_out;    res = SendCommit(fd, ops[0]->writeInfo, masterSock, syncOp[0]);    if (res < 0)        goto error_out;      for (next = minOps; next < ops.size(); next += minOps) {        res = PushData(fd, ops, next, minOps, masterSock);        if (res < 0)            goto error_out;        res = SendCommit(fd, ops[next]->writeInfo, masterSock, syncOp[currCommit]);        if (res < 0)            goto error_out;        res = GetCommitReply(syncOp[prevCommit], masterSock);        prevCommit = currCommit;        currCommit++;        currCommit %= 2;        if (res < 0)            // the commit for previous failed; there is still the            // business of getting the reply for the "current" one            // that we sent out.            break;    }    res = GetCommitReply(syncOp[prevCommit], masterSock);  error_out:    if (res < 0) {        // res will be -1; we need to pick out the error from the op that failed        for (uint32_t i = 0; i < ops.size(); i++) {            if (ops[i]->status < 0) {                res = ops[i]->status;                break;            }        }    }    // set the status for each op: either all were successful or none was.    for (uint32_t i = 0; i < ops.size(); i++) {        if (res < 0)             ops[i]->status = res;        else            ops[i]->status = ops[i]->numBytes;    }    return res;}intKfsClientImpl::IssueCommit(int fd, vector<WriteInfo> &writeId, TcpSocket *masterSock){    ChunkAttr *chunk = GetCurrChunk(fd);    WriteSyncOp sop(nextSeq(), chunk->chunkId, chunk->chunkVersion, writeId);    int res;    res = DoOpSend(&sop, masterSock);    if (res < 0)        return sop.status;    res = DoOpResponse(&sop, masterSock);    if (res < 0)        return sop.status;    return sop.status;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -