📄 masterapp.cpp
字号:
#include "MasterApp.h"
#include <fstream>
#include <set>
#include <IceUtil/IceUtil.h>
using namespace std;
using namespace Cluster;
using namespace ClientMaster;
void readLog(GDirectory* root, BlockTable* blockTable, ClerkFileList* clerkFileList,string logName, set<string>* prevClerks)
{
cout << "readLog...";
ifstream inLog(logName.c_str());
int MAX_LENGTH = 1024;
char* cont = new char[MAX_LENGTH];
// read file directory
inLog.getline(cont, MAX_LENGTH);
if (inLog.eof())
return;
inLog.getline(cont, MAX_LENGTH);
int size = atoi(cont);
for (int i = 0; i < size; i++)
{
inLog.getline(cont, MAX_LENGTH);
bool hasSpace = false;
int space=0;
while (*(cont+space) != '\0') {
if (*(cont+space) == ' ') {
hasSpace = true;
break;
}
space++;
}
if (hasSpace) {
string name(cont, 0, space);
string blockTableName(cont+space+1);
root->createFile(name, 0, &blockTableName);
} else {
string name(cont);
root->createFile(name, 1, NULL);
}
}
// read block table
inLog.getline(cont, MAX_LENGTH);
inLog.getline(cont, MAX_LENGTH);
size = atoi(cont);
for (int i = 0; i < size; i++)
{
// read file uid
inLog.getline(cont, MAX_LENGTH);
string fid(cont);
FileObject* fobj = new FileObject();
inLog.getline(cont, MAX_LENGTH);
int blockNum = atoi(cont);
for (int j = 0; j < blockNum; j++)
{
FileBlock* fb = new FileBlock();
// read blockID
inLog.getline(cont, MAX_LENGTH);
fb->blockID = atoi(cont);
// read indexID
inLog.getline(cont, MAX_LENGTH);
fb->indexID = string(cont);
inLog.getline(cont, MAX_LENGTH);
int ipsize = atoi(cont);
PCFBlock pcfb = new ClerkFileBlock();
pcfb->blockid = fb->blockID;
pcfb->fid = fid;
for (int k = 0; k < ipsize; k++)
{
inLog.getline(cont, MAX_LENGTH);
string ip = string(cont);
prevClerks->insert(ip);
fb->IPs.push_back(ip);
ClerkFileList::iterator cit = clerkFileList->find(ip);
if (cit == clerkFileList->end()) {
SubFileList* sflist = new SubFileList();
sflist->push_back(pcfb);
clerkFileList->insert(ClerkFileList::value_type(ip, sflist));
} else {
cit->second->push_back(pcfb);
}
}
fobj->push_back(*fb);
}
blockTable->insert(BlockTable::value_type(fid, fobj));
}
delete cont;
inLog.close();
cout << "finished" << endl;
}
void writeLog(GDirectory* root, BlockTable* blockTable, string logName)
{
ofstream outLog(logName.c_str());
// record the directory tree of files
vector<GFNode*> paths = root->allChildren();
outLog << "Files Map" << endl;
outLog << paths.size() << endl;
// cout << paths.size() << endl;
for (vector<GFNode*>::iterator it = paths.begin(); it != paths.end(); it++) {
if ((*it)->isDirectory())
outLog << (*it)->absolutePath() << endl;
else
outLog << (*it)->absolutePath() << " " << (*it)->blockTableName() << endl;
}
outLog << "Block Map" << endl;
outLog << blockTable->size() << endl;
for (BlockTable::iterator bit = blockTable->begin(); bit != blockTable->end(); bit++)
{
outLog << bit->first << endl;
FileObject* fobj = bit->second;
outLog << fobj->size() << endl;
for (FileObject::iterator fit = fobj->begin(); fit != fobj->end(); fit++)
{
outLog << fit->blockID << endl;
outLog << fit->indexID << endl;
outLog << fit->IPs.size() << endl;
for (desIPList::iterator dit = fit->IPs.begin(); dit != fit->IPs.end(); dit++)
outLog << *dit << endl;;
}
}
outLog.close();
}
DWORD WINAPI clerkCheckIn(PVOID pvParam)
{
MasterApp* app = (MasterApp*) pvParam;
vector<string>* bads = new vector<string>();
while (true)
{
Sleep(15000);
cout << "communication with clerks" << endl;
if (app->m_clerkMap->size() > 0)
{
for (ClerkMap::iterator it = app->m_clerkMap->begin(); it != app->m_clerkMap->end(); it++)
{
try {
HeartBeatMessage hbMsg;
it->second->reportState(hbMsg);
bool newclerk = true;
for (LoadTable::iterator lit = app->m_loadTable->begin(); lit != app->m_loadTable->end(); lit++) {
if (!it->first.compare((*lit)->clerkIP)) {
(*lit)->blockNum = hbMsg.blockNum;
newclerk = false;
break;
}
}
if (newclerk) {
PClerkState state = new ClerkState;
state->blockNum = hbMsg.blockNum;
state->clerkIP = hbMsg.ip;
app->m_loadTable->push_back(state);
}
cout << hbMsg.ip << " is alive." << endl;
} catch (const Ice::Exception& ex) {
cerr << ex << endl;
cerr << "clerk ip:" << it->first << endl;
bads->push_back(it->first);
} catch (const char* msg) {
cout << msg << endl;
cerr << "clerk ip:" << it->first << endl;
bads->push_back(it->first);
}
}
if (bads->size())
{
// handle clerk down
for (vector<string>::iterator it = bads->begin(); it != bads->end(); it++) {
app->m_clerkMap->erase(*it);
for (LoadTable::iterator lit = app->m_loadTable->begin(); lit != app->m_loadTable->end(); lit++) {
if (!(*lit)->clerkIP.compare(*it)) {
app->m_loadTable->erase(lit);
break;
}
}
}
app->integralityManager->maintain(bads);
bads->clear();
}
cout << "Load Table size: " << app->m_loadTable->size() << endl;
} else {
cout << "no clerk" << endl;
}
writeLog(app->m_root, app->m_blockTable, app->logname());
}
return 0;
}
MasterApp::MasterApp()
{
this->MasterMode = 0;
this->m_clerkMap = new ClerkMap();
this->m_root = new GDirectory("/", NULL);
this->m_blockTable = new BlockTable();
this->m_loadTable = new LoadTable();
this->m_clerkFileList = new ClerkFileList();
this->integralityManager = new IntegralityManager(m_blockTable, m_loadTable, m_clerkMap, m_clerkFileList);
}
string
MasterApp::logname()
{
return _logName;
}
void
MasterApp::initial()
{
set<string>* preClerks = new set<string>();
readLog(this->m_root, this->m_blockTable, this->m_clerkFileList, this->_logName, preClerks);
Sleep(20000);
if (this->m_clerkMap->size() > 1)
{
for (ClerkMap::iterator it = m_clerkMap->begin(); it != m_clerkMap->end(); it++)
{
HeartBeatMessage hbMsg;
it->second->reportState(hbMsg);
PClerkState state = new ClerkState;
state->blockNum = hbMsg.blockNum;
state->clerkIP = hbMsg.ip;
m_loadTable->push_back(state);
}
cout << m_loadTable->size() << endl;
vector<string>* ibads = new vector<string>();
for (set<string>::iterator sit = preClerks->begin(); sit != preClerks->end(); sit++) {
ClerkMap::iterator cit = m_clerkMap->find(*sit);
if (cit == m_clerkMap->end())
ibads->push_back(*sit);
}
if (ibads->size()) {
cout << "Initial nees Integrality maintain()" << endl;
integralityManager->maintain(ibads);
}
} else {
cout << "Too less clerks" << endl;
}
}
bool
MasterApp::integralityCheck()
{
return true;
}
int
MasterApp::run(int argc, char* argv[])
{
shutdownOnInterrupt();
this->_logName = communicator()->getProperties()->getProperty("Master.LogFile");
ClusterMessengerI::_adapter = communicator()->createObjectAdapter("ClusterMessenger");
m_clusterMessenger = new ClusterMessengerI(communicator(), this->m_clerkMap);
ClusterMessengerI::_adapter->activate();
cout << "SAFE MODE CHECK" << endl;
initial();
cout << "START WORK" << endl;
FileObjectsManagerI::_adapter = communicator()->createObjectAdapter("FileObjectsManager");
m_fileObjectManager = new FileObjectsManagerI(communicator(), m_root, m_blockTable, m_clerkMap, m_loadTable, m_clerkFileList);
FileObjectsManagerI::_adapter->activate();
DWORD clerkCheckInThreadID;
HANDLE clerkCheckInThread = CreateThread(
NULL, 0, clerkCheckIn, this, 0, &clerkCheckInThreadID);
communicator()->waitForShutdown();
if (interrupted()) {
cerr << appName()
<< ": reveived signal, shutting down" << endl;
}
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -