⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgqueue.cpp

📁 这个程序包里有3个部分:直播服务器
💻 CPP
字号:
#include "StdAfx.h"
#include "MsgQueue.h"
#include <assert.h>
#include <stdio.h>

MsgQueue::MsgQueue()
{
	Data=NULL;
	FileMappingHandle=NULL;
	ifBlock=TRUE;	
}

MsgQueue::~MsgQueue()
{
	if(Data==NULL) return;
	Data->AttachCount--;
	UnmapViewOfFile((LPVOID)Data);
	if(hSemaForGet!=NULL) CloseHandle(hSemaForGet);
	if(hSemaForPut!=NULL) CloseHandle(hSemaForPut);
	if(FileMappingHandle!=NULL) CloseHandle(FileMappingHandle);
}

DWORD MsgQueue::P(HANDLE hSema)
{
	return WaitForSingleObject(hSema,INFINITE);
}

BOOL MsgQueue::V(HANDLE hSema)
{
	return ReleaseSemaphore(hSema,1,NULL);
}

int MsgQueue::Create(char *MsgQueueName,unsigned long MsgQueueSize)
{
	//int		Ret;
	char    a[256];	
	
	//Ret=Open(MsgQueueName);	
	//if(Ret==-1) return(-1);	//如果已经创建则退出

	FileMappingHandle=CreateFileMapping(INVALID_HANDLE_VALUE,NULL,PAGE_READWRITE,0x0,MsgQueueSize+sizeof(MSG_QUEUE),MsgQueueName);
	if(FileMappingHandle==NULL)	return(-1);

	Data=(MSG_QUEUE *)MapViewOfFile(FileMappingHandle,FILE_MAP_WRITE,0,0,0);
	if(Data==NULL) return(-1);

	szBuf=(char*)((unsigned long)Data+(unsigned long)sizeof(MSG_QUEUE));
	Data->MQSize=MsgQueueSize;

	sprintf(a,"G%s",MsgQueueName);
	hSemaForGet=CreateSemaphore(NULL,1,1,a);
	if(hSemaForGet==NULL) return(-1);

	sprintf(a,"P%s",MsgQueueName);
	hSemaForPut=CreateSemaphore(NULL,1,1,a);
	if(hSemaForPut==NULL) return(-1);

	Data->AttachCount++;
	Data->IndexHead=0;
	Data->IndexTail=0;
	Data->DataHead=0;
	Data->DataTail=0;
	bExitFlag = FALSE;

	return(0);
}

int MsgQueue::Open(char *MsgQueueName)
{
	char    a[256];

	if(Data!=0) return(-1);		

	FileMappingHandle=OpenFileMapping(FILE_MAP_WRITE,TRUE,MsgQueueName);
	if(FileMappingHandle==NULL)	return(-1);

	Data=(MSG_QUEUE *)MapViewOfFile(FileMappingHandle,FILE_MAP_WRITE,0,0,0);	
	if(Data==NULL) return(-1);
	szBuf=(char*)((unsigned long)Data+(unsigned long)sizeof(MSG_QUEUE));

	sprintf(a,"G%s",MsgQueueName);
	hSemaForGet=OpenSemaphore(SEMAPHORE_ALL_ACCESS,TRUE,a);
	if(hSemaForGet==NULL) return(-1);
	
	sprintf(a,"P%s",MsgQueueName);
	hSemaForPut=OpenSemaphore(SEMAPHORE_ALL_ACCESS,TRUE,a);
	if(hSemaForPut==NULL) return(-1);
	
	Data->AttachCount++;
	bExitFlag = FALSE;
	return(0);
}

MsgQueue::Close()
{
	bExitFlag = TRUE;
	if(Data==NULL) return(-1);
	if(hSemaForGet!=NULL) CloseHandle(hSemaForGet);
	if(hSemaForPut!=NULL) CloseHandle(hSemaForPut);
	Sleep(20);
	hSemaForGet=NULL;
	hSemaForPut=NULL;
	Data->AttachCount--;
	UnmapViewOfFile((LPCVOID)Data);
	if(FileMappingHandle!=NULL) CloseHandle(FileMappingHandle);
	FileMappingHandle=NULL;
	Data=NULL;
	return(0);
}

int MsgQueue::Init()
{
	if(Data==NULL) return(-1);
	Data->IndexHead=0;
	Data->IndexTail=0;
	Data->DataHead=0;
	Data->DataTail=0;
	return 0;
}

int MsgQueue::GetMsg(char *Buffer)
{
	unsigned long IndexTail;
	unsigned long MsgBegin,MsgEnd;
	unsigned long len;

	if(Buffer==NULL) return(-1);
	if(Data==NULL)   return(-1);

	if(P(hSemaForGet) != WAIT_OBJECT_0) return -1;

	//消息队列为空	
	if(Data->IndexHead==Data->IndexTail){		
		if(ifBlock==FALSE){
			V(hSemaForGet);
			return(0);
		}
		for(;;){
			if(Data == NULL || bExitFlag){
				V(hSemaForGet);
				return -1;
			}
			Sleep(SLEEP_TIME);
			if(Data == NULL || bExitFlag){
				V(hSemaForGet);
				return -1;
			}
			if(Data->IndexHead!=Data->IndexTail) break;
		}
	}

	IndexTail=Data->IndexTail;

	MsgBegin=Data->Index[IndexTail][0];
	MsgEnd  =Data->Index[IndexTail][1];
	if(MsgBegin==MsgEnd){
		//TRACE0("MsgBegin==MsgEnd.\n");
		//assert(0);
	}
	if(MsgEnd>MsgBegin){
	    len=MsgEnd-MsgBegin;
	    memcpy(Buffer,(szBuf+MsgBegin),len);
	}
	else{
		len=Data->MQSize-MsgBegin;
		memcpy(Buffer,(szBuf+MsgBegin),len);
		memcpy((Buffer+len),szBuf,MsgEnd);
		len=Data->MQSize-MsgBegin+MsgEnd;
	}
	IndexTail=(IndexTail+1)%MAX_MSG_NUMBER;
	Data->IndexTail=IndexTail;
	Data->DataTail=MsgEnd;
	V(hSemaForGet);
	return len;
}

int MsgQueue::PutMsg(char *Buffer,unsigned long MsgLength)
{
	unsigned long DataHead,DataTail,len;
	unsigned long MsgBegin;
	unsigned long IndexHead;
	int			  RetVal;

	if(P(hSemaForPut) != WAIT_OBJECT_0) return -1;

	for(;;){
		DataHead=Data->DataHead;
		DataTail=Data->DataTail;
		RetVal=0;
		if((DataHead>=DataTail)&&(DataHead+MsgLength)>Data->MQSize){
			if((DataHead+MsgLength)%Data->MQSize>=DataTail) RetVal=-1;//full
		}
		if(DataHead<DataTail){
			if((DataHead+MsgLength)>=DataTail) RetVal=-1;//full
		}
		if(RetVal==-1){
			if(ifBlock==FALSE){
				V(hSemaForPut);
				return(-1);
			}
			if(Data == NULL || bExitFlag){
				V(hSemaForPut);
				return -1;
			}
			Sleep(SLEEP_TIME);
			if(Data == NULL || bExitFlag){
				V(hSemaForPut);
				return -1;
			}
		}
		else break;
	}
	MsgBegin=DataHead;
	if((DataHead+MsgLength)>=Data->MQSize){
		len=Data->MQSize-DataHead;
		memcpy((szBuf+DataHead),Buffer,len);
		memcpy(szBuf,(Buffer+len),MsgLength-len);
		DataHead=MsgLength-len;
	}
	else{
		memcpy((szBuf+DataHead),Buffer,MsgLength);
		DataHead=DataHead+MsgLength;
	}

	if((Data->IndexHead+1)%MAX_MSG_NUMBER==Data->IndexTail){		
		if(ifBlock==FALSE){
			V(hSemaForPut);
			return(0);
		}
		for(;;){
			if(Data == NULL || bExitFlag){
				V(hSemaForPut);
				return -1;
			}
			Sleep(SLEEP_TIME);
			if(Data == NULL || bExitFlag){
				V(hSemaForPut);
				return -1;
			}
			if((Data->IndexHead+1)%MAX_MSG_NUMBER!=Data->IndexTail) break;
		}
	}
	IndexHead=Data->IndexHead;
	Data->Index[IndexHead][0]=MsgBegin;
	Data->Index[IndexHead][1]=DataHead;
	IndexHead=(IndexHead+1)%MAX_MSG_NUMBER;

	Data->IndexHead=IndexHead;
	Data->DataHead=DataHead;
	V(hSemaForPut);
	return 0;
}

int MsgQueue::Destory(char *MsgQueueName)
{
	return -1;	
}

int MsgQueue::ClsQueue()
{
	if(P(hSemaForPut) != WAIT_OBJECT_0) return -1;	
	if(P(hSemaForGet) != WAIT_OBJECT_0) return -1;	
	Data->IndexHead = Data->IndexTail = 0;
	Data->DataHead = Data->DataTail = 0;
	V(hSemaForGet);
	V(hSemaForPut);
	return 0;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -