📄 kfsclientint.h
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: KfsClientInt.h 226 2008-12-19 06:21:46Z mikeov $//// Created 2006/04/18// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------#ifndef LIBKFSCLIENT_KFSCLIENTINT_H#define LIBKFSCLIENT_KFSCLIENTINT_H#include <string>#include <vector>#include <tr1/unordered_map>#include <sys/select.h>#include "common/log.h"#include "common/hsieh_hash.h"#include "common/kfstypes.h"#include "libkfsIO/TcpSocket.h"#include "libkfsIO/Checksum.h"#include "libkfsIO/TelemetryClient.h"#include "KfsAttr.h"#include "KfsOps.h"#include "LeaseClerk.h" #include "concurrency.h"namespace KFS {/// Set this to 1MB: 64K * 16const size_t MIN_BYTES_PIPELINE_IO = CHECKSUM_BLOCKSIZE * 16 * 4;/// Per write, push out at most one checksum block size worth of dataconst size_t MAX_BYTES_PER_WRITE_IO = CHECKSUM_BLOCKSIZE;/// on zfs, blocks are 128KB on disk; so, align reads appropriatelyconst size_t MAX_BYTES_PER_READ_IO = CHECKSUM_BLOCKSIZE * 2;/// If an op fails because the server crashed, retry the op. This/// constant defines the # of retries before declaring failure.const uint8_t NUM_RETRIES_PER_OP = 3;/// Whenever an op fails, we need to give time for the server to/// recover. So, introduce a delay of 5 secs between retries.const int RETRY_DELAY_SECS = 5;/// Whenever we have issues with lease failures, we retry the op after a minuteconst int LEASE_RETRY_DELAY_SECS = 60;/// Directory entries that we may have cached are valid for 30 secs;/// after that force a revalidataion.const int FILE_CACHE_ENTRY_VALID_TIME = 30;////// A KfsClient maintains a file-table that stores information about/// KFS files on that client. Each file in the file-table is composed/// of some number of chunks; the meta-information about each/// chunk is stored in a chunk-table associated with that file. Thus, given a/// <file-id, offset>, we can map it to the appropriate <chunk-id,/// offset within the chunk>; we can also find where that piece of/// data is located and appropriately access it.///////// \brief Buffer for speeding up small reads and writes; holds/// on to a piece of data from one chunk.///struct ChunkBuffer { // set the client buffer to be fairly big...for sequential reads, // we will hit the network few times and on each occasion, we read // a ton and thereby get decent performance; having a big buffer // obviates the need to do read-ahead :-) static const size_t ONE_MB = 1 << 20; // to reduce memory footprint, keep a decent size buffer; we used to have // 64MB before static const size_t BUF_SIZE = KFS::CHUNKSIZE < 4 * ONE_MB ? KFS::CHUNKSIZE : 4 * ONE_MB; // static const size_t BUF_SIZE = KFS::CHUNKSIZE; ChunkBuffer():chunkno(-1), start(0), length(0), dirty(false), buf(NULL) { } ~ChunkBuffer() { delete [] buf; } void invalidate() { chunkno = -1; start = 0; length = 0; dirty = false; delete [] buf; } void allocate() { if (buf) return; buf = new char[BUF_SIZE]; } int chunkno; // which chunk off_t start; // offset with chunk size_t length; // length of valid data bool dirty; // must flush to server if true char *buf; // the data};struct ChunkServerConn { /// name/port of the chunk server to which this socket is /// connected. ServerLocation location; /// connected TCP socket. If this object is copy constructed, we /// really can't afford this socket to close when the "original" /// is destructed. To protect such issues, make this a smart pointer. TcpSocketPtr sock; ChunkServerConn(const ServerLocation &l) : location(l) { sock.reset(new TcpSocket()); } void Connect(bool nonblockingConnect = false) { if (sock->IsGood()) return; int res; res = sock->Connect(location, nonblockingConnect); if (res == -EINPROGRESS) { struct timeval selectTimeout; fd_set writeSet; int sfd = sock->GetFd(); FD_ZERO(&writeSet); FD_SET(sfd, &writeSet); selectTimeout.tv_sec = 30; selectTimeout.tv_usec = 0; res = select(sfd + 1, NULL, &writeSet, NULL, &selectTimeout); if ((res > 0) && (FD_ISSET(sfd, &writeSet))) { // connection completed return; } KFS_LOG_VA_INFO("Non-blocking connect to location %s failed", location.ToString().c_str()); res = -EHOSTUNREACH; } if (res < 0) { sock.reset(new TcpSocket()); } } bool operator == (const ServerLocation &other) const { return other == location; }};////// \brief Location of the file pointer in a file consists of two/// parts: the offset in the file, which then translates to a chunk #/// and an offset within the chunk. Also, for performance, we do some/// client-side buffering (for both reads and writes). The buffer/// stores data corresponding to the "current" chunk.///struct FilePosition { FilePosition() { fileOffset = chunkOffset = 0; chunkNum = 0; preferredServer = NULL; } ~FilePosition() { } void Reset() { fileOffset = chunkOffset = 0; chunkNum = 0; chunkServers.clear(); preferredServer = NULL; } off_t fileOffset; // offset within the file /// which chunk are we at: this is an index into fattr.chunkTable[] int32_t chunkNum; /// offset within the chunk off_t chunkOffset; /// For the purpose of write, we may have to connect to multiple servers std::vector<ChunkServerConn> chunkServers; /// For reads as well as meta requests about a chunk, this is the /// preferred server to goto. This is a pointer to a socket in /// the vector<ChunkServerConn> structure. TcpSocket *preferredServer; /// Track the location of the preferred server so we can print debug messages ServerLocation preferredServerLocation; void ResetServers() { chunkServers.clear(); preferredServer = NULL; } TcpSocket *GetChunkServerSocket(const ServerLocation &loc, bool nonblockingConnect = false) { std::vector<ChunkServerConn>::iterator iter; iter = std::find(chunkServers.begin(), chunkServers.end(), loc); if (iter != chunkServers.end()) { iter->Connect(nonblockingConnect); TcpSocket *s = iter->sock.get(); if (s->IsGood()) return s; return NULL; } // Bit of an issue here: The object that is being pushed is // copy constructed; when that object is destructed, the // socket it has will go. To avoid that, we need the socket // to be a smart pointer. chunkServers.push_back(ChunkServerConn(loc)); chunkServers[chunkServers.size()-1].Connect(nonblockingConnect); TcpSocket *s = chunkServers[chunkServers.size()-1].sock.get(); if (s->IsGood()) return s; return NULL; } /// take out the chunkserver from the list of servers that we can talk to. void AvoidServer(const ServerLocation &loc) { std::vector<ChunkServerConn>::iterator iter; iter = std::find(chunkServers.begin(), chunkServers.end(), loc); if (iter != chunkServers.end()) chunkServers.erase(iter); if (preferredServerLocation == loc) preferredServer = NULL; } void SetPreferredServer(const ServerLocation &loc, bool nonblockingConnect = false) { preferredServer = GetChunkServerSocket(loc, nonblockingConnect); preferredServerLocation = loc; } const ServerLocation &GetPreferredServerLocation() const { return preferredServerLocation; } TcpSocket *GetPreferredServer() { return preferredServer; } int GetPreferredServerAddr(struct sockaddr_in &saddr) { if (preferredServer == NULL) return -1; return preferredServer->GetPeerName((struct sockaddr *) &saddr); }};typedef std::tr1::unordered_map<std::string, int, Hsieh_hash_fcn> NameToFdMap; typedef std::tr1::unordered_map<std::string, int, Hsieh_hash_fcn>::iterator NameToFdMapIter;////// \brief A table of entries that describe each open KFS file.///struct FileTableEntry { // the fid of the parent dir in which this entry "resides" kfsFileId_t parentFid; // stores the name of the file/directory. std::string name; // store a pointer to the associated name-cache entry // NameToFdMapIter pathCacheIter; // the full pathname std::string pathname;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -