📄 dataqueue.h
字号:
#ifndef DATAQUEUE_H#define DATAQUEUE_H/** *@author */#define DATAQUEUESIZE 2048class CDataNode{public: BYTE* m_pbyData;//[DATAQUEUESIZE]; WORD m_wSize; timeval m_tv; bool m_bDIR; CDataNode() { m_pbyData = NULL; m_wSize = 0; } CDataNode(timeval tv,BYTE *pData,WORD wSize,bool bDIR = false) { m_tv.tv_sec = tv.tv_sec; m_tv.tv_usec = tv.tv_usec; m_bDIR = bDIR; m_wSize = wSize; if (pData && wSize) { m_pbyData = new BYTE[wSize]; memcpy(m_pbyData,pData,wSize); } else m_pbyData = NULL;// memcpy(m_pbyData,pData,wSize); } ~CDataNode() { if (m_pbyData && m_wSize) { delete [] m_pbyData; m_pbyData = NULL; m_wSize = 0; } } void SetSize(WORD wSize) { if (m_pbyData && m_wSize) { delete [] m_pbyData; m_pbyData = NULL; m_wSize = 0; } m_wSize = wSize; if (wSize) m_pbyData = new BYTE[wSize]; else m_pbyData = NULL; } void operator=(const CDataNode& dn) { if (m_pbyData && m_wSize) { delete [] m_pbyData; m_pbyData = NULL; m_wSize = 0; } m_tv.tv_sec = dn.m_tv.tv_sec; m_tv.tv_usec = dn.m_tv.tv_usec; m_bDIR = dn.m_bDIR; m_wSize = dn.m_wSize;// m_byType = dn.m_byType; if (dn.m_pbyData && dn.m_wSize) { m_pbyData = new BYTE[dn.m_wSize]; memcpy(m_pbyData,dn.m_pbyData,dn.m_wSize); }// memcpy(m_pbyData,dn.m_pbyData,dn.m_wSize); } bool operator==(const CDataNode& dn) const { bool bRet = false; if (m_wSize == dn.m_wSize) { if (m_wSize && m_pbyData && dn.m_pbyData) { if (memcmp(m_pbyData,dn.m_pbyData,m_wSize) == 0) bRet = true; } else if (m_wSize == 0) bRet = true; } return bRet; }// void Serialize(int hFile,bool bIsLoad); void AddData(BYTE* pData,WORD wSize) { if (pData && wSize) { WORD wNewSize = m_wSize+wSize; BYTE* pbyNewData = new BYTE[wNewSize]; if (m_pbyData && m_wSize) { memcpy(pbyNewData,m_pbyData,m_wSize); delete [] m_pbyData; } memcpy(pbyNewData+m_wSize,pData,wSize); m_wSize = wNewSize; m_pbyData = pbyNewData; }/* if (pData && wSize) { memcpy(m_pbyData+m_wSize,pData,wSize); m_wSize += wSize; }*/ }};class CFrameNode : public CDataNode{public: BYTE m_bySrcAddr; BYTE m_byDesAddr; WORD m_wSAddr; WORD m_wDAddr; BYTE m_byCID; CFrameNode() { m_byCID = 0; m_bySrcAddr = 0; m_byDesAddr = 0; m_pbyData = NULL; m_wSize = 0; } CFrameNode(timeval tv,BYTE *pData,WORD wSize,BYTE byDesAddr,BYTE bySrcAddr = 0,BYTE byCID=0) : CDataNode(tv,pData,wSize) { m_byCID = byCID; m_bySrcAddr = bySrcAddr; m_byDesAddr = byDesAddr; } CFrameNode(timeval tv,BYTE *pData,WORD wSize,WORD wDAddr,WORD wSAddr,BYTE byCID=0) : CDataNode(tv,pData,wSize) { m_byCID = byCID; m_wDAddr = wDAddr; m_wSAddr = wSAddr; } void operator=(const CFrameNode& fn) { m_byCID = fn.m_byCID; m_bySrcAddr = fn.m_bySrcAddr; m_byDesAddr = fn.m_byDesAddr; m_wDAddr = fn.m_wDAddr; m_wSAddr = fn.m_wSAddr; m_tv.tv_sec = fn.m_tv.tv_sec; m_tv.tv_usec = fn.m_tv.tv_usec; if (m_pbyData && m_wSize) { delete [] m_pbyData; m_pbyData = NULL; m_wSize = 0; } m_wSize = fn.m_wSize; if (fn.m_pbyData && fn.m_wSize) { m_pbyData = new BYTE[fn.m_wSize]; memcpy(m_pbyData,fn.m_pbyData,fn.m_wSize); } else m_pbyData = NULL;// m_wSize = dn.m_wSize;// memcpy(m_pbyData,dn.m_pbyData,dn.m_wSize); }};template <class TNode>class CQue{public: CQue() { pthread_mutex_init(&m_mutex,NULL);// PTHREAD_MUTEX_INITIALIZER) sem_init(&m_sem,0,0); } ~CQue() { RemoveAll(); pthread_mutex_destroy(&m_mutex); sem_destroy(&m_sem); } void RemoveAll() { while (Size()) { if (sem_trywait(&m_sem) ==0) {// int n; // sem_getvalue(&m_sem,&n); pthread_mutex_lock(&m_mutex); TNode *pDN = m_queDN.front(); m_queDN.pop(); delete pDN; pthread_mutex_unlock(&m_mutex); } } } int Size() {return m_queDN.size();} void Add(TNode *pn) { pthread_mutex_lock(&m_mutex); sem_post(&m_sem); m_queDN.push(pn); pthread_mutex_unlock(&m_mutex); } bool Get(TNode *&pDN,int nMilliSecs = 0) { bool bRet = true; int nWaitCount = 0; wait: if (sem_trywait(&m_sem) ==0) { pthread_mutex_lock(&m_mutex); pDN = m_queDN.front(); m_queDN.pop(); pthread_mutex_unlock(&m_mutex); } else { if (nWaitCount++ < nMilliSecs) { usleep(10000);// cout << "wait 10ms" << nWaitCount <<endl; goto wait; } else bRet = false; } return bRet; } void Serialize(bool bIsLoad);protected: queue<TNode *> m_queDN; pthread_mutex_t m_mutex; sem_t m_sem;};typedef CQue<CDataNode> CDQue;typedef CQue<CFrameNode> CFQue;class CIOStream{public: CIOStream(bool bGetIOStream = true) { m_pdn = NULL; m_bGetIOStream = bGetIOStream; } bool GetStreamNode(CDataNode*& pdnStream) { bool bRet = false; if (m_dqStream.Get(pdnStream,1)) { bRet = true; } return bRet; }protected: void AddIOStream(timeval& tv,BYTE* pbyBuf,WORD wSize,bool bDIR=false) { if (m_bGetIOStream) { if (bDIR == false) { if (m_pdn) { m_dqStream.Add(m_pdn); m_pdn = NULL; } CDataNode* pdn = new CDataNode(tv,pbyBuf,wSize,bDIR); m_dqStream.Add(pdn); } else if (m_pdn) m_pdn->AddData(pbyBuf,wSize); else m_pdn = new CDataNode(tv,pbyBuf,wSize,bDIR); #ifdef _DEBUGTEMP/* cout << '{' <<pdnStream->m_wSize << '}'; for (int nnn=0; nnn<pdnStream->m_wSize; nnn++) { char c[16]; sprintf(c," %02x ",pdnStream->m_pbyData[nnn]); cout << c; } cout <<"m_dqStream.Size() = " << m_dqStream.Size() << endl;*/ #endif if (m_dqStream.Size() > 10) { CDataNode* pdn1 = NULL; if (m_dqStream.Get(pdn1,0)) delete pdn1; } } } bool m_bGetIOStream; CDQue m_dqStream; CDataNode* m_pdn;};/*class CFQue{public: CFQue() { pthread_mutex_init(&m_mutex,NULL);// PTHREAD_MUTEX_INITIALIZER) sem_init(&m_sem,0,0); } ~CFQue() { while (Size()) { if (sem_trywait(&m_sem) ==0) { CFrameNode *pDN; int n; sem_getvalue(&m_sem,&n); pthread_mutex_lock(&m_mutex); queue<CFrameNode*>::reference pQue = m_queDN.front(); pDN = pQue; m_queDN.pop(); delete pDN; pthread_mutex_unlock(&m_mutex); } } pthread_mutex_destroy(&m_mutex); sem_destroy(&m_sem); } void RemoveAll() { while (Size()) { if (sem_trywait(&m_sem) ==0) { CFrameNode *pDN; int n; sem_getvalue(&m_sem,&n); pthread_mutex_lock(&m_mutex); queue<CFrameNode*>::reference pQue = m_queDN.front(); pDN = pQue; m_queDN.pop(); delete pDN; pthread_mutex_unlock(&m_mutex); } } } int Size() {return m_queDN.size();} void Add(CFrameNode *pn) { pthread_mutex_lock(&m_mutex); sem_post(&m_sem); m_queDN.push(pn); pthread_mutex_unlock(&m_mutex); } bool Get(CFrameNode *&pDN,int nMilliSecs = 0) { bool bRet = true; int nWaitCount = 0; wait: if (sem_trywait(&m_sem) ==0) { pthread_mutex_lock(&m_mutex); queue<CFrameNode*>::reference pQue = m_queDN.front(); pDN = pQue; m_queDN.pop(); pthread_mutex_unlock(&m_mutex); } else { if (nWaitCount++ < nMilliSecs) { usleep(10000);// cout << "wait 10毫秒" <<endl; goto wait; } else bRet = false; } return bRet; }protected: queue<CFrameNode *> m_queDN; pthread_mutex_t m_mutex; sem_t m_sem;};*/class CEventQue{public: CEventQue() { pthread_mutex_init(&m_mutex,NULL);// PTHREAD_MUTEX_INITIALIZER) } ~CEventQue() { while (Size()) { char *pszEvent; pthread_mutex_lock(&m_mutex); queue<char *>::reference pQue = m_queEvent.front(); pszEvent = pQue; m_queEvent.pop(); delete pszEvent; pthread_mutex_unlock(&m_mutex); } pthread_mutex_destroy(&m_mutex); } int Size() {return m_queEvent.size();} void Add(char* pszEvent) { pthread_mutex_lock(&m_mutex); m_queEvent.push(pszEvent); pthread_mutex_unlock(&m_mutex); } bool Get(char* &pszEvent) { bool bRet = true; if (Size()) { pthread_mutex_lock(&m_mutex); queue<char*>::reference pQue = m_queEvent.front(); pszEvent = pQue; m_queEvent.pop(); pthread_mutex_unlock(&m_mutex); } else bRet = false; return bRet; }protected: queue<char *> m_queEvent; pthread_mutex_t m_mutex;};/*template <class T>class CPool{public: CPool() { pthread_mutex_init(&m_mutex,NULL); } ~CPool() { map<T,CDQue * >::iterator it = m_pool.begin(); for (;it!= m_pool.end();it++) delete it->second; pthread_mutex_destroy(&m_mutex); } void AddDTU(T& strDTUID) { pair<T,CDQue *> p; p.first = strDTUID; p.second = new CDQue; m_pool.insert(p); } CDQue* GetDTUDQue(T& strDTUID) { CDQue* pRet = NULL; for (int i=0; i<2; i++) { map<T,CDQue * >::iterator p = m_pool.find(strDTUID); if (p != m_pool.end()) { pRet = p->second; break; } else AddDTU(strDTUID); } return pRet; } bool AddRecvNode(T& strDTUID,CDataNode* pdn) { bool bRet = true; pthread_mutex_lock(&m_mutex); CDQue* pDQue = GetDTUDQue(strDTUID); if (pDQue) pDQue->Add(pdn); else bRet = false; pthread_mutex_unlock(&m_mutex); return bRet; } CDQue *GetRecvQueue(BYTE byIndex) { CDQue* pRet = NULL; pthread_mutex_lock(&m_mutex); map<T,CDQue *>::iterator p = m_pool.begin(); for (BYTE b=0;p!=m_pool.end() && (b<byIndex);b++) p ++; if (p!=m_pool.end()) pRet = p->second; pthread_mutex_unlock(&m_mutex); return pRet; }protected: map<T,CDQue * > m_pool; pthread_mutex_t m_mutex;};//typedef CPool<CString> CDTUPool;typedef CPool<DWORD> CPeerPool;*/#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -