⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dataqueue.h

📁 一个通讯管理机的源代码。比较好用。推荐
💻 H
字号:
#ifndef DATAQUEUE_H#define DATAQUEUE_H/**  *@author   */#define DATAQUEUESIZE	2048class CDataNode{public:	BYTE* m_pbyData;//[DATAQUEUESIZE];	WORD m_wSize;	timeval m_tv;	bool m_bDIR;	CDataNode()	{		m_pbyData = NULL;		m_wSize = 0;	}	CDataNode(timeval tv,BYTE *pData,WORD wSize,bool bDIR = false)	{		m_tv.tv_sec = tv.tv_sec;		m_tv.tv_usec = tv.tv_usec;		m_bDIR = bDIR;		m_wSize = wSize;		if (pData && wSize)		{			m_pbyData = new BYTE[wSize];			memcpy(m_pbyData,pData,wSize);		}		else m_pbyData = NULL;//		memcpy(m_pbyData,pData,wSize);	}	~CDataNode() 	{		if (m_pbyData && m_wSize)		{			delete [] m_pbyData;			m_pbyData = NULL;			m_wSize = 0;		}	}	void SetSize(WORD wSize)	{		if (m_pbyData && m_wSize)		{			delete [] m_pbyData;			m_pbyData = NULL;			m_wSize = 0;		}		m_wSize = wSize;		if (wSize)			m_pbyData = new BYTE[wSize];		else m_pbyData = NULL;	}	void operator=(const CDataNode& dn)	{		if (m_pbyData && m_wSize)		{			delete [] m_pbyData;			m_pbyData = NULL;			m_wSize = 0;		}		m_tv.tv_sec = dn.m_tv.tv_sec;		m_tv.tv_usec = dn.m_tv.tv_usec;		m_bDIR = dn.m_bDIR;		m_wSize = dn.m_wSize;//		m_byType = dn.m_byType;		if (dn.m_pbyData && dn.m_wSize)		{			m_pbyData = new BYTE[dn.m_wSize];			memcpy(m_pbyData,dn.m_pbyData,dn.m_wSize);		}//		memcpy(m_pbyData,dn.m_pbyData,dn.m_wSize);	}	bool operator==(const CDataNode& dn) const	{		bool bRet = false;		if (m_wSize == dn.m_wSize)		{			if (m_wSize && m_pbyData && dn.m_pbyData)			{				if (memcmp(m_pbyData,dn.m_pbyData,m_wSize) == 0)					bRet = true;			}			else if (m_wSize == 0)				bRet = true;		}		return bRet;	}//	void Serialize(int hFile,bool bIsLoad);	void AddData(BYTE* pData,WORD wSize)	{		if (pData && wSize)		{			WORD wNewSize = m_wSize+wSize;			BYTE* pbyNewData = new BYTE[wNewSize];			if (m_pbyData && m_wSize)			{				memcpy(pbyNewData,m_pbyData,m_wSize);				delete [] m_pbyData;			}			memcpy(pbyNewData+m_wSize,pData,wSize);			m_wSize = wNewSize;			m_pbyData = pbyNewData;		}/*		if (pData && wSize)		{			memcpy(m_pbyData+m_wSize,pData,wSize);			m_wSize += wSize;		}*/	}};class CFrameNode : public CDataNode{public:	BYTE	m_bySrcAddr;	BYTE	m_byDesAddr;	WORD m_wSAddr;	WORD m_wDAddr;	BYTE	m_byCID;	CFrameNode()	{		m_byCID = 0;		m_bySrcAddr = 0;		m_byDesAddr = 0;		m_pbyData = NULL;		m_wSize = 0;	}	CFrameNode(timeval tv,BYTE *pData,WORD wSize,BYTE byDesAddr,BYTE bySrcAddr = 0,BYTE byCID=0)		: CDataNode(tv,pData,wSize)	{		m_byCID = byCID;		m_bySrcAddr = bySrcAddr;		m_byDesAddr = byDesAddr;	}	CFrameNode(timeval tv,BYTE *pData,WORD wSize,WORD wDAddr,WORD wSAddr,BYTE byCID=0)		: CDataNode(tv,pData,wSize)	{		m_byCID = byCID;		m_wDAddr = wDAddr;		m_wSAddr = wSAddr;	}	void operator=(const CFrameNode& fn)	{		m_byCID = fn.m_byCID;		m_bySrcAddr = fn.m_bySrcAddr;		m_byDesAddr = fn.m_byDesAddr;		m_wDAddr = fn.m_wDAddr;		m_wSAddr = fn.m_wSAddr;		m_tv.tv_sec = fn.m_tv.tv_sec;		m_tv.tv_usec = fn.m_tv.tv_usec;		if (m_pbyData && m_wSize)		{			delete [] m_pbyData;			m_pbyData = NULL;			m_wSize = 0;		}		m_wSize = fn.m_wSize;		if (fn.m_pbyData && fn.m_wSize)		{			m_pbyData = new BYTE[fn.m_wSize];			memcpy(m_pbyData,fn.m_pbyData,fn.m_wSize);		}		else m_pbyData = NULL;//		m_wSize = dn.m_wSize;//		memcpy(m_pbyData,dn.m_pbyData,dn.m_wSize);	}};template <class TNode>class CQue{public:	CQue()	{		pthread_mutex_init(&m_mutex,NULL);// PTHREAD_MUTEX_INITIALIZER)		sem_init(&m_sem,0,0);	}	~CQue()	{		RemoveAll();		pthread_mutex_destroy(&m_mutex);		sem_destroy(&m_sem);	}	void RemoveAll()	{		while (Size())		{			if (sem_trywait(&m_sem) ==0)			{//				int n;	//			sem_getvalue(&m_sem,&n);				pthread_mutex_lock(&m_mutex);				TNode *pDN = m_queDN.front();				m_queDN.pop();				delete pDN;				pthread_mutex_unlock(&m_mutex);			}		}	}	int Size() {return m_queDN.size();}	void Add(TNode *pn)	{		pthread_mutex_lock(&m_mutex);		sem_post(&m_sem);		m_queDN.push(pn);		pthread_mutex_unlock(&m_mutex);	}	bool Get(TNode *&pDN,int nMilliSecs = 0)	{		bool bRet = true;		int nWaitCount = 0;	wait:		if (sem_trywait(&m_sem) ==0)		{			pthread_mutex_lock(&m_mutex);			pDN = m_queDN.front();			m_queDN.pop();			pthread_mutex_unlock(&m_mutex);		}		else		{			if (nWaitCount++ < nMilliSecs)			{				usleep(10000);//				cout << "wait 10ms" << nWaitCount <<endl;				goto wait;			}			else bRet = false;		}		return bRet;	}	void Serialize(bool bIsLoad);protected:	queue<TNode *> m_queDN;	pthread_mutex_t	m_mutex;	sem_t			m_sem;};typedef CQue<CDataNode> CDQue;typedef CQue<CFrameNode> CFQue;class CIOStream{public:	CIOStream(bool bGetIOStream = true)	{		m_pdn = NULL;		m_bGetIOStream = bGetIOStream;	}	bool GetStreamNode(CDataNode*& pdnStream)	{		bool bRet = false;		if (m_dqStream.Get(pdnStream,1))		{			bRet = true;		}		return bRet;	}protected:	void AddIOStream(timeval& tv,BYTE* pbyBuf,WORD wSize,bool bDIR=false)	{		if (m_bGetIOStream)		{			if (bDIR == false)			{				if (m_pdn)				{					m_dqStream.Add(m_pdn);					m_pdn = NULL;				}				CDataNode* pdn = new CDataNode(tv,pbyBuf,wSize,bDIR);				m_dqStream.Add(pdn);			}			else if (m_pdn)				m_pdn->AddData(pbyBuf,wSize);			else				m_pdn = new CDataNode(tv,pbyBuf,wSize,bDIR);					#ifdef _DEBUGTEMP/*	cout  << '{' <<pdnStream->m_wSize << '}';	for (int nnn=0; nnn<pdnStream->m_wSize; nnn++)	{		char c[16];		sprintf(c," %02x ",pdnStream->m_pbyData[nnn]);		cout << c;	}	cout <<"m_dqStream.Size() = " << m_dqStream.Size() << endl;*/	#endif			if (m_dqStream.Size() > 10)			{				CDataNode* pdn1 = NULL;				if (m_dqStream.Get(pdn1,0))					delete pdn1;			}		}	}	bool m_bGetIOStream;	CDQue	m_dqStream;	CDataNode* m_pdn;};/*class CFQue{public:	CFQue()	{		pthread_mutex_init(&m_mutex,NULL);// PTHREAD_MUTEX_INITIALIZER)		sem_init(&m_sem,0,0);	}	~CFQue()	{		while (Size())		{			if (sem_trywait(&m_sem) ==0)			{				CFrameNode *pDN;				int n;				sem_getvalue(&m_sem,&n);				pthread_mutex_lock(&m_mutex);				queue<CFrameNode*>::reference pQue = m_queDN.front();				pDN = pQue;				m_queDN.pop();				delete pDN;				pthread_mutex_unlock(&m_mutex);			}		}				pthread_mutex_destroy(&m_mutex);		sem_destroy(&m_sem);	}	void RemoveAll()	{		while (Size())		{			if (sem_trywait(&m_sem) ==0)			{				CFrameNode *pDN;				int n;				sem_getvalue(&m_sem,&n);				pthread_mutex_lock(&m_mutex);				queue<CFrameNode*>::reference pQue = m_queDN.front();				pDN = pQue;				m_queDN.pop();				delete pDN;				pthread_mutex_unlock(&m_mutex);			}		}			}	int Size() {return m_queDN.size();}	void Add(CFrameNode *pn)	{		pthread_mutex_lock(&m_mutex);		sem_post(&m_sem);		m_queDN.push(pn);		pthread_mutex_unlock(&m_mutex);	}	bool Get(CFrameNode *&pDN,int nMilliSecs = 0)	{		bool bRet = true;		int nWaitCount = 0;	wait:		if (sem_trywait(&m_sem) ==0)		{			pthread_mutex_lock(&m_mutex);			queue<CFrameNode*>::reference pQue = m_queDN.front();			pDN = pQue;			m_queDN.pop();			pthread_mutex_unlock(&m_mutex);		}		else		{			if (nWaitCount++ < nMilliSecs)			{				usleep(10000);//				cout << "wait 10毫秒" <<endl;				goto wait;			}			else bRet = false;		}		return bRet;	}protected:	queue<CFrameNode *> m_queDN;	pthread_mutex_t	m_mutex;	sem_t			m_sem;};*/class CEventQue{public:	CEventQue()	{		pthread_mutex_init(&m_mutex,NULL);// PTHREAD_MUTEX_INITIALIZER)	}	~CEventQue()	{		while (Size())		{			char *pszEvent;			pthread_mutex_lock(&m_mutex);			queue<char *>::reference pQue = m_queEvent.front();			pszEvent = pQue;			m_queEvent.pop();			delete pszEvent;			pthread_mutex_unlock(&m_mutex);		}				pthread_mutex_destroy(&m_mutex);	}	int Size() {return m_queEvent.size();}	void Add(char* pszEvent)	{		pthread_mutex_lock(&m_mutex);		m_queEvent.push(pszEvent);		pthread_mutex_unlock(&m_mutex);	}	bool Get(char* &pszEvent)	{		bool bRet = true;		if (Size())		{			pthread_mutex_lock(&m_mutex);			queue<char*>::reference pQue = m_queEvent.front();			pszEvent = pQue;			m_queEvent.pop();			pthread_mutex_unlock(&m_mutex);		}		else 	bRet = false;		return bRet;	}protected:	queue<char *> m_queEvent;	pthread_mutex_t	m_mutex;};/*template <class T>class CPool{public:	CPool()	{		pthread_mutex_init(&m_mutex,NULL);	}	~CPool()	{		map<T,CDQue * >::iterator it = m_pool.begin();		for (;it!= m_pool.end();it++)			delete it->second;		pthread_mutex_destroy(&m_mutex);	}	void AddDTU(T& strDTUID)	{		pair<T,CDQue *> p;		p.first = strDTUID;		p.second = new CDQue;		m_pool.insert(p);	}	CDQue* GetDTUDQue(T& strDTUID)	{		CDQue* pRet = NULL;		for (int i=0; i<2; i++)		{			map<T,CDQue * >::iterator p = m_pool.find(strDTUID);			if (p != m_pool.end())			{				pRet = p->second;				break;			}			else				AddDTU(strDTUID);		}		return pRet;	}	bool AddRecvNode(T& strDTUID,CDataNode* pdn)	{		bool bRet = true;		pthread_mutex_lock(&m_mutex);		CDQue* pDQue = GetDTUDQue(strDTUID);		if (pDQue)			pDQue->Add(pdn);		else bRet = false;		pthread_mutex_unlock(&m_mutex);		return bRet;	}	CDQue *GetRecvQueue(BYTE byIndex)	{		CDQue* pRet = NULL;		pthread_mutex_lock(&m_mutex);		map<T,CDQue *>::iterator p = m_pool.begin();		for (BYTE b=0;p!=m_pool.end() && (b<byIndex);b++)			p ++;		if (p!=m_pool.end())			pRet = p->second;		pthread_mutex_unlock(&m_mutex);		return pRet;	}protected:	map<T,CDQue * > m_pool;	pthread_mutex_t	m_mutex;};//typedef CPool<CString> CDTUPool;typedef CPool<DWORD> CPeerPool;*/#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -