📄 chunkmanager.cc
字号:
//---------------------------------------------------------- -*- Mode: C++ -*-// $Id: ChunkManager.cc 238 2009-01-09 07:46:35Z sriramsrao $ //// Created 2006/03/28// Author: Sriram Rao//// Copyright 2008 Quantcast Corp.// Copyright 2006-2008 Kosmix Corp.//// This file is part of Kosmos File System (KFS).//// Licensed under the Apache License, Version 2.0// (the "License"); you may not use this file except in compliance with// the License. You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or// implied. See the License for the specific language governing// permissions and limitations under the License.//// //----------------------------------------------------------------------------extern "C" {#include <dirent.h>#include <sys/time.h>#include <sys/resource.h>#include <sys/stat.h>#include <unistd.h>#include <sys/types.h>#include <sys/statvfs.h>}#include "common/log.h"#include "common/kfstypes.h"#include "ChunkManager.h"#include "ChunkServer.h"#include "MetaServerSM.h"#include "LeaseClerk.h"#include "Utils.h"#include "libkfsIO/Counter.h"#include "libkfsIO/Checksum.h"#include "libkfsIO/Globals.h"#include <fstream>#include <sstream>#include <algorithm>#include <string>#include <set>#include <boost/lexical_cast.hpp>using std::ofstream;using std::ifstream;using std::istringstream;using std::ostringstream;using std::ios_base;using std::list;using std::min;using std::max;using std::endl;using std::find_if;using std::string;using std::vector;using std::set;using namespace KFS;using namespace KFS::libkfsio;ChunkManager KFS::gChunkManager;// Cleanup fds on which no I/O has been done for the past N secsconst int INACTIVE_FDS_CLEANUP_INTERVAL_SECS = 300;// The # of fd's that we allow to be open before cleanup kicks in.// This value will be set to : # of files that the process can open / 2int OPEN_FDS_LOW_WATERMARK = 0;ChunkManager::ChunkManager(){ mTotalSpace = mUsedSpace = 0; mNumChunks = 0; mChunkManagerTimeoutImpl = new ChunkManagerTimeoutImpl(this); // we want a timeout once in 10 secs // mChunkManagerTimeoutImpl->SetTimeoutInterval(10 * 1000); mIsChunkTableDirty = false; mLastDriveChosen = -1;}ChunkManager::~ChunkManager(){ ChunkInfoHandle_t *cih; for (CMI iter = mChunkTable.begin(); iter != mChunkTable.end(); ++iter) { cih = iter->second; delete cih; } mChunkTable.clear(); globals().netManager.UnRegisterTimeoutHandler(mChunkManagerTimeoutImpl); delete mChunkManagerTimeoutImpl;}void ChunkManager::Init(const vector<string> &chunkDirs, int64_t totalSpace){ mTotalSpace = totalSpace; for (uint32_t i = 0; i < chunkDirs.size(); i++) { ChunkDirInfo_t c; c.dirname = chunkDirs[i]; mChunkDirs.push_back(c); } // force a stat of the dirs and update space usage counts GetTotalSpace();}intChunkManager::AllocChunk(kfsFileId_t fileId, kfsChunkId_t chunkId, kfsSeq_t chunkVersion, bool isBeingReplicated){ string s, chunkdir; int fd; ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); mIsChunkTableDirty = true; if (tableEntry != mChunkTable.end()) { cih = tableEntry->second; ChangeChunkVers(cih->chunkInfo.fileId, cih->chunkInfo.chunkId, chunkVersion); return 0; } // Find the directory to use chunkdir = GetDirForChunk(); if (chunkdir == "") { KFS_LOG_VA_INFO("No directory has space to host chunk %ld", chunkId); return -ENOSPC; } s = MakeChunkPathname(chunkdir, fileId, chunkId, chunkVersion); KFS_LOG_VA_INFO("Creating chunk: %s", s.c_str()); CleanupInactiveFds(); if ((fd = creat(s.c_str(), S_IRUSR | S_IWUSR)) < 0) { perror("Create failed: "); return -KFS::ESERVERBUSY; } close(fd); mNumChunks++; cih = new ChunkInfoHandle_t(); cih->chunkInfo.Init(fileId, chunkId, chunkVersion); cih->chunkInfo.SetDirname(chunkdir); cih->isBeingReplicated = isBeingReplicated; mChunkTable[chunkId] = cih; return 0;}intChunkManager::DeleteChunk(kfsChunkId_t chunkId){ string s; ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); if (tableEntry == mChunkTable.end()) return -EBADF; mIsChunkTableDirty = true; cih = tableEntry->second; s = MakeChunkPathname(cih); unlink(s.c_str()); KFS_LOG_VA_DEBUG("Deleting chunk: %s", s.c_str()); mNumChunks--; assert(mNumChunks >= 0); if (mNumChunks < 0) mNumChunks = 0; UpdateDirSpace(cih, -cih->chunkInfo.chunkSize); mUsedSpace -= cih->chunkInfo.chunkSize; mChunkTable.erase(chunkId); delete cih; return 0;}voidChunkManager::DumpChunkMap(){ ChunkInfoHandle_t *cih; ofstream ofs; ofs.open("chunkdump.txt"); // Dump chunk map in the format of // chunkID fileID chunkSize for (CMI tableEntry = mChunkTable.begin(); tableEntry != mChunkTable.end(); ++tableEntry) { cih = tableEntry->second; ofs << cih->chunkInfo.chunkId << " " << cih->chunkInfo.fileId << " " << cih->chunkInfo.chunkSize << endl; } ofs.flush(); ofs.close();}voidChunkManager::DumpChunkMap(ostringstream &ofs){ ChunkInfoHandle_t *cih; // Dump chunk map in the format of // chunkID fileID chunkSize for (CMI tableEntry = mChunkTable.begin(); tableEntry != mChunkTable.end(); ++tableEntry) { cih = tableEntry->second; ofs << cih->chunkInfo.chunkId << " " << cih->chunkInfo.fileId << " " << cih->chunkInfo.chunkSize << endl; }}intChunkManager::WriteChunkMetadata(kfsChunkId_t chunkId, KfsOp *cb){ CMI tableEntry = mChunkTable.find(chunkId); int res; if (tableEntry == mChunkTable.end()) return -EBADF; ChunkInfoHandle_t *cih = tableEntry->second; WriteChunkMetaOp *wcm = new WriteChunkMetaOp(chunkId, cb); DiskConnection *d = SetupDiskConnection(chunkId, wcm); if (d == NULL) return -KFS::ESERVERBUSY; wcm->diskConnection.reset(d); wcm->dataBuf = new IOBuffer(); cih->chunkInfo.Serialize(wcm->dataBuf); cih->lastIOTime = time(0); res = wcm->diskConnection->Write(0, wcm->dataBuf->BytesConsumable(), wcm->dataBuf); if (res < 0) { delete wcm; } return res >= 0 ? 0 : res;}intChunkManager::ReadChunkMetadata(kfsChunkId_t chunkId, KfsOp *cb){ CMI tableEntry = mChunkTable.find(chunkId); int res = 0; if (tableEntry == mChunkTable.end()) return -EBADF; ChunkInfoHandle_t *cih = tableEntry->second; cih->lastIOTime = time(0); if (cih->chunkInfo.AreChecksumsLoaded()) return cb->HandleEvent(EVENT_CMD_DONE, (void *) &res); if (cih->isMetadataReadOngoing) { // if we have issued a read request for this chunk's metadata, // don't submit another one; otherwise, we will simply drive // up memory usage for useless IO's cih->readChunkMetaOp->AddWaiter(cb); return 0; } ReadChunkMetaOp *rcm = new ReadChunkMetaOp(chunkId, cb); DiskConnection *d = SetupDiskConnection(chunkId, rcm); if (d == NULL) return -KFS::ESERVERBUSY; rcm->diskConnection.reset(d); res = rcm->diskConnection->Read(0, KFS_CHUNK_HEADER_SIZE); if (res < 0) { delete rcm; } cih->isMetadataReadOngoing = true; cih->readChunkMetaOp = rcm; return res >= 0 ? 0 : res;}voidChunkManager::ReadChunkMetadataDone(kfsChunkId_t chunkId){ CMI tableEntry = mChunkTable.find(chunkId); if (tableEntry == mChunkTable.end()) return; ChunkInfoHandle_t *cih = tableEntry->second; cih->lastIOTime = time(0); cih->isMetadataReadOngoing = false; cih->readChunkMetaOp = NULL;}intChunkManager::SetChunkMetadata(const DiskChunkInfo_t &dci){ ChunkInfoHandle_t *cih; if (GetChunkInfoHandle(dci.chunkId, &cih) < 0) return -EBADF; cih->chunkInfo.SetChecksums(dci.chunkBlockChecksum); return 0;}voidChunkManager::MarkChunkStale(ChunkInfoHandle_t *cih){ string s = MakeChunkPathname(cih); string staleChunkPathname = MakeStaleChunkPathname(cih); rename(s.c_str(), staleChunkPathname.c_str()); KFS_LOG_VA_INFO("Moving chunk %ld to staleChunks dir", cih->chunkInfo.chunkId); }intChunkManager::StaleChunk(kfsChunkId_t chunkId){ ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); if (tableEntry == mChunkTable.end()) return -EBADF; mIsChunkTableDirty = true; cih = tableEntry->second; MarkChunkStale(cih); mNumChunks--; assert(mNumChunks >= 0); if (mNumChunks < 0) mNumChunks = 0; UpdateDirSpace(cih, -cih->chunkInfo.chunkSize); mUsedSpace -= cih->chunkInfo.chunkSize; mChunkTable.erase(chunkId); delete cih; return 0;}intChunkManager::TruncateChunk(kfsChunkId_t chunkId, off_t chunkSize){ string chunkPathname; ChunkInfoHandle_t *cih; int res; uint32_t lastChecksumBlock; CMI tableEntry = mChunkTable.find(chunkId); // the truncated size should not exceed chunk size. if (chunkSize > (off_t) KFS::CHUNKSIZE) return -EINVAL; if (tableEntry == mChunkTable.end()) return -EBADF; mIsChunkTableDirty = true; cih = tableEntry->second; chunkPathname = MakeChunkPathname(cih); res = truncate(chunkPathname.c_str(), chunkSize); if (res < 0) { res = errno; return -res; } UpdateDirSpace(cih, -cih->chunkInfo.chunkSize); mUsedSpace -= cih->chunkInfo.chunkSize; mUsedSpace += chunkSize; cih->chunkInfo.chunkSize = chunkSize; UpdateDirSpace(cih, cih->chunkInfo.chunkSize); lastChecksumBlock = OffsetToChecksumBlockNum(chunkSize); // XXX: Could do better; recompute the checksum for this last block cih->chunkInfo.chunkBlockChecksum[lastChecksumBlock] = 0; return 0;}intChunkManager::ChangeChunkVers(kfsFileId_t fileId, kfsChunkId_t chunkId, int64_t chunkVersion){ string chunkPathname; ChunkInfoHandle_t *cih; CMI tableEntry = mChunkTable.find(chunkId); string oldname, newname, s; if (tableEntry == mChunkTable.end()) { return -1; } cih = tableEntry->second; oldname = MakeChunkPathname(cih); mIsChunkTableDirty = true; if (cih->chunkInfo.AreChecksumsLoaded()) s = "Checksums are loaded"; else s = "Checksums are not loaded"; KFS_LOG_VA_INFO("Chunk %s already exists; changing version # to %ld; %s", oldname.c_str(), chunkVersion, s.c_str()); cih->chunkInfo.chunkVersion = chunkVersion; newname = MakeChunkPathname(cih); rename(oldname.c_str(), newname.c_str());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -