⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 masterapp.cpp

📁 GiPS是一个面向数据密集型应用的分布式文件系统
💻 CPP
字号:
#include "MasterApp.h"
#include <fstream>
#include <set>
#include <IceUtil/IceUtil.h>

using namespace std;
using namespace Cluster;
using namespace ClientMaster;

void readLog(GDirectory* root, BlockTable* blockTable, ClerkFileList* clerkFileList,string logName, set<string>* prevClerks)
{
	cout << "readLog...";
	ifstream inLog(logName.c_str());
	
	int MAX_LENGTH = 1024;
	char* cont = new char[MAX_LENGTH];
	// read file directory
	inLog.getline(cont, MAX_LENGTH);
	if (inLog.eof())
		return;

	inLog.getline(cont, MAX_LENGTH);
	int size = atoi(cont);
	for (int i = 0; i < size; i++)
	{
		inLog.getline(cont, MAX_LENGTH);
		bool hasSpace = false;

		int space=0;
		while (*(cont+space) != '\0') {
			if (*(cont+space) == ' ') {
				hasSpace = true;
				break;
			}
			space++;
		}

		if (hasSpace) {
			string name(cont, 0, space);
			string blockTableName(cont+space+1);
			root->createFile(name, 0, &blockTableName);
		} else {
			string name(cont);
			root->createFile(name, 1, NULL);
		}
	}
	
	// read block table
	inLog.getline(cont, MAX_LENGTH);
	inLog.getline(cont, MAX_LENGTH);
	size = atoi(cont);
	for (int i = 0; i < size; i++)
	{
		// read file uid
		inLog.getline(cont, MAX_LENGTH);
		string fid(cont);
		FileObject* fobj = new FileObject();

		inLog.getline(cont, MAX_LENGTH);
		int blockNum = atoi(cont);
		for (int j = 0; j < blockNum; j++)
		{
			FileBlock* fb = new FileBlock();
			// read blockID
			inLog.getline(cont, MAX_LENGTH);
			fb->blockID = atoi(cont);
			// read indexID
			inLog.getline(cont, MAX_LENGTH);
			fb->indexID = string(cont);
			
			inLog.getline(cont, MAX_LENGTH);
			int ipsize = atoi(cont);

			PCFBlock pcfb = new ClerkFileBlock();
			pcfb->blockid = fb->blockID;
			pcfb->fid = fid;
			for (int k = 0; k < ipsize; k++)
			{
				inLog.getline(cont, MAX_LENGTH);
				string ip = string(cont);
				prevClerks->insert(ip);
				fb->IPs.push_back(ip);

				ClerkFileList::iterator cit = clerkFileList->find(ip);
				if (cit == clerkFileList->end()) {
					SubFileList* sflist = new SubFileList();
					sflist->push_back(pcfb);
					clerkFileList->insert(ClerkFileList::value_type(ip, sflist));
				} else {
					cit->second->push_back(pcfb);
				}
			}
			fobj->push_back(*fb);
		}
		blockTable->insert(BlockTable::value_type(fid, fobj));
	}

	delete cont;

	inLog.close();
	cout << "finished" << endl;
}

void writeLog(GDirectory* root, BlockTable* blockTable, string logName)
{
	ofstream outLog(logName.c_str());

	// record the directory tree of files
	vector<GFNode*> paths = root->allChildren();
	outLog << "Files Map" << endl;
	outLog << paths.size() << endl;
//	cout << paths.size() << endl;
	for (vector<GFNode*>::iterator it = paths.begin(); it != paths.end(); it++) {
		if ((*it)->isDirectory())
			outLog << (*it)->absolutePath() << endl;
		else
			outLog << (*it)->absolutePath() << " " << (*it)->blockTableName() << endl;
	}

	outLog << "Block Map" << endl;
	outLog << blockTable->size() << endl;
	for (BlockTable::iterator bit = blockTable->begin(); bit != blockTable->end(); bit++)
	{
		outLog << bit->first << endl;
		FileObject* fobj = bit->second;
		outLog << fobj->size() << endl;
		
		for (FileObject::iterator fit = fobj->begin(); fit != fobj->end(); fit++)
		{
			outLog << fit->blockID << endl;
			outLog << fit->indexID << endl;
			outLog << fit->IPs.size() << endl;
			for (desIPList::iterator dit = fit->IPs.begin(); dit != fit->IPs.end(); dit++)
				outLog << *dit << endl;;
		}
	}

	outLog.close();
}

DWORD WINAPI clerkCheckIn(PVOID pvParam)
{
	MasterApp* app = (MasterApp*) pvParam;
	vector<string>* bads = new vector<string>();
	while (true)
	{
		Sleep(15000);
		cout << "communication with clerks" << endl;
		if (app->m_clerkMap->size() > 0)
		{
			for (ClerkMap::iterator it = app->m_clerkMap->begin(); it != app->m_clerkMap->end(); it++)
			{
				try {
					HeartBeatMessage hbMsg;
					it->second->reportState(hbMsg);
					bool newclerk = true;
					for (LoadTable::iterator lit = app->m_loadTable->begin(); lit != app->m_loadTable->end(); lit++) {
						if (!it->first.compare((*lit)->clerkIP)) {
							(*lit)->blockNum = hbMsg.blockNum;
							newclerk = false;
							break;
						}
					}

					if (newclerk) {
						PClerkState state = new ClerkState;
						state->blockNum = hbMsg.blockNum;
						state->clerkIP = hbMsg.ip;
						app->m_loadTable->push_back(state);
					}
					cout << hbMsg.ip << " is alive." << endl;
				} catch (const Ice::Exception& ex) {
					cerr << ex << endl;
					cerr << "clerk ip:" << it->first << endl;
					bads->push_back(it->first);
				} catch (const char* msg) {
					cout << msg << endl;
					cerr << "clerk ip:" << it->first << endl;
					bads->push_back(it->first);
				}
			}

			if (bads->size())
			{
				// handle clerk down
				for (vector<string>::iterator it = bads->begin(); it != bads->end(); it++) {
					app->m_clerkMap->erase(*it);
					for (LoadTable::iterator lit = app->m_loadTable->begin(); lit != app->m_loadTable->end(); lit++) {
						if (!(*lit)->clerkIP.compare(*it)) {
							app->m_loadTable->erase(lit);
							break;
						}
					}
				}

				app->integralityManager->maintain(bads);
				bads->clear();
			}

			cout << "Load Table size: " << app->m_loadTable->size() << endl;
		} else {
			cout << "no clerk" << endl;
		}
		writeLog(app->m_root, app->m_blockTable, app->logname());
	}

	return 0;
}

MasterApp::MasterApp()
{
	this->MasterMode = 0;
	this->m_clerkMap = new ClerkMap();
	this->m_root = new GDirectory("/", NULL);
	this->m_blockTable = new BlockTable();
	this->m_loadTable = new LoadTable();
	this->m_clerkFileList = new ClerkFileList();
	this->integralityManager = new IntegralityManager(m_blockTable, m_loadTable, m_clerkMap, m_clerkFileList);
}

string 
MasterApp::logname()
{
	return _logName;
}

void
MasterApp::initial()
{
	set<string>* preClerks = new set<string>();
	readLog(this->m_root, this->m_blockTable, this->m_clerkFileList, this->_logName, preClerks);
	Sleep(20000);
	if (this->m_clerkMap->size() > 1)
	{
		for (ClerkMap::iterator it = m_clerkMap->begin(); it != m_clerkMap->end(); it++)
		{
			HeartBeatMessage hbMsg;
			it->second->reportState(hbMsg);
			PClerkState state = new ClerkState;
			state->blockNum = hbMsg.blockNum;
			state->clerkIP = hbMsg.ip;
			m_loadTable->push_back(state);
		}
		cout << m_loadTable->size() << endl;

		vector<string>* ibads = new vector<string>();
		for (set<string>::iterator sit = preClerks->begin(); sit != preClerks->end(); sit++) {
			ClerkMap::iterator cit = m_clerkMap->find(*sit);
			if (cit == m_clerkMap->end())
				ibads->push_back(*sit);
		}

		if (ibads->size()) {
			cout << "Initial nees Integrality maintain()" << endl;
			integralityManager->maintain(ibads);
		}
	} else {
		cout << "Too less clerks" << endl;
	}
}

bool
MasterApp::integralityCheck()
{
	return true;
}

int
MasterApp::run(int argc, char* argv[])
{
	shutdownOnInterrupt();
	this->_logName = communicator()->getProperties()->getProperty("Master.LogFile");

	ClusterMessengerI::_adapter = communicator()->createObjectAdapter("ClusterMessenger");
	m_clusterMessenger = new ClusterMessengerI(communicator(), this->m_clerkMap);
	ClusterMessengerI::_adapter->activate();

	cout << "SAFE MODE CHECK" << endl;
	initial();

	cout << "START WORK" << endl;
	FileObjectsManagerI::_adapter = communicator()->createObjectAdapter("FileObjectsManager");
	m_fileObjectManager = new FileObjectsManagerI(communicator(), m_root, m_blockTable, m_clerkMap, m_loadTable, m_clerkFileList);
	FileObjectsManagerI::_adapter->activate();

	DWORD clerkCheckInThreadID;
	HANDLE clerkCheckInThread = CreateThread(
		NULL, 0, clerkCheckIn, this, 0, &clerkCheckInThreadID);

	communicator()->waitForShutdown();
	if (interrupted()) {
		cerr << appName()
			<< ": reveived signal, shutting down" << endl;
	}

	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -