📄 kfsclient.cc
字号:
vector<ServerLocation>::const_iterator iter; if (sock.Connect(loc) < 0) { return; } for (uint32_t i = startIdx; i < lastChunkInfo.size(); i++) { if (lastChunkInfo[i].chunkCount == 0) continue; if (fattrs[i].fileSize >= 0) continue; iter = find_if(lastChunkInfo[i].cattr.chunkServerLoc.begin(), lastChunkInfo[i].cattr.chunkServerLoc.end(), MatchingServer(loc)); if (iter == lastChunkInfo[i].cattr.chunkServerLoc.end()) continue; SizeOp sop(nextSeq(), lastChunkInfo[i].cattr.chunkId, lastChunkInfo[i].cattr.chunkVersion); int numIO = DoOpCommon(&sop, &sock); if (numIO < 0 && !sock.IsGood()) { return; } if (sop.status >= 0) { lastChunkInfo[i].cattr.chunkSize = sop.size; fattrs[i].fileSize = lastChunkInfo[i].lastChunkOffset + lastChunkInfo[i].cattr.chunkSize; } }}// A simple functor to match chunkserver by hostnameclass ChunkserverMatcher { string myHostname;public: ChunkserverMatcher(const string &l) : myHostname(l) { } bool operator()(const ServerLocation &loc) const { return loc.hostname == myHostname; }};class ChunkserverMatcherByIp { in_addr_t hostaddr;public: ChunkserverMatcherByIp(const string &hostname) { hostaddr = inet_addr(hostname.c_str()); } bool operator()(in_addr &l) const { return hostaddr == l.s_addr; }};intKfsClientImpl::OpenChunk(int fd, bool nonblockingConnect){ if (!IsCurrChunkAttrKnown(fd)) { // Nothing known about this chunk return -EINVAL; } ChunkAttr *chunk = GetCurrChunk(fd); if (chunk->chunkId == (kfsChunkId_t) -1) { chunk->chunkSize = 0; // don't send bogus chunk id's return -EINVAL; } // try the local server first vector <ServerLocation>::iterator s = find_if(chunk->chunkServerLoc.begin(), chunk->chunkServerLoc.end(), ChunkserverMatcher(mHostname)); if (s != chunk->chunkServerLoc.end()) { FdPos(fd)->SetPreferredServer(*s, nonblockingConnect); if (FdPos(fd)->GetPreferredServer() != NULL) { KFS_LOG_VA_DEBUG("Picking local server: %s", s->ToString().c_str()); return SizeChunk(fd); } } vector<ServerLocation> loc = chunk->chunkServerLoc; // take out the slow servers if we can mTelemetryReporter.getNotification(mSlowNodes); bool allNodesSlow = mSlowNodes.size() > 0; if (allNodesSlow) { for (vector<ServerLocation>::size_type i = 0; i != loc.size(); ++i) { vector<struct in_addr>::iterator iter = find_if(mSlowNodes.begin(), mSlowNodes.end(), ChunkserverMatcherByIp(loc[i].hostname)); if (iter == mSlowNodes.end()) { // not all nodes are slow; so, we can eliminate slow nodes allNodesSlow = false; break; } } } try_again: // pick one at random avoiding slow nodes random_shuffle(loc.begin(), loc.end()); for (vector<ServerLocation>::size_type i = 0; (FdPos(fd)->GetPreferredServer() == NULL && i != loc.size()); i++) { if (!allNodesSlow) { vector<struct in_addr>::iterator iter = find_if(mSlowNodes.begin(), mSlowNodes.end(), ChunkserverMatcherByIp(loc[i].hostname)); if (iter != mSlowNodes.end()) { KFS_LOG_VA_INFO("For chunk %lld, avoiding slow node: %s", chunk->chunkId, loc[i].ToString().c_str()); continue; } } FdPos(fd)->SetPreferredServer(loc[i], nonblockingConnect); if (FdPos(fd)->GetPreferredServer() != NULL) KFS_LOG_VA_DEBUG("For chunk %lld, randomly chose: %s", chunk->chunkId, loc[i].ToString().c_str()); } if (FdPos(fd)->GetPreferredServer() == NULL) { if (!allNodesSlow) { // the non-slow node isn't responding; so try one of the slow nodes allNodesSlow = true; KFS_LOG_VA_INFO("Retrying to find a server for chunk = %lld", chunk->chunkId); goto try_again; } KFS_LOG_VA_INFO("Unable to find a server for chunk = %lld", chunk->chunkId); } return (FdPos(fd)->GetPreferredServer() == NULL) ? -EHOSTUNREACH : SizeChunk(fd);}intKfsClientImpl::SizeChunk(int fd){ ChunkAttr *chunk = GetCurrChunk(fd); assert(FdPos(fd)->preferredServer != NULL); if (FdPos(fd)->preferredServer == NULL) return -EHOSTUNREACH; SizeOp op(nextSeq(), chunk->chunkId, chunk->chunkVersion); (void)DoOpCommon(&op, FdPos(fd)->preferredServer); chunk->chunkSize = op.size; KFS_LOG_VA_DEBUG("Chunk: %lld, size = %zd", chunk->chunkId, chunk->chunkSize); return op.status;}////// Wrapper for retrying ops with the metaserver.///intKfsClientImpl::DoMetaOpWithRetry(KfsOp *op){ int res; if (!mMetaServerSock.IsGood()) ConnectToMetaServer(); for (int attempt = 0; attempt < NUM_RETRIES_PER_OP; attempt++) { res = DoOpCommon(op, &mMetaServerSock); if (op->status != -EHOSTUNREACH && op->status != -ETIMEDOUT) break; Sleep(RETRY_DELAY_SECS); ConnectToMetaServer(); // re-issue the op with a new sequence # op->seq = nextSeq(); } return res;}static boolnull_fte(const FileTableEntry *ft){ return (ft == NULL);}//// Rank entries by access time, but putting all directories before files//static boolfte_compare(const FileTableEntry *first, const FileTableEntry *second){ bool dir1 = first->fattr.isDirectory; bool dir2 = second->fattr.isDirectory; if (dir1 == dir2) return first->lastAccessTime < second->lastAccessTime; else if ((!dir1) && (first->openMode == 0)) return dir1; else if ((!dir2) && (second->openMode == 0)) return dir2; return dir1;}intKfsClientImpl::FindFreeFileTableEntry(){ vector <FileTableEntry *>::iterator b = mFileTable.begin(); vector <FileTableEntry *>::iterator e = mFileTable.end(); vector <FileTableEntry *>::iterator i = find_if(b, e, null_fte); if (i != e) return i - b; // Use NULL entries first int last = mFileTable.size(); if (last != MAX_FILES) { // Grow vector up to max. size mFileTable.push_back(NULL); return last; } // recycle directory entries or files open for attribute caching vector <FileTableEntry *>::iterator oldest = min_element(b, e, fte_compare); if ((*oldest)->fattr.isDirectory || ((*oldest)->openMode == 0)) { ReleaseFileTableEntry(oldest - b); return oldest - b; } return -EMFILE; // No luck}class FTMatcher { kfsFileId_t parentFid; string myname;public: FTMatcher(kfsFileId_t f, const char *n): parentFid(f), myname(n) { } bool operator () (FileTableEntry *ft) { return (ft != NULL && ft->parentFid == parentFid && ft->name == myname); }};boolKfsClientImpl::IsFileTableEntryValid(int fte){ // The entries for files open for read/write are valid. This is a // handle that is given to the application. The entries for // directories need to be revalidated every N secs. The one // exception for directory entries is that for "/"; that is always // 2 and is valid. That entry will never be deleted from the fs. // Any other directory can be deleted and we don't want to hold on // to stale entries. time_t now = time(NULL); if (((!FdAttr(fte)->isDirectory) && (FdInfo(fte)->openMode != 0)) || (FdAttr(fte)->fileId == KFS::ROOTFID) || (now - FdInfo(fte)->validatedTime < FILE_CACHE_ENTRY_VALID_TIME)) return true; return false;}intKfsClientImpl::LookupFileTableEntry(kfsFileId_t parentFid, const char *name){ FTMatcher match(parentFid, name); vector <FileTableEntry *>::iterator i; i = find_if(mFileTable.begin(), mFileTable.end(), match); if (i == mFileTable.end()) return -1; int fte = i - mFileTable.begin(); if (IsFileTableEntryValid(fte)) return fte; KFS_LOG_VA_DEBUG("Entry for <%lld, %s> is likely stale; forcing revalidation", parentFid, name); // the entry maybe stale; force revalidation ReleaseFileTableEntry(fte); return -1;}intKfsClientImpl::LookupFileTableEntry(const char *pathname){ string p(pathname); NameToFdMapIter iter = mPathCache.find(p); if (iter != mPathCache.end()) { int fte = iter->second; if (IsFileTableEntryValid(fte)) { assert(mFileTable[fte]->pathname == pathname); return fte; } ReleaseFileTableEntry(fte); return -1; } kfsFileId_t parentFid; string name; int res = GetPathComponents(pathname, &parentFid, name); if (res < 0) return res; return LookupFileTableEntry(parentFid, name.c_str());}intKfsClientImpl::ClaimFileTableEntry(kfsFileId_t parentFid, const char *name, string pathname){ int fte = LookupFileTableEntry(parentFid, name); if (fte >= 0) return fte; return AllocFileTableEntry(parentFid, name, pathname);}intKfsClientImpl::AllocFileTableEntry(kfsFileId_t parentFid, const char *name, string pathname){ int fte = FindFreeFileTableEntry(); if (fte >= 0) { /* if (parentFid != KFS::ROOTFID) KFS_LOG_VA_INFO("Alloc'ing fte: %d for %d, %s", fte, parentFid, name); */ mFileTable[fte] = new FileTableEntry(parentFid, name); mFileTable[fte]->validatedTime = mFileTable[fte]->lastAccessTime = time(NULL); if (pathname != "") { string fullpath = build_path(mCwd, pathname.c_str()); mPathCache[pathname] = fte; // mFileTable[fte]->pathCacheIter = mPathCache.find(pathname); } mFileTable[fte]->pathname = pathname; } return fte;}voidKfsClientImpl::ReleaseFileTableEntry(int fte){ if (mFileTable[fte]->pathname != "") mPathCache.erase(mFileTable[fte]->pathname); /* if (mFileTable[fte]->pathCacheIter != mPathCache.end()) mPathCache.erase(mFileTable[fte]->pathCacheIter); */ KFS_LOG_VA_DEBUG("Closing filetable entry: %d, openmode = %d, path = %s", fte, mFileTable[fte]->openMode, mFileTable[fte]->pathname.c_str()); delete mFileTable[fte]; mFileTable[fte] = NULL;}////// Given a parentFid and a file in that directory, return the/// corresponding entry in the file table. If such an entry has not/// been seen before, download the file attributes from the server and/// save it in the file table.///intKfsClientImpl::Lookup(kfsFileId_t parentFid, const char *name){ int fte = LookupFileTableEntry(parentFid, name); if (fte >= 0) return fte; LookupOp op(nextSeq(), parentFid, name); (void) DoOpCommon(&op, &mMetaServerSock); if (op.status < 0) { return op.status; } // Everything is good now... fte = ClaimFileTableEntry(parentFid, name, ""); if (fte < 0) // too many open files return -EMFILE; FileAttr *fa = FdAttr(fte); *fa = op.fattr; return fte;}////// Given a path, break it down into: parentFid and filename. If the/// path does not begin with "/", the current working directory is/// inserted in front of it./// @param[in] path The path string that needs to be extracted/// @param[out] parentFid The file-id corresponding to the parent dir/// @param[out] name The filename following the final "/"./// @retval 0 on success; -errno on failure///intKfsClientImpl::GetPathComponents(const char *pathname, kfsFileId_t *parentFid, string &name){ const char slash = '/'; string pathstr = build_path(mCwd, pathname); string::size_type pathlen = pathstr.size(); if (pathlen == 0 || pathstr[0] != slash) return -EINVAL; // find the trailing '/' string::size_type rslash = pathstr.rfind('/'); if (rslash + 1 == pathlen) { // path looks like: /.../.../; so get rid of the last '/' pathstr.erase(rslash); pathlen = pathstr.size(); rslash = pathstr.rfind('/'); } if (pathlen == 0) name = "/"; else { // the component of the name we want is between trailing slash // and the end of string name.assign(pathstr, rslash + 1, string::npos); // get rid of the last component of the path as we have copied // it out. pathstr.erase(rslash + 1, string::npos); pathlen = pathstr.size(); } if (name.size() == 0) return -EINVAL; *parentFid = KFS::ROOTFID; if (pathlen == 0) return 0; // Verify that the all the components in pathname leading to // "name" are directories. string::size_type start = 1; while (start != string::npos) { string::size_type next = pathstr.find(slash, start); if (next == string::npos) break; if (next == start) return -EINVAL; // don't allow "//" in path string component(pathstr, start, next - start); int fte = Lookup(*parentFid, component.c_str()); if (fte < 0) return fte; else if (!FdAttr(fte)->isDirectory) return -ENOTDIR; else *parentFid = FdAttr(fte)->fileId; start = next + 1; // next points to '/' } KFS_LOG_VA_DEBUG("file-id for dir: %s (file = %s) is %lld", pathstr.c_str(), name.c_str(), *parentFid); return 0;}stringKFS::ErrorCodeToStr(int status){ if (status == 0) return ""; char buf[4096]; char *errptr = NULL;#if defined (__APPLE__) || defined(__sun__) if (strerror_r(-status, buf, sizeof buf) == 0) errptr = buf; else { strcpy(buf, "<unknown error>"); errptr = buf; }#else if ((errptr = strerror_r(-status, buf, sizeof buf)) == NULL) { strcpy(buf, "<unknown error>"); errptr = buf; }#endif return string(errptr);}intKfsClientImpl::GetLease(kfsChunkId_t chunkId){ int res; assert(chunkId >= 0); for (int i = 0; i < 3; i++) { // XXX Evil constant LeaseAcquireOp op(nextSeq(), chunkId); res = DoOpCommon(&op, &mMetaServerSock); if (op.status
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -