📄 monitorsvr.cpp
字号:
#include "allmgr.h"#include "monitorsvr.h"/*void CDFrame::SetupFrame(BOOL bIsAsk,BYTE bySAddr,BYTE byDAddr,BYTE byCID,BYTE* pbyData,WORD wDataSize){ m_wDataSize = wDataSize; if (m_wDataSize) { m_pbyData = new BYTE[m_wDataSize]; memcpy(m_pbyData,pbyData,m_wDataSize); } m_bIsAsk = bIsAsk; m_bySAddr = bySAddr; m_byDAddr = byDAddr; m_byCID = byCID;}void CDFrame::FrameToBuf(BYTE*& pbyData,WORD& wSize){ wSize = m_wDataSize+6; pbyData = new BYTE[wSize]; WORD* pw = (WORD *)pbyData; *pw++ = wSize; BYTE* pby = (BYTE *)pw; *pby++ = m_bIsAsk?1:0; *pby++ = m_byCID; *pby++ = m_bySAddr; *pby++ = m_byDAddr; if (m_pbyData && m_wDataSize) { memcpy(pby,m_pbyData,m_wDataSize); }}BOOL CDFrame::BufToFrame(BYTE* pbyData,WORD wSize){ if (m_pbyData && m_wDataSize) { delete m_pbyData; m_pbyData = NULL; m_wDataSize = 0; } if (pbyData && (wSize>=6)) { BYTE* pby = pbyData+2; m_bIsAsk = BOOL(*pby++); m_byCID = *pby++; m_bySAddr = *pby++; m_byDAddr = *pby++; if ((m_wDataSize = wSize - 6) > 0) { m_pbyData = new BYTE [m_wDataSize]; memcpy(m_pbyData,pby,m_wDataSize); } return TRUE; } return FALSE;}*/bool CNBEchoSvr::OnRead(CTcpStream& stTcpStream){ char* pchBuffer = new char [stTcpStream.BufSize()]; int iReadAvail = stTcpStream.rdbuf()->in_avail(); int iWriteSpace = stTcpStream.BufSize() - stTcpStream.rdbuf()->out_waiting(); int iToRead = iReadAvail < iWriteSpace ? iReadAvail : iWriteSpace; if (iToRead > 0) { stTcpStream.read(pchBuffer, iToRead); WORD* pw = (WORD *)pchBuffer; if ((*pw++ == 0xeb90) && (*pw++ == 0xeb90)) { WORD wSize = *pw;/* if ((wSize == 1) && (pchBuffer[6] == 2) && (iToRead == 7)) //Read RealData { BYTE abyData[8192]; int nSize=0; m_pMonSvr->CDT(abyData,nSize);// int nSend = 0;// while (nSend<nSize) { stTcpStream.write(abyData,8);//nSize); } } else*/ if ((wSize == 5) && (pchBuffer[6] == 5) && (iToRead == 11)) //YK { WORD wDevNo = pchBuffer[7]; WORD wAddr = *(WORD *)&pchBuffer[8]; bool bOn = (pchBuffer[10]==0x40)?true:false;// *pw = 5; if (m_pAllMgr->YK(wDevNo,wAddr,bOn)) pchBuffer[6] = 0x15;//success else pchBuffer[6] = 0x25;//failure stTcpStream.write(pchBuffer,11); } else if ((wSize == 5) && (pchBuffer[6] == 6) && (iToRead == 11)) //Read FixValue { WORD wDevNo = pchBuffer[7]; WORD wFC = *(WORD *)&pchBuffer[8]; WORD wFVSize = (WORD)pchBuffer[10]; BYTE* pbyFV; if (m_pAllMgr->GetFV(wDevNo,pbyFV,wFVSize,wFC)) { *pw = wFVSize+4; pchBuffer[6] = 0x16;//success memcpy(&pchBuffer[10],pbyFV,wFVSize); delete pbyFV; stTcpStream.write(pchBuffer,10+wFVSize); } else { pchBuffer[6] = 0x26;//failure stTcpStream.write(pchBuffer,10); } } else if ((pchBuffer[6] == 7) && (iToRead == (6+wSize))) //Write FixValue { WORD wDevNo = pchBuffer[7]; WORD wFC = *(WORD *)&pchBuffer[8]; BYTE* pbyFV = (BYTE *)&pchBuffer[10]; WORD wFVSize = wSize -4 ; if (m_pAllMgr->PutFV(wDevNo,pbyFV,wFVSize,wFC)) pchBuffer[6] = 0x17;//success else pchBuffer[6] = 0x27;//failure *pw = 4; stTcpStream.write(pchBuffer,10); } } } delete [] pchBuffer; stTcpStream << flush; return true;}bool CNBEchoSvr::OnWrite(CTcpStream& stTcpStream){// int iWriteSpace = stTcpStream.BufSize() - stTcpStream.rdbuf()->out_waiting();// cout << iWriteSpace <<" | "<< stTcpStream.SocketFd()<< endl;// BYTE abyData[8192];// m_pMonSvr->CDT(abyData,iWriteSpace);// if (iWriteSpace > 0)// {// stTcpStream.read(pchBuffer, iToRead);// stTcpStream.write(abyData, iWriteSpace);// }// stTcpStream << flush; return true;}void CNBEchoSvr::WriteToAll(char* pchData,int nSize){ CConnList::iterator i = m_stConnList.begin(); while (i != m_stConnList.end()) { (*i)->m_pstTcpStream->flush(); if (!(*i)->m_pstTcpStream->good()) { delete (*i); i = m_stConnList.erase(i); continue; } (*i)->m_iLastAccessTime = (int)time(NULL); if (!OnWrite(*((*i)->m_pstTcpStream))) { delete (*i); i = m_stConnList.erase(i); continue; } else {// int iWriteSpace = (*i)->m_pstTcpStream->BufSize() - (*i)->m_pstTcpStream->rdbuf()->out_waiting();// if (nSize <= iWriteSpace)// { (*i)->m_pstTcpStream->write(pchData,nSize);// cout << "+++++++++iWriteSpace:" << iWriteSpace <<" "<< nSize<<endl; (*i)->m_pstTcpStream->flush();// }// / else// {// (*i)->m_pstTcpStream->underflow(); // cout << "-----------iWriteSpace:" << iWriteSpace <<" "<< nSize<<endl;// } usleep(200000); } i++; } }CMonitorSvr::CMonitorSvr(CAllMgr* pAllMgr)// : m_stNBEchoSvr(this,pAllMgr,"192.168.1.18", 8899) : m_stNBEchoSvr(this,pAllMgr,"192.168.1.40", 8899){ m_pAllMgr = pAllMgr; m_hThread = 0; pthread_mutex_init(&m_mutexSOE,NULL);// PTHREAD_MUTEX_INITIALIZER) pthread_mutex_init(&m_mutexEvent,NULL);// PTHREAD_MUTEX_INITIALIZER)}CMonitorSvr::~CMonitorSvr(){ pthread_mutex_destroy(&m_mutexSOE); pthread_mutex_destroy(&m_mutexEvent); if (m_hThread > 0) { cout << "SOCK Monitor:" << m_hThread << endl; pthread_cancel(m_hThread); } if (m_hThreadCDT > 0) //4.16 { cout << "CMonitorSvr -CDT:" << m_hThreadCDT << endl; //4.16 pthread_cancel(m_hThreadCDT); //4.16 }}bool CNBEchoSvr::Run(){ if (!m_bShutdown) { CheckConnTimeout(); CheckEvent(); OnCheck(); } return !m_bShutdown;}void* MonitorService(void *pVoid){ CMonitorSvr *pMSvr = (CMonitorSvr *)pVoid; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL); sleep(10); int i=0; while (true) { try { if (pMSvr->Run()) { if (i++%2 == 0) pMSvr->CDT(); else usleep(100000); } } catch (CException& e) {// if (!OnException(e))// break; } }}void* thCDT(void* pVoid) //4.16{ CMonitorSvr *pMSvr = (CMonitorSvr *)pVoid; //4.16 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); //4.16 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL); //4.16 while (true) { pMSvr->CDT(); }}bool CMonitorSvr::Init(){ bool bRet = false; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); if (pthread_create(&m_hThread,&attr,MonitorService,(void *)this) == 0) {/* pthread_attr_t attr1; pthread_attr_init(&attr1); pthread_attr_setdetachstate(&attr1,PTHREAD_CREATE_DETACHED); if (pthread_create(&m_hThreadCDT,&attr1,thCDT,(void *)this) == 0) { bRet = true; } else { pthread_cancel(m_hThread); m_hThread = 0; m_hThreadCDT = 0; } pthread_attr_destroy(&attr1);*/ } else m_hThread = 0; pthread_attr_destroy(&attr); return bRet;}void CMonitorSvr::CDT(){ static BYTE abyData[8192]; int nSize; static BYTE bbb=0;// for (BYTE bbb=0;bbb<4;bbb++)// { BYTE b; do { nSize = 0; switch (bbb++%4) { case 0://YX m_pAllMgr->GetAllYX(&abyData[7],nSize); b = 0; break; case 1://YC m_pAllMgr->GetAllYC(&abyData[7],nSize); b = 1; break; case 2://SOE while (m_quSOEData.size() > 0) { pthread_mutex_lock(&m_mutexSOE); SOEData soe = m_quSOEData.front(); BYTE* pTemp = &abyData[7+nSize]; memcpy(pTemp,(BYTE *)&soe,sizeof(soe)); nSize += sizeof(soe); m_quSOEData.pop(); pthread_mutex_unlock(&m_mutexSOE); if (nSize > 4096) break; } b = 2; break; case 3://EVENT if (m_quEvent.Size() > 0) { pthread_mutex_lock(&m_mutexEvent); char* p; m_quEvent.Get(p); BYTE* pTemp = &abyData[7]; memcpy(pTemp,p,strlen(p)); nSize += strlen(p); delete p; pthread_mutex_unlock(&m_mutexEvent); } b = 3; break; } } while (nSize<3); { WORD* pw = (WORD *)abyData; *pw++ = 0xeb90; *pw++ = 0xeb90; *pw = nSize + 1; abyData[6] = b; m_stNBEchoSvr.WriteToAll((char *)abyData,nSize+7); nSize += 7; }}void CMonitorSvr::AddSOE(SOEData* pSOEValue,WORD wCount){ pthread_mutex_lock(&m_mutexSOE); for (WORD www=0;www<wCount;www++) { SOEData soe = pSOEValue[www]; m_quSOEData.push(soe); } while (m_quSOEData.size() > 100) m_quSOEData.pop(); pthread_mutex_unlock(&m_mutexSOE);}void CMonitorSvr::AddEvent(WORD wDevNo,char* pszEvent){ pthread_mutex_lock(&m_mutexEvent); char* p = new char[strlen(pszEvent)+1]; memcpy(p,pszEvent,strlen(pszEvent)+1); m_quEvent.Add(p); while (m_quEvent.Size() > 200) { char* p; m_quEvent.Get(p); delete p;// m_quEvent.pop(); } pthread_mutex_unlock(&m_mutexEvent);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -