📄 mdatabase.cpp
字号:
//----------------------------------------------------------------------------
// 程序名称: Table.cpp
// 程序说明: CTable类实现
// 程序作者: 陈立峰
// 程序版本: 1.0
// 开始日期: 2005-06-09
//----------------------------------------------------------------------------
//#include <sys/types.h>
//#include <signal.h>
#include <stdio.h>
#include <string>
#include <iostream>
#include "MDatabase.h"
#include "log.h"
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
//CMDatabase::CMDatabase():m_cPoolOpt(ACE_DEFAULT_BASE_ADDR,ACE_DEFAULT_MAX_SEGMENTS,ACE_DEFAULT_FILE_PERMS,50*1024*1024,30*1024*1024),m_cMalloc(szPoolName,szLockName,&m_cPoolOpt),m_cDictLatch(szDictLatchName),m_cProcLatch(szProcLatchName),m_cMdbDict(&m_cMalloc)
CMDatabase::CMDatabase(const char *szDbName):m_cPoolOpt(MDB_MAP_BASE_ADDR,1,0777,getPoolSize(),0),m_cMalloc(getPoolName(szDbName),getLockName(szDbName),&m_cPoolOpt),m_cDictLatch(getDictLatchName(szDbName)),m_cProcLatch(getProcLatchName(szDbName)),m_cMdbDict(&m_cMalloc)
{
m_nMyPid = ACE_OS::getpid();
ACE_Guard<ACE_Process_Mutex> cGuard(m_cDictLatch);
if(!cGuard.locked())
{
throwException(MDB_EC_NORMAL,MDB_ET_GET_DICT_LOCK_FAIL,"CMDatabase::CMDatabase Error,Can not get Dict Lock\n");
}
void *pData = 0;
ACE_Based_Pointer<CDbTableMeta> *ppsTableMeta;
int result = m_cMalloc.find(getDictName(szDbName), pData);
if(result == -1)
{
if((pData = m_cMalloc.malloc(sizeof(ACE_Based_Pointer<CDbTableMeta>))) == NULL)
{
throwException(MDB_EC_NORMAL,MDB_ET_MEM_ALLOC_ERROR,"CMDatabase::CMDatabase Error,Can not Alloc Memory for ACE_Based_Pointer<CDbTableMeta>)\n");
}
ppsTableMeta = new (pData) ACE_Based_Pointer<CDbTableMeta>();
*ppsTableMeta = (CDbTableMeta *)NULL;
result = m_cMalloc.bind (getDictName(), pData);
if(result == -1)
{
throwException(MDB_EC_NORMAL,MDB_ET_DICT_MISSING,"CMDatabase::CMDatabase Error,Can not Bind MdbDict\n");
}
}
else
{
ppsTableMeta = reinterpret_cast<ACE_Based_Pointer<CDbTableMeta> *>(pData);
}
m_cMdbDict.touch(ppsTableMeta);
cGuard.release();
ACE_Guard<ACE_Process_Mutex> cGuard2(m_cProcLatch);
if(!cGuard2.locked())
{
throwException(MDB_EC_NORMAL,MDB_ET_GET_PROC_LOCK_FAIL,"CMDatabase::CMDatabase Error,Can not get Proc Lock\n");
}
result = m_cMalloc.find(getProcName(szDbName), pData);
if(result == -1)
{
if((pData = m_cMalloc.malloc(sizeof(ACE_Based_Pointer<CProcNode>))) == NULL)
{
throwException(MDB_EC_NORMAL,MDB_ET_MEM_ALLOC_ERROR,"CMDatabase::CMDatabase Error,Can not Alloc Memory for ACE_Based_Pointer<CProcNode>)\n");
}
m_ppsProcList = new (pData) ACE_Based_Pointer<CProcNode>();
*m_ppsProcList = (CProcNode *)NULL;
result = m_cMalloc.bind (getProcName(), pData);
if(result == -1)
{
throwException(MDB_EC_NORMAL,MDB_ET_PROC_LIST_MISSING,"CMDatabase::CMDatabase Error,Can not Bind MdbProc\n");
}
}
else
{
m_ppsProcList = reinterpret_cast<ACE_Based_Pointer<CProcNode> *>(pData);
}
if((pData = m_cMalloc.malloc(sizeof(CProcNode))) == NULL)
{
throwException(MDB_EC_NORMAL,MDB_ET_MEM_ALLOC_ERROR,"CMDatabase::CMDatabase Error,Can not Alloc Memory for CProcNode)\n");
}
// CProcNode *psProcNode = reinterpret_cast<CProcNode *>(pData);
CProcNode *psProcNode = new (pData) CProcNode();
psProcNode->m_nPid = m_nMyPid;
psProcNode->m_psNext = *m_ppsProcList;
*m_ppsProcList = psProcNode;
cGuard2.release();
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
CMDatabase::~CMDatabase()
{
m_cMalloc.release();
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
CMdbQuery *CMDatabase::queryCreate(SCreateTblPara *psCreateTblPara,SCreateIdxesPara *psCreateIdxesPara)
{
return new CMdbQuery(&m_cMalloc,&m_cDictLatch,&m_cMdbDict,m_nMyPid,psCreateTblPara,psCreateIdxesPara);
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
void CMDatabase::queryDrop(const char *szTableName)
{
CMdbQuery *pcMdbQuery = new CMdbQuery(&m_cMalloc,&m_cDictLatch,&m_cMdbDict,m_nMyPid,szTableName);
pcMdbQuery->destroy();
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
CMdbQuery *CMDatabase::querySelectForUpdate(SSelectPara *psSelectPara)
{
SWherePara *psWherePara = &(psSelectPara->m_sWherePara);
return new CMdbQuery(&m_cMalloc,&m_cDictLatch,&m_cMdbDict,m_nMyPid,psSelectPara->m_szTblName,psWherePara->m_nConCnt,psWherePara->m_szFieldNames,psWherePara->m_pnFieldTypes,psWherePara->m_ppVals,true);
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
CMdbQuery *CMDatabase::querySelect(SSelectPara *psSelectPara)
{
SWherePara *psWherePara = &(psSelectPara->m_sWherePara);
return new CMdbQuery(&m_cMalloc,&m_cDictLatch,&m_cMdbDict,m_nMyPid,psSelectPara->m_szTblName,psWherePara->m_nConCnt,psWherePara->m_szFieldNames,psWherePara->m_pnFieldTypes,psWherePara->m_ppVals,false);
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
void CMDatabase::queryDelete(SDeletePara *psDeletePara)
{
SWherePara *psWherePara = &(psDeletePara->m_sWherePara);
CMdbQuery *pcMdbQuery = new CMdbQuery(&m_cMalloc,&m_cDictLatch,&m_cMdbDict,m_nMyPid,psDeletePara->m_szTblName,psWherePara->m_nConCnt,psWherePara->m_szFieldNames,psWherePara->m_pnFieldTypes,psWherePara->m_ppVals,true);
pcMdbQuery->clear();
pcMdbQuery->destroy();
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
void CMDatabase::queryInsert(SInsertPara *psInsertPara)
{
CMdbQuery *pcMdbQuery = new CMdbQuery(&m_cMalloc,&m_cDictLatch,&m_cMdbDict,m_nMyPid,psInsertPara->m_szTblName,-1,NULL,NULL,NULL,true);
pcMdbQuery->createRec();
SValPara *psValPara = &(psInsertPara->m_sValPara);
if(psValPara->m_nFieldCnt != pcMdbQuery->getColCnt())
{
throwException(MDB_EC_NORMAL,MDB_ET_INSERT_VAL_NOT_ENOUGH,"CMDatabase::queryInsert Error,Val Count is not Enough\n");
}
for(int i=0;i<psValPara->m_nFieldCnt;i++)
{
pcMdbQuery->setFieldValByIndex(i,psValPara->m_pnFieldTypes[i],psValPara->m_ppVals[i]);
}
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
void CMDatabase::queryUpdate(SUpdatePara *psUpdatePara)
{
SSetPara *psSetPara = &(psUpdatePara->m_sSetPara);
SWherePara *psWherePara = &(psUpdatePara->m_sWherePara);
CMdbQuery *pcMdbQuery = new CMdbQuery(&m_cMalloc,&m_cDictLatch,&m_cMdbDict,m_nMyPid,psUpdatePara->m_szTblName,psWherePara->m_nConCnt,psWherePara->m_szFieldNames,psWherePara->m_pnFieldTypes,psWherePara->m_ppVals,true);
pcMdbQuery->resetRec();
while(pcMdbQuery->nextRec())
{
for(int i=0;i<psSetPara->m_nFieldCnt;i++)
{
pcMdbQuery->setFieldValByName(psSetPara->m_szFieldNames[i],psSetPara->m_pnFieldTypes[i],psSetPara->m_ppVals[i]);
}
}
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
void CMDatabase::checkForUnLock()
{
ACE_Guard<ACE_Process_Mutex> cGuard(m_cDictLatch);
if(!cGuard.locked())
{
throwException(MDB_EC_NORMAL,MDB_ET_GET_DICT_LOCK_FAIL,"checkForUnLock Error,Can not get Dict Lock\n");
//continue;
}
vector<pid_t> cProcIds;
getSysProcIds(&cProcIds);
CDbTableMeta *psTblMeta = NULL;
while((psTblMeta = m_cMdbDict.enumTableMeta(psTblMeta)) != NULL)
{
LOG4CPLUS_DEBUG(logger, "***Tbl Lock Show Begin,Tbl:" << psTblMeta->m_szTblName << ",ELock:" << psTblMeta->m_nOwnerPid);
if(psTblMeta->m_nOwnerPid != -1 && !checkProcExist(psTblMeta->m_nOwnerPid,&cProcIds))
{
LOG4CPLUS_INFO(logger,"found Missing ELock: " << psTblMeta->m_nOwnerPid);
psTblMeta->m_nOwnerPid = -1;
}
CProcNode *pcSharePid = NULL;
while((pcSharePid = m_cMdbDict.enumSharePid(psTblMeta,pcSharePid)) != NULL)
{
LOG4CPLUS_DEBUG(logger,"SLock:" << pcSharePid->m_nPid);
if(!checkProcExist(pcSharePid->m_nPid,&cProcIds))
{
LOG4CPLUS_INFO(logger,"found missing SLock: " << pcSharePid->m_nPid);
m_cMdbDict.deleteSharePid(psTblMeta, pcSharePid);
}
}
LOG4CPLUS_DEBUG(logger,"Tbl Lock Show End***");
}
cGuard.release();
/*
ACE_Guard<ACE_Process_Mutex> cGuard2(m_cProcLatch);
if(!cGuard2.locked())
{
throwException(MDB_EC_NORMAL,MDB_ET_GET_PROC_LOCK_FAIL,"checkForUnLock Error,Can not get Proc Lock\n");
//continue;
}
CProcNode *psPrevNode = NULL;
CProcNode *psProcNode = *m_ppsProcList;
LOG4CPLUS_DEBUG(logger,"***Proc List Show Begin");
while(psProcNode)
{
LOG4CPLUS_DEBUG(logger,"procid:" << psProcNode->m_nPid);
if(!checkProcExist(psProcNode->m_nPid,&cProcIds))
{
LOG4CPLUS_DEBUG(logger,"found missing Proc");
CProcNode *psNextNode = psProcNode->m_psNext;
if(!psPrevNode)
{
*m_ppsProcList = psNextNode;
}
else
{
psPrevNode->m_psNext = psNextNode;
}
m_cMalloc.free((void *)psProcNode);
psProcNode = psNextNode;
m_cMalloc.release();
}
else
{
psPrevNode = psProcNode;
psProcNode = psProcNode->m_psNext;
}
}
LOG4CPLUS_DEBUG(logger,"Proc List Show End***");
cGuard2.release();
*/
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
void CMDatabase::getSysProcIds(vector<pid_t> *pcProcIds)
{
FILE *pFp;
pFp = popen("ps -ef | sed 's/^ *//g' | sed 's/ */ /g' | cut -d ' ' -f 2 | sed '1d'","r");
if(!pFp)
{
throwException(MDB_EC_NORMAL,MDB_ET_PIPE_OPEN_FAIL,"CMDatabase::getProcs,Can not Open Pipe\n");
}
char pBuf[1025];
int nReadCnt;
string s;
while((nReadCnt = fread(pBuf,1,1024,pFp)) > 0)
{
pBuf[nReadCnt] = 0;
s += pBuf;
}
//err << s.c_str() << endl;
string::size_type nPrevPos = 0, nPos = 0;
while((nPos = s.find_first_of('\n',nPos)) != string::npos)
{
//cerr << "procid:" << atoi(s.substr(nPrevPos,nPos-nPrevPos).c_str());
pcProcIds->push_back((pid_t)(atoi(s.substr(nPrevPos,nPos-nPrevPos).c_str())));
nPrevPos = ++ nPos;
}
if(nPrevPos < s.length())
{
//cerr << "procid:" << atoi(s.substr(nPrevPos,nPos-nPrevPos).c_str());
pcProcIds->push_back((pid_t)(atoi(s.substr(nPrevPos).c_str())));
}
pclose(pFp);
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
bool CMDatabase::checkProcExist(pid_t nPid,const vector<pid_t> *pcProcIds)
{
for(unsigned int i=0;i<pcProcIds->size();i++)
{
if((*pcProcIds)[i] == nPid)
return true;
}
return false;
}
//----------------------------------------------------------------------------
// 函数原型: CWorkFlow::CWorkFlow(int nBucketNum)
// 函数功能: CWorkFlow 构造函数
// 传入参数: 无
// 传出参数: 无
// 函数返回: 无
// 注意事项: 无
//---------------------------------------------------------------------------
CDatabase *getMdb(const char *szDbName)
{
static CMDatabase cMDatabase(szDbName);
return &cMDatabase;
}
void removeMdb(const char *szDbName)
{
string szPoolName = "MDB_"; szPoolName += szDbName; szPoolName += "_POOL";
string szLockName = "MDB_"; szLockName += szDbName; szLockName += "_LOCK";
char *p = getenv("MDB_POOL_SIZE");
off_t poolSize = (off_t) ((p == NULL)? 200*1024*1024:atol(p));
MALLOC::MEMORY_POOL_OPTIONS aPoolOpt(MDB_MAP_BASE_ADDR,1,0777,poolSize,0);
MALLOC aMalloc(szPoolName.c_str(),szLockName.c_str(),&aPoolOpt);
aMalloc.remove();
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -