📄 kfsclient.cc
字号:
assert(valid_fd(fd) && !fa->isDirectory); struct timeval startTime, endTime; double timeTaken; gettimeofday(&startTime, NULL); AllocateOp op(nextSeq(), fa->fileId); FilePosition *pos = FdPos(fd); op.fileOffset = ((pos->fileOffset / KFS::CHUNKSIZE) * KFS::CHUNKSIZE); (void) DoMetaOpWithRetry(&op); if (op.status < 0) { // KFS_LOG_VA_DEBUG("AllocChunk(%d)", op.status); return op.status; } ChunkAttr chunk; chunk.chunkId = op.chunkId; chunk.chunkVersion = op.chunkVersion; chunk.chunkServerLoc = op.chunkServers; FdInfo(fd)->cattr[pos->chunkNum] = chunk; FdPos(fd)->ResetServers(); // for writes, [0] is the master; that is the preferred server if (op.chunkServers.size() > 0) { FdPos(fd)->SetPreferredServer(op.chunkServers[0]); SizeChunk(fd); } KFS_LOG_VA_DEBUG("Fileid: %lld, chunk : %lld, version: %lld, hosted on:", fa->fileId, chunk.chunkId, chunk.chunkVersion); for (uint32_t i = 0; i < op.chunkServers.size(); i++) { KFS_LOG_VA_DEBUG("%s", op.chunkServers[i].ToString().c_str()); } gettimeofday(&endTime, NULL); timeTaken = (endTime.tv_sec - startTime.tv_sec) + (endTime.tv_usec - startTime.tv_usec) * 1e-6; KFS_LOG_VA_DEBUG("Total Time to allocate chunk: %.4f secs", timeTaken); return op.status;}////// Given a chunk of file, find out where the chunk is hosted./// @param[in] fd The index for an entry in mFileTable[] for which/// we are trying find out chunk location info.////// @param[in] chunkNum The index in/// mFileTable[fd]->cattr[] corresponding to the chunk for/// which we are trying to get location info.//////intKfsClientImpl::LocateChunk(int fd, int chunkNum){ assert(valid_fd(fd) && !mFileTable[fd]->fattr.isDirectory); if (chunkNum < 0) return -EINVAL; map <int, ChunkAttr>::iterator c; c = mFileTable[fd]->cattr.find(chunkNum); // Avoid unnecessary look ups. if (c != mFileTable[fd]->cattr.end() && c->second.chunkId > 0) return 0; GetAllocOp op(nextSeq(), mFileTable[fd]->fattr.fileId, (off_t) chunkNum * KFS::CHUNKSIZE); (void)DoMetaOpWithRetry(&op); if (op.status < 0) { string errstr = ErrorCodeToStr(op.status); KFS_LOG_VA_DEBUG("LocateChunk (%d): %s", op.status, errstr.c_str()); return op.status; } ChunkAttr chunk; chunk.chunkId = op.chunkId; chunk.chunkVersion = op.chunkVersion; chunk.chunkServerLoc = op.chunkServers; mFileTable[fd]->cattr[chunkNum] = chunk; return 0;}boolKfsClientImpl::IsCurrChunkAttrKnown(int fd){ map <int, ChunkAttr> *c = &FdInfo(fd)->cattr; return c->find(FdPos(fd)->chunkNum) != c->end();}////// Helper function that does the work for sending out an op to the/// server.////// @param[in] op the op to be sent out/// @param[in] sock the socket on which we communicate with server/// @retval 0 on success; -1 on failure///intKFS::DoOpSend(KfsOp *op, TcpSocket *sock){ ostringstream os; if ((sock == NULL ) || (!sock->IsGood())) { // KFS_LOG_VA_DEBUG("Trying to do I/O on a closed socket..failing it"); op->status = -EHOSTUNREACH; return -1; } op->Request(os); int numIO = sock->DoSynchSend(os.str().c_str(), os.str().length()); if (numIO <= 0) { sock->Close(); KFS_LOG_DEBUG("Send failed...closing socket"); op->status = -EHOSTUNREACH; return -1; } if (op->contentLength > 0) { numIO = sock->DoSynchSend(op->contentBuf, op->contentLength); if (numIO <= 0) { sock->Close(); KFS_LOG_DEBUG("Send failed...closing socket"); op->status = -EHOSTUNREACH; return -1; } } return 0;}/// Get a response from the server. The response is assumed to/// terminate with "\r\n\r\n"./// @param[in/out] buf that should be filled with data from server/// @param[in] bufSize size of the buffer////// @param[out] delims the position in the buffer where "\r\n\r\n"/// occurs; in particular, the length of the response string that ends/// with last "\n" character. If the buffer got full and we couldn't/// find "\r\n\r\n", delims is set to -1.////// @param[in] sock the socket from which data should be read/// @retval # of bytes that were read; 0/-1 if there was an error///static intGetResponse(char *buf, int bufSize, int *delims, TcpSocket *sock){ *delims = -1; while (1) { struct timeval timeout = gDefaultTimeout; int nread = sock->DoSynchPeek(buf, bufSize, timeout); if (nread <= 0) return nread; for (int i = 4; i <= nread; i++) { if (i < 4) break; if ((buf[i - 3] == '\r') && (buf[i - 2] == '\n') && (buf[i - 1] == '\r') && (buf[i] == '\n')) { // valid stuff is from 0..i; so, length of resulting // string is i+1. memset(buf, '\0', bufSize); *delims = (i + 1); nread = sock->Recv(buf, *delims); return nread; } } } return -ENOBUFS;}////// From a response, extract out seq # and content-length.///static voidGetSeqContentLen(const char *resp, int respLen, kfsSeq_t *seq, int *contentLength){ string respStr(resp, respLen); Properties prop; istringstream ist(respStr); const char separator = ':'; prop.loadProperties(ist, separator, false); *seq = prop.getValue("Cseq", (kfsSeq_t) -1); *contentLength = prop.getValue("Content-length", 0);}////// Helper function that does the work of getting a response from the/// server and parsing it out.////// @param[in] op the op for which a response is to be gotten/// @param[in] sock the socket on which we communicate with server/// @retval 0 on success; -1 on failure///intKFS::DoOpResponse(KfsOp *op, TcpSocket *sock){ int numIO; char buf[CMD_BUF_SIZE]; int nread = 0, len; ssize_t navail, nleft; kfsSeq_t resSeq; int contentLen; bool printMatchingResponse = false; if ((sock == NULL) || (!sock->IsGood())) { op->status = -EHOSTUNREACH; KFS_LOG_DEBUG("Trying to do I/O on a closed socket..failing it"); return -1; } while (1) { memset(buf, '\0', CMD_BUF_SIZE); numIO = GetResponse(buf, CMD_BUF_SIZE, &len, sock); assert(numIO != -ENOBUFS); if (numIO <= 0) { if (numIO == -ENOBUFS) { op->status = -1; } else if (numIO == -ETIMEDOUT) { op->status = -ETIMEDOUT; KFS_LOG_DEBUG("Get response recv timed out..."); } else { KFS_LOG_DEBUG("Get response failed...closing socket"); sock->Close(); op->status = -EHOSTUNREACH; } return -1; } assert(len > 0); GetSeqContentLen(buf, len, &resSeq, &contentLen); if (resSeq == op->seq) { if (printMatchingResponse) { KFS_LOG_VA_DEBUG("Seq #'s match (after mismatch seq): Expect: %lld, got: %lld", op->seq, resSeq); } break; } KFS_LOG_VA_DEBUG("Seq #'s dont match: Expect: %lld, got: %lld", op->seq, resSeq); printMatchingResponse = true; if (contentLen > 0) { struct timeval timeout = gDefaultTimeout; int len = sock->DoSynchDiscard(contentLen, timeout); if (len != contentLen) { sock->Close(); op->status = -EHOSTUNREACH; return -1; } } } contentLen = op->contentLength; op->ParseResponseHeader(buf, len); if (op->contentLength == 0) { // restore it back: when a write op is sent out and this // method is invoked with the same op to get the response, the // op's status should get filled in; we shouldn't be stomping // over content length. op->contentLength = contentLen; return numIO; } // This is the annoying part...we may have read some of the data // related to attributes already. So, copy them out and then read // whatever else is left if (op->contentBufLen == 0) { op->contentBuf = new char[op->contentLength + 1]; op->contentBuf[op->contentLength] = '\0'; } // len bytes belongs to the RPC reply. Whatever is left after // stripping that data out is the data. navail = numIO - len; if (navail > 0) { assert(navail <= (ssize_t)op->contentLength); memcpy(op->contentBuf, buf + len, navail); } nleft = op->contentLength - navail; assert(nleft >= 0); if (nleft > 0) { struct timeval timeout = gDefaultTimeout; nread = sock->DoSynchRecv(op->contentBuf + navail, nleft, timeout); if (nread == -ETIMEDOUT) { KFS_LOG_DEBUG("Recv timed out..."); op->status = -ETIMEDOUT; } else if (nread <= 0) { KFS_LOG_DEBUG("Recv failed...closing socket"); op->status = -EHOSTUNREACH; sock->Close(); } if (nread <= 0) { return 0; } } return nread + numIO;}////// Common work for each op: build a request; send it to server; get a/// response; parse it.////// @param[in] op the op to be done/// @param[in] sock the socket on which we communicate with server////// @retval # of bytes read from the server.///intKFS::DoOpCommon(KfsOp *op, TcpSocket *sock){ if (sock == NULL) { KFS_LOG_VA_DEBUG("%s: send failed; no socket", op->Show().c_str()); assert(sock); return -EHOSTUNREACH; } int res = DoOpSend(op, sock); if (res < 0) { KFS_LOG_VA_DEBUG("%s: send failure code: %d", op->Show().c_str(), res); return res; } res = DoOpResponse(op, sock); if (res < 0) { KFS_LOG_VA_DEBUG("%s: recv failure code: %d", op->Show().c_str(), res); return res; } if (op->status < 0) { string errstr = ErrorCodeToStr(op->status); KFS_LOG_VA_DEBUG("%s failed with code(%d): %s", op->Show().c_str(), op->status, errstr.c_str()); } return res;}////// To compute the size of a file, determine what the last chunk in/// the file happens to be (from the meta server); then, for the last/// chunk, find its size and then add the size of remaining chunks/// (all of which are assumed to be full). The reason for asking the/// meta server about the last chunk (and simply using chunkCount) is/// that random writes with seeks affect where the "last" chunk of the/// file happens to be: for instance, a file could have chunkCount = 1, but/// that chunk could be the 10th chunk in the file---the first 9/// chunks are just holes.//struct RespondingServer { KfsClientImpl *client; const ChunkLayoutInfo &layout; int *status; off_t *size; RespondingServer(KfsClientImpl *cli, const ChunkLayoutInfo &lay, off_t *sz, int *st): client(cli), layout(lay), status(st), size(sz) { } bool operator() (ServerLocation loc) { TcpSocket sock; if (sock.Connect(loc) < 0) { *size = 0; *status = -1; return false; } SizeOp sop(client->nextSeq(), layout.chunkId, layout.chunkVersion); int numIO = DoOpCommon(&sop, &sock); if (numIO < 0 && !sock.IsGood()) { return false; } *status = sop.status; if (*status >= 0) *size = sop.size; return *status >= 0; }};struct RespondingServer2 { KfsClientImpl *client; const ChunkLayoutInfo &layout; RespondingServer2(KfsClientImpl *cli, const ChunkLayoutInfo &lay) : client(cli), layout(lay) { } ssize_t operator() (ServerLocation loc) { TcpSocket sock; if (sock.Connect(loc) < 0) { return -1; } SizeOp sop(client->nextSeq(), layout.chunkId, layout.chunkVersion); int numIO = DoOpCommon(&sop, &sock); if (numIO < 0 && !sock.IsGood()) { return -1; } return sop.size; }};off_tKfsClientImpl::ComputeFilesize(kfsFileId_t kfsfid){ GetLayoutOp lop(nextSeq(), kfsfid); (void)DoMetaOpWithRetry(&lop); if (lop.status < 0) { // XXX: This can only during concurrent I/O when someone is // deleting a file and we are trying to compute the size of // this file. For now, assert away. assert(lop.status != -ENOENT); return 0; } if (lop.ParseLayoutInfo()) { KFS_LOG_DEBUG("Unable to parse layout info"); return -1; } if (lop.chunks.size() == 0) return 0; vector <ChunkLayoutInfo>::reverse_iterator last = lop.chunks.rbegin(); off_t filesize = last->fileOffset; off_t endsize = 0; int rstatus = 0; RespondingServer responder(this, *last, &endsize, &rstatus); vector <ServerLocation>::iterator s = find_if(last->chunkServers.begin(), last->chunkServers.end(), responder); if (s != last->chunkServers.end()) { if (rstatus < 0) { KFS_LOG_VA_DEBUG("RespondingServer status %d", rstatus); return 0; } filesize += endsize; } return filesize;}voidKfsClientImpl::ComputeFilesizes(vector<KfsFileAttr> &fattrs, vector<FileChunkInfo> &lastChunkInfo){ for (uint32_t i = 0; i < lastChunkInfo.size(); i++) { if (lastChunkInfo[i].chunkCount == 0) continue; if (fattrs[i].fileSize >= 0) continue; for (uint32_t j = 0; j < lastChunkInfo[i].cattr.chunkServerLoc.size(); j++) { TcpSocket sock; ServerLocation loc = lastChunkInfo[i].cattr.chunkServerLoc[j]; // get all the filesizes we can from this server ComputeFilesizes(fattrs, lastChunkInfo, i, loc); } bool alldone = true; for (uint32_t j = i; j < fattrs.size(); j++) { if (fattrs[j].fileSize < 0) { alldone = false; break; } } if (alldone) break; }}voidKfsClientImpl::ComputeFilesizes(vector<KfsFileAttr> &fattrs, vector<FileChunkInfo> &lastChunkInfo, uint32_t startIdx, const ServerLocation &loc){ TcpSocket sock;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -