📄 kfsclient.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsClient.cc 245 2009-01-12 20:12:46Z sriramsrao $ //// Created 2006/04/18// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// \file KfsClient.cc// \brief Kfs Client-library code.////----------------------------------------------------------------------------#include "KfsClient.h"#include "KfsClientInt.h"#include "common/config.h"#include "common/properties.h"#include "common/log.h"#include "meta/kfstypes.h"#include "libkfsIO/Checksum.h"#include "Utils.h"extern "C" {#include <signal.h>#include <stdlib.h>}#include <cerrno>#include <iostream>#include <string>#include <netinet/in.h>#include <arpa/inet.h>#include <boost/scoped_array.hpp>using std::string;using std::ostringstream;using std::istringstream;using std::min;using std::max;using std::map;using std::vector;using std::sort;using std::transform;using std::random_shuffle;using std::cout;using std::endl;using namespace KFS;const int CMD_BUF_SIZE = 1024;// Set the default timeout for server I/O's to be 5 mins for now.// This is intentionally large so that we can do stuff in gdb and not// have the client timeout in the midst of a debug session.struct timeval gDefaultTimeout = {300, 0};namespace { Properties & theProps() { static Properties p; return p; }} KfsClientFactory *KFS::getKfsClientFactory(){ return KfsClientFactory::Instance();}KfsClientPtrKfsClientFactory::GetClient(const char *propFile){ bool verbose = false;#ifdef DEBUG verbose = true;#endif if (theProps().loadProperties(propFile, '=', verbose) != 0) { KfsClientPtr clnt; return clnt; } return GetClient(theProps().getValue("metaServer.name", ""), theProps().getValue("metaServer.port", -1));}class MatchingServer { ServerLocation loc;public: MatchingServer(const ServerLocation &l) : loc(l) { } bool operator()(KfsClientPtr &clnt) const { return clnt->GetMetaserverLocation() == loc; } bool operator()(const ServerLocation &other) const { return other == loc; }};KfsClientPtrKfsClientFactory::GetClient(const std::string metaServerHost, int metaServerPort){ vector<KfsClientPtr>::iterator iter; ServerLocation loc(metaServerHost, metaServerPort); iter = find_if(mClients.begin(), mClients.end(), MatchingServer(loc)); if (iter != mClients.end()) return *iter; KfsClientPtr clnt; clnt.reset(new KfsClient()); clnt->Init(metaServerHost, metaServerPort); if (clnt->IsInitialized()) mClients.push_back(clnt); else clnt.reset(); return clnt;}KfsClient::KfsClient(){ mImpl = new KfsClientImpl();}KfsClient::~KfsClient(){ delete mImpl;}voidKfsClient::SetLogLevel(string logLevel){ if (logLevel == "DEBUG") MsgLogger::SetLevel(log4cpp::Priority::DEBUG); else if (logLevel == "INFO") MsgLogger::SetLevel(log4cpp::Priority::INFO);}int KfsClient::Init(const std::string metaServerHost, int metaServerPort){ return mImpl->Init(metaServerHost, metaServerPort);}bool KfsClient::IsInitialized(){ return mImpl->IsInitialized();}intKfsClient::Cd(const char *pathname){ return mImpl->Cd(pathname);}stringKfsClient::GetCwd(){ return mImpl->GetCwd();}intKfsClient::Mkdirs(const char *pathname){ return mImpl->Mkdirs(pathname);}int KfsClient::Mkdir(const char *pathname){ return mImpl->Mkdir(pathname);}int KfsClient::Rmdir(const char *pathname){ return mImpl->Rmdir(pathname);}int KfsClient::Rmdirs(const char *pathname){ return mImpl->Rmdirs(pathname);}int KfsClient::Readdir(const char *pathname, std::vector<std::string> &result){ return mImpl->Readdir(pathname, result);}int KfsClient::ReaddirPlus(const char *pathname, std::vector<KfsFileAttr> &result){ return mImpl->ReaddirPlus(pathname, result);}int KfsClient::GetDirSummary(const char *pathname, uint64_t &numFiles, uint64_t &numBytes){ return mImpl->GetDirSummary(pathname, numFiles, numBytes);}int KfsClient::Stat(const char *pathname, struct stat &result, bool computeFilesize){ return mImpl->Stat(pathname, result, computeFilesize);}bool KfsClient::Exists(const char *pathname){ return mImpl->Exists(pathname);}bool KfsClient::IsFile(const char *pathname){ return mImpl->IsFile(pathname);}bool KfsClient::IsDirectory(const char *pathname){ return mImpl->IsDirectory(pathname);}intKfsClient::EnumerateBlocks(const char *pathname){ return mImpl->EnumerateBlocks(pathname);}boolKfsClient::VerifyDataChecksums(const char *pathname, const vector<uint32_t> &checksums){ return mImpl->VerifyDataChecksums(pathname, checksums);}boolKfsClient::VerifyDataChecksums(int fd, off_t offset, const char *buf, size_t numBytes){ return mImpl->VerifyDataChecksums(fd, offset, buf, numBytes);}int KfsClient::Create(const char *pathname, int numReplicas, bool exclusive){ return mImpl->Create(pathname, numReplicas, exclusive);}int KfsClient::Remove(const char *pathname){ return mImpl->Remove(pathname);}int KfsClient::Rename(const char *oldpath, const char *newpath, bool overwrite){ return mImpl->Rename(oldpath, newpath, overwrite);}int KfsClient::Open(const char *pathname, int openFlags, int numReplicas){ return mImpl->Open(pathname, openFlags, numReplicas);}int KfsClient::Fileno(const char *pathname){ return mImpl->Fileno(pathname);}int KfsClient::Close(int fd){ return mImpl->Close(fd);}ssize_t KfsClient::Read(int fd, char *buf, size_t numBytes){ return mImpl->Read(fd, buf, numBytes);}ssize_t KfsClient::Write(int fd, const char *buf, size_t numBytes){ return mImpl->Write(fd, buf, numBytes);}int KfsClient::Sync(int fd){ return mImpl->Sync(fd);}off_t KfsClient::Seek(int fd, off_t offset, int whence){ return mImpl->Seek(fd, offset, whence);}off_t KfsClient::Seek(int fd, off_t offset){ return mImpl->Seek(fd, offset, SEEK_SET);}off_t KfsClient::Tell(int fd){ return mImpl->Tell(fd);}int KfsClient::Truncate(int fd, off_t offset){ return mImpl->Truncate(fd, offset);}int KfsClient::GetDataLocation(const char *pathname, off_t start, size_t len, std::vector< std::vector <std::string> > &locations){ return mImpl->GetDataLocation(pathname, start, len, locations);}int KfsClient::GetDataLocation(int fd, off_t start, size_t len, std::vector< std::vector <std::string> > &locations){ return mImpl->GetDataLocation(fd, start, len, locations);}int16_t KfsClient::GetReplicationFactor(const char *pathname){ return mImpl->GetReplicationFactor(pathname);}int16_t KfsClient::SetReplicationFactor(const char *pathname, int16_t numReplicas){ return mImpl->SetReplicationFactor(pathname, numReplicas);}ServerLocationKfsClient::GetMetaserverLocation() const{ return mImpl->GetMetaserverLocation();}//// Now, the real work is done by the impl object....//KfsClientImpl::KfsClientImpl(){ pthread_mutexattr_t mutexAttr; int rval; const int hostnamelen = 256; char hostname[hostnamelen]; if (gethostname(hostname, hostnamelen)) { perror("gethostname: "); exit(-1); } mHostname = hostname; // store the entry for "/" int UNUSED_ATTR rootfte = ClaimFileTableEntry(KFS::ROOTFID, "/", "/"); assert(rootfte == 0); mFileTable[0]->fattr.fileId = KFS::ROOTFID; mFileTable[0]->fattr.isDirectory = true; mCwd = "/"; mIsInitialized = false; mCmdSeqNum = 0; // Setup the mutex to allow recursive locking calls. This // simplifies things when a public method (eg., read) in KFS client calls // another public method (eg., seek) and both need to take the lock rval = pthread_mutexattr_init(&mutexAttr); assert(rval == 0); rval = pthread_mutexattr_settype(&mutexAttr, PTHREAD_MUTEX_RECURSIVE); assert(rval == 0); rval = pthread_mutex_init(&mMutex, &mutexAttr); assert(rval == 0); // whenever a socket goes kaput, don't crash the app signal(SIGPIPE, SIG_IGN); // for random # generation, seed it srand(getpid());}int KfsClientImpl::Init(const string metaServerHost, int metaServerPort){ // Initialize the logger MsgLogger::Init(NULL); MsgLogger::SetLevel(log4cpp::Priority::INFO); //MsgLogger::SetLevel(log4cpp::Priority::DEBUG); mRandSeed = time(NULL); mMetaServerLoc.hostname = metaServerHost; mMetaServerLoc.port = metaServerPort; KFS_LOG_VA_DEBUG("Connecting to metaserver at: %s:%d", metaServerHost.c_str(), metaServerPort); if (!mMetaServerLoc.IsValid()) { mIsInitialized = false; KFS_LOG_VA_ERROR("Unable to connect to metaserver at: %s:%d", metaServerHost.c_str(), metaServerPort); return -1; } if (!ConnectToMetaServer()) { mIsInitialized = false; KFS_LOG_VA_ERROR("Unable to connect to metaserver at: %s:%d", metaServerHost.c_str(), metaServerPort); return -1; } // setup the telemetry stuff... struct ip_mreq imreq; string srvIp = "10.2.0.10"; // int srvPort = 12000; //int multicastPort = 13000; imreq.imr_multiaddr.s_addr = inet_addr("226.0.0.1"); imreq.imr_interface.s_addr = INADDR_ANY; // use DEFAULT interface // will setup this for release // mTelemetryReporter.Init(imreq, multicastPort, srvIp, srvPort); mIsInitialized = true; return 0;}boolKfsClientImpl::ConnectToMetaServer(){ return mMetaServerSock.Connect(mMetaServerLoc) >= 0;}/// A notion of "cwd" in KFS.///intKfsClientImpl::Cd(const char *pathname){ MutexLock l(&mMutex); struct stat s; string path = build_path(mCwd, pathname); int status = Stat(path.c_str(), s); if (status < 0) { KFS_LOG_VA_DEBUG("Non-existent path: %s", pathname); return -ENOENT; } if (!S_ISDIR(s.st_mode)) { KFS_LOG_VA_DEBUG("Non-existent dir: %s", pathname); return -ENOTDIR; } // strip the trailing '/' string::size_type pathlen = path.size(); string::size_type rslash = path.rfind('/'); if (rslash + 1 == pathlen) { // path looks like: /.../; so, get rid of the last '/' path.erase(rslash); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -