📄 kfsclient.cc
字号:
}boolKfsClientImpl::IsFile(const char *pathname){ MutexLock l(&mMutex); struct stat statInfo; if (Stat(pathname, statInfo, false) != 0) return false; return S_ISREG(statInfo.st_mode);}boolKfsClientImpl::IsDirectory(const char *pathname){ MutexLock l(&mMutex); struct stat statInfo; if (Stat(pathname, statInfo, false) != 0) return false; return S_ISDIR(statInfo.st_mode);}intKfsClientImpl::LookupAttr(kfsFileId_t parentFid, const char *filename, KfsFileAttr &result, bool computeFilesize){ MutexLock l(&mMutex); if (parentFid < 0) return -EINVAL; int fte = LookupFileTableEntry(parentFid, filename); LookupOp op(nextSeq(), parentFid, filename); if (fte >= 0) { result = mFileTable[fte]->fattr; } else { (void)DoMetaOpWithRetry(&op); if (op.status < 0) return op.status; result = op.fattr; } if ((!result.isDirectory) && computeFilesize) { if (result.fileSize < 0) { result.fileSize = ComputeFilesize(result.fileId); } } else { result.fileSize = -1; } if (fte >= 0) { // if we computed the filesize, then we stash it; otherwise, we'll // set the value to -1 and force a recompute later... mFileTable[fte]->fattr.fileSize = result.fileSize; return 0; } // cache the entry if possible fte = AllocFileTableEntry(parentFid, filename, ""); if (fte < 0) return op.status; mFileTable[fte]->fattr = op.fattr; mFileTable[fte]->openMode = 0; // if we computed the filesize, then we stash it; otherwise, we'll // set the value to -1 and force a recompute later... mFileTable[fte]->fattr.fileSize = result.fileSize; return op.status;}intKfsClientImpl::Create(const char *pathname, int numReplicas, bool exclusive){ MutexLock l(&mMutex); kfsFileId_t parentFid; string filename; int res = GetPathComponents(pathname, &parentFid, filename); if (res < 0) { KFS_LOG_VA_DEBUG("status %d for pathname %s", res, pathname); return res; } if (filename.size() >= MAX_FILENAME_LEN) return -ENAMETOOLONG; CreateOp op(nextSeq(), parentFid, filename.c_str(), numReplicas, exclusive); (void)DoMetaOpWithRetry(&op); if (op.status < 0) { KFS_LOG_VA_DEBUG("status %d from create RPC", op.status); return op.status; } // Everything is good now... int fte = ClaimFileTableEntry(parentFid, filename.c_str(), pathname); if (fte < 0) { // XXX Too many open files KFS_LOG_VA_DEBUG("status %d from ClaimFileTableEntry", fte); return fte; } FileAttr *fa = FdAttr(fte); fa->fileId = op.fileId; fa->Init(false); // is an ordinary file FdInfo(fte)->openMode = O_RDWR; return fte;}intKfsClientImpl::Remove(const char *pathname){ MutexLock l(&mMutex); kfsFileId_t parentFid; string filename; int res = GetPathComponents(pathname, &parentFid, filename); if (res < 0) return res; int fte = LookupFileTableEntry(parentFid, filename.c_str()); if (fte > 0) ReleaseFileTableEntry(fte); RemoveOp op(nextSeq(), parentFid, filename.c_str()); (void)DoMetaOpWithRetry(&op); return op.status;}intKfsClientImpl::Rename(const char *oldpath, const char *newpath, bool overwrite){ MutexLock l(&mMutex); kfsFileId_t parentFid; string oldfilename; int res = GetPathComponents(oldpath, &parentFid, oldfilename); if (res < 0) return res; string absNewpath = build_path(mCwd, newpath); RenameOp op(nextSeq(), parentFid, oldfilename.c_str(), absNewpath.c_str(), overwrite); (void)DoMetaOpWithRetry(&op); KFS_LOG_VA_DEBUG("Status of renaming %s -> %s is: %d", oldpath, newpath, op.status); // update the path cache if (op.status == 0) { int fte = LookupFileTableEntry(parentFid, oldfilename.c_str()); if (fte > 0) { string oldn = string(oldpath); NameToFdMapIter iter = mPathCache.find(oldn); if (iter != mPathCache.end()) mPathCache.erase(iter); mPathCache[absNewpath] = fte; mFileTable[fte]->pathname = absNewpath; } } return op.status;}intKfsClientImpl::Fileno(const char *pathname){ kfsFileId_t parentFid; string filename; int res = GetPathComponents(pathname, &parentFid, filename); if (res < 0) return res; return LookupFileTableEntry(parentFid, filename.c_str());}intKfsClientImpl::Open(const char *pathname, int openMode, int numReplicas){ MutexLock l(&mMutex); kfsFileId_t parentFid; string filename; int res = GetPathComponents(pathname, &parentFid, filename); if (res < 0) return res; if (filename.size() >= MAX_FILENAME_LEN) return -ENAMETOOLONG; LookupOp op(nextSeq(), parentFid, filename.c_str()); (void)DoMetaOpWithRetry(&op); if (op.status < 0) { if (openMode & O_CREAT) { // file doesn't exist. Create it return Create(pathname, numReplicas, openMode & O_EXCL); } return op.status; } else { // file exists; now fail open if: O_CREAT | O_EXCL if ((openMode & (O_CREAT|O_EXCL)) == (O_CREAT|O_EXCL)) return -EEXIST; } int fte = AllocFileTableEntry(parentFid, filename.c_str(), pathname); if (fte < 0) // Too many open files return fte; // O_RDONLY is 0 and we use 0 to tell if the entry isn't used if ((openMode & O_RDWR) || (openMode & O_RDONLY)) mFileTable[fte]->openMode = O_RDWR; else if (openMode & O_WRONLY) mFileTable[fte]->openMode = O_WRONLY; else // in this mode, we open the file to cache the attributes mFileTable[fte]->openMode = 0; // We got a path...get the fattr mFileTable[fte]->fattr = op.fattr; if (mFileTable[fte]->fattr.chunkCount > 0) { mFileTable[fte]->fattr.fileSize = ComputeFilesize(op.fattr.fileId); } if (openMode & O_TRUNC) Truncate(fte, 0); if (openMode & O_APPEND) Seek(fte, 0, SEEK_END); return fte;}intKfsClientImpl::Close(int fd){ MutexLock l(&mMutex); int status = 0; if ((!valid_fd(fd)) || (mFileTable[fd] == NULL)) return -EBADF; if (mFileTable[fd]->buffer.dirty) { status = FlushBuffer(fd); } KFS_LOG_VA_DEBUG("Closing filetable entry: %d", fd); ReleaseFileTableEntry(fd); return status;}intKfsClientImpl::Sync(int fd){ MutexLock l(&mMutex); if ((!valid_fd(fd)) || (mFileTable[fd] == NULL)) return -EBADF; if (mFileTable[fd]->buffer.dirty) { int status = FlushBuffer(fd); if (status < 0) return status; } return 0;}intKfsClientImpl::Truncate(int fd, off_t offset){ MutexLock l(&mMutex); if (!valid_fd(fd)) return -EBADF; // for truncation, file should be opened for writing if (mFileTable[fd]->openMode == O_RDONLY) return -EBADF; ChunkBuffer *cb = FdBuffer(fd); if (cb->dirty) { int res = FlushBuffer(fd); if (res < 0) return res; } // invalidate buffer in case it is past new EOF cb->invalidate(); FilePosition *pos = FdPos(fd); pos->ResetServers(); FileAttr *fa = FdAttr(fd); TruncateOp op(nextSeq(), fa->fileId, offset); (void)DoMetaOpWithRetry(&op); int res = op.status; if (res == 0) { fa->fileSize = offset; if (fa->fileSize == 0) fa->chunkCount = 0; // else // chunkcount is off...but, that is ok; it is never exposed to // the end-client. gettimeofday(&fa->mtime, NULL); // force a re-lookup of locations FdInfo(fd)->cattr.clear(); } return res;}intKfsClientImpl::GetDataLocation(const char *pathname, off_t start, size_t len, vector< vector <string> > &locations){ MutexLock l(&mMutex); int fd; // Non-existent if (!IsFile(pathname)) return -ENOENT; // load up the fte fd = LookupFileTableEntry(pathname); if (fd < 0) { // Open the file and cache the attributes fd = Open(pathname, 0); // we got too many open files? if (fd < 0) return fd; } return GetDataLocation(fd, start, len, locations);}intKfsClientImpl::GetDataLocation(int fd, off_t start, size_t len, vector< vector <string> > &locations){ MutexLock l(&mMutex); int res; // locate each chunk and get the hosts that are storing the chunk. for (size_t pos = start; pos < start + len; pos += KFS::CHUNKSIZE) { ChunkAttr *chunkAttr; int chunkNum = pos / KFS::CHUNKSIZE; if ((res = LocateChunk(fd, chunkNum)) < 0) { return res; } chunkAttr = &(mFileTable[fd]->cattr[chunkNum]); vector<string> hosts; for (vector<string>::size_type i = 0; i < chunkAttr->chunkServerLoc.size(); i++) hosts.push_back(chunkAttr->chunkServerLoc[i].hostname); locations.push_back(hosts); } return 0;}int16_tKfsClientImpl::GetReplicationFactor(const char *pathname){ MutexLock l(&mMutex); int fd; // Non-existent if (!IsFile(pathname)) return -ENOENT; // load up the fte fd = LookupFileTableEntry(pathname); if (fd < 0) { // Open the file for reading...this'll get the attributes setup fd = Open(pathname, 0); // we got too many open files? if (fd < 0) return fd; } return mFileTable[fd]->fattr.numReplicas;}int16_tKfsClientImpl::SetReplicationFactor(const char *pathname, int16_t numReplicas){ MutexLock l(&mMutex); int res, fd; // Non-existent if (!IsFile(pathname)) return -ENOENT; // load up the fte fd = LookupFileTableEntry(pathname); if (fd < 0) { // Open the file and get the attributes cached fd = Open(pathname, 0); // we got too many open files? if (fd < 0) return fd; } ChangeFileReplicationOp op(nextSeq(), FdAttr(fd)->fileId, numReplicas); (void) DoMetaOpWithRetry(&op); if (op.status == 0) { FdAttr(fd)->numReplicas = op.numReplicas; res = op.numReplicas; } else res = op.status; return res;}off_tKfsClientImpl::Seek(int fd, off_t offset){ return Seek(fd, offset, SEEK_SET);}off_tKfsClientImpl::Seek(int fd, off_t offset, int whence){ MutexLock l(&mMutex); if (!valid_fd(fd) || mFileTable[fd]->fattr.isDirectory) return (off_t) -EBADF; FilePosition *pos = FdPos(fd); off_t newOff; switch (whence) { case SEEK_SET: newOff = offset; break; case SEEK_CUR: newOff = pos->fileOffset + offset; break; case SEEK_END: newOff = mFileTable[fd]->fattr.fileSize + offset; break; default: return (off_t) -EINVAL; } int32_t chunkNum = newOff / KFS::CHUNKSIZE; // If we are changing chunks, we need to reset the socket so that // it eventually points to the right place if (pos->chunkNum != chunkNum) { ChunkBuffer *cb = FdBuffer(fd); if (cb->dirty) { FlushBuffer(fd); } assert(!cb->dirty); // better to panic than silently lose a write if (cb->dirty) { KFS_LOG_ERROR("Unable to flush data to server...aborting"); abort(); } // Disconnect from all the servers we were connected for this chunk pos->ResetServers(); } pos->fileOffset = newOff; pos->chunkNum = chunkNum; pos->chunkOffset = newOff % KFS::CHUNKSIZE; return newOff;}off_t KfsClientImpl::Tell(int fd){ MutexLock l(&mMutex); return mFileTable[fd]->currPos.fileOffset;}////// Send a request to the meta server to allocate a chunk./// @param[in] fd The index for an entry in mFileTable[] for which/// space should be allocated./// @param[in] numBytes The # of bytes we will write to this file/// @retval 0 if successful; -errno otherwise///intKfsClientImpl::AllocChunk(int fd){ FileAttr *fa = FdAttr(fd);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -