⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 kfsclient.cc

📁 nandflash文件系统源代码
💻 CC
📖 第 1 页 / 共 5 页
字号:
    assert(valid_fd(fd) && !fa->isDirectory);    struct timeval startTime, endTime;    double timeTaken;        gettimeofday(&startTime, NULL);    AllocateOp op(nextSeq(), fa->fileId);    FilePosition *pos = FdPos(fd);    op.fileOffset = ((pos->fileOffset / KFS::CHUNKSIZE) * KFS::CHUNKSIZE);    (void) DoMetaOpWithRetry(&op);    if (op.status < 0) {	// KFS_LOG_VA_DEBUG("AllocChunk(%d)", op.status);	return op.status;    }    ChunkAttr chunk;    chunk.chunkId = op.chunkId;    chunk.chunkVersion = op.chunkVersion;    chunk.chunkServerLoc = op.chunkServers;    FdInfo(fd)->cattr[pos->chunkNum] = chunk;    FdPos(fd)->ResetServers();    // for writes, [0] is the master; that is the preferred server    if (op.chunkServers.size() > 0) {        FdPos(fd)->SetPreferredServer(op.chunkServers[0]);        SizeChunk(fd);    }    KFS_LOG_VA_DEBUG("Fileid: %lld, chunk : %lld, version: %lld, hosted on:",                     fa->fileId, chunk.chunkId, chunk.chunkVersion);    for (uint32_t i = 0; i < op.chunkServers.size(); i++) {	KFS_LOG_VA_DEBUG("%s", op.chunkServers[i].ToString().c_str());    }    gettimeofday(&endTime, NULL);        timeTaken = (endTime.tv_sec - startTime.tv_sec) +        (endTime.tv_usec - startTime.tv_usec) * 1e-6;    KFS_LOG_VA_DEBUG("Total Time to allocate chunk: %.4f secs", timeTaken);    return op.status;}////// Given a chunk of file, find out where the chunk is hosted./// @param[in] fd  The index for an entry in mFileTable[] for which/// we are trying find out chunk location info.////// @param[in] chunkNum  The index in/// mFileTable[fd]->cattr[] corresponding to the chunk for/// which we are trying to get location info.//////intKfsClientImpl::LocateChunk(int fd, int chunkNum){    assert(valid_fd(fd) && !mFileTable[fd]->fattr.isDirectory);    if (chunkNum < 0)	return -EINVAL;    map <int, ChunkAttr>::iterator c;    c = mFileTable[fd]->cattr.find(chunkNum);    // Avoid unnecessary look ups.    if (c != mFileTable[fd]->cattr.end() && c->second.chunkId > 0)	return 0;    GetAllocOp op(nextSeq(), mFileTable[fd]->fattr.fileId,		  (off_t) chunkNum * KFS::CHUNKSIZE);    (void)DoMetaOpWithRetry(&op);    if (op.status < 0) {	string errstr = ErrorCodeToStr(op.status);	KFS_LOG_VA_DEBUG("LocateChunk (%d): %s", op.status, errstr.c_str());	return op.status;    }    ChunkAttr chunk;    chunk.chunkId = op.chunkId;    chunk.chunkVersion = op.chunkVersion;    chunk.chunkServerLoc = op.chunkServers;    mFileTable[fd]->cattr[chunkNum] = chunk;    return 0;}boolKfsClientImpl::IsCurrChunkAttrKnown(int fd){    map <int, ChunkAttr> *c = &FdInfo(fd)->cattr;    return c->find(FdPos(fd)->chunkNum) != c->end();}////// Helper function that does the work for sending out an op to the/// server.////// @param[in] op the op to be sent out/// @param[in] sock the socket on which we communicate with server/// @retval 0 on success; -1 on failure///intKFS::DoOpSend(KfsOp *op, TcpSocket *sock){    ostringstream os;    if ((sock == NULL ) || (!sock->IsGood())) {	// KFS_LOG_VA_DEBUG("Trying to do I/O on a closed socket..failing it");	op->status = -EHOSTUNREACH;	return -1;    }    op->Request(os);    int numIO = sock->DoSynchSend(os.str().c_str(), os.str().length());    if (numIO <= 0) {	sock->Close();	KFS_LOG_DEBUG("Send failed...closing socket");	op->status = -EHOSTUNREACH;	return -1;    }    if (op->contentLength > 0) {	numIO = sock->DoSynchSend(op->contentBuf, op->contentLength);	if (numIO <= 0) {	    sock->Close();	    KFS_LOG_DEBUG("Send failed...closing socket");	    op->status = -EHOSTUNREACH;	    return -1;	}    }    return 0;}/// Get a response from the server.  The response is assumed to/// terminate with "\r\n\r\n"./// @param[in/out] buf that should be filled with data from server/// @param[in] bufSize size of the buffer////// @param[out] delims the position in the buffer where "\r\n\r\n"/// occurs; in particular, the length of the response string that ends/// with last "\n" character.  If the buffer got full and we couldn't/// find "\r\n\r\n", delims is set to -1.////// @param[in] sock the socket from which data should be read/// @retval # of bytes that were read; 0/-1 if there was an error///static intGetResponse(char *buf, int bufSize, int *delims, TcpSocket *sock){    *delims = -1;    while (1) {	struct timeval timeout = gDefaultTimeout;	int nread = sock->DoSynchPeek(buf, bufSize, timeout);	if (nread <= 0)	    return nread;	for (int i = 4; i <= nread; i++) {	    if (i < 4)		break;	    if ((buf[i - 3] == '\r') &&		(buf[i - 2] == '\n') &&		(buf[i - 1] == '\r') &&		(buf[i] == '\n')) {		// valid stuff is from 0..i; so, length of resulting		// string is i+1.		memset(buf, '\0', bufSize);		*delims = (i + 1);		nread = sock->Recv(buf, *delims);		return nread;	    }	}    }    return -ENOBUFS;}////// From a response, extract out seq # and content-length.///static voidGetSeqContentLen(const char *resp, int respLen,	         kfsSeq_t *seq, int *contentLength){    string respStr(resp, respLen);    Properties prop;    istringstream ist(respStr);    const char separator = ':';    prop.loadProperties(ist, separator, false);    *seq = prop.getValue("Cseq", (kfsSeq_t) -1);    *contentLength = prop.getValue("Content-length", 0);}////// Helper function that does the work of getting a response from the/// server and parsing it out.////// @param[in] op the op for which a response is to be gotten/// @param[in] sock the socket on which we communicate with server/// @retval 0 on success; -1 on failure///intKFS::DoOpResponse(KfsOp *op, TcpSocket *sock){    int numIO;    char buf[CMD_BUF_SIZE];    int nread = 0, len;    ssize_t navail, nleft;    kfsSeq_t resSeq;    int contentLen;    bool printMatchingResponse = false;    if ((sock == NULL) || (!sock->IsGood())) {	op->status = -EHOSTUNREACH;	KFS_LOG_DEBUG("Trying to do I/O on a closed socket..failing it");	return -1;    }    while (1) {	memset(buf, '\0', CMD_BUF_SIZE);	numIO = GetResponse(buf, CMD_BUF_SIZE, &len, sock);	assert(numIO != -ENOBUFS);	if (numIO <= 0) {	    if (numIO == -ENOBUFS) {		op->status = -1;	    } else if (numIO == -ETIMEDOUT) {		op->status = -ETIMEDOUT;		KFS_LOG_DEBUG("Get response recv timed out...");	    } else {		KFS_LOG_DEBUG("Get response failed...closing socket");		sock->Close();		op->status = -EHOSTUNREACH;	    }	    return -1;	}	assert(len > 0);	GetSeqContentLen(buf, len, &resSeq, &contentLen);	if (resSeq == op->seq) {            if (printMatchingResponse) {                KFS_LOG_VA_DEBUG("Seq #'s match (after mismatch seq): Expect: %lld, got: %lld",                                 op->seq, resSeq);            }	    break;	}	KFS_LOG_VA_DEBUG("Seq #'s dont match: Expect: %lld, got: %lld",                         op->seq, resSeq);        printMatchingResponse = true;        if (contentLen > 0) {            struct timeval timeout = gDefaultTimeout;            int len = sock->DoSynchDiscard(contentLen, timeout);            if (len != contentLen) {                sock->Close();                op->status = -EHOSTUNREACH;                return -1;            }        }    }    contentLen = op->contentLength;    op->ParseResponseHeader(buf, len);    if (op->contentLength == 0) {	// restore it back: when a write op is sent out and this	// method is invoked with the same op to get the response, the	// op's status should get filled in; we shouldn't be stomping	// over content length.	op->contentLength = contentLen;	return numIO;    }    // This is the annoying part...we may have read some of the data    // related to attributes already.  So, copy them out and then read    // whatever else is left    if (op->contentBufLen == 0) {	op->contentBuf = new char[op->contentLength + 1];	op->contentBuf[op->contentLength] = '\0';    }    // len bytes belongs to the RPC reply.  Whatever is left after    // stripping that data out is the data.    navail = numIO - len;    if (navail > 0) {	assert(navail <= (ssize_t)op->contentLength);	memcpy(op->contentBuf, buf + len, navail);    }    nleft = op->contentLength - navail;    assert(nleft >= 0);    if (nleft > 0) {	struct timeval timeout = gDefaultTimeout;	nread = sock->DoSynchRecv(op->contentBuf + navail, nleft, timeout);	if (nread == -ETIMEDOUT) {	    KFS_LOG_DEBUG("Recv timed out...");	    op->status = -ETIMEDOUT;	} else if (nread <= 0) {	    KFS_LOG_DEBUG("Recv failed...closing socket");	    op->status = -EHOSTUNREACH;	    sock->Close();	}	if (nread <= 0) {	    return 0;	}    }    return nread + numIO;}////// Common work for each op: build a request; send it to server; get a/// response; parse it.////// @param[in] op the op to be done/// @param[in] sock the socket on which we communicate with server////// @retval # of bytes read from the server.///intKFS::DoOpCommon(KfsOp *op, TcpSocket *sock){    if (sock == NULL) {	KFS_LOG_VA_DEBUG("%s: send failed; no socket", op->Show().c_str());	assert(sock);	return -EHOSTUNREACH;    }    int res = DoOpSend(op, sock);    if (res < 0) {	KFS_LOG_VA_DEBUG("%s: send failure code: %d", op->Show().c_str(), res);	return res;    }    res = DoOpResponse(op, sock);    if (res < 0) {	KFS_LOG_VA_DEBUG("%s: recv failure code: %d", op->Show().c_str(), res);	return res;    }    if (op->status < 0) {	string errstr = ErrorCodeToStr(op->status);	KFS_LOG_VA_DEBUG("%s failed with code(%d): %s", op->Show().c_str(), op->status, errstr.c_str());    }    return res;}////// To compute the size of a file, determine what the last chunk in/// the file happens to be (from the meta server); then, for the last/// chunk, find its size and then add the size of remaining chunks/// (all of which are assumed to be full).  The reason for asking the/// meta server about the last chunk (and simply using chunkCount) is/// that random writes with seeks affect where the "last" chunk of the/// file happens to be: for instance, a file could have chunkCount = 1, but/// that chunk could be the 10th chunk in the file---the first 9/// chunks are just holes.//struct RespondingServer {    KfsClientImpl *client;    const ChunkLayoutInfo &layout;    int *status;    off_t *size;    RespondingServer(KfsClientImpl *cli, const ChunkLayoutInfo &lay,		     off_t *sz, int *st):	    client(cli), layout(lay), status(st), size(sz) { }    bool operator() (ServerLocation loc)    {	TcpSocket sock;        if (sock.Connect(loc) < 0) {            *size = 0;            *status = -1;	    return false;        }	SizeOp sop(client->nextSeq(), layout.chunkId, layout.chunkVersion);	int numIO = DoOpCommon(&sop, &sock);	if (numIO < 0 && !sock.IsGood()) {            return false;        }	*status = sop.status;	if (*status >= 0)            *size = sop.size;        return *status >= 0;    }};struct RespondingServer2 {    KfsClientImpl *client;    const ChunkLayoutInfo &layout;    RespondingServer2(KfsClientImpl *cli, const ChunkLayoutInfo &lay) :        client(cli), layout(lay) { }    ssize_t operator() (ServerLocation loc)    {	TcpSocket sock;        if (sock.Connect(loc) < 0) {            return -1;        }	SizeOp sop(client->nextSeq(), layout.chunkId, layout.chunkVersion);	int numIO = DoOpCommon(&sop, &sock);	if (numIO < 0 && !sock.IsGood()) {            return -1;        }        return sop.size;    }};off_tKfsClientImpl::ComputeFilesize(kfsFileId_t kfsfid){    GetLayoutOp lop(nextSeq(), kfsfid);    (void)DoMetaOpWithRetry(&lop);    if (lop.status < 0) {	// XXX: This can only during concurrent I/O when someone is	// deleting a file and we are trying to compute the size of	// this file.  For now, assert away.	assert(lop.status != -ENOENT);	return 0;    }    if (lop.ParseLayoutInfo()) {	KFS_LOG_DEBUG("Unable to parse layout info");	return -1;    }    if (lop.chunks.size() == 0)	return 0;    vector <ChunkLayoutInfo>::reverse_iterator last = lop.chunks.rbegin();    off_t filesize = last->fileOffset;    off_t endsize = 0;    int rstatus = 0;    RespondingServer responder(this, *last, &endsize, &rstatus);    vector <ServerLocation>::iterator s =	    find_if(last->chunkServers.begin(), last->chunkServers.end(),			    responder);    if (s != last->chunkServers.end()) {	if (rstatus < 0) {	    KFS_LOG_VA_DEBUG("RespondingServer status %d", rstatus);	    return 0;	}	filesize += endsize;    }    return filesize;}voidKfsClientImpl::ComputeFilesizes(vector<KfsFileAttr> &fattrs, vector<FileChunkInfo> &lastChunkInfo){    for (uint32_t i = 0; i < lastChunkInfo.size(); i++) {        if (lastChunkInfo[i].chunkCount == 0)            continue;        if (fattrs[i].fileSize >= 0)            continue;        for (uint32_t j = 0; j < lastChunkInfo[i].cattr.chunkServerLoc.size(); j++) {            TcpSocket sock;            ServerLocation loc = lastChunkInfo[i].cattr.chunkServerLoc[j];            // get all the filesizes we can from this server            ComputeFilesizes(fattrs, lastChunkInfo, i, loc);        }        bool alldone = true;        for (uint32_t j = i; j < fattrs.size(); j++) {            if (fattrs[j].fileSize < 0) {                alldone = false;                break;            }        }        if (alldone)            break;            }}voidKfsClientImpl::ComputeFilesizes(vector<KfsFileAttr> &fattrs, vector<FileChunkInfo> &lastChunkInfo,                                uint32_t startIdx, const ServerLocation &loc){    TcpSocket sock;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -